diff --git a/src/main/java/de/diddiz/LogBlock/Consumer.java b/src/main/java/de/diddiz/LogBlock/Consumer.java index a845ec4..3a7c9cb 100644 --- a/src/main/java/de/diddiz/LogBlock/Consumer.java +++ b/src/main/java/de/diddiz/LogBlock/Consumer.java @@ -268,7 +268,7 @@ public class Consumer extends TimerTask } @Override - public void run() { + public synchronized void run() { if (queue.isEmpty() || !lock.tryLock()) return; final Connection conn = logblock.getConnection(); @@ -302,6 +302,24 @@ public class Consumer extends TimerTask } if (r instanceof PreparedStatementRow) { PreparedStatementRow PSRow = (PreparedStatementRow) r; + if (r instanceof MergeableRow) { + int batchCount=count; + // if we've reached our row target but not exceeded our time target, allow merging of up to 50% of our row limit more rows + if (count > forceToProcessAtLeast) batchCount = forceToProcessAtLeast / 2; + while(!queue.isEmpty()) { + MergeableRow mRow = (MergeableRow) PSRow; + Row s = queue.peek(); + if (s == null) break; + if (!(s instanceof MergeableRow)) break; + MergeableRow mRow2 = (MergeableRow) s; + if (mRow.canMerge(mRow2)) { + PSRow = mRow.merge((MergeableRow) queue.poll()); + count++; + batchCount++; + if (batchCount > forceToProcessAtLeast) break; + } else break; + } + } PSRow.setConnection(conn); try { PSRow.executeStatements(); @@ -446,10 +464,16 @@ public class Consumer extends TimerTask abstract void setConnection(Connection connection); abstract void executeStatements() throws SQLException; - } - private class BlockRow extends BlockChange implements PreparedStatementRow + private interface MergeableRow extends PreparedStatementRow + { + abstract boolean isUnique(); + abstract boolean canMerge(MergeableRow row); + abstract MergeableRow merge(MergeableRow second); + } + + private class BlockRow extends BlockChange implements MergeableRow { private Connection connection; @@ -554,6 +578,118 @@ public class Consumer extends TimerTask } } } + + @Override + public boolean isUnique() { + return !(signtext == null && ca == null && playerIds.containsKey(actor)); + } + + @Override + public boolean canMerge(MergeableRow row) { + return !this.isUnique() && !row.isUnique() && row instanceof BlockRow && getWorldConfig(loc.getWorld()).table.equals(getWorldConfig(((BlockRow) row).loc.getWorld()).table); + } + + @Override + public MergeableRow merge(MergeableRow singleRow) { + return new MultiBlockChangeRow(this,(BlockRow) singleRow); + } + } + + private class MultiBlockChangeRow implements MergeableRow{ + private List rows = new ArrayList(); + private Connection connection; + private Set players = new HashSet(); + private Set actors = new HashSet(); + private String table; + + MultiBlockChangeRow (BlockRow first, BlockRow second) { + if (first.isUnique() || second.isUnique()) throw new IllegalArgumentException("Can't merge a unique row"); + rows.add(first); + rows.add(second); + actors.addAll(Arrays.asList(first.getActors())); + actors.addAll(Arrays.asList(second.getActors())); + players.addAll(Arrays.asList(first.getPlayers())); + players.addAll(Arrays.asList(second.getPlayers())); + table = getWorldConfig(first.loc.getWorld()).table; + } + + @Override + public void setConnection(Connection connection) { + this.connection = connection; + } + + @Override + public void executeStatements() throws SQLException { + PreparedStatement ps = null; + try { + ps = connection.prepareStatement("INSERT INTO `" + table + "` (date, playerid, replaced, type, data, x, y, z) VALUES(FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?)"); + for (BlockRow row : rows) { + ps.setLong(1, row.date ); + ps.setInt(2, playerIds.get(row.actor)); + ps.setInt(3, row.replaced); + ps.setInt(4, row.type); + ps.setInt(5, row.data); + ps.setInt(6, row.loc.getBlockX()); + ps.setInt(7, row.loc.getBlockY()); + ps.setInt(8, row.loc.getBlockZ()); + ps.addBatch(); + } + ps.executeBatch(); + } catch (final SQLException ex) { + if (ps != null) { + getLogger().log(Level.SEVERE, "[Consumer] Troublesome query: " + ps.toString()); + } + throw ex; + } finally { + // individual try/catch here, though ugly, prevents resource leaks + if( ps != null ) { + try { + ps.close(); + } + catch(SQLException e) { + e.printStackTrace(); + } + } + } + } + + @Override + public boolean isUnique() { + return true; + } + + @Override + public boolean canMerge(MergeableRow row) { + return !row.isUnique() && row instanceof BlockRow && table.equals(getWorldConfig(((BlockRow) row).loc.getWorld()).table); + } + + @Override + public MergeableRow merge(MergeableRow second) { + if (second.isUnique()) throw new IllegalArgumentException("Can't merge a unique row"); + rows.add((BlockRow) second); + actors.addAll(Arrays.asList(second.getActors())); + players.addAll(Arrays.asList(second.getPlayers())); + return this; + } + + @Override + public String[] getInserts() { + List l = new ArrayList(); + for (BlockRow row : rows) { + l.addAll(Arrays.asList(row.getInserts())); + } + return (String[]) l.toArray(); + } + + @Override + public String[] getPlayers() { + return (String[]) players.toArray(); + } + + @Override + public Actor[] getActors() { + return (Actor[]) actors.toArray(); + } } private class KillRow implements Row diff --git a/src/main/java/de/diddiz/LogBlock/LogBlock.java b/src/main/java/de/diddiz/LogBlock/LogBlock.java index 6b3d809..860701e 100644 --- a/src/main/java/de/diddiz/LogBlock/LogBlock.java +++ b/src/main/java/de/diddiz/LogBlock/LogBlock.java @@ -132,16 +132,16 @@ public class LogBlock extends JavaPlugin getServer().getScheduler().runTaskAsynchronously(this, new DumpedLogImporter(this)); registerEvents(); if (useBukkitScheduler) { - if (getServer().getScheduler().runTaskTimerAsynchronously(this, consumer, delayBetweenRuns * 20, delayBetweenRuns * 20).getTaskId() > 0) + if (getServer().getScheduler().runTaskTimerAsynchronously(this, consumer, delayBetweenRuns < 20 ? 20 : delayBetweenRuns, delayBetweenRuns).getTaskId() > 0) getLogger().info("Scheduled consumer with bukkit scheduler."); else { getLogger().warning("Failed to schedule consumer with bukkit scheduler. Now trying schedule with timer."); timer = new Timer(); - timer.scheduleAtFixedRate(consumer, delayBetweenRuns * 1000, delayBetweenRuns * 1000); + timer.schedule(consumer, delayBetweenRuns < 20 ? 1000 : delayBetweenRuns * 50, delayBetweenRuns * 50); } } else { timer = new Timer(); - timer.scheduleAtFixedRate(consumer, delayBetweenRuns * 1000, delayBetweenRuns * 1000); + timer.schedule(consumer, delayBetweenRuns < 20 ? 1000 : delayBetweenRuns * 50, delayBetweenRuns * 50); getLogger().info("Scheduled consumer with timer."); } getServer().getScheduler().runTaskAsynchronously(this, new Updater.PlayerCountChecker(this)); @@ -215,9 +215,10 @@ public class LogBlock extends JavaPlugin if (logPlayerInfo && getServer().getOnlinePlayers() != null) for (final Player player : getServer().getOnlinePlayers()) consumer.queueLeave(player); + getLogger().info("Waiting for consumer ..."); + consumer.run(); if (consumer.getQueueSize() > 0) { - getLogger().info("Waiting for consumer ..."); - int tries = 10; + int tries = 9; while (consumer.getQueueSize() > 0) { getLogger().info("Remaining queue size: " + consumer.getQueueSize()); if (tries > 0) diff --git a/src/main/java/de/diddiz/LogBlock/config/Config.java b/src/main/java/de/diddiz/LogBlock/config/Config.java index 88812e8..27bca7d 100644 --- a/src/main/java/de/diddiz/LogBlock/config/Config.java +++ b/src/main/java/de/diddiz/LogBlock/config/Config.java @@ -133,7 +133,7 @@ public class Config if (!config.contains(e.getKey())) config.set(e.getKey(), e.getValue()); logblock.saveConfig(); - url = "jdbc:mysql://" + config.getString("mysql.host") + ":" + config.getInt("mysql.port") + "/" + getStringIncludingInts(config, "mysql.database") + "?useUnicode=true&characterEncoding=utf-8"; + url = "jdbc:mysql://" + config.getString("mysql.host") + ":" + config.getInt("mysql.port") + "/" + getStringIncludingInts(config, "mysql.database") + "?useUnicode=true&characterEncoding=utf-8&rewriteBatchedStatements=true"; user = getStringIncludingInts(config, "mysql.user"); password = getStringIncludingInts(config, "mysql.password"); delayBetweenRuns = config.getInt("consumer.delayBetweenRuns", 2);