Merge pull request #581 from frymaster/merge-rows

Batch execution code and consumer timing optimising - massive DB insertion speedup
This commit is contained in:
Philip Cass
2015-03-02 20:30:16 +00:00
3 changed files with 146 additions and 9 deletions

View File

@ -268,7 +268,7 @@ public class Consumer extends TimerTask
}
@Override
public void run() {
public synchronized void run() {
if (queue.isEmpty() || !lock.tryLock())
return;
final Connection conn = logblock.getConnection();
@ -302,6 +302,24 @@ public class Consumer extends TimerTask
}
if (r instanceof PreparedStatementRow) {
PreparedStatementRow PSRow = (PreparedStatementRow) r;
if (r instanceof MergeableRow) {
int batchCount=count;
// if we've reached our row target but not exceeded our time target, allow merging of up to 50% of our row limit more rows
if (count > forceToProcessAtLeast) batchCount = forceToProcessAtLeast / 2;
while(!queue.isEmpty()) {
MergeableRow mRow = (MergeableRow) PSRow;
Row s = queue.peek();
if (s == null) break;
if (!(s instanceof MergeableRow)) break;
MergeableRow mRow2 = (MergeableRow) s;
if (mRow.canMerge(mRow2)) {
PSRow = mRow.merge((MergeableRow) queue.poll());
count++;
batchCount++;
if (batchCount > forceToProcessAtLeast) break;
} else break;
}
}
PSRow.setConnection(conn);
try {
PSRow.executeStatements();
@ -446,10 +464,16 @@ public class Consumer extends TimerTask
abstract void setConnection(Connection connection);
abstract void executeStatements() throws SQLException;
}
private class BlockRow extends BlockChange implements PreparedStatementRow
private interface MergeableRow extends PreparedStatementRow
{
abstract boolean isUnique();
abstract boolean canMerge(MergeableRow row);
abstract MergeableRow merge(MergeableRow second);
}
private class BlockRow extends BlockChange implements MergeableRow
{
private Connection connection;
@ -554,6 +578,118 @@ public class Consumer extends TimerTask
}
}
}
@Override
public boolean isUnique() {
return !(signtext == null && ca == null && playerIds.containsKey(actor));
}
@Override
public boolean canMerge(MergeableRow row) {
return !this.isUnique() && !row.isUnique() && row instanceof BlockRow && getWorldConfig(loc.getWorld()).table.equals(getWorldConfig(((BlockRow) row).loc.getWorld()).table);
}
@Override
public MergeableRow merge(MergeableRow singleRow) {
return new MultiBlockChangeRow(this,(BlockRow) singleRow);
}
}
private class MultiBlockChangeRow implements MergeableRow{
private List<BlockRow> rows = new ArrayList<BlockRow>();
private Connection connection;
private Set<String> players = new HashSet<String>();
private Set<Actor> actors = new HashSet<Actor>();
private String table;
MultiBlockChangeRow (BlockRow first, BlockRow second) {
if (first.isUnique() || second.isUnique()) throw new IllegalArgumentException("Can't merge a unique row");
rows.add(first);
rows.add(second);
actors.addAll(Arrays.asList(first.getActors()));
actors.addAll(Arrays.asList(second.getActors()));
players.addAll(Arrays.asList(first.getPlayers()));
players.addAll(Arrays.asList(second.getPlayers()));
table = getWorldConfig(first.loc.getWorld()).table;
}
@Override
public void setConnection(Connection connection) {
this.connection = connection;
}
@Override
public void executeStatements() throws SQLException {
PreparedStatement ps = null;
try {
ps = connection.prepareStatement("INSERT INTO `" + table + "` (date, playerid, replaced, type, data, x, y, z) VALUES(FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?)");
for (BlockRow row : rows) {
ps.setLong(1, row.date );
ps.setInt(2, playerIds.get(row.actor));
ps.setInt(3, row.replaced);
ps.setInt(4, row.type);
ps.setInt(5, row.data);
ps.setInt(6, row.loc.getBlockX());
ps.setInt(7, row.loc.getBlockY());
ps.setInt(8, row.loc.getBlockZ());
ps.addBatch();
}
ps.executeBatch();
} catch (final SQLException ex) {
if (ps != null) {
getLogger().log(Level.SEVERE, "[Consumer] Troublesome query: " + ps.toString());
}
throw ex;
} finally {
// individual try/catch here, though ugly, prevents resource leaks
if( ps != null ) {
try {
ps.close();
}
catch(SQLException e) {
e.printStackTrace();
}
}
}
}
@Override
public boolean isUnique() {
return true;
}
@Override
public boolean canMerge(MergeableRow row) {
return !row.isUnique() && row instanceof BlockRow && table.equals(getWorldConfig(((BlockRow) row).loc.getWorld()).table);
}
@Override
public MergeableRow merge(MergeableRow second) {
if (second.isUnique()) throw new IllegalArgumentException("Can't merge a unique row");
rows.add((BlockRow) second);
actors.addAll(Arrays.asList(second.getActors()));
players.addAll(Arrays.asList(second.getPlayers()));
return this;
}
@Override
public String[] getInserts() {
List<String> l = new ArrayList<String>();
for (BlockRow row : rows) {
l.addAll(Arrays.asList(row.getInserts()));
}
return (String[]) l.toArray();
}
@Override
public String[] getPlayers() {
return (String[]) players.toArray();
}
@Override
public Actor[] getActors() {
return (Actor[]) actors.toArray();
}
}
private class KillRow implements Row

View File

@ -132,16 +132,16 @@ public class LogBlock extends JavaPlugin
getServer().getScheduler().runTaskAsynchronously(this, new DumpedLogImporter(this));
registerEvents();
if (useBukkitScheduler) {
if (getServer().getScheduler().runTaskTimerAsynchronously(this, consumer, delayBetweenRuns * 20, delayBetweenRuns * 20).getTaskId() > 0)
if (getServer().getScheduler().runTaskTimerAsynchronously(this, consumer, delayBetweenRuns < 20 ? 20 : delayBetweenRuns, delayBetweenRuns).getTaskId() > 0)
getLogger().info("Scheduled consumer with bukkit scheduler.");
else {
getLogger().warning("Failed to schedule consumer with bukkit scheduler. Now trying schedule with timer.");
timer = new Timer();
timer.scheduleAtFixedRate(consumer, delayBetweenRuns * 1000, delayBetweenRuns * 1000);
timer.schedule(consumer, delayBetweenRuns < 20 ? 1000 : delayBetweenRuns * 50, delayBetweenRuns * 50);
}
} else {
timer = new Timer();
timer.scheduleAtFixedRate(consumer, delayBetweenRuns * 1000, delayBetweenRuns * 1000);
timer.schedule(consumer, delayBetweenRuns < 20 ? 1000 : delayBetweenRuns * 50, delayBetweenRuns * 50);
getLogger().info("Scheduled consumer with timer.");
}
getServer().getScheduler().runTaskAsynchronously(this, new Updater.PlayerCountChecker(this));
@ -215,9 +215,10 @@ public class LogBlock extends JavaPlugin
if (logPlayerInfo && getServer().getOnlinePlayers() != null)
for (final Player player : getServer().getOnlinePlayers())
consumer.queueLeave(player);
getLogger().info("Waiting for consumer ...");
consumer.run();
if (consumer.getQueueSize() > 0) {
getLogger().info("Waiting for consumer ...");
int tries = 10;
int tries = 9;
while (consumer.getQueueSize() > 0) {
getLogger().info("Remaining queue size: " + consumer.getQueueSize());
if (tries > 0)

View File

@ -133,7 +133,7 @@ public class Config
if (!config.contains(e.getKey()))
config.set(e.getKey(), e.getValue());
logblock.saveConfig();
url = "jdbc:mysql://" + config.getString("mysql.host") + ":" + config.getInt("mysql.port") + "/" + getStringIncludingInts(config, "mysql.database") + "?useUnicode=true&characterEncoding=utf-8";
url = "jdbc:mysql://" + config.getString("mysql.host") + ":" + config.getInt("mysql.port") + "/" + getStringIncludingInts(config, "mysql.database") + "?useUnicode=true&characterEncoding=utf-8&rewriteBatchedStatements=true";
user = getStringIncludingInts(config, "mysql.user");
password = getStringIncludingInts(config, "mysql.password");
delayBetweenRuns = config.getInt("consumer.delayBetweenRuns", 2);