Improve recovering from broken connections

This commit is contained in:
Brokkonaut
2018-10-21 04:21:59 +02:00
parent 3e836c2f50
commit fde6927aeb

View File

@ -419,9 +419,12 @@ public class Consumer extends Thread {
ArrayList<Row> currentRows = new ArrayList<>(); ArrayList<Row> currentRows = new ArrayList<>();
Connection conn = null; Connection conn = null;
BatchHelper batchHelper = new BatchHelper(); BatchHelper batchHelper = new BatchHelper();
int lastCommitsFailed = 0;
while (true) { while (true) {
try { try {
if (conn == null) { if (conn == null) {
batchHelper.reset();
logblock.getLogger().info("[Consumer] Connecting to the database!");
conn = logblock.getConnection(); conn = logblock.getConnection();
if (conn != null) { if (conn != null) {
// initialize connection // initialize connection
@ -492,22 +495,27 @@ public class Consumer extends Thread {
currentRows.clear(); currentRows.clear();
playerIds.putAll(uncommitedPlayerIds); playerIds.putAll(uncommitedPlayerIds);
uncommitedPlayerIds.clear(); uncommitedPlayerIds.clear();
lastCommitsFailed = 0;
} }
} catch (Exception e) { } catch (Exception e) {
logblock.getLogger().log(Level.SEVERE, "[Consumer] Could not insert entries!", e); boolean retry = lastCommitsFailed < 2;
boolean retry = false; String state = "unknown";
if (e instanceof SQLException) { if (e instanceof SQLException) {
// Retry on network errors: SQLSTATE = 08S01 08001 08004 HY000 40001 // Retry on network errors: SQLSTATE = 08S01 08001 08004 HY000 40001
String state = ((SQLException) e).getSQLState(); state = ((SQLException) e).getSQLState();
retry = state != null && (state.equals("08S01") || state.equals("08001") || state.equals("08004") || state.equals("HY000") || state.equals("40001")); retry = retry || (state != null && (state.equals("08S01") || state.equals("08001") || state.equals("08004") || state.equals("HY000") || state.equals("40001")));
} }
lastCommitsFailed += 1;
if (retry) { if (retry) {
logblock.getLogger().log(Level.WARNING, "[Consumer] Database connection lost, reconnecting! SQLState: " + state);
// readd rows to the queue // readd rows to the queue
synchronized (queue) { synchronized (queue) {
while (!currentRows.isEmpty()) { while (!currentRows.isEmpty()) {
queue.addFirst(currentRows.remove(currentRows.size() - 1)); queue.addFirst(currentRows.remove(currentRows.size() - 1));
} }
} }
} else {
logblock.getLogger().log(Level.SEVERE, "[Consumer] Could not insert entries! SQLState: " + state, e);
} }
currentRows.clear(); currentRows.clear();
batchHelper.reset(); batchHelper.reset();