From 6a2f400f591b0c5833290697bdc551ebc4c79ca2 Mon Sep 17 00:00:00 2001 From: Robin Kupper Date: Sun, 17 Jul 2011 01:37:26 +0200 Subject: [PATCH] Fixed reconnect, added flatfile queue dump --- src/de/diddiz/LogBlock/Consumer.java | 183 +++++++++++++++------------ src/de/diddiz/LogBlock/LogBlock.java | 45 ++++++- 2 files changed, 147 insertions(+), 81 deletions(-) diff --git a/src/de/diddiz/LogBlock/Consumer.java b/src/de/diddiz/LogBlock/Consumer.java index 672d540..7ab63ab 100644 --- a/src/de/diddiz/LogBlock/Consumer.java +++ b/src/de/diddiz/LogBlock/Consumer.java @@ -3,6 +3,9 @@ package de.diddiz.LogBlock; import static de.diddiz.util.BukkitUtils.compressInventory; import static de.diddiz.util.BukkitUtils.entityName; import static de.diddiz.util.BukkitUtils.rawData; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.PrintWriter; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -29,10 +32,9 @@ import org.bukkit.inventory.ItemStack; public class Consumer extends TimerTask { - private final Queue bqueue = new LinkedBlockingQueue(); - private final Queue kqueue = new LinkedBlockingQueue(); - private final Queue cqueue = new LinkedBlockingQueue(); + private final Queue queue = new LinkedBlockingQueue(); private final Config config; + private final Map tables; private final Set hiddenPlayers, hiddenBlocks; private final Map lastAttackedEntity = new HashMap(); private final Map lastAttackTime = new HashMap(); @@ -47,6 +49,7 @@ public class Consumer extends TimerTask config = logblock.getConfig(); hiddenPlayers = config.hiddenPlayers; hiddenBlocks = config.hiddenBlocks; + tables = config.tables; } /** @@ -195,9 +198,11 @@ public class Consumer extends TimerTask * Item id of the weapon. 0 for no weapon. */ public void queueKill(World world, String killerName, String victimName, int weapon) { - if (victimName == null || !config.tables.containsKey(world.getName().hashCode())) + if (victimName == null || !tables.containsKey(world.getName().hashCode())) return; - kqueue.add(new KillRow(world.getName().hashCode(), killerName, victimName, weapon)); + killerName = killerName.replaceAll("[^a-zA-Z0-9_]", ""); + victimName = victimName.replaceAll("[^a-zA-Z0-9_]", ""); + queue.add(new KillRow(world.getName().hashCode(), killerName, victimName, weapon)); } /** @@ -233,16 +238,15 @@ public class Consumer extends TimerTask } public void queueChat(String player, String message) { - cqueue.add(new ChatRow(player, message.replace("\\", "\\\\").replace("'", "\\'"))); + queue.add(new ChatRow(player, message.replace("\\", "\\\\").replace("'", "\\'"))); } @Override public void run() { - if (!lock.tryLock()) + if (queue.isEmpty() || !lock.tryLock()) return; final Connection conn = logblock.getConnection(); Statement state = null; - String table; if (getQueueSize() > 1000) log.info("[LogBlock Consumer] Queue overloaded. Size: " + getQueueSize()); try { @@ -252,70 +256,26 @@ public class Consumer extends TimerTask state = conn.createStatement(); final long start = System.currentTimeMillis(); int count = 0; - if (!bqueue.isEmpty()) { - while (!bqueue.isEmpty() && (System.currentTimeMillis() - start < config.timePerRun || count < config.forceToProcessAtLeast)) { - final BlockChange b = bqueue.poll(); - if (b == null) - continue; - final int playerHash = b.playerName.hashCode(); - if (!players.containsKey(playerHash)) - if (!addPlayer(conn, state, b.playerName)) { - log.warning("[LogBlock Consumer] Failed to add player " + b.playerName); - continue; + process: while (!queue.isEmpty() && (System.currentTimeMillis() - start < config.timePerRun || count < config.forceToProcessAtLeast)) { + final Row r = queue.poll(); + if (r == null) + continue; + for (final String player : r.getPlayers()) + if (!players.containsKey(player.hashCode())) + if (!addPlayer(state, player)) { + log.warning("[LogBlock Consumer] Failed to add player " + player); + continue process; } - final boolean needKeys = b.signtext != null || b.ca != null; - table = config.tables.get(b.loc.getWorld().getName().hashCode()); - state.execute("INSERT INTO `" + table + "` (date, playerid, replaced, type, data, x, y, z) VALUES (FROM_UNIXTIME(" + b.date + "), " + players.get(playerHash) + ", " + b.replaced + ", " + b.type + ", " + b.data + ", '" + b.loc.getBlockX() + "', " + b.loc.getBlockY() + ", '" + b.loc.getBlockZ() + "')", needKeys ? Statement.RETURN_GENERATED_KEYS : Statement.NO_GENERATED_KEYS); - if (needKeys) { - final ResultSet keys = state.getGeneratedKeys(); - if (keys.next()) { - if (b.signtext != null) - state.execute("INSERT INTO `" + table + "-sign` (id, signtext) values (" + keys.getInt(1) + ", '" + b.signtext + "')"); - if (b.ca != null) - state.execute("INSERT INTO `" + table + "-chest` (id, itemtype, itemamount, itemdata) values (" + keys.getInt(1) + ", " + b.ca.itemType + ", " + b.ca.itemAmount + ", " + b.ca.itemData + ")"); - } else - log.warning("[LogBlock Consumer] Failed to get generated keys. You may have to repair " + table); + for (final String insert : r.getInserts()) + try { + state.execute(insert); + } catch (final SQLException ex) { + log.log(Level.SEVERE, "[LogBlock Consumer] SQL exception on " + insert + ": ", ex); + break process; } - count++; - } - conn.commit(); - } - if (!kqueue.isEmpty()) { - while (!kqueue.isEmpty() && (System.currentTimeMillis() - start < config.timePerRun || count < config.forceToProcessAtLeast)) { - final KillRow k = kqueue.poll(); - if (k == null || k.victim == null) - continue; - if (k.killer != null && !players.containsKey(k.killer.hashCode())) - if (!addPlayer(conn, state, k.killer)) { - log.warning("[LogBlock Consumer] Failed to add player " + k.killer); - continue; - } - if (!players.containsKey(k.victim.hashCode())) - if (!addPlayer(conn, state, k.victim)) { - log.warning("[LogBlock Consumer] Failed to add player " + k.victim); - continue; - } - state.execute("INSERT INTO `" + config.tables.get(k.worldHash) + "-kills` (date, killer, victim, weapon) VALUES (now(), " + (k.killer == null ? "null" : players.get(k.killer.hashCode())) + ", " + players.get(k.victim.hashCode()) + ", " + k.weapon + ")"); - count++; - } - conn.commit(); - } - if (!cqueue.isEmpty()) { - while (!cqueue.isEmpty() && (System.currentTimeMillis() - start < config.timePerRun || count < config.forceToProcessAtLeast)) { - final ChatRow c = cqueue.poll(); - if (c == null) - continue; - final int playerHash = c.player.hashCode(); - if (!players.containsKey(playerHash)) - if (!addPlayer(conn, state, c.player)) { - log.warning("[LogBlock Consumer] Failed to add player " + c.player); - continue; - } - state.execute("INSERT INTO `lb-chat` (date, playerid, message) VALUES (FROM_UNIXTIME(" + c.date + "), " + players.get(playerHash) + ", '" + c.message + "')"); - count++; - } - conn.commit(); + count++; } + conn.commit(); } catch (final SQLException ex) { log.log(Level.SEVERE, "[LogBlock Consumer] SQL exception", ex); } finally { @@ -331,8 +291,25 @@ public class Consumer extends TimerTask } } + public void writeToFile() throws FileNotFoundException { + final File file = new File("plugins/LogBlock/consumer/queue.sql"); + file.getParentFile().mkdirs(); + final PrintWriter writer = new PrintWriter(file); + while (!queue.isEmpty()) { + final Row r = queue.poll(); + if (r == null) + continue; + for (final String player : r.getPlayers()) + if (!players.containsKey(player.hashCode())) + writer.println("INSERT IGNORE INTO `lb-players` (playername) VALUES ('" + player + "');"); + for (final String insert : r.getInserts()) + writer.println(insert); + } + writer.close(); + } + int getQueueSize() { - return bqueue.size() + kqueue.size() + cqueue.size(); + return queue.size(); } boolean hide(Player player) { @@ -345,9 +322,8 @@ public class Consumer extends TimerTask return true; } - private boolean addPlayer(Connection conn, Statement state, String playerName) throws SQLException { + private boolean addPlayer(Statement state, String playerName) throws SQLException { state.execute("INSERT IGNORE INTO `lb-players` (playername) VALUES ('" + playerName + "')"); - conn.commit(); final ResultSet rs = state.executeQuery("SELECT playerid FROM `lb-players` WHERE playername = '" + playerName + "'"); if (rs.next()) players.put(playerName.hashCode(), rs.getInt(1)); @@ -356,30 +332,71 @@ public class Consumer extends TimerTask } private void queueBlock(String playerName, Location loc, int typeBefore, int typeAfter, byte data, String signtext, ChestAccess ca) { - if (playerName == null || loc == null || typeBefore < 0 || typeAfter < 0 || hiddenPlayers.contains(playerName.hashCode()) || !config.tables.containsKey(loc.getWorld().getName().hashCode()) || typeBefore != typeAfter && hiddenBlocks.contains(typeBefore) && hiddenBlocks.contains(typeAfter)) + if (playerName == null || loc == null || typeBefore < 0 || typeAfter < 0 || hiddenPlayers.contains(playerName.hashCode()) || !tables.containsKey(loc.getWorld().getName().hashCode()) || typeBefore != typeAfter && hiddenBlocks.contains(typeBefore) && hiddenBlocks.contains(typeAfter)) return; playerName = playerName.replaceAll("[^a-zA-Z0-9_]", ""); if (signtext != null) signtext = signtext.replace("\\", "\\\\").replace("'", "\\'"); - bqueue.add(new BlockChange(System.currentTimeMillis() / 1000, loc, playerName, typeBefore, typeAfter, data, signtext, ca)); + queue.add(new BlockRow(loc, playerName, typeBefore, typeAfter, data, signtext, ca)); } - private static class KillRow + private static interface Row { - final String killer; - final String victim; + String[] getInserts(); + + String[] getPlayers(); + } + + private class BlockRow extends BlockChange implements Row + { + public BlockRow(Location loc, String playerName, int replaced, int type, byte data, String signtext, ChestAccess ca) { + super(System.currentTimeMillis() / 1000, loc, playerName, replaced, type, data, signtext, ca); + } + + @Override + public String[] getInserts() { + final String[] inserts = new String[ca != null || signtext != null ? 2 : 1]; + inserts[0] = "INSERT INTO `" + tables.get(loc.getWorld().getName().hashCode()) + "` (date, playerid, replaced, type, data, x, y, z) VALUES (FROM_UNIXTIME(" + date + "), " + players.get(playerName.hashCode()) + ", " + replaced + ", " + type + ", " + data + ", '" + loc.getBlockX() + "', " + loc.getBlockY() + ", '" + loc.getBlockZ() + "');"; + if (signtext != null) + inserts[1] = "INSERT INTO `" + tables.get(loc.getWorld().getName().hashCode()) + "-sign` (id, signtext) values (LAST_INSERT_ID(), '" + signtext + "');"; + else if (ca != null) + inserts[1] = "INSERT INTO `" + tables.get(loc.getWorld().getName().hashCode()) + "-chest` (id, itemtype, itemamount, itemdata) values (LAST_INSERT_ID(), " + ca.itemType + ", " + ca.itemAmount + ", " + ca.itemData + ");"; + return inserts; + } + + @Override + public String[] getPlayers() { + return new String[]{playerName}; + } + } + + private class KillRow implements Row + { + final long date; + final String killer, victim; final int weapon; final int worldHash; KillRow(int worldHash, String attacker, String defender, int weapon) { + date = System.currentTimeMillis() / 1000; this.worldHash = worldHash; killer = attacker; victim = defender; this.weapon = weapon; } + + @Override + public String[] getInserts() { + return new String[]{"INSERT INTO `" + tables.get(worldHash) + "-kills` (date, killer, victim, weapon) VALUES (FROM_UNIXTIME(" + date + "), " + (killer == null ? "NULL" : players.get(killer.hashCode())) + ", " + players.get(victim.hashCode()) + ", " + weapon + ");"}; + } + + @Override + public String[] getPlayers() { + return new String[]{killer, victim}; + } } - private static class ChatRow + private class ChatRow implements Row { final long date; final String player, message; @@ -389,5 +406,15 @@ public class Consumer extends TimerTask this.player = player; this.message = message; } + + @Override + public String[] getInserts() { + return new String[]{"INSERT INTO `lb-chat` (date, playerid, message) VALUES (FROM_UNIXTIME(" + date + "), " + players.get(player.hashCode()) + ", '" + message + "');"}; + } + + @Override + public String[] getPlayers() { + return new String[]{player}; + } } } diff --git a/src/de/diddiz/LogBlock/LogBlock.java b/src/de/diddiz/LogBlock/LogBlock.java index 5b7e381..d8165a5 100644 --- a/src/de/diddiz/LogBlock/LogBlock.java +++ b/src/de/diddiz/LogBlock/LogBlock.java @@ -1,10 +1,13 @@ package de.diddiz.LogBlock; import static de.diddiz.util.Utils.download; +import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; +import java.io.FileReader; import java.net.URL; import java.sql.Connection; +import java.sql.Statement; import java.util.HashMap; import java.util.Map; import java.util.Timer; @@ -59,7 +62,7 @@ public class LogBlock extends JavaPlugin updater = new Updater(this); log.info("[LogBlock] Version check: " + updater.checkVersion()); config = new Config(this); - final File file = new File("lib/mysql-connector-java-bin.jar"); + File file = new File("lib/mysql-connector-java-bin.jar"); if (!file.exists() || file.length() == 0) download(log, new URL("http://diddiz.insane-architects.net/download/mysql-connector-java-bin.jar"), file); if (!file.exists() || file.length() == 0) @@ -70,6 +73,28 @@ public class LogBlock extends JavaPlugin if (updater.update()) config = new Config(this); updater.checkTables(); + file = new File("plugins/LogBlock/consumer/queue.sql"); + if (file.exists()) { + log.info("[LogBlock] Found stored queue. Try to import now."); + Connection conn = null; + try { + conn = getConnection(); + conn.setAutoCommit(false); + final Statement state = conn.createStatement(); + final BufferedReader in = new BufferedReader(new FileReader(file)); + String insert = null; + while ((insert = in.readLine()) != null) + state.execute(insert); + conn.commit(); + file.delete(); + log.info("[LogBlock] Successfully imported stored queue."); + } catch (final Exception ex) { + log.log(Level.WARNING, "[LogBlock] Failed to import stored queue: ", ex); + } finally { + if (conn != null) + conn.close(); + } + } } catch (final Exception ex) { log.log(Level.SEVERE, "[LogBlock] Error while loading: ", ex); errorAtLoading = true; @@ -182,10 +207,24 @@ public class LogBlock extends JavaPlugin getServer().getScheduler().cancelTasks(this); if (consumer != null && consumer.getQueueSize() > 0) { log.info("[LogBlock] Waiting for consumer ..."); - final Thread thread = new Thread(consumer); + int lastSize = -1, fails = 0; while (consumer.getQueueSize() > 0) { log.info("[LogBlock] Remaining queue size: " + consumer.getQueueSize()); - thread.run(); + if (lastSize == consumer.getQueueSize()) + fails++; + else + fails = 0; + if (fails > 10) { + log.info("Unable to save queue to database. Trying to write to a local file."); + try { + consumer.writeToFile(); + } catch (final FileNotFoundException ex) { + log.info("Failed to write. Given up."); + break; + } + } + lastSize = consumer.getQueueSize(); + consumer.run(); } } if (pool != null)