Merge identically-formed db INSERT rows into a single batch statement

This increases insert speed by up to 6 times when the DB server is on the same
box as the minecraft server, and many HUNDREDS of times when it is accessed via
a network connection.

This creates a new kind of BlockRow which is a collection of many similar rows, each
of which uses the same PreparedStatement.  As such, when asked to run, it can
use the executeBatch() method.  It also adds code to BlockRow to see if they are mergeable
(it won't if they have sign or chest actions associated) and to merge two rows into one of
these new objects.  The consumer processing loop is altered to check for merges, and do so if
possible.

Also, prevent consumer race condition on shutdown

On shutdown, LogBlock invokes the consumer's run() method up to twn times on
the main thread.  However, the consumer may still be running from the last
scheduled task, and this could result in two threads running the run() code
simultaneously, resulting in inconsistent row insertion order.

Another scenario is that the consumer has just started processing the last
row in the queue.  With the queue empty, the server will terminate, but the
consumer could still not have fully executed the databse query.

To solve this, the run() method is syncronized, so it can only be run on the
object by one thread at a time, and is made to run at least once (to force LB
to wait on any already processing run)

Also, the pause between consumer runs was altered to be in ticks (50ms) rather than
in seconds, to prevent needless pauses between runs.

Fixes #580 Fixes #56
This commit is contained in:
Philip Cass
2015-02-22 10:45:37 +00:00
parent 82b4ffc2a2
commit 6a38708a32
3 changed files with 146 additions and 9 deletions

View File

@ -268,7 +268,7 @@ public class Consumer extends TimerTask
}
@Override
public void run() {
public synchronized void run() {
if (queue.isEmpty() || !lock.tryLock())
return;
final Connection conn = logblock.getConnection();
@ -302,6 +302,24 @@ public class Consumer extends TimerTask
}
if (r instanceof PreparedStatementRow) {
PreparedStatementRow PSRow = (PreparedStatementRow) r;
if (r instanceof MergeableRow) {
int batchCount=count;
// if we've reached our row target but not exceeded our time target, allow merging of up to 50% of our row limit more rows
if (count > forceToProcessAtLeast) batchCount = forceToProcessAtLeast / 2;
while(!queue.isEmpty()) {
MergeableRow mRow = (MergeableRow) PSRow;
Row s = queue.peek();
if (s == null) break;
if (!(s instanceof MergeableRow)) break;
MergeableRow mRow2 = (MergeableRow) s;
if (mRow.canMerge(mRow2)) {
PSRow = mRow.merge((MergeableRow) queue.poll());
count++;
batchCount++;
if (batchCount > forceToProcessAtLeast) break;
} else break;
}
}
PSRow.setConnection(conn);
try {
PSRow.executeStatements();
@ -446,10 +464,16 @@ public class Consumer extends TimerTask
abstract void setConnection(Connection connection);
abstract void executeStatements() throws SQLException;
}
private class BlockRow extends BlockChange implements PreparedStatementRow
private interface MergeableRow extends PreparedStatementRow
{
abstract boolean isUnique();
abstract boolean canMerge(MergeableRow row);
abstract MergeableRow merge(MergeableRow second);
}
private class BlockRow extends BlockChange implements MergeableRow
{
private Connection connection;
@ -554,6 +578,118 @@ public class Consumer extends TimerTask
}
}
}
@Override
public boolean isUnique() {
return !(signtext == null && ca == null && playerIds.containsKey(actor));
}
@Override
public boolean canMerge(MergeableRow row) {
return !this.isUnique() && !row.isUnique() && row instanceof BlockRow && getWorldConfig(loc.getWorld()).table.equals(getWorldConfig(((BlockRow) row).loc.getWorld()).table);
}
@Override
public MergeableRow merge(MergeableRow singleRow) {
return new MultiBlockChangeRow(this,(BlockRow) singleRow);
}
}
private class MultiBlockChangeRow implements MergeableRow{
private List<BlockRow> rows = new ArrayList<BlockRow>();
private Connection connection;
private Set<String> players = new HashSet<String>();
private Set<Actor> actors = new HashSet<Actor>();
private String table;
MultiBlockChangeRow (BlockRow first, BlockRow second) {
if (first.isUnique() || second.isUnique()) throw new IllegalArgumentException("Can't merge a unique row");
rows.add(first);
rows.add(second);
actors.addAll(Arrays.asList(first.getActors()));
actors.addAll(Arrays.asList(second.getActors()));
players.addAll(Arrays.asList(first.getPlayers()));
players.addAll(Arrays.asList(second.getPlayers()));
table = getWorldConfig(first.loc.getWorld()).table;
}
@Override
public void setConnection(Connection connection) {
this.connection = connection;
}
@Override
public void executeStatements() throws SQLException {
PreparedStatement ps = null;
try {
ps = connection.prepareStatement("INSERT INTO `" + table + "` (date, playerid, replaced, type, data, x, y, z) VALUES(FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?)");
for (BlockRow row : rows) {
ps.setLong(1, row.date );
ps.setInt(2, playerIds.get(row.actor));
ps.setInt(3, row.replaced);
ps.setInt(4, row.type);
ps.setInt(5, row.data);
ps.setInt(6, row.loc.getBlockX());
ps.setInt(7, row.loc.getBlockY());
ps.setInt(8, row.loc.getBlockZ());
ps.addBatch();
}
ps.executeBatch();
} catch (final SQLException ex) {
if (ps != null) {
getLogger().log(Level.SEVERE, "[Consumer] Troublesome query: " + ps.toString());
}
throw ex;
} finally {
// individual try/catch here, though ugly, prevents resource leaks
if( ps != null ) {
try {
ps.close();
}
catch(SQLException e) {
e.printStackTrace();
}
}
}
}
@Override
public boolean isUnique() {
return true;
}
@Override
public boolean canMerge(MergeableRow row) {
return !row.isUnique() && row instanceof BlockRow && table.equals(getWorldConfig(((BlockRow) row).loc.getWorld()).table);
}
@Override
public MergeableRow merge(MergeableRow second) {
if (second.isUnique()) throw new IllegalArgumentException("Can't merge a unique row");
rows.add((BlockRow) second);
actors.addAll(Arrays.asList(second.getActors()));
players.addAll(Arrays.asList(second.getPlayers()));
return this;
}
@Override
public String[] getInserts() {
List<String> l = new ArrayList<String>();
for (BlockRow row : rows) {
l.addAll(Arrays.asList(row.getInserts()));
}
return (String[]) l.toArray();
}
@Override
public String[] getPlayers() {
return (String[]) players.toArray();
}
@Override
public Actor[] getActors() {
return (Actor[]) actors.toArray();
}
}
private class KillRow implements Row

View File

@ -126,16 +126,16 @@ public class LogBlock extends JavaPlugin
getServer().getScheduler().runTaskAsynchronously(this, new DumpedLogImporter(this));
registerEvents();
if (useBukkitScheduler) {
if (getServer().getScheduler().runTaskTimerAsynchronously(this, consumer, delayBetweenRuns * 20, delayBetweenRuns * 20).getTaskId() > 0)
if (getServer().getScheduler().runTaskTimerAsynchronously(this, consumer, delayBetweenRuns < 20 ? 20 : delayBetweenRuns, delayBetweenRuns).getTaskId() > 0)
getLogger().info("Scheduled consumer with bukkit scheduler.");
else {
getLogger().warning("Failed to schedule consumer with bukkit scheduler. Now trying schedule with timer.");
timer = new Timer();
timer.scheduleAtFixedRate(consumer, delayBetweenRuns * 1000, delayBetweenRuns * 1000);
timer.schedule(consumer, delayBetweenRuns < 20 ? 1000 : delayBetweenRuns * 50, delayBetweenRuns * 50);
}
} else {
timer = new Timer();
timer.scheduleAtFixedRate(consumer, delayBetweenRuns * 1000, delayBetweenRuns * 1000);
timer.schedule(consumer, delayBetweenRuns < 20 ? 1000 : delayBetweenRuns * 50, delayBetweenRuns * 50);
getLogger().info("Scheduled consumer with timer.");
}
getServer().getScheduler().runTaskAsynchronously(this, new Updater.PlayerCountChecker(this));
@ -209,9 +209,10 @@ public class LogBlock extends JavaPlugin
if (logPlayerInfo && getServer().getOnlinePlayers() != null)
for (final Player player : getServer().getOnlinePlayers())
consumer.queueLeave(player);
getLogger().info("Waiting for consumer ...");
consumer.run();
if (consumer.getQueueSize() > 0) {
getLogger().info("Waiting for consumer ...");
int tries = 10;
int tries = 9;
while (consumer.getQueueSize() > 0) {
getLogger().info("Remaining queue size: " + consumer.getQueueSize());
if (tries > 0)

View File

@ -133,7 +133,7 @@ public class Config
if (!config.contains(e.getKey()))
config.set(e.getKey(), e.getValue());
logblock.saveConfig();
url = "jdbc:mysql://" + config.getString("mysql.host") + ":" + config.getInt("mysql.port") + "/" + getStringIncludingInts(config, "mysql.database") + "?useUnicode=true&characterEncoding=utf-8";
url = "jdbc:mysql://" + config.getString("mysql.host") + ":" + config.getInt("mysql.port") + "/" + getStringIncludingInts(config, "mysql.database") + "?useUnicode=true&characterEncoding=utf-8&rewriteBatchedStatements=true";
user = getStringIncludingInts(config, "mysql.user");
password = getStringIncludingInts(config, "mysql.password");
delayBetweenRuns = config.getInt("consumer.delayBetweenRuns", 2);