diff --git a/src/main/java/de/diddiz/LogBlock/CommandsHandler.java b/src/main/java/de/diddiz/LogBlock/CommandsHandler.java index e654076..ec8d8a8 100755 --- a/src/main/java/de/diddiz/LogBlock/CommandsHandler.java +++ b/src/main/java/de/diddiz/LogBlock/CommandsHandler.java @@ -562,20 +562,7 @@ public class CommandsHandler implements CommandExecutor { @Override public void run() { final Consumer consumer = logblock.getConsumer(); - if (consumer.getQueueSize() > 0) { - sender.sendMessage(ChatColor.DARK_AQUA + "Current queue size: " + consumer.getQueueSize()); - int lastSize = -1, fails = 0; - while (consumer.getQueueSize() > 0) { - fails = lastSize == consumer.getQueueSize() ? fails + 1 : 0; - if (fails > 10) { - sender.sendMessage(ChatColor.RED + "Unable to save queue"); - return; - } - lastSize = consumer.getQueueSize(); - consumer.run(); - } - sender.sendMessage(ChatColor.GREEN + "Queue saved successfully"); - } + sender.sendMessage(ChatColor.DARK_AQUA + "Current queue size: " + consumer.getQueueSize()); } } @@ -662,9 +649,6 @@ public class CommandsHandler implements CommandExecutor { if (!checkRestrictions(sender, params)) { return; } - if (logblock.getConsumer().getQueueSize() > 0) { - new CommandSaveQueue(sender, null, false); - } if (!params.silent) { sender.sendMessage(ChatColor.DARK_AQUA + "Searching " + params.getTitle() + ":"); } diff --git a/src/main/java/de/diddiz/LogBlock/Consumer.java b/src/main/java/de/diddiz/LogBlock/Consumer.java index 047e074..f1d793e 100644 --- a/src/main/java/de/diddiz/LogBlock/Consumer.java +++ b/src/main/java/de/diddiz/LogBlock/Consumer.java @@ -1,10 +1,32 @@ package de.diddiz.LogBlock; -import de.diddiz.LogBlock.blockstate.BlockStateCodecSign; -import de.diddiz.LogBlock.blockstate.BlockStateCodecs; -import de.diddiz.LogBlock.config.Config; -import de.diddiz.LogBlock.events.BlockChangePreLogEvent; -import de.diddiz.util.Utils; +import static de.diddiz.LogBlock.config.Config.getWorldConfig; +import static de.diddiz.LogBlock.config.Config.hiddenBlocks; +import static de.diddiz.LogBlock.config.Config.hiddenPlayers; +import static de.diddiz.LogBlock.config.Config.isLogged; +import static de.diddiz.LogBlock.config.Config.logPlayerInfo; +import static de.diddiz.util.BukkitUtils.compressInventory; +import static de.diddiz.util.BukkitUtils.itemIDfromProjectileEntity; +import static de.diddiz.util.Utils.mysqlTextEscape; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.logging.Level; import org.bukkit.Bukkit; import org.bukkit.Location; @@ -20,26 +42,22 @@ import org.bukkit.inventory.InventoryHolder; import org.bukkit.inventory.ItemStack; import org.bukkit.projectiles.ProjectileSource; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.PrintWriter; -import java.sql.*; -import java.util.*; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.logging.Level; +import de.diddiz.LogBlock.blockstate.BlockStateCodecSign; +import de.diddiz.LogBlock.blockstate.BlockStateCodecs; +import de.diddiz.LogBlock.config.Config; +import de.diddiz.LogBlock.events.BlockChangePreLogEvent; +import de.diddiz.util.Utils; -import static de.diddiz.LogBlock.config.Config.*; -import static de.diddiz.util.Utils.mysqlTextEscape; -import static de.diddiz.util.BukkitUtils.*; - -public class Consumer extends TimerTask { - private final Queue queue = new LinkedBlockingQueue(); +public class Consumer extends Thread { + private final Deque queue = new ArrayDeque(); private final Set failedPlayers = new HashSet(); private final LogBlock logblock; private final Map playerIds = new HashMap(); - private final Lock lock = new ReentrantLock(); + private long addEntryCounter; + private long nextWarnCounter; + + private boolean shutdown; + private long shutdownInitialized; Consumer(LogBlock logblock) { this.logblock = logblock; @@ -309,7 +327,7 @@ public class Consumer extends TimerTask { if (victim == null || !isLogged(location.getWorld())) { return; } - queue.add(new KillRow(location, killer == null ? null : killer, victim, weapon == null ? 0 : MaterialConverter.getOrAddMaterialId(weapon.getType().getKey().toString()))); + addQueueLast(new KillRow(location, killer == null ? null : killer, victim, weapon == null ? 0 : MaterialConverter.getOrAddMaterialId(weapon.getType().getKey().toString()))); } /** @@ -387,127 +405,152 @@ public class Consumer extends TimerTask { if (hiddenPlayers.contains(player.getName().toLowerCase())) { return; } - queue.add(new ChatRow(player, message)); + addQueueLast(new ChatRow(player, message)); } public void queueJoin(Player player) { - queue.add(new PlayerJoinRow(player)); + addQueueLast(new PlayerJoinRow(player)); } public void queueLeave(Player player, long onlineTime) { - queue.add(new PlayerLeaveRow(player, onlineTime)); + addQueueLast(new PlayerLeaveRow(player, onlineTime)); + } + + public void shutdown() { + synchronized (queue) { + shutdown = true; + shutdownInitialized = System.currentTimeMillis(); + queue.notifyAll(); + } + while (isAlive()) { + try { + join(); + } catch (InterruptedException e) { + // ignore + } + } } @Override - public synchronized void run() { - if (queue.isEmpty() || !lock.tryLock()) { - return; - } - long startTime = System.currentTimeMillis(); - // int startSize = queue.size(); - int count = 0; + public void run() { + ArrayList currentRows = new ArrayList<>(); Connection conn = null; - Statement state = null; - try { - - conn = logblock.getConnection(); - if (Config.queueWarningSize > 0 && queue.size() >= Config.queueWarningSize) { - logblock.getLogger().info("[Consumer] Queue overloaded. Size: " + getQueueSize()); - } - - if (conn == null) { - return; - } - conn.setAutoCommit(false); - state = conn.createStatement(); - final long start = System.currentTimeMillis(); - process: while (!queue.isEmpty() && (System.currentTimeMillis() - start < timePerRun || count < forceToProcessAtLeast)) { - final Row r = queue.poll(); - if (r == null) { - continue; - } - for (final Actor actor : r.getActors()) { - if (!playerIds.containsKey(actor)) { - if (!addPlayer(state, actor)) { - if (!failedPlayers.contains(actor)) { - failedPlayers.add(actor); - logblock.getLogger().warning("[Consumer] Failed to add player " + actor.getName()); - } - continue process; - } - } - } - if (r instanceof PreparedStatementRow) { - PreparedStatementRow PSRow = (PreparedStatementRow) r; - if (r instanceof MergeableRow) { - int batchCount = count; - // if we've reached our row target but not exceeded our time target, allow merging of up to 50% of our row limit more rows - if (count > forceToProcessAtLeast) { - batchCount = forceToProcessAtLeast / 2; - } - while (!queue.isEmpty()) { - MergeableRow mRow = (MergeableRow) PSRow; - Row s = queue.peek(); - if (s == null) { - break; - } - if (!(s instanceof MergeableRow)) { - break; - } - MergeableRow mRow2 = (MergeableRow) s; - if (mRow.canMerge(mRow2)) { - PSRow = mRow.merge((MergeableRow) queue.poll()); - count++; - batchCount++; - if (batchCount > forceToProcessAtLeast) { - break; - } - } else { - break; - } - } - } - PSRow.setConnection(conn); - try { - PSRow.executeStatements(); - } catch (final SQLException ex) { - logblock.getLogger().log(Level.SEVERE, "[Consumer] SQL exception on insertion: ", ex); - break; - } - } else { - for (final String insert : r.getInserts()) { - try { - state.execute(insert); - } catch (final SQLException ex) { - logblock.getLogger().log(Level.SEVERE, "[Consumer] SQL exception on " + insert + ": ", ex); - break process; - } - } - } - - count++; - } - conn.commit(); - } catch (final SQLException ex) { - logblock.getLogger().log(Level.SEVERE, "[Consumer] SQL exception", ex); - } finally { + BatchHelper batchHelper = new BatchHelper(); + while (true) { try { - if (state != null) { - state.close(); + if (conn == null) { + conn = logblock.getConnection(); + if (conn != null) { + // initialize connection + conn.setAutoCommit(false); + } else { + // we did not get a connection + boolean wantsShutdown; + synchronized (queue) { + wantsShutdown = shutdown; + } + if (wantsShutdown) { + // lets give up + break; + } + // wait for a connection + logblock.getLogger().severe("[Consumer] Could not connect to the database!"); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + // ignore + } + continue; + } } + Row r; + boolean processBatch = false; + synchronized (queue) { + if (shutdown) { + // Give this thread some time to process the remaining entries + if (queue.isEmpty() || System.currentTimeMillis() - shutdownInitialized > 20000) { + if (currentRows.isEmpty()) { + break; + } else { + processBatch = true; + } + } + } + r = queue.pollFirst(); + if (r == null) { + try { + if (currentRows.isEmpty() && !shutdown) { + queue.wait(); // nothing to do for us + } else { + processBatch = true; + } + } catch (InterruptedException e) { + // ignore + } + } + } + if (r != null) { + boolean failOnActors = false; + for (final Actor actor : r.getActors()) { + if (!playerIds.containsKey(actor)) { + if (!addPlayer(conn, actor)) { + if (failedPlayers.add(actor)) { + logblock.getLogger().warning("[Consumer] Failed to add player " + actor.getName()); + } + failOnActors = true; // skip this row + } + } + } + if (!failOnActors) { + currentRows.add(r); + } + r.process(conn, batchHelper); + } + if (currentRows.size() >= (processBatch ? 1 : (Config.forceToProcessAtLeast * 10))) { + batchHelper.processStatements(conn); + conn.commit(); + currentRows.clear(); + } + } catch (Exception e) { + logblock.getLogger().log(Level.SEVERE, "[Consumer] Could not insert entries!", e); + boolean retry = false; + if (e instanceof SQLException) { + // Retry on network errors: SQLSTATE = 08S01 08001 08004 HY000 40001 + String state = ((SQLException) e).getSQLState(); + retry = state != null && (state.equals("08S01") || state.equals("08001") || state.equals("08004") || state.equals("HY000") || state.equals("40001")); + } + if (retry) { + // readd rows to the queue + synchronized (queue) { + while (!currentRows.isEmpty()) { + queue.addFirst(currentRows.remove(currentRows.size() - 1)); + } + } + } + currentRows.clear(); + batchHelper.reset(); if (conn != null) { - conn.close(); + try { + conn.close(); + } catch (SQLException e1) { + // ignore + } } - } catch (final SQLException ex) { - logblock.getLogger().log(Level.SEVERE, "[Consumer] SQL exception on close", ex); + conn = null; } - lock.unlock(); + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException e1) { + // ignore + } + } - if (debug) { - long timeElapsed = System.currentTimeMillis() - startTime; - float rowPerTime = count / timeElapsed; - logblock.getLogger().log(Level.INFO, "[Consumer] Finished consumer cycle in " + timeElapsed + " milliseconds."); - logblock.getLogger().log(Level.INFO, "[Consumer] Total rows processed: " + count + ". row/time: " + String.format("%.4f", rowPerTime)); + // readd to queue - this can be saved later + synchronized (queue) { + while (!currentRows.isEmpty()) { + queue.addFirst(currentRows.remove(currentRows.size() - 1)); } } } @@ -518,8 +561,8 @@ public class Consumer extends TimerTask { int counter = 0; new File("plugins/LogBlock/import/").mkdirs(); PrintWriter writer = new PrintWriter(new File("plugins/LogBlock/import/queue-" + time + "-0.sql")); - while (!queue.isEmpty()) { - final Row r = queue.poll(); + while (!isQueueEmpty()) { + final Row r = pollQueueFirst(); if (r == null) { continue; } @@ -543,7 +586,36 @@ public class Consumer extends TimerTask { } int getQueueSize() { - return queue.size(); + synchronized (queue) { + return queue.size(); + } + } + + private boolean isQueueEmpty() { + synchronized (queue) { + return queue.isEmpty(); + } + } + + private void addQueueLast(Row row) { + synchronized (queue) { + boolean wasEmpty = queue.isEmpty(); + queue.addLast(row); + addEntryCounter++; + if (Config.queueWarningSize > 0 && queue.size() >= Config.queueWarningSize && addEntryCounter >= nextWarnCounter) { + logblock.getLogger().warning("[Consumer] Queue overloaded. Size: " + queue.size()); + nextWarnCounter = addEntryCounter + 1000; + } + if (wasEmpty) { + queue.notifyAll(); + } + } + } + + private Row pollQueueFirst() { + synchronized (queue) { + return queue.pollFirst(); + } } static void hide(Player player) { @@ -564,16 +636,18 @@ public class Consumer extends TimerTask { return true; } - private boolean addPlayer(Statement state, Actor actor) throws SQLException { + private boolean addPlayer(Connection conn, Actor actor) throws SQLException { // Odd query contruction is to work around innodb auto increment behaviour - bug #492 String name = actor.getName(); String uuid = actor.getUUID(); + Statement state = conn.createStatement(); state.execute("INSERT IGNORE INTO `lb-players` (playername,UUID) SELECT '" + mysqlTextEscape(name) + "','" + uuid + "' FROM `lb-players` WHERE NOT EXISTS (SELECT NULL FROM `lb-players` WHERE UUID = '" + uuid + "') LIMIT 1;"); final ResultSet rs = state.executeQuery("SELECT playerid FROM `lb-players` WHERE UUID = '" + uuid + "'"); if (rs.next()) { playerIds.put(actor, rs.getInt(1)); } rs.close(); + state.close(); return playerIds.containsKey(actor); } @@ -612,7 +686,7 @@ public class Consumer extends TimerTask { int typeMaterialId = MaterialConverter.getOrAddMaterialId(typeString); int typeStateId = MaterialConverter.getOrAddBlockStateId(typeString); - queue.add(new BlockRow(loc, actor, replacedMaterialId, replacedStateId, stateBefore, typeMaterialId, typeStateId, stateAfter, ca)); + addQueueLast(new BlockRow(loc, actor, replacedMaterialId, replacedStateId, stateBefore, typeMaterialId, typeStateId, stateAfter, ca)); } private String playerID(Actor actor) { @@ -636,34 +710,19 @@ public class Consumer extends TimerTask { private static interface Row { String[] getInserts(); - /** - * @deprecated - Names are not guaranteed to be unique. Use {@link #getActors() } - */ - String[] getPlayers(); + void process(Connection conn, BatchHelper batchHelper) throws SQLException; Actor[] getActors(); } - private interface PreparedStatementRow extends Row { - - abstract void setConnection(Connection connection); - - abstract void executeStatements() throws SQLException; - } - - private interface MergeableRow extends PreparedStatementRow { - abstract boolean isUnique(); - - abstract boolean canMerge(MergeableRow row); - - abstract MergeableRow merge(MergeableRow second); - } - - private class BlockRow extends BlockChange implements MergeableRow { - private Connection connection; + private class BlockRow extends BlockChange implements Row { + final String statementString; public BlockRow(Location loc, Actor actor, int replaced, int replacedData, byte[] replacedState, int type, int typeData, byte[] typeState, ChestAccess ca) { super(System.currentTimeMillis() / 1000, loc, actor, replaced, replacedData, replacedState, type, typeData, typeState, ca); + + final String table = getWorldConfig(loc.getWorld()).table; + statementString = "INSERT INTO `" + table + "-blocks` (date, playerid, replaced, replaceddata, type, typedata, x, y, z) VALUES (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?)"; } @Override @@ -674,210 +733,52 @@ public class Consumer extends TimerTask { inserts[0] = "INSERT INTO `" + table + "-blocks` (date, playerid, replaced, replaceddata, type, typedata, x, y, z) VALUES (FROM_UNIXTIME(" + date + "), " + playerID(actor) + ", " + replacedMaterial + ", " + replacedData + ", " + typeMaterial + ", " + typeData + ", '" + loc.getBlockX() + "', " + safeY(loc) + ", '" + loc.getBlockZ() + "');"; if (replacedState != null || typeState != null) { - inserts[1] = "INSERT INTO `" + table + "-state` (replacedState, typeState, id) VALUES('" + Utils.mysqlEscapeBytes(replacedState) + "', '" + Utils.mysqlEscapeBytes(typeState) + "', LAST_INSERT_ID());"; + inserts[1] = "INSERT INTO `" + table + "-state` (replacedState, typeState, id) VALUES(" + Utils.mysqlPrepareBytesForInsertAllowNull(replacedState) + ", " + Utils.mysqlPrepareBytesForInsertAllowNull(typeState) + ", LAST_INSERT_ID());"; } else if (ca != null) { inserts[1] = "INSERT INTO `" + table + "-chestdata` (id, item, itemremove, itemtype) values (LAST_INSERT_ID(), '" + Utils.mysqlEscapeBytes(Utils.saveItemStack(ca.itemStack)) + "', " + (ca.remove ? 1 : 0) + ", " + ca.itemType + ");"; } return inserts; } - @Override - public String[] getPlayers() { - return new String[] { actor.getName() }; - } - @Override public Actor[] getActors() { return new Actor[] { actor }; } @Override - public void setConnection(Connection connection) { - this.connection = connection; - } - - @Override - public void executeStatements() throws SQLException { - final String table = getWorldConfig(loc.getWorld()).table; - - PreparedStatement ps1 = null; - PreparedStatement ps = null; - try { - ps1 = connection.prepareStatement("INSERT INTO `" + table + "-blocks` (date, playerid, replaced, replacedData, type, typeData, x, y, z) VALUES(FROM_UNIXTIME(?), " + playerID(actor) + ", ?, ?, ?, ?, ?, ?, ?)", Statement.RETURN_GENERATED_KEYS); - ps1.setLong(1, date); - ps1.setInt(2, replacedMaterial); - ps1.setInt(3, replacedData); - ps1.setInt(4, typeMaterial); - ps1.setInt(5, typeData); - ps1.setInt(6, loc.getBlockX()); - ps1.setInt(7, safeY(loc)); - ps1.setInt(8, loc.getBlockZ()); - ps1.executeUpdate(); - - int id; - ResultSet rs = ps1.getGeneratedKeys(); - rs.next(); - id = rs.getInt(1); - - if (typeState != null || replacedState != null) { - ps = connection.prepareStatement("INSERT INTO `" + table + "-state` (replacedState, typeState, id) VALUES(?, ?, ?)"); - ps.setBytes(1, replacedState); - ps.setBytes(2, typeState); - ps.setInt(3, id); - ps.executeUpdate(); - } else if (ca != null) { - ps = connection.prepareStatement("INSERT INTO `" + table + "-chestdata` (item, itemremove, id, itemtype) values (?, ?, ?, ?)"); - ps.setBytes(1, Utils.saveItemStack(ca.itemStack)); - ps.setInt(2, ca.remove ? 1 : 0); - ps.setInt(3, id); - ps.setInt(4, ca.itemType); - ps.executeUpdate(); - } - } catch (final SQLException ex) { - if (ps1 != null) { - logblock.getLogger().log(Level.SEVERE, "[Consumer] Troublesome query: " + ps1.toString()); - } - if (ps != null) { - logblock.getLogger().log(Level.SEVERE, "[Consumer] Troublesome query: " + ps.toString()); - } - throw ex; - } finally { - // individual try/catch here, though ugly, prevents resource leaks - if (ps1 != null) { - try { - ps1.close(); - } catch (SQLException e) { - // ideally should log to logger, none is available in this class - // at the time of this writing, so I'll leave that to the plugin - // maintainers to integrate if they wish - e.printStackTrace(); + public void process(Connection conn, BatchHelper batchHelper) throws SQLException { + PreparedStatement smt = batchHelper.getOrPrepareStatement(conn, statementString, Statement.RETURN_GENERATED_KEYS); + smt.setLong(1, date); + smt.setInt(2, playerIDAsInt(actor)); + smt.setInt(3, replacedMaterial); + smt.setInt(4, replacedData); + smt.setInt(5, typeMaterial); + smt.setInt(6, typeData); + smt.setInt(7, loc.getBlockX()); + smt.setInt(8, safeY(loc)); + smt.setInt(9, loc.getBlockZ()); + batchHelper.addBatch(smt, new IntCallback() { + @Override + public void call(int id) throws SQLException { + final String table = getWorldConfig(loc.getWorld()).table; + PreparedStatement ps; + if (typeState != null || replacedState != null) { + ps = batchHelper.getOrPrepareStatement(conn, "INSERT INTO `" + table + "-state` (replacedState, typeState, id) VALUES(?, ?, ?)", Statement.NO_GENERATED_KEYS); + ps.setBytes(1, replacedState); + ps.setBytes(2, typeState); + ps.setInt(3, id); + batchHelper.addBatch(ps, null); + } + if (ca != null) { + ps = batchHelper.getOrPrepareStatement(conn, "INSERT INTO `" + table + "-chestdata` (item, itemremove, id, itemtype) values (?, ?, ?, ?)", Statement.NO_GENERATED_KEYS); + ps.setBytes(1, Utils.saveItemStack(ca.itemStack)); + ps.setInt(2, ca.remove ? 1 : 0); + ps.setInt(3, id); + ps.setInt(4, ca.itemType); + batchHelper.addBatch(ps, null); } } - - if (ps != null) { - try { - ps.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - } - } - } - - @Override - public boolean isUnique() { - return !(typeState == null && replacedState == null && ca == null && playerIds.containsKey(actor)); - } - - @Override - public boolean canMerge(MergeableRow row) { - return !this.isUnique() && !row.isUnique() && row instanceof BlockRow && getWorldConfig(loc.getWorld()).table.equals(getWorldConfig(((BlockRow) row).loc.getWorld()).table); - } - - @Override - public MergeableRow merge(MergeableRow singleRow) { - return new MultiBlockChangeRow(this, (BlockRow) singleRow); - } - } - - private class MultiBlockChangeRow implements MergeableRow { - private List rows = new ArrayList(); - private Connection connection; - private Set players = new HashSet(); - private Set actors = new HashSet(); - private String table; - - MultiBlockChangeRow(BlockRow first, BlockRow second) { - if (first.isUnique() || second.isUnique()) { - throw new IllegalArgumentException("Can't merge a unique row"); - } - rows.add(first); - rows.add(second); - actors.addAll(Arrays.asList(first.getActors())); - actors.addAll(Arrays.asList(second.getActors())); - players.addAll(Arrays.asList(first.getPlayers())); - players.addAll(Arrays.asList(second.getPlayers())); - table = getWorldConfig(first.loc.getWorld()).table; - } - - @Override - public void setConnection(Connection connection) { - this.connection = connection; - } - - @Override - public void executeStatements() throws SQLException { - PreparedStatement ps = null; - try { - ps = connection.prepareStatement("INSERT INTO `" + table + "-blocks` (date, playerid, replaced, replacedData, type, typeData, x, y, z) VALUES(FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?)"); - for (BlockRow row : rows) { - ps.setLong(1, row.date); - ps.setInt(2, playerIds.get(row.actor)); - ps.setInt(3, row.replacedMaterial); - ps.setInt(4, row.replacedData); - ps.setInt(5, row.typeMaterial); - ps.setInt(6, row.typeData); - ps.setInt(7, row.loc.getBlockX()); - ps.setInt(8, safeY(row.loc)); - ps.setInt(9, row.loc.getBlockZ()); - ps.addBatch(); - } - ps.executeBatch(); - } catch (final SQLException ex) { - if (ps != null) { - logblock.getLogger().log(Level.SEVERE, "[Consumer] Troublesome query: " + ps.toString()); - } - throw ex; - } finally { - // individual try/catch here, though ugly, prevents resource leaks - if (ps != null) { - try { - ps.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - } - } - } - - @Override - public boolean isUnique() { - return true; - } - - @Override - public boolean canMerge(MergeableRow row) { - return !row.isUnique() && row instanceof BlockRow && table.equals(getWorldConfig(((BlockRow) row).loc.getWorld()).table); - } - - @Override - public MergeableRow merge(MergeableRow second) { - if (second.isUnique()) { - throw new IllegalArgumentException("Can't merge a unique row"); - } - rows.add((BlockRow) second); - actors.addAll(Arrays.asList(second.getActors())); - players.addAll(Arrays.asList(second.getPlayers())); - return this; - } - - @Override - public String[] getInserts() { - List l = new ArrayList(); - for (BlockRow row : rows) { - l.addAll(Arrays.asList(row.getInserts())); - } - return (String[]) l.toArray(); - } - - @Override - public String[] getPlayers() { - return (String[]) players.toArray(); - } - - @Override - public Actor[] getActors() { - return (Actor[]) actors.toArray(); + }); } } @@ -886,6 +787,7 @@ public class Consumer extends TimerTask { final Actor killer, victim; final int weapon; final Location loc; + final String statementString; KillRow(Location loc, Actor attacker, Actor defender, int weapon) { date = System.currentTimeMillis() / 1000; @@ -893,6 +795,8 @@ public class Consumer extends TimerTask { killer = attacker; victim = defender; this.weapon = weapon; + + statementString = "INSERT INTO `" + getWorldConfig(loc.getWorld()).table + "-kills` (date, killer, victim, weapon, x, y, z) VALUES (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?)"; } @Override @@ -901,22 +805,32 @@ public class Consumer extends TimerTask { + loc.getBlockZ() + ");" }; } - @Override - public String[] getPlayers() { - return new String[] { killer.getName(), victim.getName() }; - } - @Override public Actor[] getActors() { return new Actor[] { killer, victim }; } + + @Override + public void process(Connection conn, BatchHelper batchHelper) throws SQLException { + PreparedStatement smt = batchHelper.getOrPrepareStatement(conn, statementString, Statement.NO_GENERATED_KEYS); + smt.setLong(1, date); + smt.setInt(2, playerIDAsInt(killer)); + smt.setInt(3, playerIDAsInt(victim)); + smt.setInt(4, weapon); + smt.setInt(5, loc.getBlockX()); + smt.setInt(6, safeY(loc)); + smt.setInt(7, loc.getBlockZ()); + batchHelper.addBatch(smt, null); + } } - private class ChatRow extends ChatMessage implements PreparedStatementRow { - private Connection connection; + private class ChatRow extends ChatMessage implements Row { + private String statementString; ChatRow(Actor player, String message) { super(player, message); + + statementString = "INSERT INTO `lb-chat` (date, playerid, message) VALUES (FROM_UNIXTIME(?), ?, ?)"; } @Override @@ -924,58 +838,18 @@ public class Consumer extends TimerTask { return new String[] { "INSERT INTO `lb-chat` (date, playerid, message) VALUES (FROM_UNIXTIME(" + date + "), " + playerID(player) + ", '" + mysqlTextEscape(message) + "');" }; } - @Override - public String[] getPlayers() { - return new String[] { player.getName() }; - } - @Override public Actor[] getActors() { return new Actor[] { player }; } @Override - public void setConnection(Connection connection) { - this.connection = connection; - } - - @Override - public void executeStatements() throws SQLException { - boolean noID = false; - Integer id; - - String sql = "INSERT INTO `lb-chat` (date, playerid, message) VALUES (FROM_UNIXTIME(?), "; - if ((id = playerIDAsInt(player)) == null) { - noID = true; - sql += playerID(player) + ", "; - } else { - sql += "?, "; - } - sql += "?)"; - - PreparedStatement ps = null; - try { - ps = connection.prepareStatement(sql); - ps.setLong(1, date); - if (!noID) { - ps.setInt(2, id); - ps.setString(3, message); - } else { - ps.setString(2, message); - } - ps.execute(); - } - // we intentionally do not catch SQLException, it is thrown to the caller - finally { - if (ps != null) { - try { - ps.close(); - } catch (SQLException e) { - // should print to a Logger instead if one is ever added to this class - e.printStackTrace(); - } - } - } + public void process(Connection conn, BatchHelper batchHelper) throws SQLException { + PreparedStatement smt = batchHelper.getOrPrepareStatement(conn, statementString, Statement.NO_GENERATED_KEYS); + smt.setLong(1, date); + smt.setInt(2, playerIDAsInt(player)); + smt.setString(3, message); + batchHelper.addBatch(smt, null); } } @@ -983,11 +857,18 @@ public class Consumer extends TimerTask { private final Actor player; private final long lastLogin; private final String ip; + private String statementString; PlayerJoinRow(Player player) { this.player = Actor.actorFromEntity(player); lastLogin = System.currentTimeMillis() / 1000; ip = player.getAddress().toString().replace("'", "\\'"); + + if (logPlayerInfo) { + statementString = "UPDATE `lb-players` SET lastlogin = FROM_UNIXTIME(?), firstlogin = IF(firstlogin = 0, FROM_UNIXTIME(?), firstlogin), ip = ?, playername = ? WHERE UUID = ?"; + } else { + statementString = "UPDATE `lb-players` SET playername = ? WHERE UUID = ?"; + } } @Override @@ -1000,23 +881,36 @@ public class Consumer extends TimerTask { } @Override - public String[] getPlayers() { - return new String[] { player.getName() }; + public Actor[] getActors() { + return new Actor[] { player }; } @Override - public Actor[] getActors() { - return new Actor[] { player }; + public void process(Connection conn, BatchHelper batchHelper) throws SQLException { + PreparedStatement smt = batchHelper.getOrPrepareStatement(conn, statementString, Statement.NO_GENERATED_KEYS); + if (logPlayerInfo) { + smt.setLong(1, lastLogin); + smt.setLong(2, lastLogin); + smt.setString(3, ip); + smt.setString(4, player.getName()); + smt.setString(5, player.getUUID()); + } else { + smt.setString(1, player.getName()); + smt.setString(2, player.getUUID()); + } + batchHelper.addBatch(smt, null); } } private class PlayerLeaveRow implements Row { private final long onlineTime; private final Actor actor; + private String statementString; PlayerLeaveRow(Player player, long onlineTime) { this.onlineTime = onlineTime; actor = Actor.actorFromEntity(player); + statementString = "UPDATE `lb-players` SET onlinetime = onlinetime + ? WHERE lastlogin > 0 && UUID = ?"; } @Override @@ -1028,13 +922,16 @@ public class Consumer extends TimerTask { } @Override - public String[] getPlayers() { - return new String[] { actor.getName() }; + public Actor[] getActors() { + return new Actor[] { actor }; } @Override - public Actor[] getActors() { - return new Actor[] { actor }; + public void process(Connection conn, BatchHelper batchHelper) throws SQLException { + PreparedStatement smt = batchHelper.getOrPrepareStatement(conn, statementString, Statement.NO_GENERATED_KEYS); + smt.setLong(1, onlineTime); + smt.setString(2, actor.getUUID()); + batchHelper.addBatch(smt, null); } } @@ -1046,4 +943,67 @@ public class Consumer extends TimerTask { safeY = 65535; return safeY; } + + private class BatchHelper { + private HashMap preparedStatements = new HashMap<>(); + private HashSet preparedStatementsWithGeneratedKeys = new HashSet<>(); + private LinkedHashMap> generatedKeyHandler = new LinkedHashMap<>(); + + public void reset() { + preparedStatements.clear(); + preparedStatementsWithGeneratedKeys.clear(); + generatedKeyHandler.clear(); + } + + public void processStatements(Connection conn) throws SQLException { + while (!generatedKeyHandler.isEmpty()) { + Entry> entry = generatedKeyHandler.entrySet().iterator().next(); + PreparedStatement smt = entry.getKey(); + ArrayList callbackList = entry.getValue(); + generatedKeyHandler.remove(smt); + smt.executeBatch(); + if (preparedStatementsWithGeneratedKeys.contains(smt)) { + ResultSet keys = smt.getGeneratedKeys(); + int[] results = new int[callbackList.size()]; + int pos = 0; + while (keys.next() && pos < results.length) { + results[pos++] = keys.getInt(1); + } + keys.close(); + for (int i = 0; i < results.length; i++) { + IntCallback callback = callbackList.get(i); + if (callback != null) { + callback.call(results[i]); + } + } + } + } + } + + public PreparedStatement getOrPrepareStatement(Connection conn, String sql, int autoGeneratedKeys) throws SQLException { + PreparedStatement smt = preparedStatements.get(sql); + if (smt == null) { + smt = conn.prepareStatement(sql, autoGeneratedKeys); + preparedStatements.put(sql, smt); + if (autoGeneratedKeys == Statement.RETURN_GENERATED_KEYS) { + preparedStatementsWithGeneratedKeys.add(smt); + } + } + return smt; + } + + public void addBatch(PreparedStatement smt, IntCallback generatedKeysCallback) throws SQLException { + smt.addBatch(); + ArrayList callbackList = generatedKeyHandler.get(smt); + if (callbackList == null) { + callbackList = new ArrayList<>(); + generatedKeyHandler.put(smt, callbackList); + } + callbackList.add(generatedKeysCallback); + } + } + + protected interface IntCallback { + public void call(int value) throws SQLException; + } } diff --git a/src/main/java/de/diddiz/LogBlock/LogBlock.java b/src/main/java/de/diddiz/LogBlock/LogBlock.java index 1585c78..ae95ba7 100644 --- a/src/main/java/de/diddiz/LogBlock/LogBlock.java +++ b/src/main/java/de/diddiz/LogBlock/LogBlock.java @@ -117,21 +117,9 @@ public class LogBlock extends JavaPlugin { if (enableAutoClearLog && autoClearLogDelay > 0) { getServer().getScheduler().runTaskTimerAsynchronously(this, new AutoClearLog(this), 6000, autoClearLogDelay * 60 * 20); } - getServer().getScheduler().runTaskAsynchronously(this, new DumpedLogImporter(this)); + new DumpedLogImporter(this).run(); registerEvents(); - if (useBukkitScheduler) { - if (getServer().getScheduler().runTaskTimerAsynchronously(this, consumer, delayBetweenRuns < 20 ? 20 : delayBetweenRuns, delayBetweenRuns).getTaskId() > 0) { - getLogger().info("Scheduled consumer with bukkit scheduler."); - } else { - getLogger().warning("Failed to schedule consumer with bukkit scheduler. Now trying schedule with timer."); - timer = new Timer(); - timer.schedule(consumer, delayBetweenRuns < 20 ? 1000 : delayBetweenRuns * 50, delayBetweenRuns * 50); - } - } else { - timer = new Timer(); - timer.schedule(consumer, delayBetweenRuns < 20 ? 1000 : delayBetweenRuns * 50, delayBetweenRuns * 50); - getLogger().info("Scheduled consumer with timer."); - } + consumer.start(); getServer().getScheduler().runTaskAsynchronously(this, new Updater.PlayerCountChecker(this)); for (final Tool tool : toolsByType.values()) { if (pm.getPermission("logblock.tools." + tool.name) == null) { @@ -227,25 +215,14 @@ public class LogBlock extends JavaPlugin { } } getLogger().info("Waiting for consumer ..."); - consumer.run(); + consumer.shutdown(); if (consumer.getQueueSize() > 0) { - int tries = 9; - while (consumer.getQueueSize() > 0) { - getLogger().info("Remaining queue size: " + consumer.getQueueSize()); - if (tries > 0) { - getLogger().info("Remaining tries: " + tries); - } else { - getLogger().info("Unable to save queue to database. Trying to write to a local file."); - try { - consumer.writeToFile(); - getLogger().info("Successfully dumped queue."); - } catch (final FileNotFoundException ex) { - getLogger().info("Failed to write. Given up."); - break; - } - } - consumer.run(); - tries--; + getLogger().info("Remaining queue size: " + consumer.getQueueSize() + ". Trying to write to a local file."); + try { + consumer.writeToFile(); + getLogger().info("Successfully dumped queue."); + } catch (final FileNotFoundException ex) { + getLogger().info("Failed to write. Given up."); } } } diff --git a/src/main/java/de/diddiz/util/Utils.java b/src/main/java/de/diddiz/util/Utils.java index 6b7b19f..beb1305 100644 --- a/src/main/java/de/diddiz/util/Utils.java +++ b/src/main/java/de/diddiz/util/Utils.java @@ -220,6 +220,13 @@ public class Utils { return new String(hexChars); } + public static String mysqlPrepareBytesForInsertAllowNull(byte[] bytes) { + if (bytes == null) { + return "null"; + } + return "'" + mysqlEscapeBytes(bytes) + "'"; + } + public static String mysqlTextEscape(String untrusted) { return untrusted.replace("\\", "\\\\").replace("'", "\\'"); }