From b321668cde7aba0fa7864e816e0900fe8d273308 Mon Sep 17 00:00:00 2001 From: Robin Kupper Date: Mon, 4 Apr 2011 02:20:58 +0200 Subject: [PATCH] Moved consumer to an own class file. Saved queue on server shutdown. --- src/de/diddiz/LogBlock/Consumer.java | 131 +++++++++++++++++++ src/de/diddiz/LogBlock/LogBlock.java | 181 +++------------------------ 2 files changed, 149 insertions(+), 163 deletions(-) create mode 100644 src/de/diddiz/LogBlock/Consumer.java diff --git a/src/de/diddiz/LogBlock/Consumer.java b/src/de/diddiz/LogBlock/Consumer.java new file mode 100644 index 0000000..6458417 --- /dev/null +++ b/src/de/diddiz/LogBlock/Consumer.java @@ -0,0 +1,131 @@ +package de.diddiz.LogBlock; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Level; + +import org.bukkit.block.Block; +import org.bukkit.entity.Player; + +public class Consumer implements Runnable +{ + private LinkedBlockingQueue bqueue = new LinkedBlockingQueue(); + + public void queueBlock(Player player, Block block, int typeAfter) { + queueBlock(player.getName(), block, 0, typeAfter, (byte)0, null, null); + } + + public void queueBlock(String playerName, Block block, int typeBefore, int typeAfter, byte data) { + queueBlock(playerName, block, typeBefore, typeAfter, data, null, null); + } + + public void queueBlock(Player player, Block block, short inType, byte inAmount, short outType, byte outAmount) { + queueBlock(player.getName(), block, block.getTypeId(), block.getTypeId(), (byte)0, null, new ChestAccess(inType, inAmount, outType, outAmount)); + } + + public void queueBlock(String playerName, Block block, int typeBefore, int typeAfter, byte data, String signtext, ChestAccess ca) { + if (block == null || typeBefore < 0 || typeAfter < 0) + return; + String table = LogBlock.config.worldTables.get(LogBlock.config.worldNames.indexOf(block.getWorld().getName())); + if (table == null) + return; + if (playerName.length() > 32) + playerName = playerName.substring(0, 32); + BlockRow row = new BlockRow(table, playerName, typeBefore, typeAfter, data, block.getX(), block.getY(), block.getZ()); + if (signtext != null) + row.signtext = signtext; + if (ca != null) + row.ca = ca; + if (!bqueue.offer(row)) + LogBlock.log.info("[LogBlock] Failed to queue block for " + playerName); + } + + public int getQueueSize() { + return bqueue.size(); + } + + public void run() { + Connection conn = null; + Statement state = null; + BlockRow b; + int count = 0; + if (bqueue.size() > 100) + LogBlock.log.info("[LogBlock Consumer] Queue overloaded. Size: " + bqueue.size()); + try { + conn = DriverManager.getConnection("jdbc:jdc:jdcpool"); + conn.setAutoCommit(false); + state = conn.createStatement(); + long start = System.currentTimeMillis(); + while (count < 1000 && !bqueue.isEmpty() && System.currentTimeMillis() - start < 100) { + b = bqueue.poll(); + if (b == null) + continue; + state.execute("INSERT INTO `" + b.table + "` (date, playerid, replaced, type, data, x, y, z) SELECT now(), playerid, " + b.replaced + ", " + b.type + ", " + b.data + ", '" + b.x + "', " + b.y + ", '" + b.z + "' FROM `lb-players` WHERE playername = '" + b.name + "'", Statement.RETURN_GENERATED_KEYS); + if (b.signtext != null) { + ResultSet keys = state.getGeneratedKeys(); + keys.next(); + state.execute("INSERT INTO `" + b.table + "-sign` (id, signtext) values (" + keys.getInt(1) + ", '" + b.signtext + "')"); + } else if (b.ca != null) { + ResultSet keys = state.getGeneratedKeys(); + keys.next(); + state.execute("INSERT INTO `" + b.table + "-chest` (id, intype, inamount, outtype, outamount) values (" + keys.getInt(1) + ", " + b.ca.inType + ", " + b.ca.inAmount + ", " + b.ca.outType + ", " + b.ca.outAmount + ")"); + } + count++; + } + conn.commit(); + LogBlock.log.info("[LogBlock Consumer] Took: " + (System.currentTimeMillis() - start) + "ms for " + count + " blocks"); + } catch (SQLException ex) { + LogBlock.log.log(Level.SEVERE, "[LogBlock Consumer] SQL exception", ex); + } finally { + try { + if (state != null) + state.close(); + if (conn != null) + conn.close(); + } catch (SQLException ex) { + LogBlock.log.log(Level.SEVERE, "[LogBlock Consumer] SQL exception on close", ex); + } + } + } + + private class ChestAccess + { + public short inType, outType; + public byte inAmount, outAmount; + + ChestAccess(short inType, byte inAmount, short outType, byte outAmount) { + this.inType = inType; + this.inAmount = inAmount; + this.outType = outType; + this.outAmount = outAmount; + } + } + + private class BlockRow + { + public String table; + public String name; + public int replaced, type; + public byte data; + public int x, y, z; + public String signtext; + public ChestAccess ca; + + BlockRow(String table, String name, int replaced, int type, byte data, int x, int y, int z) { + this.table = table; + this.name = name; + this.replaced = replaced; + this.type = type; + this.data = data; + this.x = x; + this.y = y; + this.z = z; + this.signtext = null; + this.ca = null; + } + } +} diff --git a/src/de/diddiz/LogBlock/LogBlock.java b/src/de/diddiz/LogBlock/LogBlock.java index 74bacd7..b50afa3 100644 --- a/src/de/diddiz/LogBlock/LogBlock.java +++ b/src/de/diddiz/LogBlock/LogBlock.java @@ -5,12 +5,8 @@ import java.net.URL; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -57,7 +53,6 @@ public class LogBlock extends JavaPlugin static Logger log; static Config config; private Consumer consumer = null; - private LinkedBlockingQueue bqueue = new LinkedBlockingQueue(); @Override public void onEnable() { @@ -116,15 +111,19 @@ public class LogBlock extends JavaPlugin if (config.logLeavesDecay) pm.registerEvent(Type.LEAVES_DECAY, lbBlockListener, Event.Priority.Monitor, this); consumer = new Consumer(); - new Thread(consumer).start(); + getServer().getScheduler().scheduleAsyncRepeatingTask(this, consumer, config.delay * 20, config.delay * 20); log.info("Logblock v" + getDescription().getVersion() + " enabled."); } @Override public void onDisable() { - if (consumer != null) { - log.info("[LogBlock] Stopping consumer"); - consumer.stop(); + if (consumer != null && consumer.getQueueSize() > 0) { + log.info("[LogBlock] Waiting for consumer ..."); + Thread thread = new Thread(consumer); + while (consumer.getQueueSize() > 0) { + log.info("[LogBlock] Remaining queue size: " + consumer.getQueueSize()); + thread.run(); + } } log.info("LogBlock disabled."); } @@ -420,35 +419,6 @@ public class LogBlock extends JavaPlugin return config.worldTables.get(idx); } - private void queueBlock(Player player, Block block, int typeAfter) { - queueBlock(player.getName(), block, 0, typeAfter, (byte)0, null, null); - } - - private void queueBlock(String playerName, Block block, int typeBefore, int typeAfter, byte data) { - queueBlock(playerName, block, typeBefore, typeAfter, data, null, null); - } - - private void queueBlock(Player player, Block block, short inType, byte inAmount, short outType, byte outAmount) { - queueBlock(player.getName(), block, block.getTypeId(), block.getTypeId(), (byte)0, null, new ChestAccess(inType, inAmount, outType, outAmount)); - } - - private void queueBlock(String playerName, Block block, int typeBefore, int typeAfter, byte data, String signtext, ChestAccess ca) { - if (block == null || typeBefore < 0 || typeAfter < 0) - return; - String table = getTable(block); - if (table == null) - return; - if (playerName.length() > 32) - playerName = playerName.substring(0, 31); - BlockRow row = new BlockRow(table, playerName, typeBefore, typeAfter, data, block.getX(), block.getY(), block.getZ()); - if (signtext != null) - row.signtext = signtext; - if (ca != null) - row.ca = ca; - if (!bqueue.offer(row)) - log.info("[LogBlock] Failed to queue block for " + playerName); - } - private boolean CheckPermission(Player player, String permission) { if (config.usePermissions) return Permissions.Security.permission(player, permission); @@ -490,28 +460,28 @@ public class LogBlock extends JavaPlugin { public void onBlockPlace(BlockPlaceEvent event) { if (!event.isCancelled() && !(config.logSignTexts && (event.getBlock().getType() == Material.WALL_SIGN || event.getBlock().getType() == Material.SIGN_POST))) { - queueBlock(event.getPlayer().getName(), event.getBlockPlaced(), event.getBlockReplacedState().getTypeId(), event.getBlockPlaced().getTypeId(), event.getBlockPlaced().getData()); + consumer.queueBlock(event.getPlayer().getName(), event.getBlockPlaced(), event.getBlockReplacedState().getTypeId(), event.getBlockPlaced().getTypeId(), event.getBlockPlaced().getData()); } } public void onBlockBreak(BlockBreakEvent event) { if (!event.isCancelled()) - queueBlock(event.getPlayer().getName(), event.getBlock(), event.getBlock().getTypeId(), 0, event.getBlock().getData()); + consumer.queueBlock(event.getPlayer().getName(), event.getBlock(), event.getBlock().getTypeId(), 0, event.getBlock().getData()); } public void onSignChange(SignChangeEvent event) { if (!event.isCancelled()) - queueBlock(event.getPlayer().getName(), event.getBlock(), 0, event.getBlock().getTypeId(), event.getBlock().getData(), "sign [" + event.getLine(0) + "] [" + event.getLine(1) + "] [" + event.getLine(2) + "] [" + event.getLine(3) + "]", null); + consumer.queueBlock(event.getPlayer().getName(), event.getBlock(), 0, event.getBlock().getTypeId(), event.getBlock().getData(), "sign [" + event.getLine(0) + "] [" + event.getLine(1) + "] [" + event.getLine(2) + "] [" + event.getLine(3) + "]", null); } public void onBlockBurn(BlockBurnEvent event) { if (!event.isCancelled()) - queueBlock(config.logFireAs, event.getBlock(), event.getBlock().getTypeId(), 0, event.getBlock().getData()); + consumer.queueBlock(config.logFireAs, event.getBlock(), event.getBlock().getTypeId(), 0, event.getBlock().getData()); } public void onLeavesDecay(LeavesDecayEvent event) { if (!event.isCancelled()) - queueBlock(config.logLeavesDecayAs, event.getBlock(), event.getBlock().getTypeId(), 0, event.getBlock().getData()); + consumer.queueBlock(config.logLeavesDecayAs, event.getBlock(), event.getBlock().getTypeId(), 0, event.getBlock().getData()); } } @@ -529,7 +499,7 @@ public class LogBlock extends JavaPlugin else name = "Environment"; for (Block block : event.blockList()) - queueBlock(name, block, block.getTypeId(), 0, block.getData()); + consumer.queueBlock(name, block, block.getTypeId(), 0, block.getData()); } } } @@ -539,22 +509,22 @@ public class LogBlock extends JavaPlugin public void onPlayerInteract(PlayerInteractEvent event) { if (!event.isCancelled()) { if (event.getAction() == Action.RIGHT_CLICK_BLOCK && (event.getClickedBlock().getType() == Material.CHEST || event.getClickedBlock().getType() == Material.FURNACE ||event.getClickedBlock().getType() == Material.DISPENSER)) { - queueBlock(event.getPlayer(), event.getClickedBlock(), (short)0, (byte)0, (short)0, (byte)0); + consumer.queueBlock(event.getPlayer(), event.getClickedBlock(), (short)0, (byte)0, (short)0, (byte)0); } } } public void onPlayerBucketFill(PlayerBucketFillEvent event) { if (!event.isCancelled()) { - queueBlock(event.getPlayer().getName(), event.getBlockClicked(), event.getBlockClicked().getTypeId(), 0, event.getBlockClicked().getData()); + consumer.queueBlock(event.getPlayer().getName(), event.getBlockClicked(), event.getBlockClicked().getTypeId(), 0, event.getBlockClicked().getData()); } } public void onPlayerBucketEmpty(PlayerBucketEmptyEvent event) { if (event.getBucket() == Material.WATER_BUCKET) - queueBlock(event.getPlayer(), event.getBlockClicked().getFace(event.getBlockFace()), Material.STATIONARY_WATER.getId()); + consumer.queueBlock(event.getPlayer(), event.getBlockClicked().getFace(event.getBlockFace()), Material.STATIONARY_WATER.getId()); else if (event.getBucket() == Material.LAVA_BUCKET) - queueBlock(event.getPlayer(), event.getBlockClicked().getFace(event.getBlockFace()), Material.STATIONARY_LAVA.getId()); + consumer.queueBlock(event.getPlayer(), event.getBlockClicked().getFace(event.getBlockFace()), Material.STATIONARY_LAVA.getId()); } public void onPlayerJoin(PlayerJoinEvent event) { @@ -604,119 +574,4 @@ public class LogBlock extends JavaPlugin return false; } } - - private class Consumer implements Runnable - { - private boolean stop = false; - - Consumer() { - stop = false; - } - - public void stop() { - stop = true; - } - - public void run() { - PreparedStatement ps = null; - Connection conn = null; - BlockRow b; - while (!stop) { - long start = System.currentTimeMillis()/1000L; - int count = 0; - if (bqueue.size() > 100) - log.info("[LogBlock] Queue overloaded. Size: " + bqueue.size()); - try { - conn = getConnection(); - conn.setAutoCommit(false); - while (count < 100 && start + config.delay > (System.currentTimeMillis()/1000L)) { - b = bqueue.poll(1L, TimeUnit.SECONDS); - if (b == null) - continue; - ps = conn.prepareStatement("INSERT INTO `" + b.table + "` (`date`, `playerid`, `replaced`, `type`, `data`, `x`, `y`, `z`) SELECT now(), `playerid`, ?, ?, ?, ?, ? , ? FROM `lb-players` WHERE `playername` = ?", Statement.RETURN_GENERATED_KEYS); - ps.setInt(1, b.replaced); - ps.setInt(2, b.type); - ps.setByte(3, b.data); - ps.setInt(4, b.x); - ps.setInt(5, b.y); - ps.setInt(6, b.z); - ps.setString(7, b.name); - ps.executeUpdate(); - if (b.signtext != null) { - ResultSet keys = ps.getGeneratedKeys(); - keys.next(); - int key = keys.getInt(1); - ps = conn.prepareStatement("INSERT INTO `" + b.table + "-sign` (`id`, `signtext`) values (?,?)"); - ps.setInt(1, key); - ps.setString(2, b.signtext); - ps.executeUpdate(); - } else if (b.ca != null) { - ResultSet keys = ps.getGeneratedKeys(); - keys.next(); - int key = keys.getInt(1); - ps = conn.prepareStatement("INSERT INTO `" + b.table + "-chest` (`id`, `intype`, `inamount`, `outtype`, `outamount`) values (?,?,?,?,?)"); - ps.setInt(1, key); - ps.setShort(2, b.ca.inType); - ps.setByte(3, b.ca.inAmount); - ps.setShort(4, b.ca.outType); - ps.setByte(5, b.ca.outAmount); - ps.executeUpdate(); - } - count++; - } - conn.commit(); - } catch (InterruptedException ex) { - log.log(Level.SEVERE, "[LogBlock] Interrupted exception", ex); - } catch (SQLException ex) { - log.log(Level.SEVERE, "[LogBlock] SQL exception", ex); - } finally { - try { - if (ps != null) - ps.close(); - if (conn != null) - conn.close(); - } catch (SQLException ex) { - log.log(Level.SEVERE, "[LogBlock] SQL exception on close", ex); - } - } - } - } - } - - private class ChestAccess - { - public short inType, outType; - public byte inAmount, outAmount; - - ChestAccess(short inType, byte inAmount, short outType, byte outAmount) { - this.inType = inType; - this.inAmount = inAmount; - this.outType = outType; - this.outAmount = outAmount; - } - } - - private class BlockRow - { - public String table; - public String name; - public int replaced, type; - public byte data; - public int x, y, z; - public String signtext; - public ChestAccess ca; - - BlockRow(String table, String name, int replaced, int type, byte data, int x, int y, int z) { - this.table = table; - this.name = name; - this.replaced = replaced; - this.type = type; - this.data = data; - this.x = x; - this.y = y; - this.z = z; - this.signtext = null; - this.ca = null; - } - } }