forked from LogBlock/LogBlock
@ -56,6 +56,7 @@ import de.diddiz.util.Utils;
|
|||||||
public class Consumer extends Thread {
|
public class Consumer extends Thread {
|
||||||
private static final int MAX_SHUTDOWN_TIME_MILLIS = 20000;
|
private static final int MAX_SHUTDOWN_TIME_MILLIS = 20000;
|
||||||
private static final int WAIT_FOR_CONNECTION_TIME_MILLIS = 10000;
|
private static final int WAIT_FOR_CONNECTION_TIME_MILLIS = 10000;
|
||||||
|
private static final int RETURN_IDLE_CONNECTION_TIME_MILLIS = 120000;
|
||||||
private static final int RETRIES_ON_UNKNOWN_CONNECTION_ERROR = 2;
|
private static final int RETRIES_ON_UNKNOWN_CONNECTION_ERROR = 2;
|
||||||
|
|
||||||
private final Deque<Row> queue = new ArrayDeque<Row>();
|
private final Deque<Row> queue = new ArrayDeque<Row>();
|
||||||
@ -434,7 +435,6 @@ public class Consumer extends Thread {
|
|||||||
try {
|
try {
|
||||||
if (conn == null) {
|
if (conn == null) {
|
||||||
batchHelper.reset();
|
batchHelper.reset();
|
||||||
logblock.getLogger().info("[Consumer] Connecting to the database!");
|
|
||||||
conn = logblock.getConnection();
|
conn = logblock.getConnection();
|
||||||
if (conn != null) {
|
if (conn != null) {
|
||||||
// initialize connection
|
// initialize connection
|
||||||
@ -476,7 +476,19 @@ public class Consumer extends Thread {
|
|||||||
if (r == null) {
|
if (r == null) {
|
||||||
try {
|
try {
|
||||||
if (currentRows.isEmpty() && !shutdown) {
|
if (currentRows.isEmpty() && !shutdown) {
|
||||||
queue.wait(); // nothing to do for us
|
// nothing to do for us
|
||||||
|
// wait some time before closing the connection
|
||||||
|
queue.wait(RETURN_IDLE_CONNECTION_TIME_MILLIS);
|
||||||
|
// if there is still nothing to do, close the connection and go to sleep
|
||||||
|
if (queue.isEmpty() && !shutdown) {
|
||||||
|
try {
|
||||||
|
conn.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
// ignored
|
||||||
|
}
|
||||||
|
conn = null;
|
||||||
|
queue.wait();
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
processBatch = true;
|
processBatch = true;
|
||||||
}
|
}
|
||||||
@ -499,7 +511,7 @@ public class Consumer extends Thread {
|
|||||||
r.process(conn, batchHelper);
|
r.process(conn, batchHelper);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (currentRows.size() >= (processBatch ? 1 : (Config.forceToProcessAtLeast * 10))) {
|
if (currentRows.size() >= Math.max((processBatch ? 1 : (Config.forceToProcessAtLeast * 10)), 1)) {
|
||||||
batchHelper.processStatements(conn);
|
batchHelper.processStatements(conn);
|
||||||
conn.commit();
|
conn.commit();
|
||||||
currentRows.clear();
|
currentRows.clear();
|
||||||
|
Reference in New Issue
Block a user