forked from LogBlock/LogBlock
Use "count" var to determine how many rows have been processed
This commit is contained in:
@@ -293,13 +293,14 @@ public class Consumer extends TimerTask
|
|||||||
getLogger().info("[Consumer] Queue overloaded. Size: " + getQueueSize());
|
getLogger().info("[Consumer] Queue overloaded. Size: " + getQueueSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (conn == null)
|
if (conn == null)
|
||||||
return;
|
return;
|
||||||
conn.setAutoCommit(false);
|
conn.setAutoCommit(false);
|
||||||
state = conn.createStatement();
|
state = conn.createStatement();
|
||||||
final long start = System.currentTimeMillis();
|
final long start = System.currentTimeMillis();
|
||||||
int count = 0;
|
|
||||||
process:
|
process:
|
||||||
while (!queue.isEmpty() && (System.currentTimeMillis() - start < timePerRun || count < forceToProcessAtLeast)) {
|
while (!queue.isEmpty() && (System.currentTimeMillis() - start < timePerRun || count < forceToProcessAtLeast)) {
|
||||||
final Row r = queue.poll();
|
final Row r = queue.poll();
|
||||||
@@ -371,10 +372,9 @@ public class Consumer extends TimerTask
|
|||||||
|
|
||||||
if (debug) {
|
if (debug) {
|
||||||
long timeElapsed = System.currentTimeMillis() - startTime;
|
long timeElapsed = System.currentTimeMillis() - startTime;
|
||||||
int rowsProcessed = startSize - queue.size();
|
float rowPerTime = count / timeElapsed;
|
||||||
float rowPerTime = rowsProcessed / timeElapsed;
|
|
||||||
getLogger().log(Level.INFO, "[Consumer] Finished consumer cycle in " + timeElapsed + " milliseconds.");
|
getLogger().log(Level.INFO, "[Consumer] Finished consumer cycle in " + timeElapsed + " milliseconds.");
|
||||||
getLogger().log(Level.INFO, "[Consumer] Total rows processed: " + rowsProcessed + ". row/time: " + String.format("%.4f", rowPerTime));
|
getLogger().log(Level.INFO, "[Consumer] Total rows processed: " + count + ". row/time: " + String.format("%.4f", rowPerTime));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user