Added more consumer configuration.

This commit is contained in:
Robin Kupper
2011-04-28 12:13:13 +02:00
parent 28b85c07e1
commit 70d8f55f5d
3 changed files with 60 additions and 34 deletions

View File

@@ -13,7 +13,10 @@ public class Config {
public final String url; public final String url;
public final String user; public final String user;
public final String password; public final String password;
public final int delay; public final int delayBetweenRuns;
public final int minCountPerRun;
public final int maxCountPerRun;
public final int maxTimePerRun;
public final boolean useBukkitScheduler; public final boolean useBukkitScheduler;
public final int keepLogDays; public final int keepLogDays;
public final boolean dumpDeletedLog; public final boolean dumpDeletedLog;
@@ -65,8 +68,19 @@ public class Config {
subkeys = config.getKeys("consumer"); subkeys = config.getKeys("consumer");
if (subkeys == null) if (subkeys == null)
subkeys = new ArrayList<String>(); subkeys = new ArrayList<String>();
if (!subkeys.contains("delay")) if (!subkeys.contains("delayBetweenRuns")) {
config.setProperty("consumer.delay", 6); if (subkeys.contains("delay")) {
config.setProperty("consumer.delayBetweenRuns", config.getInt("consumer.delay", 6));
config.removeProperty("consumer.delay");
} else
config.setProperty("consumer.delayBetweenRuns", 6);
}
if (!subkeys.contains("minCountPerRun"))
config.setProperty("consumer.minCountPerRun", 100);
if (!subkeys.contains("maxCountPerRun"))
config.setProperty("consumer.maxCountPerRun", 1000);
if (!subkeys.contains("maxTimePerRun"))
config.setProperty("maxTimePerRun", 100);
if (!subkeys.contains("useBukkitScheduler")) if (!subkeys.contains("useBukkitScheduler"))
config.setProperty("consumer.useBukkitScheduler", true); config.setProperty("consumer.useBukkitScheduler", true);
subkeys = config.getKeys("clearlog"); subkeys = config.getKeys("clearlog");
@@ -122,7 +136,10 @@ public class Config {
url = "jdbc:mysql://" + config.getString("mysql.host") + ":" + config.getString("mysql.port") + "/" + config.getString("mysql.database"); url = "jdbc:mysql://" + config.getString("mysql.host") + ":" + config.getString("mysql.port") + "/" + config.getString("mysql.database");
user = config.getString("mysql.user"); user = config.getString("mysql.user");
password = config.getString("mysql.password"); password = config.getString("mysql.password");
delay = config.getInt("consumer.delay", 6); delayBetweenRuns = config.getInt("consumer.delayBetweenRuns", 6);
minCountPerRun = config.getInt("consumer.minCountPerRun", 100);
maxCountPerRun = config.getInt("consumer.maxCountPerRun", 1000);
maxTimePerRun = config.getInt("consumer.maxTimePerRun", 100);
useBukkitScheduler = config.getBoolean("consumer.useBukkitScheduler", true); useBukkitScheduler = config.getBoolean("consumer.useBukkitScheduler", true);
keepLogDays = config.getInt("clearlog.keepLogDays", -1); keepLogDays = config.getInt("clearlog.keepLogDays", -1);
if (keepLogDays*86400000L > System.currentTimeMillis()) if (keepLogDays*86400000L > System.currentTimeMillis())

View File

@@ -206,42 +206,51 @@ public class Consumer extends TimerTask implements Runnable
return; return;
Statement state = null; Statement state = null;
BlockRow b; KillRow k; String table; BlockRow b; KillRow k; String table;
int count = 0;
if (getQueueSize() > 1000) if (getQueueSize() > 1000)
log.info("[LogBlock Consumer] Queue overloaded. Size: " + getQueueSize()); log.info("[LogBlock Consumer] Queue overloaded. Size: " + getQueueSize());
try { try {
conn.setAutoCommit(false); conn.setAutoCommit(false);
state = conn.createStatement(); state = conn.createStatement();
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
while (count < 1000 && !bqueue.isEmpty() && (System.currentTimeMillis() - start < 100 || count < 100)) { int count = 0;
b = bqueue.poll(); if (!bqueue.isEmpty()) {
if (b == null) while (count < config.maxCountPerRun && !bqueue.isEmpty() && (System.currentTimeMillis() - start < config.maxTimePerRun || count < config.minCountPerRun)) {
continue; b = bqueue.poll();
table = config.tables.get(b.worldHash); if (b == null)
state.execute("INSERT INTO `" + table + "` (date, playerid, replaced, type, data, x, y, z) SELECT now(), playerid, " + b.replaced + ", " + b.type + ", " + b.data + ", '" + b.x + "', " + b.y + ", '" + b.z + "' FROM `lb-players` WHERE playername = '" + b.name + "'", Statement.RETURN_GENERATED_KEYS); continue;
if (b.signtext != null) { table = config.tables.get(b.worldHash);
final ResultSet keys = state.getGeneratedKeys(); state.execute("INSERT INTO `" + table + "` (date, playerid, replaced, type, data, x, y, z) SELECT now(), playerid, " + b.replaced + ", " + b.type + ", " + b.data + ", '" + b.x + "', " + b.y + ", '" + b.z + "' FROM `lb-players` WHERE playername = '" + b.name + "'", Statement.RETURN_GENERATED_KEYS);
if (keys.next()) if (b.signtext != null) {
state.execute("INSERT INTO `" + table + "-sign` (id, signtext) values (" + keys.getInt(1) + ", '" + b.signtext + "')"); final ResultSet keys = state.getGeneratedKeys();
else if (keys.next())
log.severe("[LogBlock Consumer] Failed to get generated keys"); state.execute("INSERT INTO `" + table + "-sign` (id, signtext) values (" + keys.getInt(1) + ", '" + b.signtext + "')");
} else if (b.ca != null) { else
final ResultSet keys = state.getGeneratedKeys(); log.severe("[LogBlock Consumer] Failed to get generated keys");
if (keys.next()) } else if (b.ca != null) {
state.execute("INSERT INTO `" + table + "-chest` (id, intype, inamount, outtype, outamount) values (" + keys.getInt(1) + ", " + b.ca.inType + ", " + b.ca.inAmount + ", " + b.ca.outType + ", " + b.ca.outAmount + ")"); final ResultSet keys = state.getGeneratedKeys();
else if (keys.next())
log.severe("[LogBlock Consumer] Failed to get generated keys"); state.execute("INSERT INTO `" + table + "-chest` (id, intype, inamount, outtype, outamount) values (" + keys.getInt(1) + ", " + b.ca.inType + ", " + b.ca.inAmount + ", " + b.ca.outType + ", " + b.ca.outAmount + ")");
else
log.severe("[LogBlock Consumer] Failed to get generated keys");
}
count++;
if (count % 100 == 0)
conn.commit();
} }
count++; conn.commit();
} }
conn.commit(); if (!kqueue.isEmpty()) {
while (!kqueue.isEmpty() && count < 1000 && (System.currentTimeMillis() - start < 100 || count < 100)) { while (count < config.maxCountPerRun && !kqueue.isEmpty() && (System.currentTimeMillis() - start < config.maxTimePerRun || count < config.minCountPerRun)) {
k = kqueue.poll(); k = kqueue.poll();
if (k == null) if (k == null)
continue; continue;
state.execute("INSERT INTO `" + config.tables.get(k.worldHash) + "-kills` (date, killer, victim, weapon) SELECT now(), playerid, (SELECT playerid FROM `lb-players` WHERE playername = '" + k.victim + "'), " + k.weapon + " FROM `lb-players` WHERE playername = '" + k.killer + "'"); state.execute("INSERT INTO `" + config.tables.get(k.worldHash) + "-kills` (date, killer, victim, weapon) SELECT now(), playerid, (SELECT playerid FROM `lb-players` WHERE playername = '" + k.victim + "'), " + k.weapon + " FROM `lb-players` WHERE playername = '" + k.killer + "'");
count++;
if (count % 100 == 0)
conn.commit();
}
conn.commit();
} }
conn.commit();
} catch (final SQLException ex) { } catch (final SQLException ex) {
log.log(Level.SEVERE, "[LogBlock Consumer] SQL exception", ex); log.log(Level.SEVERE, "[LogBlock Consumer] SQL exception", ex);
} finally { } finally {

View File

@@ -132,16 +132,16 @@ public class LogBlock extends JavaPlugin
if (config.logKills) if (config.logKills)
pm.registerEvent(Type.ENTITY_DAMAGE, lbEntityListener, Priority.Monitor, this); pm.registerEvent(Type.ENTITY_DAMAGE, lbEntityListener, Priority.Monitor, this);
if (config.useBukkitScheduler) { if (config.useBukkitScheduler) {
if (getServer().getScheduler().scheduleAsyncRepeatingTask(this, consumer, config.delay * 20, config.delay * 20) > 0) if (getServer().getScheduler().scheduleAsyncRepeatingTask(this, consumer, config.delayBetweenRuns * 20, config.delayBetweenRuns * 20) > 0)
log.info("[LogBlock] Scheduled consumer with bukkit scheduler."); log.info("[LogBlock] Scheduled consumer with bukkit scheduler.");
else { else {
log.warning("[LogBlock] Failed to schedule consumer with bukkit scheduler. Now trying schedule with timer."); log.warning("[LogBlock] Failed to schedule consumer with bukkit scheduler. Now trying schedule with timer.");
timer = new Timer(); timer = new Timer();
timer.scheduleAtFixedRate(consumer, config.delay*1000, config.delay*1000); timer.scheduleAtFixedRate(consumer, config.delayBetweenRuns * 1000, config.delayBetweenRuns * 1000);
} }
} else { } else {
timer = new Timer(); timer = new Timer();
timer.scheduleAtFixedRate(consumer, config.delay*1000, config.delay*1000); timer.scheduleAtFixedRate(consumer, config.delayBetweenRuns * 1000, config.delayBetweenRuns * 1000);
log.info("[LogBlock] Scheduled consumer with timer."); log.info("[LogBlock] Scheduled consumer with timer.");
} }
log.info("Logblock v" + getDescription().getVersion() + " enabled."); log.info("Logblock v" + getDescription().getVersion() + " enabled.");