Rewrite the Consumer to be more stable and much faster

- Use prepared statements and batching for all inserts
- There is now one continuously running Consumer thread
- Use proper synchronization
Fixes #712, Fixes #710

Its a large rewrite so there could still be bugs..
This commit is contained in:
Brokkonaut
2018-08-03 01:53:28 +02:00
parent a37dd9cff1
commit 60a049ed49
4 changed files with 381 additions and 453 deletions

View File

@ -562,20 +562,7 @@ public class CommandsHandler implements CommandExecutor {
@Override
public void run() {
final Consumer consumer = logblock.getConsumer();
if (consumer.getQueueSize() > 0) {
sender.sendMessage(ChatColor.DARK_AQUA + "Current queue size: " + consumer.getQueueSize());
int lastSize = -1, fails = 0;
while (consumer.getQueueSize() > 0) {
fails = lastSize == consumer.getQueueSize() ? fails + 1 : 0;
if (fails > 10) {
sender.sendMessage(ChatColor.RED + "Unable to save queue");
return;
}
lastSize = consumer.getQueueSize();
consumer.run();
}
sender.sendMessage(ChatColor.GREEN + "Queue saved successfully");
}
sender.sendMessage(ChatColor.DARK_AQUA + "Current queue size: " + consumer.getQueueSize());
}
}
@ -662,9 +649,6 @@ public class CommandsHandler implements CommandExecutor {
if (!checkRestrictions(sender, params)) {
return;
}
if (logblock.getConsumer().getQueueSize() > 0) {
new CommandSaveQueue(sender, null, false);
}
if (!params.silent) {
sender.sendMessage(ChatColor.DARK_AQUA + "Searching " + params.getTitle() + ":");
}

View File

@ -1,10 +1,32 @@
package de.diddiz.LogBlock;
import de.diddiz.LogBlock.blockstate.BlockStateCodecSign;
import de.diddiz.LogBlock.blockstate.BlockStateCodecs;
import de.diddiz.LogBlock.config.Config;
import de.diddiz.LogBlock.events.BlockChangePreLogEvent;
import de.diddiz.util.Utils;
import static de.diddiz.LogBlock.config.Config.getWorldConfig;
import static de.diddiz.LogBlock.config.Config.hiddenBlocks;
import static de.diddiz.LogBlock.config.Config.hiddenPlayers;
import static de.diddiz.LogBlock.config.Config.isLogged;
import static de.diddiz.LogBlock.config.Config.logPlayerInfo;
import static de.diddiz.util.BukkitUtils.compressInventory;
import static de.diddiz.util.BukkitUtils.itemIDfromProjectileEntity;
import static de.diddiz.util.Utils.mysqlTextEscape;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.logging.Level;
import org.bukkit.Bukkit;
import org.bukkit.Location;
@ -20,26 +42,22 @@ import org.bukkit.inventory.InventoryHolder;
import org.bukkit.inventory.ItemStack;
import org.bukkit.projectiles.ProjectileSource;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.sql.*;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import de.diddiz.LogBlock.blockstate.BlockStateCodecSign;
import de.diddiz.LogBlock.blockstate.BlockStateCodecs;
import de.diddiz.LogBlock.config.Config;
import de.diddiz.LogBlock.events.BlockChangePreLogEvent;
import de.diddiz.util.Utils;
import static de.diddiz.LogBlock.config.Config.*;
import static de.diddiz.util.Utils.mysqlTextEscape;
import static de.diddiz.util.BukkitUtils.*;
public class Consumer extends TimerTask {
private final Queue<Row> queue = new LinkedBlockingQueue<Row>();
public class Consumer extends Thread {
private final Deque<Row> queue = new ArrayDeque<Row>();
private final Set<Actor> failedPlayers = new HashSet<Actor>();
private final LogBlock logblock;
private final Map<Actor, Integer> playerIds = new HashMap<Actor, Integer>();
private final Lock lock = new ReentrantLock();
private long addEntryCounter;
private long nextWarnCounter;
private boolean shutdown;
private long shutdownInitialized;
Consumer(LogBlock logblock) {
this.logblock = logblock;
@ -309,7 +327,7 @@ public class Consumer extends TimerTask {
if (victim == null || !isLogged(location.getWorld())) {
return;
}
queue.add(new KillRow(location, killer == null ? null : killer, victim, weapon == null ? 0 : MaterialConverter.getOrAddMaterialId(weapon.getType().getKey().toString())));
addQueueLast(new KillRow(location, killer == null ? null : killer, victim, weapon == null ? 0 : MaterialConverter.getOrAddMaterialId(weapon.getType().getKey().toString())));
}
/**
@ -387,127 +405,152 @@ public class Consumer extends TimerTask {
if (hiddenPlayers.contains(player.getName().toLowerCase())) {
return;
}
queue.add(new ChatRow(player, message));
addQueueLast(new ChatRow(player, message));
}
public void queueJoin(Player player) {
queue.add(new PlayerJoinRow(player));
addQueueLast(new PlayerJoinRow(player));
}
public void queueLeave(Player player, long onlineTime) {
queue.add(new PlayerLeaveRow(player, onlineTime));
addQueueLast(new PlayerLeaveRow(player, onlineTime));
}
public void shutdown() {
synchronized (queue) {
shutdown = true;
shutdownInitialized = System.currentTimeMillis();
queue.notifyAll();
}
while (isAlive()) {
try {
join();
} catch (InterruptedException e) {
// ignore
}
}
}
@Override
public synchronized void run() {
if (queue.isEmpty() || !lock.tryLock()) {
return;
}
long startTime = System.currentTimeMillis();
// int startSize = queue.size();
int count = 0;
public void run() {
ArrayList<Row> currentRows = new ArrayList<>();
Connection conn = null;
Statement state = null;
try {
conn = logblock.getConnection();
if (Config.queueWarningSize > 0 && queue.size() >= Config.queueWarningSize) {
logblock.getLogger().info("[Consumer] Queue overloaded. Size: " + getQueueSize());
}
if (conn == null) {
return;
}
conn.setAutoCommit(false);
state = conn.createStatement();
final long start = System.currentTimeMillis();
process: while (!queue.isEmpty() && (System.currentTimeMillis() - start < timePerRun || count < forceToProcessAtLeast)) {
final Row r = queue.poll();
if (r == null) {
continue;
}
for (final Actor actor : r.getActors()) {
if (!playerIds.containsKey(actor)) {
if (!addPlayer(state, actor)) {
if (!failedPlayers.contains(actor)) {
failedPlayers.add(actor);
logblock.getLogger().warning("[Consumer] Failed to add player " + actor.getName());
}
continue process;
}
}
}
if (r instanceof PreparedStatementRow) {
PreparedStatementRow PSRow = (PreparedStatementRow) r;
if (r instanceof MergeableRow) {
int batchCount = count;
// if we've reached our row target but not exceeded our time target, allow merging of up to 50% of our row limit more rows
if (count > forceToProcessAtLeast) {
batchCount = forceToProcessAtLeast / 2;
}
while (!queue.isEmpty()) {
MergeableRow mRow = (MergeableRow) PSRow;
Row s = queue.peek();
if (s == null) {
break;
}
if (!(s instanceof MergeableRow)) {
break;
}
MergeableRow mRow2 = (MergeableRow) s;
if (mRow.canMerge(mRow2)) {
PSRow = mRow.merge((MergeableRow) queue.poll());
count++;
batchCount++;
if (batchCount > forceToProcessAtLeast) {
break;
}
} else {
break;
}
}
}
PSRow.setConnection(conn);
try {
PSRow.executeStatements();
} catch (final SQLException ex) {
logblock.getLogger().log(Level.SEVERE, "[Consumer] SQL exception on insertion: ", ex);
break;
}
} else {
for (final String insert : r.getInserts()) {
try {
state.execute(insert);
} catch (final SQLException ex) {
logblock.getLogger().log(Level.SEVERE, "[Consumer] SQL exception on " + insert + ": ", ex);
break process;
}
}
}
count++;
}
conn.commit();
} catch (final SQLException ex) {
logblock.getLogger().log(Level.SEVERE, "[Consumer] SQL exception", ex);
} finally {
BatchHelper batchHelper = new BatchHelper();
while (true) {
try {
if (state != null) {
state.close();
if (conn == null) {
conn = logblock.getConnection();
if (conn != null) {
// initialize connection
conn.setAutoCommit(false);
} else {
// we did not get a connection
boolean wantsShutdown;
synchronized (queue) {
wantsShutdown = shutdown;
}
if (wantsShutdown) {
// lets give up
break;
}
// wait for a connection
logblock.getLogger().severe("[Consumer] Could not connect to the database!");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// ignore
}
continue;
}
}
Row r;
boolean processBatch = false;
synchronized (queue) {
if (shutdown) {
// Give this thread some time to process the remaining entries
if (queue.isEmpty() || System.currentTimeMillis() - shutdownInitialized > 20000) {
if (currentRows.isEmpty()) {
break;
} else {
processBatch = true;
}
}
}
r = queue.pollFirst();
if (r == null) {
try {
if (currentRows.isEmpty() && !shutdown) {
queue.wait(); // nothing to do for us
} else {
processBatch = true;
}
} catch (InterruptedException e) {
// ignore
}
}
}
if (r != null) {
boolean failOnActors = false;
for (final Actor actor : r.getActors()) {
if (!playerIds.containsKey(actor)) {
if (!addPlayer(conn, actor)) {
if (failedPlayers.add(actor)) {
logblock.getLogger().warning("[Consumer] Failed to add player " + actor.getName());
}
failOnActors = true; // skip this row
}
}
}
if (!failOnActors) {
currentRows.add(r);
}
r.process(conn, batchHelper);
}
if (currentRows.size() >= (processBatch ? 1 : (Config.forceToProcessAtLeast * 10))) {
batchHelper.processStatements(conn);
conn.commit();
currentRows.clear();
}
} catch (Exception e) {
logblock.getLogger().log(Level.SEVERE, "[Consumer] Could not insert entries!", e);
boolean retry = false;
if (e instanceof SQLException) {
// Retry on network errors: SQLSTATE = 08S01 08001 08004 HY000 40001
String state = ((SQLException) e).getSQLState();
retry = state != null && (state.equals("08S01") || state.equals("08001") || state.equals("08004") || state.equals("HY000") || state.equals("40001"));
}
if (retry) {
// readd rows to the queue
synchronized (queue) {
while (!currentRows.isEmpty()) {
queue.addFirst(currentRows.remove(currentRows.size() - 1));
}
}
}
currentRows.clear();
batchHelper.reset();
if (conn != null) {
conn.close();
try {
conn.close();
} catch (SQLException e1) {
// ignore
}
}
} catch (final SQLException ex) {
logblock.getLogger().log(Level.SEVERE, "[Consumer] SQL exception on close", ex);
conn = null;
}
lock.unlock();
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e1) {
// ignore
}
}
if (debug) {
long timeElapsed = System.currentTimeMillis() - startTime;
float rowPerTime = count / timeElapsed;
logblock.getLogger().log(Level.INFO, "[Consumer] Finished consumer cycle in " + timeElapsed + " milliseconds.");
logblock.getLogger().log(Level.INFO, "[Consumer] Total rows processed: " + count + ". row/time: " + String.format("%.4f", rowPerTime));
// readd to queue - this can be saved later
synchronized (queue) {
while (!currentRows.isEmpty()) {
queue.addFirst(currentRows.remove(currentRows.size() - 1));
}
}
}
@ -518,8 +561,8 @@ public class Consumer extends TimerTask {
int counter = 0;
new File("plugins/LogBlock/import/").mkdirs();
PrintWriter writer = new PrintWriter(new File("plugins/LogBlock/import/queue-" + time + "-0.sql"));
while (!queue.isEmpty()) {
final Row r = queue.poll();
while (!isQueueEmpty()) {
final Row r = pollQueueFirst();
if (r == null) {
continue;
}
@ -543,7 +586,36 @@ public class Consumer extends TimerTask {
}
int getQueueSize() {
return queue.size();
synchronized (queue) {
return queue.size();
}
}
private boolean isQueueEmpty() {
synchronized (queue) {
return queue.isEmpty();
}
}
private void addQueueLast(Row row) {
synchronized (queue) {
boolean wasEmpty = queue.isEmpty();
queue.addLast(row);
addEntryCounter++;
if (Config.queueWarningSize > 0 && queue.size() >= Config.queueWarningSize && addEntryCounter >= nextWarnCounter) {
logblock.getLogger().warning("[Consumer] Queue overloaded. Size: " + queue.size());
nextWarnCounter = addEntryCounter + 1000;
}
if (wasEmpty) {
queue.notifyAll();
}
}
}
private Row pollQueueFirst() {
synchronized (queue) {
return queue.pollFirst();
}
}
static void hide(Player player) {
@ -564,16 +636,18 @@ public class Consumer extends TimerTask {
return true;
}
private boolean addPlayer(Statement state, Actor actor) throws SQLException {
private boolean addPlayer(Connection conn, Actor actor) throws SQLException {
// Odd query contruction is to work around innodb auto increment behaviour - bug #492
String name = actor.getName();
String uuid = actor.getUUID();
Statement state = conn.createStatement();
state.execute("INSERT IGNORE INTO `lb-players` (playername,UUID) SELECT '" + mysqlTextEscape(name) + "','" + uuid + "' FROM `lb-players` WHERE NOT EXISTS (SELECT NULL FROM `lb-players` WHERE UUID = '" + uuid + "') LIMIT 1;");
final ResultSet rs = state.executeQuery("SELECT playerid FROM `lb-players` WHERE UUID = '" + uuid + "'");
if (rs.next()) {
playerIds.put(actor, rs.getInt(1));
}
rs.close();
state.close();
return playerIds.containsKey(actor);
}
@ -612,7 +686,7 @@ public class Consumer extends TimerTask {
int typeMaterialId = MaterialConverter.getOrAddMaterialId(typeString);
int typeStateId = MaterialConverter.getOrAddBlockStateId(typeString);
queue.add(new BlockRow(loc, actor, replacedMaterialId, replacedStateId, stateBefore, typeMaterialId, typeStateId, stateAfter, ca));
addQueueLast(new BlockRow(loc, actor, replacedMaterialId, replacedStateId, stateBefore, typeMaterialId, typeStateId, stateAfter, ca));
}
private String playerID(Actor actor) {
@ -636,34 +710,19 @@ public class Consumer extends TimerTask {
private static interface Row {
String[] getInserts();
/**
* @deprecated - Names are not guaranteed to be unique. Use {@link #getActors() }
*/
String[] getPlayers();
void process(Connection conn, BatchHelper batchHelper) throws SQLException;
Actor[] getActors();
}
private interface PreparedStatementRow extends Row {
abstract void setConnection(Connection connection);
abstract void executeStatements() throws SQLException;
}
private interface MergeableRow extends PreparedStatementRow {
abstract boolean isUnique();
abstract boolean canMerge(MergeableRow row);
abstract MergeableRow merge(MergeableRow second);
}
private class BlockRow extends BlockChange implements MergeableRow {
private Connection connection;
private class BlockRow extends BlockChange implements Row {
final String statementString;
public BlockRow(Location loc, Actor actor, int replaced, int replacedData, byte[] replacedState, int type, int typeData, byte[] typeState, ChestAccess ca) {
super(System.currentTimeMillis() / 1000, loc, actor, replaced, replacedData, replacedState, type, typeData, typeState, ca);
final String table = getWorldConfig(loc.getWorld()).table;
statementString = "INSERT INTO `" + table + "-blocks` (date, playerid, replaced, replaceddata, type, typedata, x, y, z) VALUES (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?)";
}
@Override
@ -674,210 +733,52 @@ public class Consumer extends TimerTask {
inserts[0] = "INSERT INTO `" + table + "-blocks` (date, playerid, replaced, replaceddata, type, typedata, x, y, z) VALUES (FROM_UNIXTIME(" + date + "), " + playerID(actor) + ", " + replacedMaterial + ", " + replacedData + ", " + typeMaterial + ", " + typeData + ", '" + loc.getBlockX()
+ "', " + safeY(loc) + ", '" + loc.getBlockZ() + "');";
if (replacedState != null || typeState != null) {
inserts[1] = "INSERT INTO `" + table + "-state` (replacedState, typeState, id) VALUES('" + Utils.mysqlEscapeBytes(replacedState) + "', '" + Utils.mysqlEscapeBytes(typeState) + "', LAST_INSERT_ID());";
inserts[1] = "INSERT INTO `" + table + "-state` (replacedState, typeState, id) VALUES(" + Utils.mysqlPrepareBytesForInsertAllowNull(replacedState) + ", " + Utils.mysqlPrepareBytesForInsertAllowNull(typeState) + ", LAST_INSERT_ID());";
} else if (ca != null) {
inserts[1] = "INSERT INTO `" + table + "-chestdata` (id, item, itemremove, itemtype) values (LAST_INSERT_ID(), '" + Utils.mysqlEscapeBytes(Utils.saveItemStack(ca.itemStack)) + "', " + (ca.remove ? 1 : 0) + ", " + ca.itemType + ");";
}
return inserts;
}
@Override
public String[] getPlayers() {
return new String[] { actor.getName() };
}
@Override
public Actor[] getActors() {
return new Actor[] { actor };
}
@Override
public void setConnection(Connection connection) {
this.connection = connection;
}
@Override
public void executeStatements() throws SQLException {
final String table = getWorldConfig(loc.getWorld()).table;
PreparedStatement ps1 = null;
PreparedStatement ps = null;
try {
ps1 = connection.prepareStatement("INSERT INTO `" + table + "-blocks` (date, playerid, replaced, replacedData, type, typeData, x, y, z) VALUES(FROM_UNIXTIME(?), " + playerID(actor) + ", ?, ?, ?, ?, ?, ?, ?)", Statement.RETURN_GENERATED_KEYS);
ps1.setLong(1, date);
ps1.setInt(2, replacedMaterial);
ps1.setInt(3, replacedData);
ps1.setInt(4, typeMaterial);
ps1.setInt(5, typeData);
ps1.setInt(6, loc.getBlockX());
ps1.setInt(7, safeY(loc));
ps1.setInt(8, loc.getBlockZ());
ps1.executeUpdate();
int id;
ResultSet rs = ps1.getGeneratedKeys();
rs.next();
id = rs.getInt(1);
if (typeState != null || replacedState != null) {
ps = connection.prepareStatement("INSERT INTO `" + table + "-state` (replacedState, typeState, id) VALUES(?, ?, ?)");
ps.setBytes(1, replacedState);
ps.setBytes(2, typeState);
ps.setInt(3, id);
ps.executeUpdate();
} else if (ca != null) {
ps = connection.prepareStatement("INSERT INTO `" + table + "-chestdata` (item, itemremove, id, itemtype) values (?, ?, ?, ?)");
ps.setBytes(1, Utils.saveItemStack(ca.itemStack));
ps.setInt(2, ca.remove ? 1 : 0);
ps.setInt(3, id);
ps.setInt(4, ca.itemType);
ps.executeUpdate();
}
} catch (final SQLException ex) {
if (ps1 != null) {
logblock.getLogger().log(Level.SEVERE, "[Consumer] Troublesome query: " + ps1.toString());
}
if (ps != null) {
logblock.getLogger().log(Level.SEVERE, "[Consumer] Troublesome query: " + ps.toString());
}
throw ex;
} finally {
// individual try/catch here, though ugly, prevents resource leaks
if (ps1 != null) {
try {
ps1.close();
} catch (SQLException e) {
// ideally should log to logger, none is available in this class
// at the time of this writing, so I'll leave that to the plugin
// maintainers to integrate if they wish
e.printStackTrace();
public void process(Connection conn, BatchHelper batchHelper) throws SQLException {
PreparedStatement smt = batchHelper.getOrPrepareStatement(conn, statementString, Statement.RETURN_GENERATED_KEYS);
smt.setLong(1, date);
smt.setInt(2, playerIDAsInt(actor));
smt.setInt(3, replacedMaterial);
smt.setInt(4, replacedData);
smt.setInt(5, typeMaterial);
smt.setInt(6, typeData);
smt.setInt(7, loc.getBlockX());
smt.setInt(8, safeY(loc));
smt.setInt(9, loc.getBlockZ());
batchHelper.addBatch(smt, new IntCallback() {
@Override
public void call(int id) throws SQLException {
final String table = getWorldConfig(loc.getWorld()).table;
PreparedStatement ps;
if (typeState != null || replacedState != null) {
ps = batchHelper.getOrPrepareStatement(conn, "INSERT INTO `" + table + "-state` (replacedState, typeState, id) VALUES(?, ?, ?)", Statement.NO_GENERATED_KEYS);
ps.setBytes(1, replacedState);
ps.setBytes(2, typeState);
ps.setInt(3, id);
batchHelper.addBatch(ps, null);
}
if (ca != null) {
ps = batchHelper.getOrPrepareStatement(conn, "INSERT INTO `" + table + "-chestdata` (item, itemremove, id, itemtype) values (?, ?, ?, ?)", Statement.NO_GENERATED_KEYS);
ps.setBytes(1, Utils.saveItemStack(ca.itemStack));
ps.setInt(2, ca.remove ? 1 : 0);
ps.setInt(3, id);
ps.setInt(4, ca.itemType);
batchHelper.addBatch(ps, null);
}
}
if (ps != null) {
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
@Override
public boolean isUnique() {
return !(typeState == null && replacedState == null && ca == null && playerIds.containsKey(actor));
}
@Override
public boolean canMerge(MergeableRow row) {
return !this.isUnique() && !row.isUnique() && row instanceof BlockRow && getWorldConfig(loc.getWorld()).table.equals(getWorldConfig(((BlockRow) row).loc.getWorld()).table);
}
@Override
public MergeableRow merge(MergeableRow singleRow) {
return new MultiBlockChangeRow(this, (BlockRow) singleRow);
}
}
private class MultiBlockChangeRow implements MergeableRow {
private List<BlockRow> rows = new ArrayList<BlockRow>();
private Connection connection;
private Set<String> players = new HashSet<String>();
private Set<Actor> actors = new HashSet<Actor>();
private String table;
MultiBlockChangeRow(BlockRow first, BlockRow second) {
if (first.isUnique() || second.isUnique()) {
throw new IllegalArgumentException("Can't merge a unique row");
}
rows.add(first);
rows.add(second);
actors.addAll(Arrays.asList(first.getActors()));
actors.addAll(Arrays.asList(second.getActors()));
players.addAll(Arrays.asList(first.getPlayers()));
players.addAll(Arrays.asList(second.getPlayers()));
table = getWorldConfig(first.loc.getWorld()).table;
}
@Override
public void setConnection(Connection connection) {
this.connection = connection;
}
@Override
public void executeStatements() throws SQLException {
PreparedStatement ps = null;
try {
ps = connection.prepareStatement("INSERT INTO `" + table + "-blocks` (date, playerid, replaced, replacedData, type, typeData, x, y, z) VALUES(FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?)");
for (BlockRow row : rows) {
ps.setLong(1, row.date);
ps.setInt(2, playerIds.get(row.actor));
ps.setInt(3, row.replacedMaterial);
ps.setInt(4, row.replacedData);
ps.setInt(5, row.typeMaterial);
ps.setInt(6, row.typeData);
ps.setInt(7, row.loc.getBlockX());
ps.setInt(8, safeY(row.loc));
ps.setInt(9, row.loc.getBlockZ());
ps.addBatch();
}
ps.executeBatch();
} catch (final SQLException ex) {
if (ps != null) {
logblock.getLogger().log(Level.SEVERE, "[Consumer] Troublesome query: " + ps.toString());
}
throw ex;
} finally {
// individual try/catch here, though ugly, prevents resource leaks
if (ps != null) {
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
@Override
public boolean isUnique() {
return true;
}
@Override
public boolean canMerge(MergeableRow row) {
return !row.isUnique() && row instanceof BlockRow && table.equals(getWorldConfig(((BlockRow) row).loc.getWorld()).table);
}
@Override
public MergeableRow merge(MergeableRow second) {
if (second.isUnique()) {
throw new IllegalArgumentException("Can't merge a unique row");
}
rows.add((BlockRow) second);
actors.addAll(Arrays.asList(second.getActors()));
players.addAll(Arrays.asList(second.getPlayers()));
return this;
}
@Override
public String[] getInserts() {
List<String> l = new ArrayList<String>();
for (BlockRow row : rows) {
l.addAll(Arrays.asList(row.getInserts()));
}
return (String[]) l.toArray();
}
@Override
public String[] getPlayers() {
return (String[]) players.toArray();
}
@Override
public Actor[] getActors() {
return (Actor[]) actors.toArray();
});
}
}
@ -886,6 +787,7 @@ public class Consumer extends TimerTask {
final Actor killer, victim;
final int weapon;
final Location loc;
final String statementString;
KillRow(Location loc, Actor attacker, Actor defender, int weapon) {
date = System.currentTimeMillis() / 1000;
@ -893,6 +795,8 @@ public class Consumer extends TimerTask {
killer = attacker;
victim = defender;
this.weapon = weapon;
statementString = "INSERT INTO `" + getWorldConfig(loc.getWorld()).table + "-kills` (date, killer, victim, weapon, x, y, z) VALUES (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?)";
}
@Override
@ -901,22 +805,32 @@ public class Consumer extends TimerTask {
+ loc.getBlockZ() + ");" };
}
@Override
public String[] getPlayers() {
return new String[] { killer.getName(), victim.getName() };
}
@Override
public Actor[] getActors() {
return new Actor[] { killer, victim };
}
@Override
public void process(Connection conn, BatchHelper batchHelper) throws SQLException {
PreparedStatement smt = batchHelper.getOrPrepareStatement(conn, statementString, Statement.NO_GENERATED_KEYS);
smt.setLong(1, date);
smt.setInt(2, playerIDAsInt(killer));
smt.setInt(3, playerIDAsInt(victim));
smt.setInt(4, weapon);
smt.setInt(5, loc.getBlockX());
smt.setInt(6, safeY(loc));
smt.setInt(7, loc.getBlockZ());
batchHelper.addBatch(smt, null);
}
}
private class ChatRow extends ChatMessage implements PreparedStatementRow {
private Connection connection;
private class ChatRow extends ChatMessage implements Row {
private String statementString;
ChatRow(Actor player, String message) {
super(player, message);
statementString = "INSERT INTO `lb-chat` (date, playerid, message) VALUES (FROM_UNIXTIME(?), ?, ?)";
}
@Override
@ -924,58 +838,18 @@ public class Consumer extends TimerTask {
return new String[] { "INSERT INTO `lb-chat` (date, playerid, message) VALUES (FROM_UNIXTIME(" + date + "), " + playerID(player) + ", '" + mysqlTextEscape(message) + "');" };
}
@Override
public String[] getPlayers() {
return new String[] { player.getName() };
}
@Override
public Actor[] getActors() {
return new Actor[] { player };
}
@Override
public void setConnection(Connection connection) {
this.connection = connection;
}
@Override
public void executeStatements() throws SQLException {
boolean noID = false;
Integer id;
String sql = "INSERT INTO `lb-chat` (date, playerid, message) VALUES (FROM_UNIXTIME(?), ";
if ((id = playerIDAsInt(player)) == null) {
noID = true;
sql += playerID(player) + ", ";
} else {
sql += "?, ";
}
sql += "?)";
PreparedStatement ps = null;
try {
ps = connection.prepareStatement(sql);
ps.setLong(1, date);
if (!noID) {
ps.setInt(2, id);
ps.setString(3, message);
} else {
ps.setString(2, message);
}
ps.execute();
}
// we intentionally do not catch SQLException, it is thrown to the caller
finally {
if (ps != null) {
try {
ps.close();
} catch (SQLException e) {
// should print to a Logger instead if one is ever added to this class
e.printStackTrace();
}
}
}
public void process(Connection conn, BatchHelper batchHelper) throws SQLException {
PreparedStatement smt = batchHelper.getOrPrepareStatement(conn, statementString, Statement.NO_GENERATED_KEYS);
smt.setLong(1, date);
smt.setInt(2, playerIDAsInt(player));
smt.setString(3, message);
batchHelper.addBatch(smt, null);
}
}
@ -983,11 +857,18 @@ public class Consumer extends TimerTask {
private final Actor player;
private final long lastLogin;
private final String ip;
private String statementString;
PlayerJoinRow(Player player) {
this.player = Actor.actorFromEntity(player);
lastLogin = System.currentTimeMillis() / 1000;
ip = player.getAddress().toString().replace("'", "\\'");
if (logPlayerInfo) {
statementString = "UPDATE `lb-players` SET lastlogin = FROM_UNIXTIME(?), firstlogin = IF(firstlogin = 0, FROM_UNIXTIME(?), firstlogin), ip = ?, playername = ? WHERE UUID = ?";
} else {
statementString = "UPDATE `lb-players` SET playername = ? WHERE UUID = ?";
}
}
@Override
@ -1000,23 +881,36 @@ public class Consumer extends TimerTask {
}
@Override
public String[] getPlayers() {
return new String[] { player.getName() };
public Actor[] getActors() {
return new Actor[] { player };
}
@Override
public Actor[] getActors() {
return new Actor[] { player };
public void process(Connection conn, BatchHelper batchHelper) throws SQLException {
PreparedStatement smt = batchHelper.getOrPrepareStatement(conn, statementString, Statement.NO_GENERATED_KEYS);
if (logPlayerInfo) {
smt.setLong(1, lastLogin);
smt.setLong(2, lastLogin);
smt.setString(3, ip);
smt.setString(4, player.getName());
smt.setString(5, player.getUUID());
} else {
smt.setString(1, player.getName());
smt.setString(2, player.getUUID());
}
batchHelper.addBatch(smt, null);
}
}
private class PlayerLeaveRow implements Row {
private final long onlineTime;
private final Actor actor;
private String statementString;
PlayerLeaveRow(Player player, long onlineTime) {
this.onlineTime = onlineTime;
actor = Actor.actorFromEntity(player);
statementString = "UPDATE `lb-players` SET onlinetime = onlinetime + ? WHERE lastlogin > 0 && UUID = ?";
}
@Override
@ -1028,13 +922,16 @@ public class Consumer extends TimerTask {
}
@Override
public String[] getPlayers() {
return new String[] { actor.getName() };
public Actor[] getActors() {
return new Actor[] { actor };
}
@Override
public Actor[] getActors() {
return new Actor[] { actor };
public void process(Connection conn, BatchHelper batchHelper) throws SQLException {
PreparedStatement smt = batchHelper.getOrPrepareStatement(conn, statementString, Statement.NO_GENERATED_KEYS);
smt.setLong(1, onlineTime);
smt.setString(2, actor.getUUID());
batchHelper.addBatch(smt, null);
}
}
@ -1046,4 +943,67 @@ public class Consumer extends TimerTask {
safeY = 65535;
return safeY;
}
private class BatchHelper {
private HashMap<String, PreparedStatement> preparedStatements = new HashMap<>();
private HashSet<PreparedStatement> preparedStatementsWithGeneratedKeys = new HashSet<>();
private LinkedHashMap<PreparedStatement, ArrayList<IntCallback>> generatedKeyHandler = new LinkedHashMap<>();
public void reset() {
preparedStatements.clear();
preparedStatementsWithGeneratedKeys.clear();
generatedKeyHandler.clear();
}
public void processStatements(Connection conn) throws SQLException {
while (!generatedKeyHandler.isEmpty()) {
Entry<PreparedStatement, ArrayList<IntCallback>> entry = generatedKeyHandler.entrySet().iterator().next();
PreparedStatement smt = entry.getKey();
ArrayList<IntCallback> callbackList = entry.getValue();
generatedKeyHandler.remove(smt);
smt.executeBatch();
if (preparedStatementsWithGeneratedKeys.contains(smt)) {
ResultSet keys = smt.getGeneratedKeys();
int[] results = new int[callbackList.size()];
int pos = 0;
while (keys.next() && pos < results.length) {
results[pos++] = keys.getInt(1);
}
keys.close();
for (int i = 0; i < results.length; i++) {
IntCallback callback = callbackList.get(i);
if (callback != null) {
callback.call(results[i]);
}
}
}
}
}
public PreparedStatement getOrPrepareStatement(Connection conn, String sql, int autoGeneratedKeys) throws SQLException {
PreparedStatement smt = preparedStatements.get(sql);
if (smt == null) {
smt = conn.prepareStatement(sql, autoGeneratedKeys);
preparedStatements.put(sql, smt);
if (autoGeneratedKeys == Statement.RETURN_GENERATED_KEYS) {
preparedStatementsWithGeneratedKeys.add(smt);
}
}
return smt;
}
public void addBatch(PreparedStatement smt, IntCallback generatedKeysCallback) throws SQLException {
smt.addBatch();
ArrayList<IntCallback> callbackList = generatedKeyHandler.get(smt);
if (callbackList == null) {
callbackList = new ArrayList<>();
generatedKeyHandler.put(smt, callbackList);
}
callbackList.add(generatedKeysCallback);
}
}
protected interface IntCallback {
public void call(int value) throws SQLException;
}
}

View File

@ -117,21 +117,9 @@ public class LogBlock extends JavaPlugin {
if (enableAutoClearLog && autoClearLogDelay > 0) {
getServer().getScheduler().runTaskTimerAsynchronously(this, new AutoClearLog(this), 6000, autoClearLogDelay * 60 * 20);
}
getServer().getScheduler().runTaskAsynchronously(this, new DumpedLogImporter(this));
new DumpedLogImporter(this).run();
registerEvents();
if (useBukkitScheduler) {
if (getServer().getScheduler().runTaskTimerAsynchronously(this, consumer, delayBetweenRuns < 20 ? 20 : delayBetweenRuns, delayBetweenRuns).getTaskId() > 0) {
getLogger().info("Scheduled consumer with bukkit scheduler.");
} else {
getLogger().warning("Failed to schedule consumer with bukkit scheduler. Now trying schedule with timer.");
timer = new Timer();
timer.schedule(consumer, delayBetweenRuns < 20 ? 1000 : delayBetweenRuns * 50, delayBetweenRuns * 50);
}
} else {
timer = new Timer();
timer.schedule(consumer, delayBetweenRuns < 20 ? 1000 : delayBetweenRuns * 50, delayBetweenRuns * 50);
getLogger().info("Scheduled consumer with timer.");
}
consumer.start();
getServer().getScheduler().runTaskAsynchronously(this, new Updater.PlayerCountChecker(this));
for (final Tool tool : toolsByType.values()) {
if (pm.getPermission("logblock.tools." + tool.name) == null) {
@ -227,25 +215,14 @@ public class LogBlock extends JavaPlugin {
}
}
getLogger().info("Waiting for consumer ...");
consumer.run();
consumer.shutdown();
if (consumer.getQueueSize() > 0) {
int tries = 9;
while (consumer.getQueueSize() > 0) {
getLogger().info("Remaining queue size: " + consumer.getQueueSize());
if (tries > 0) {
getLogger().info("Remaining tries: " + tries);
} else {
getLogger().info("Unable to save queue to database. Trying to write to a local file.");
try {
consumer.writeToFile();
getLogger().info("Successfully dumped queue.");
} catch (final FileNotFoundException ex) {
getLogger().info("Failed to write. Given up.");
break;
}
}
consumer.run();
tries--;
getLogger().info("Remaining queue size: " + consumer.getQueueSize() + ". Trying to write to a local file.");
try {
consumer.writeToFile();
getLogger().info("Successfully dumped queue.");
} catch (final FileNotFoundException ex) {
getLogger().info("Failed to write. Given up.");
}
}
}

View File

@ -220,6 +220,13 @@ public class Utils {
return new String(hexChars);
}
public static String mysqlPrepareBytesForInsertAllowNull(byte[] bytes) {
if (bytes == null) {
return "null";
}
return "'" + mysqlEscapeBytes(bytes) + "'";
}
public static String mysqlTextEscape(String untrusted) {
return untrusted.replace("\\", "\\\\").replace("'", "\\'");
}