Fixed reconnect, added flatfile queue dump

This commit is contained in:
Robin Kupper
2011-07-17 01:37:26 +02:00
parent b78c4ceab0
commit 6a2f400f59
2 changed files with 147 additions and 81 deletions

View File

@@ -3,6 +3,9 @@ package de.diddiz.LogBlock;
import static de.diddiz.util.BukkitUtils.compressInventory;
import static de.diddiz.util.BukkitUtils.entityName;
import static de.diddiz.util.BukkitUtils.rawData;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -29,10 +32,9 @@ import org.bukkit.inventory.ItemStack;
public class Consumer extends TimerTask
{
private final Queue<BlockChange> bqueue = new LinkedBlockingQueue<BlockChange>();
private final Queue<KillRow> kqueue = new LinkedBlockingQueue<KillRow>();
private final Queue<ChatRow> cqueue = new LinkedBlockingQueue<ChatRow>();
private final Queue<Row> queue = new LinkedBlockingQueue<Row>();
private final Config config;
private final Map<Integer, String> tables;
private final Set<Integer> hiddenPlayers, hiddenBlocks;
private final Map<Integer, Integer> lastAttackedEntity = new HashMap<Integer, Integer>();
private final Map<Integer, Long> lastAttackTime = new HashMap<Integer, Long>();
@@ -47,6 +49,7 @@ public class Consumer extends TimerTask
config = logblock.getConfig();
hiddenPlayers = config.hiddenPlayers;
hiddenBlocks = config.hiddenBlocks;
tables = config.tables;
}
/**
@@ -195,9 +198,11 @@ public class Consumer extends TimerTask
* Item id of the weapon. 0 for no weapon.
*/
public void queueKill(World world, String killerName, String victimName, int weapon) {
if (victimName == null || !config.tables.containsKey(world.getName().hashCode()))
if (victimName == null || !tables.containsKey(world.getName().hashCode()))
return;
kqueue.add(new KillRow(world.getName().hashCode(), killerName, victimName, weapon));
killerName = killerName.replaceAll("[^a-zA-Z0-9_]", "");
victimName = victimName.replaceAll("[^a-zA-Z0-9_]", "");
queue.add(new KillRow(world.getName().hashCode(), killerName, victimName, weapon));
}
/**
@@ -233,16 +238,15 @@ public class Consumer extends TimerTask
}
public void queueChat(String player, String message) {
cqueue.add(new ChatRow(player, message.replace("\\", "\\\\").replace("'", "\\'")));
queue.add(new ChatRow(player, message.replace("\\", "\\\\").replace("'", "\\'")));
}
@Override
public void run() {
if (!lock.tryLock())
if (queue.isEmpty() || !lock.tryLock())
return;
final Connection conn = logblock.getConnection();
Statement state = null;
String table;
if (getQueueSize() > 1000)
log.info("[LogBlock Consumer] Queue overloaded. Size: " + getQueueSize());
try {
@@ -252,70 +256,26 @@ public class Consumer extends TimerTask
state = conn.createStatement();
final long start = System.currentTimeMillis();
int count = 0;
if (!bqueue.isEmpty()) {
while (!bqueue.isEmpty() && (System.currentTimeMillis() - start < config.timePerRun || count < config.forceToProcessAtLeast)) {
final BlockChange b = bqueue.poll();
if (b == null)
continue;
final int playerHash = b.playerName.hashCode();
if (!players.containsKey(playerHash))
if (!addPlayer(conn, state, b.playerName)) {
log.warning("[LogBlock Consumer] Failed to add player " + b.playerName);
continue;
process: while (!queue.isEmpty() && (System.currentTimeMillis() - start < config.timePerRun || count < config.forceToProcessAtLeast)) {
final Row r = queue.poll();
if (r == null)
continue;
for (final String player : r.getPlayers())
if (!players.containsKey(player.hashCode()))
if (!addPlayer(state, player)) {
log.warning("[LogBlock Consumer] Failed to add player " + player);
continue process;
}
final boolean needKeys = b.signtext != null || b.ca != null;
table = config.tables.get(b.loc.getWorld().getName().hashCode());
state.execute("INSERT INTO `" + table + "` (date, playerid, replaced, type, data, x, y, z) VALUES (FROM_UNIXTIME(" + b.date + "), " + players.get(playerHash) + ", " + b.replaced + ", " + b.type + ", " + b.data + ", '" + b.loc.getBlockX() + "', " + b.loc.getBlockY() + ", '" + b.loc.getBlockZ() + "')", needKeys ? Statement.RETURN_GENERATED_KEYS : Statement.NO_GENERATED_KEYS);
if (needKeys) {
final ResultSet keys = state.getGeneratedKeys();
if (keys.next()) {
if (b.signtext != null)
state.execute("INSERT INTO `" + table + "-sign` (id, signtext) values (" + keys.getInt(1) + ", '" + b.signtext + "')");
if (b.ca != null)
state.execute("INSERT INTO `" + table + "-chest` (id, itemtype, itemamount, itemdata) values (" + keys.getInt(1) + ", " + b.ca.itemType + ", " + b.ca.itemAmount + ", " + b.ca.itemData + ")");
} else
log.warning("[LogBlock Consumer] Failed to get generated keys. You may have to repair " + table);
for (final String insert : r.getInserts())
try {
state.execute(insert);
} catch (final SQLException ex) {
log.log(Level.SEVERE, "[LogBlock Consumer] SQL exception on " + insert + ": ", ex);
break process;
}
count++;
}
conn.commit();
}
if (!kqueue.isEmpty()) {
while (!kqueue.isEmpty() && (System.currentTimeMillis() - start < config.timePerRun || count < config.forceToProcessAtLeast)) {
final KillRow k = kqueue.poll();
if (k == null || k.victim == null)
continue;
if (k.killer != null && !players.containsKey(k.killer.hashCode()))
if (!addPlayer(conn, state, k.killer)) {
log.warning("[LogBlock Consumer] Failed to add player " + k.killer);
continue;
}
if (!players.containsKey(k.victim.hashCode()))
if (!addPlayer(conn, state, k.victim)) {
log.warning("[LogBlock Consumer] Failed to add player " + k.victim);
continue;
}
state.execute("INSERT INTO `" + config.tables.get(k.worldHash) + "-kills` (date, killer, victim, weapon) VALUES (now(), " + (k.killer == null ? "null" : players.get(k.killer.hashCode())) + ", " + players.get(k.victim.hashCode()) + ", " + k.weapon + ")");
count++;
}
conn.commit();
}
if (!cqueue.isEmpty()) {
while (!cqueue.isEmpty() && (System.currentTimeMillis() - start < config.timePerRun || count < config.forceToProcessAtLeast)) {
final ChatRow c = cqueue.poll();
if (c == null)
continue;
final int playerHash = c.player.hashCode();
if (!players.containsKey(playerHash))
if (!addPlayer(conn, state, c.player)) {
log.warning("[LogBlock Consumer] Failed to add player " + c.player);
continue;
}
state.execute("INSERT INTO `lb-chat` (date, playerid, message) VALUES (FROM_UNIXTIME(" + c.date + "), " + players.get(playerHash) + ", '" + c.message + "')");
count++;
}
conn.commit();
count++;
}
conn.commit();
} catch (final SQLException ex) {
log.log(Level.SEVERE, "[LogBlock Consumer] SQL exception", ex);
} finally {
@@ -331,8 +291,25 @@ public class Consumer extends TimerTask
}
}
public void writeToFile() throws FileNotFoundException {
final File file = new File("plugins/LogBlock/consumer/queue.sql");
file.getParentFile().mkdirs();
final PrintWriter writer = new PrintWriter(file);
while (!queue.isEmpty()) {
final Row r = queue.poll();
if (r == null)
continue;
for (final String player : r.getPlayers())
if (!players.containsKey(player.hashCode()))
writer.println("INSERT IGNORE INTO `lb-players` (playername) VALUES ('" + player + "');");
for (final String insert : r.getInserts())
writer.println(insert);
}
writer.close();
}
int getQueueSize() {
return bqueue.size() + kqueue.size() + cqueue.size();
return queue.size();
}
boolean hide(Player player) {
@@ -345,9 +322,8 @@ public class Consumer extends TimerTask
return true;
}
private boolean addPlayer(Connection conn, Statement state, String playerName) throws SQLException {
private boolean addPlayer(Statement state, String playerName) throws SQLException {
state.execute("INSERT IGNORE INTO `lb-players` (playername) VALUES ('" + playerName + "')");
conn.commit();
final ResultSet rs = state.executeQuery("SELECT playerid FROM `lb-players` WHERE playername = '" + playerName + "'");
if (rs.next())
players.put(playerName.hashCode(), rs.getInt(1));
@@ -356,30 +332,71 @@ public class Consumer extends TimerTask
}
private void queueBlock(String playerName, Location loc, int typeBefore, int typeAfter, byte data, String signtext, ChestAccess ca) {
if (playerName == null || loc == null || typeBefore < 0 || typeAfter < 0 || hiddenPlayers.contains(playerName.hashCode()) || !config.tables.containsKey(loc.getWorld().getName().hashCode()) || typeBefore != typeAfter && hiddenBlocks.contains(typeBefore) && hiddenBlocks.contains(typeAfter))
if (playerName == null || loc == null || typeBefore < 0 || typeAfter < 0 || hiddenPlayers.contains(playerName.hashCode()) || !tables.containsKey(loc.getWorld().getName().hashCode()) || typeBefore != typeAfter && hiddenBlocks.contains(typeBefore) && hiddenBlocks.contains(typeAfter))
return;
playerName = playerName.replaceAll("[^a-zA-Z0-9_]", "");
if (signtext != null)
signtext = signtext.replace("\\", "\\\\").replace("'", "\\'");
bqueue.add(new BlockChange(System.currentTimeMillis() / 1000, loc, playerName, typeBefore, typeAfter, data, signtext, ca));
queue.add(new BlockRow(loc, playerName, typeBefore, typeAfter, data, signtext, ca));
}
private static class KillRow
private static interface Row
{
final String killer;
final String victim;
String[] getInserts();
String[] getPlayers();
}
private class BlockRow extends BlockChange implements Row
{
public BlockRow(Location loc, String playerName, int replaced, int type, byte data, String signtext, ChestAccess ca) {
super(System.currentTimeMillis() / 1000, loc, playerName, replaced, type, data, signtext, ca);
}
@Override
public String[] getInserts() {
final String[] inserts = new String[ca != null || signtext != null ? 2 : 1];
inserts[0] = "INSERT INTO `" + tables.get(loc.getWorld().getName().hashCode()) + "` (date, playerid, replaced, type, data, x, y, z) VALUES (FROM_UNIXTIME(" + date + "), " + players.get(playerName.hashCode()) + ", " + replaced + ", " + type + ", " + data + ", '" + loc.getBlockX() + "', " + loc.getBlockY() + ", '" + loc.getBlockZ() + "');";
if (signtext != null)
inserts[1] = "INSERT INTO `" + tables.get(loc.getWorld().getName().hashCode()) + "-sign` (id, signtext) values (LAST_INSERT_ID(), '" + signtext + "');";
else if (ca != null)
inserts[1] = "INSERT INTO `" + tables.get(loc.getWorld().getName().hashCode()) + "-chest` (id, itemtype, itemamount, itemdata) values (LAST_INSERT_ID(), " + ca.itemType + ", " + ca.itemAmount + ", " + ca.itemData + ");";
return inserts;
}
@Override
public String[] getPlayers() {
return new String[]{playerName};
}
}
private class KillRow implements Row
{
final long date;
final String killer, victim;
final int weapon;
final int worldHash;
KillRow(int worldHash, String attacker, String defender, int weapon) {
date = System.currentTimeMillis() / 1000;
this.worldHash = worldHash;
killer = attacker;
victim = defender;
this.weapon = weapon;
}
@Override
public String[] getInserts() {
return new String[]{"INSERT INTO `" + tables.get(worldHash) + "-kills` (date, killer, victim, weapon) VALUES (FROM_UNIXTIME(" + date + "), " + (killer == null ? "NULL" : players.get(killer.hashCode())) + ", " + players.get(victim.hashCode()) + ", " + weapon + ");"};
}
@Override
public String[] getPlayers() {
return new String[]{killer, victim};
}
}
private static class ChatRow
private class ChatRow implements Row
{
final long date;
final String player, message;
@@ -389,5 +406,15 @@ public class Consumer extends TimerTask
this.player = player;
this.message = message;
}
@Override
public String[] getInserts() {
return new String[]{"INSERT INTO `lb-chat` (date, playerid, message) VALUES (FROM_UNIXTIME(" + date + "), " + players.get(player.hashCode()) + ", '" + message + "');"};
}
@Override
public String[] getPlayers() {
return new String[]{player};
}
}
}

View File

@@ -1,10 +1,13 @@
package de.diddiz.LogBlock;
import static de.diddiz.util.Utils.download;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.net.URL;
import java.sql.Connection;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
@@ -59,7 +62,7 @@ public class LogBlock extends JavaPlugin
updater = new Updater(this);
log.info("[LogBlock] Version check: " + updater.checkVersion());
config = new Config(this);
final File file = new File("lib/mysql-connector-java-bin.jar");
File file = new File("lib/mysql-connector-java-bin.jar");
if (!file.exists() || file.length() == 0)
download(log, new URL("http://diddiz.insane-architects.net/download/mysql-connector-java-bin.jar"), file);
if (!file.exists() || file.length() == 0)
@@ -70,6 +73,28 @@ public class LogBlock extends JavaPlugin
if (updater.update())
config = new Config(this);
updater.checkTables();
file = new File("plugins/LogBlock/consumer/queue.sql");
if (file.exists()) {
log.info("[LogBlock] Found stored queue. Try to import now.");
Connection conn = null;
try {
conn = getConnection();
conn.setAutoCommit(false);
final Statement state = conn.createStatement();
final BufferedReader in = new BufferedReader(new FileReader(file));
String insert = null;
while ((insert = in.readLine()) != null)
state.execute(insert);
conn.commit();
file.delete();
log.info("[LogBlock] Successfully imported stored queue.");
} catch (final Exception ex) {
log.log(Level.WARNING, "[LogBlock] Failed to import stored queue: ", ex);
} finally {
if (conn != null)
conn.close();
}
}
} catch (final Exception ex) {
log.log(Level.SEVERE, "[LogBlock] Error while loading: ", ex);
errorAtLoading = true;
@@ -182,10 +207,24 @@ public class LogBlock extends JavaPlugin
getServer().getScheduler().cancelTasks(this);
if (consumer != null && consumer.getQueueSize() > 0) {
log.info("[LogBlock] Waiting for consumer ...");
final Thread thread = new Thread(consumer);
int lastSize = -1, fails = 0;
while (consumer.getQueueSize() > 0) {
log.info("[LogBlock] Remaining queue size: " + consumer.getQueueSize());
thread.run();
if (lastSize == consumer.getQueueSize())
fails++;
else
fails = 0;
if (fails > 10) {
log.info("Unable to save queue to database. Trying to write to a local file.");
try {
consumer.writeToFile();
} catch (final FileNotFoundException ex) {
log.info("Failed to write. Given up.");
break;
}
}
lastSize = consumer.getQueueSize();
consumer.run();
}
}
if (pool != null)