From 6a38708a3218745042703d36a8f765ae5fb1f238 Mon Sep 17 00:00:00 2001 From: Philip Cass Date: Sun, 22 Feb 2015 10:45:37 +0000 Subject: [PATCH] Merge identically-formed db INSERT rows into a single batch statement This increases insert speed by up to 6 times when the DB server is on the same box as the minecraft server, and many HUNDREDS of times when it is accessed via a network connection. This creates a new kind of BlockRow which is a collection of many similar rows, each of which uses the same PreparedStatement. As such, when asked to run, it can use the executeBatch() method. It also adds code to BlockRow to see if they are mergeable (it won't if they have sign or chest actions associated) and to merge two rows into one of these new objects. The consumer processing loop is altered to check for merges, and do so if possible. Also, prevent consumer race condition on shutdown On shutdown, LogBlock invokes the consumer's run() method up to twn times on the main thread. However, the consumer may still be running from the last scheduled task, and this could result in two threads running the run() code simultaneously, resulting in inconsistent row insertion order. Another scenario is that the consumer has just started processing the last row in the queue. With the queue empty, the server will terminate, but the consumer could still not have fully executed the databse query. To solve this, the run() method is syncronized, so it can only be run on the object by one thread at a time, and is made to run at least once (to force LB to wait on any already processing run) Also, the pause between consumer runs was altered to be in ticks (50ms) rather than in seconds, to prevent needless pauses between runs. Fixes #580 Fixes #56 --- .../java/de/diddiz/LogBlock/Consumer.java | 142 +++++++++++++++++- .../java/de/diddiz/LogBlock/LogBlock.java | 11 +- .../de/diddiz/LogBlock/config/Config.java | 2 +- 3 files changed, 146 insertions(+), 9 deletions(-) diff --git a/src/main/java/de/diddiz/LogBlock/Consumer.java b/src/main/java/de/diddiz/LogBlock/Consumer.java index a845ec4..3a7c9cb 100644 --- a/src/main/java/de/diddiz/LogBlock/Consumer.java +++ b/src/main/java/de/diddiz/LogBlock/Consumer.java @@ -268,7 +268,7 @@ public class Consumer extends TimerTask } @Override - public void run() { + public synchronized void run() { if (queue.isEmpty() || !lock.tryLock()) return; final Connection conn = logblock.getConnection(); @@ -302,6 +302,24 @@ public class Consumer extends TimerTask } if (r instanceof PreparedStatementRow) { PreparedStatementRow PSRow = (PreparedStatementRow) r; + if (r instanceof MergeableRow) { + int batchCount=count; + // if we've reached our row target but not exceeded our time target, allow merging of up to 50% of our row limit more rows + if (count > forceToProcessAtLeast) batchCount = forceToProcessAtLeast / 2; + while(!queue.isEmpty()) { + MergeableRow mRow = (MergeableRow) PSRow; + Row s = queue.peek(); + if (s == null) break; + if (!(s instanceof MergeableRow)) break; + MergeableRow mRow2 = (MergeableRow) s; + if (mRow.canMerge(mRow2)) { + PSRow = mRow.merge((MergeableRow) queue.poll()); + count++; + batchCount++; + if (batchCount > forceToProcessAtLeast) break; + } else break; + } + } PSRow.setConnection(conn); try { PSRow.executeStatements(); @@ -446,10 +464,16 @@ public class Consumer extends TimerTask abstract void setConnection(Connection connection); abstract void executeStatements() throws SQLException; - } - private class BlockRow extends BlockChange implements PreparedStatementRow + private interface MergeableRow extends PreparedStatementRow + { + abstract boolean isUnique(); + abstract boolean canMerge(MergeableRow row); + abstract MergeableRow merge(MergeableRow second); + } + + private class BlockRow extends BlockChange implements MergeableRow { private Connection connection; @@ -554,6 +578,118 @@ public class Consumer extends TimerTask } } } + + @Override + public boolean isUnique() { + return !(signtext == null && ca == null && playerIds.containsKey(actor)); + } + + @Override + public boolean canMerge(MergeableRow row) { + return !this.isUnique() && !row.isUnique() && row instanceof BlockRow && getWorldConfig(loc.getWorld()).table.equals(getWorldConfig(((BlockRow) row).loc.getWorld()).table); + } + + @Override + public MergeableRow merge(MergeableRow singleRow) { + return new MultiBlockChangeRow(this,(BlockRow) singleRow); + } + } + + private class MultiBlockChangeRow implements MergeableRow{ + private List rows = new ArrayList(); + private Connection connection; + private Set players = new HashSet(); + private Set actors = new HashSet(); + private String table; + + MultiBlockChangeRow (BlockRow first, BlockRow second) { + if (first.isUnique() || second.isUnique()) throw new IllegalArgumentException("Can't merge a unique row"); + rows.add(first); + rows.add(second); + actors.addAll(Arrays.asList(first.getActors())); + actors.addAll(Arrays.asList(second.getActors())); + players.addAll(Arrays.asList(first.getPlayers())); + players.addAll(Arrays.asList(second.getPlayers())); + table = getWorldConfig(first.loc.getWorld()).table; + } + + @Override + public void setConnection(Connection connection) { + this.connection = connection; + } + + @Override + public void executeStatements() throws SQLException { + PreparedStatement ps = null; + try { + ps = connection.prepareStatement("INSERT INTO `" + table + "` (date, playerid, replaced, type, data, x, y, z) VALUES(FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?)"); + for (BlockRow row : rows) { + ps.setLong(1, row.date ); + ps.setInt(2, playerIds.get(row.actor)); + ps.setInt(3, row.replaced); + ps.setInt(4, row.type); + ps.setInt(5, row.data); + ps.setInt(6, row.loc.getBlockX()); + ps.setInt(7, row.loc.getBlockY()); + ps.setInt(8, row.loc.getBlockZ()); + ps.addBatch(); + } + ps.executeBatch(); + } catch (final SQLException ex) { + if (ps != null) { + getLogger().log(Level.SEVERE, "[Consumer] Troublesome query: " + ps.toString()); + } + throw ex; + } finally { + // individual try/catch here, though ugly, prevents resource leaks + if( ps != null ) { + try { + ps.close(); + } + catch(SQLException e) { + e.printStackTrace(); + } + } + } + } + + @Override + public boolean isUnique() { + return true; + } + + @Override + public boolean canMerge(MergeableRow row) { + return !row.isUnique() && row instanceof BlockRow && table.equals(getWorldConfig(((BlockRow) row).loc.getWorld()).table); + } + + @Override + public MergeableRow merge(MergeableRow second) { + if (second.isUnique()) throw new IllegalArgumentException("Can't merge a unique row"); + rows.add((BlockRow) second); + actors.addAll(Arrays.asList(second.getActors())); + players.addAll(Arrays.asList(second.getPlayers())); + return this; + } + + @Override + public String[] getInserts() { + List l = new ArrayList(); + for (BlockRow row : rows) { + l.addAll(Arrays.asList(row.getInserts())); + } + return (String[]) l.toArray(); + } + + @Override + public String[] getPlayers() { + return (String[]) players.toArray(); + } + + @Override + public Actor[] getActors() { + return (Actor[]) actors.toArray(); + } } private class KillRow implements Row diff --git a/src/main/java/de/diddiz/LogBlock/LogBlock.java b/src/main/java/de/diddiz/LogBlock/LogBlock.java index 322eeb1..fd4ca97 100644 --- a/src/main/java/de/diddiz/LogBlock/LogBlock.java +++ b/src/main/java/de/diddiz/LogBlock/LogBlock.java @@ -126,16 +126,16 @@ public class LogBlock extends JavaPlugin getServer().getScheduler().runTaskAsynchronously(this, new DumpedLogImporter(this)); registerEvents(); if (useBukkitScheduler) { - if (getServer().getScheduler().runTaskTimerAsynchronously(this, consumer, delayBetweenRuns * 20, delayBetweenRuns * 20).getTaskId() > 0) + if (getServer().getScheduler().runTaskTimerAsynchronously(this, consumer, delayBetweenRuns < 20 ? 20 : delayBetweenRuns, delayBetweenRuns).getTaskId() > 0) getLogger().info("Scheduled consumer with bukkit scheduler."); else { getLogger().warning("Failed to schedule consumer with bukkit scheduler. Now trying schedule with timer."); timer = new Timer(); - timer.scheduleAtFixedRate(consumer, delayBetweenRuns * 1000, delayBetweenRuns * 1000); + timer.schedule(consumer, delayBetweenRuns < 20 ? 1000 : delayBetweenRuns * 50, delayBetweenRuns * 50); } } else { timer = new Timer(); - timer.scheduleAtFixedRate(consumer, delayBetweenRuns * 1000, delayBetweenRuns * 1000); + timer.schedule(consumer, delayBetweenRuns < 20 ? 1000 : delayBetweenRuns * 50, delayBetweenRuns * 50); getLogger().info("Scheduled consumer with timer."); } getServer().getScheduler().runTaskAsynchronously(this, new Updater.PlayerCountChecker(this)); @@ -209,9 +209,10 @@ public class LogBlock extends JavaPlugin if (logPlayerInfo && getServer().getOnlinePlayers() != null) for (final Player player : getServer().getOnlinePlayers()) consumer.queueLeave(player); + getLogger().info("Waiting for consumer ..."); + consumer.run(); if (consumer.getQueueSize() > 0) { - getLogger().info("Waiting for consumer ..."); - int tries = 10; + int tries = 9; while (consumer.getQueueSize() > 0) { getLogger().info("Remaining queue size: " + consumer.getQueueSize()); if (tries > 0) diff --git a/src/main/java/de/diddiz/LogBlock/config/Config.java b/src/main/java/de/diddiz/LogBlock/config/Config.java index 88812e8..27bca7d 100644 --- a/src/main/java/de/diddiz/LogBlock/config/Config.java +++ b/src/main/java/de/diddiz/LogBlock/config/Config.java @@ -133,7 +133,7 @@ public class Config if (!config.contains(e.getKey())) config.set(e.getKey(), e.getValue()); logblock.saveConfig(); - url = "jdbc:mysql://" + config.getString("mysql.host") + ":" + config.getInt("mysql.port") + "/" + getStringIncludingInts(config, "mysql.database") + "?useUnicode=true&characterEncoding=utf-8"; + url = "jdbc:mysql://" + config.getString("mysql.host") + ":" + config.getInt("mysql.port") + "/" + getStringIncludingInts(config, "mysql.database") + "?useUnicode=true&characterEncoding=utf-8&rewriteBatchedStatements=true"; user = getStringIncludingInts(config, "mysql.user"); password = getStringIncludingInts(config, "mysql.password"); delayBetweenRuns = config.getInt("consumer.delayBetweenRuns", 2);