Moved consumer to an own class file.

Saved queue on server shutdown.
This commit is contained in:
Robin Kupper
2011-04-04 02:20:58 +02:00
parent 5029a0bbbc
commit b321668cde
2 changed files with 149 additions and 163 deletions

View File

@@ -0,0 +1,131 @@
package de.diddiz.LogBlock;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import org.bukkit.block.Block;
import org.bukkit.entity.Player;
public class Consumer implements Runnable
{
private LinkedBlockingQueue<BlockRow> bqueue = new LinkedBlockingQueue<BlockRow>();
public void queueBlock(Player player, Block block, int typeAfter) {
queueBlock(player.getName(), block, 0, typeAfter, (byte)0, null, null);
}
public void queueBlock(String playerName, Block block, int typeBefore, int typeAfter, byte data) {
queueBlock(playerName, block, typeBefore, typeAfter, data, null, null);
}
public void queueBlock(Player player, Block block, short inType, byte inAmount, short outType, byte outAmount) {
queueBlock(player.getName(), block, block.getTypeId(), block.getTypeId(), (byte)0, null, new ChestAccess(inType, inAmount, outType, outAmount));
}
public void queueBlock(String playerName, Block block, int typeBefore, int typeAfter, byte data, String signtext, ChestAccess ca) {
if (block == null || typeBefore < 0 || typeAfter < 0)
return;
String table = LogBlock.config.worldTables.get(LogBlock.config.worldNames.indexOf(block.getWorld().getName()));
if (table == null)
return;
if (playerName.length() > 32)
playerName = playerName.substring(0, 32);
BlockRow row = new BlockRow(table, playerName, typeBefore, typeAfter, data, block.getX(), block.getY(), block.getZ());
if (signtext != null)
row.signtext = signtext;
if (ca != null)
row.ca = ca;
if (!bqueue.offer(row))
LogBlock.log.info("[LogBlock] Failed to queue block for " + playerName);
}
public int getQueueSize() {
return bqueue.size();
}
public void run() {
Connection conn = null;
Statement state = null;
BlockRow b;
int count = 0;
if (bqueue.size() > 100)
LogBlock.log.info("[LogBlock Consumer] Queue overloaded. Size: " + bqueue.size());
try {
conn = DriverManager.getConnection("jdbc:jdc:jdcpool");
conn.setAutoCommit(false);
state = conn.createStatement();
long start = System.currentTimeMillis();
while (count < 1000 && !bqueue.isEmpty() && System.currentTimeMillis() - start < 100) {
b = bqueue.poll();
if (b == null)
continue;
state.execute("INSERT INTO `" + b.table + "` (date, playerid, replaced, type, data, x, y, z) SELECT now(), playerid, " + b.replaced + ", " + b.type + ", " + b.data + ", '" + b.x + "', " + b.y + ", '" + b.z + "' FROM `lb-players` WHERE playername = '" + b.name + "'", Statement.RETURN_GENERATED_KEYS);
if (b.signtext != null) {
ResultSet keys = state.getGeneratedKeys();
keys.next();
state.execute("INSERT INTO `" + b.table + "-sign` (id, signtext) values (" + keys.getInt(1) + ", '" + b.signtext + "')");
} else if (b.ca != null) {
ResultSet keys = state.getGeneratedKeys();
keys.next();
state.execute("INSERT INTO `" + b.table + "-chest` (id, intype, inamount, outtype, outamount) values (" + keys.getInt(1) + ", " + b.ca.inType + ", " + b.ca.inAmount + ", " + b.ca.outType + ", " + b.ca.outAmount + ")");
}
count++;
}
conn.commit();
LogBlock.log.info("[LogBlock Consumer] Took: " + (System.currentTimeMillis() - start) + "ms for " + count + " blocks");
} catch (SQLException ex) {
LogBlock.log.log(Level.SEVERE, "[LogBlock Consumer] SQL exception", ex);
} finally {
try {
if (state != null)
state.close();
if (conn != null)
conn.close();
} catch (SQLException ex) {
LogBlock.log.log(Level.SEVERE, "[LogBlock Consumer] SQL exception on close", ex);
}
}
}
private class ChestAccess
{
public short inType, outType;
public byte inAmount, outAmount;
ChestAccess(short inType, byte inAmount, short outType, byte outAmount) {
this.inType = inType;
this.inAmount = inAmount;
this.outType = outType;
this.outAmount = outAmount;
}
}
private class BlockRow
{
public String table;
public String name;
public int replaced, type;
public byte data;
public int x, y, z;
public String signtext;
public ChestAccess ca;
BlockRow(String table, String name, int replaced, int type, byte data, int x, int y, int z) {
this.table = table;
this.name = name;
this.replaced = replaced;
this.type = type;
this.data = data;
this.x = x;
this.y = y;
this.z = z;
this.signtext = null;
this.ca = null;
}
}
}

View File

@@ -5,12 +5,8 @@ import java.net.URL;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -57,7 +53,6 @@ public class LogBlock extends JavaPlugin
static Logger log;
static Config config;
private Consumer consumer = null;
private LinkedBlockingQueue<BlockRow> bqueue = new LinkedBlockingQueue<BlockRow>();
@Override
public void onEnable() {
@@ -116,15 +111,19 @@ public class LogBlock extends JavaPlugin
if (config.logLeavesDecay)
pm.registerEvent(Type.LEAVES_DECAY, lbBlockListener, Event.Priority.Monitor, this);
consumer = new Consumer();
new Thread(consumer).start();
getServer().getScheduler().scheduleAsyncRepeatingTask(this, consumer, config.delay * 20, config.delay * 20);
log.info("Logblock v" + getDescription().getVersion() + " enabled.");
}
@Override
public void onDisable() {
if (consumer != null) {
log.info("[LogBlock] Stopping consumer");
consumer.stop();
if (consumer != null && consumer.getQueueSize() > 0) {
log.info("[LogBlock] Waiting for consumer ...");
Thread thread = new Thread(consumer);
while (consumer.getQueueSize() > 0) {
log.info("[LogBlock] Remaining queue size: " + consumer.getQueueSize());
thread.run();
}
}
log.info("LogBlock disabled.");
}
@@ -420,35 +419,6 @@ public class LogBlock extends JavaPlugin
return config.worldTables.get(idx);
}
private void queueBlock(Player player, Block block, int typeAfter) {
queueBlock(player.getName(), block, 0, typeAfter, (byte)0, null, null);
}
private void queueBlock(String playerName, Block block, int typeBefore, int typeAfter, byte data) {
queueBlock(playerName, block, typeBefore, typeAfter, data, null, null);
}
private void queueBlock(Player player, Block block, short inType, byte inAmount, short outType, byte outAmount) {
queueBlock(player.getName(), block, block.getTypeId(), block.getTypeId(), (byte)0, null, new ChestAccess(inType, inAmount, outType, outAmount));
}
private void queueBlock(String playerName, Block block, int typeBefore, int typeAfter, byte data, String signtext, ChestAccess ca) {
if (block == null || typeBefore < 0 || typeAfter < 0)
return;
String table = getTable(block);
if (table == null)
return;
if (playerName.length() > 32)
playerName = playerName.substring(0, 31);
BlockRow row = new BlockRow(table, playerName, typeBefore, typeAfter, data, block.getX(), block.getY(), block.getZ());
if (signtext != null)
row.signtext = signtext;
if (ca != null)
row.ca = ca;
if (!bqueue.offer(row))
log.info("[LogBlock] Failed to queue block for " + playerName);
}
private boolean CheckPermission(Player player, String permission) {
if (config.usePermissions)
return Permissions.Security.permission(player, permission);
@@ -490,28 +460,28 @@ public class LogBlock extends JavaPlugin
{
public void onBlockPlace(BlockPlaceEvent event) {
if (!event.isCancelled() && !(config.logSignTexts && (event.getBlock().getType() == Material.WALL_SIGN || event.getBlock().getType() == Material.SIGN_POST))) {
queueBlock(event.getPlayer().getName(), event.getBlockPlaced(), event.getBlockReplacedState().getTypeId(), event.getBlockPlaced().getTypeId(), event.getBlockPlaced().getData());
consumer.queueBlock(event.getPlayer().getName(), event.getBlockPlaced(), event.getBlockReplacedState().getTypeId(), event.getBlockPlaced().getTypeId(), event.getBlockPlaced().getData());
}
}
public void onBlockBreak(BlockBreakEvent event) {
if (!event.isCancelled())
queueBlock(event.getPlayer().getName(), event.getBlock(), event.getBlock().getTypeId(), 0, event.getBlock().getData());
consumer.queueBlock(event.getPlayer().getName(), event.getBlock(), event.getBlock().getTypeId(), 0, event.getBlock().getData());
}
public void onSignChange(SignChangeEvent event) {
if (!event.isCancelled())
queueBlock(event.getPlayer().getName(), event.getBlock(), 0, event.getBlock().getTypeId(), event.getBlock().getData(), "sign [" + event.getLine(0) + "] [" + event.getLine(1) + "] [" + event.getLine(2) + "] [" + event.getLine(3) + "]", null);
consumer.queueBlock(event.getPlayer().getName(), event.getBlock(), 0, event.getBlock().getTypeId(), event.getBlock().getData(), "sign [" + event.getLine(0) + "] [" + event.getLine(1) + "] [" + event.getLine(2) + "] [" + event.getLine(3) + "]", null);
}
public void onBlockBurn(BlockBurnEvent event) {
if (!event.isCancelled())
queueBlock(config.logFireAs, event.getBlock(), event.getBlock().getTypeId(), 0, event.getBlock().getData());
consumer.queueBlock(config.logFireAs, event.getBlock(), event.getBlock().getTypeId(), 0, event.getBlock().getData());
}
public void onLeavesDecay(LeavesDecayEvent event) {
if (!event.isCancelled())
queueBlock(config.logLeavesDecayAs, event.getBlock(), event.getBlock().getTypeId(), 0, event.getBlock().getData());
consumer.queueBlock(config.logLeavesDecayAs, event.getBlock(), event.getBlock().getTypeId(), 0, event.getBlock().getData());
}
}
@@ -529,7 +499,7 @@ public class LogBlock extends JavaPlugin
else
name = "Environment";
for (Block block : event.blockList())
queueBlock(name, block, block.getTypeId(), 0, block.getData());
consumer.queueBlock(name, block, block.getTypeId(), 0, block.getData());
}
}
}
@@ -539,22 +509,22 @@ public class LogBlock extends JavaPlugin
public void onPlayerInteract(PlayerInteractEvent event) {
if (!event.isCancelled()) {
if (event.getAction() == Action.RIGHT_CLICK_BLOCK && (event.getClickedBlock().getType() == Material.CHEST || event.getClickedBlock().getType() == Material.FURNACE ||event.getClickedBlock().getType() == Material.DISPENSER)) {
queueBlock(event.getPlayer(), event.getClickedBlock(), (short)0, (byte)0, (short)0, (byte)0);
consumer.queueBlock(event.getPlayer(), event.getClickedBlock(), (short)0, (byte)0, (short)0, (byte)0);
}
}
}
public void onPlayerBucketFill(PlayerBucketFillEvent event) {
if (!event.isCancelled()) {
queueBlock(event.getPlayer().getName(), event.getBlockClicked(), event.getBlockClicked().getTypeId(), 0, event.getBlockClicked().getData());
consumer.queueBlock(event.getPlayer().getName(), event.getBlockClicked(), event.getBlockClicked().getTypeId(), 0, event.getBlockClicked().getData());
}
}
public void onPlayerBucketEmpty(PlayerBucketEmptyEvent event) {
if (event.getBucket() == Material.WATER_BUCKET)
queueBlock(event.getPlayer(), event.getBlockClicked().getFace(event.getBlockFace()), Material.STATIONARY_WATER.getId());
consumer.queueBlock(event.getPlayer(), event.getBlockClicked().getFace(event.getBlockFace()), Material.STATIONARY_WATER.getId());
else if (event.getBucket() == Material.LAVA_BUCKET)
queueBlock(event.getPlayer(), event.getBlockClicked().getFace(event.getBlockFace()), Material.STATIONARY_LAVA.getId());
consumer.queueBlock(event.getPlayer(), event.getBlockClicked().getFace(event.getBlockFace()), Material.STATIONARY_LAVA.getId());
}
public void onPlayerJoin(PlayerJoinEvent event) {
@@ -604,119 +574,4 @@ public class LogBlock extends JavaPlugin
return false;
}
}
private class Consumer implements Runnable
{
private boolean stop = false;
Consumer() {
stop = false;
}
public void stop() {
stop = true;
}
public void run() {
PreparedStatement ps = null;
Connection conn = null;
BlockRow b;
while (!stop) {
long start = System.currentTimeMillis()/1000L;
int count = 0;
if (bqueue.size() > 100)
log.info("[LogBlock] Queue overloaded. Size: " + bqueue.size());
try {
conn = getConnection();
conn.setAutoCommit(false);
while (count < 100 && start + config.delay > (System.currentTimeMillis()/1000L)) {
b = bqueue.poll(1L, TimeUnit.SECONDS);
if (b == null)
continue;
ps = conn.prepareStatement("INSERT INTO `" + b.table + "` (`date`, `playerid`, `replaced`, `type`, `data`, `x`, `y`, `z`) SELECT now(), `playerid`, ?, ?, ?, ?, ? , ? FROM `lb-players` WHERE `playername` = ?", Statement.RETURN_GENERATED_KEYS);
ps.setInt(1, b.replaced);
ps.setInt(2, b.type);
ps.setByte(3, b.data);
ps.setInt(4, b.x);
ps.setInt(5, b.y);
ps.setInt(6, b.z);
ps.setString(7, b.name);
ps.executeUpdate();
if (b.signtext != null) {
ResultSet keys = ps.getGeneratedKeys();
keys.next();
int key = keys.getInt(1);
ps = conn.prepareStatement("INSERT INTO `" + b.table + "-sign` (`id`, `signtext`) values (?,?)");
ps.setInt(1, key);
ps.setString(2, b.signtext);
ps.executeUpdate();
} else if (b.ca != null) {
ResultSet keys = ps.getGeneratedKeys();
keys.next();
int key = keys.getInt(1);
ps = conn.prepareStatement("INSERT INTO `" + b.table + "-chest` (`id`, `intype`, `inamount`, `outtype`, `outamount`) values (?,?,?,?,?)");
ps.setInt(1, key);
ps.setShort(2, b.ca.inType);
ps.setByte(3, b.ca.inAmount);
ps.setShort(4, b.ca.outType);
ps.setByte(5, b.ca.outAmount);
ps.executeUpdate();
}
count++;
}
conn.commit();
} catch (InterruptedException ex) {
log.log(Level.SEVERE, "[LogBlock] Interrupted exception", ex);
} catch (SQLException ex) {
log.log(Level.SEVERE, "[LogBlock] SQL exception", ex);
} finally {
try {
if (ps != null)
ps.close();
if (conn != null)
conn.close();
} catch (SQLException ex) {
log.log(Level.SEVERE, "[LogBlock] SQL exception on close", ex);
}
}
}
}
}
private class ChestAccess
{
public short inType, outType;
public byte inAmount, outAmount;
ChestAccess(short inType, byte inAmount, short outType, byte outAmount) {
this.inType = inType;
this.inAmount = inAmount;
this.outType = outType;
this.outAmount = outAmount;
}
}
private class BlockRow
{
public String table;
public String name;
public int replaced, type;
public byte data;
public int x, y, z;
public String signtext;
public ChestAccess ca;
BlockRow(String table, String name, int replaced, int type, byte data, int x, int y, int z) {
this.table = table;
this.name = name;
this.replaced = replaced;
this.type = type;
this.data = data;
this.x = x;
this.y = y;
this.z = z;
this.signtext = null;
this.ca = null;
}
}
}