From 31ef2d942b2162dd606bfd603f7af784aece3b3a Mon Sep 17 00:00:00 2001 From: Brokkonaut Date: Tue, 11 Dec 2018 05:11:13 +0100 Subject: [PATCH] Return idle connection from Consumer to avoid timeout Fixes #732 --- src/main/java/de/diddiz/LogBlock/Consumer.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/diddiz/LogBlock/Consumer.java b/src/main/java/de/diddiz/LogBlock/Consumer.java index d0f1b32..afa3fe8 100644 --- a/src/main/java/de/diddiz/LogBlock/Consumer.java +++ b/src/main/java/de/diddiz/LogBlock/Consumer.java @@ -56,6 +56,7 @@ import de.diddiz.util.Utils; public class Consumer extends Thread { private static final int MAX_SHUTDOWN_TIME_MILLIS = 20000; private static final int WAIT_FOR_CONNECTION_TIME_MILLIS = 10000; + private static final int RETURN_IDLE_CONNECTION_TIME_MILLIS = 120000; private static final int RETRIES_ON_UNKNOWN_CONNECTION_ERROR = 2; private final Deque queue = new ArrayDeque(); @@ -434,7 +435,6 @@ public class Consumer extends Thread { try { if (conn == null) { batchHelper.reset(); - logblock.getLogger().info("[Consumer] Connecting to the database!"); conn = logblock.getConnection(); if (conn != null) { // initialize connection @@ -476,7 +476,19 @@ public class Consumer extends Thread { if (r == null) { try { if (currentRows.isEmpty() && !shutdown) { - queue.wait(); // nothing to do for us + // nothing to do for us + // wait some time before closing the connection + queue.wait(RETURN_IDLE_CONNECTION_TIME_MILLIS); + // if there is still nothing to do, close the connection and go to sleep + if (queue.isEmpty() && !shutdown) { + try { + conn.close(); + } catch (Exception e) { + // ignored + } + conn = null; + queue.wait(); + } } else { processBatch = true; } @@ -499,7 +511,7 @@ public class Consumer extends Thread { r.process(conn, batchHelper); } } - if (currentRows.size() >= (processBatch ? 1 : (Config.forceToProcessAtLeast * 10))) { + if (currentRows.size() >= Math.max((processBatch ? 1 : (Config.forceToProcessAtLeast * 10)), 1)) { batchHelper.processStatements(conn); conn.commit(); currentRows.clear();