Use platform scheduler to reuse threads between plugins

The platforms usually use a caching thread executor. It makes to use
this executor to reuse threads as they are expensive to create.
This commit is contained in:
games647
2022-02-22 16:07:11 +01:00
parent 1d3fcb9929
commit 9d2c346235
5 changed files with 21 additions and 32 deletions

View File

@ -28,7 +28,6 @@ package com.github.games647.fastlogin.bukkit;
import com.github.games647.fastlogin.core.AsyncScheduler;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import org.bukkit.Bukkit;
import org.bukkit.plugin.Plugin;
@ -38,8 +37,8 @@ public class BukkitScheduler extends AsyncScheduler {
private final Executor syncExecutor;
public BukkitScheduler(Plugin plugin, Logger logger, ThreadFactory threadFactory) {
super(logger, threadFactory);
public BukkitScheduler(Plugin plugin, Logger logger) {
super(logger, command -> Bukkit.getScheduler().runTaskAsynchronously(plugin, command));
syncExecutor = r -> Bukkit.getScheduler().runTask(plugin, r);
}

View File

@ -82,7 +82,7 @@ public class FastLoginBukkit extends JavaPlugin implements PlatformPlugin<Comman
public FastLoginBukkit() {
this.logger = CommonUtil.initializeLoggerService(getLogger());
this.scheduler = new BukkitScheduler(this, logger, getThreadFactory());
this.scheduler = new BukkitScheduler(this, logger);
}
@Override

View File

@ -82,7 +82,7 @@ public class FastLoginBungee extends Plugin implements PlatformPlugin<CommandSen
@Override
public void onEnable() {
logger = CommonUtil.initializeLoggerService(getLogger());
scheduler = new AsyncScheduler(logger, getThreadFactory());
scheduler = new AsyncScheduler(logger, task -> getProxy().getScheduler().runAsync(this, task));
core = new FastLoginCore<>(this);
core.load();

View File

@ -25,14 +25,9 @@
*/
package com.github.games647.fastlogin.core;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
@ -53,7 +48,9 @@ public class AsyncScheduler {
// 30 threads are still too many - the optimal solution is to separate into processing and blocking threads
// where processing threads could only be max number of cores while blocking threads could be minimized using
// non-blocking I/O and a single event executor
private final ExecutorService processingPool;
private final Executor processingPool;
private final AtomicInteger currentlyRunning = new AtomicInteger();
/*
private final ExecutorService databaseExecutor = new ThreadPoolExecutor(1, 10,
@ -61,34 +58,27 @@ public class AsyncScheduler {
new LinkedBlockingQueue<>(MAX_CAPACITY));
*/
public AsyncScheduler(Logger logger, ThreadFactory threadFactory) {
public AsyncScheduler(Logger logger, Executor processingPool) {
this.logger = logger;
processingPool = new ThreadPoolExecutor(6, 32,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(MAX_CAPACITY), threadFactory);
this.processingPool = processingPool;
}
/*
public <R> CompletableFuture<R> runDatabaseTask(Supplier<R> databaseTask) {
return CompletableFuture.supplyAsync(databaseTask, databaseExecutor)
.exceptionally(error -> {
logger.warn("Error occurred on thread pool", error);
return null;
})
// change context to the processing pool
.thenApplyAsync(r -> r, processingPool);
}
*/
public CompletableFuture<Void> runAsync(Runnable task) {
return CompletableFuture.runAsync(task, processingPool).exceptionally(error -> {
return CompletableFuture.runAsync(() -> {
currentlyRunning.incrementAndGet();
try {
task.run();
} finally {
currentlyRunning.getAndDecrement();
}
}, processingPool).exceptionally(error -> {
logger.warn("Error occurred on thread pool", error);
return null;
});
}
public void shutdown() {
MoreExecutors.shutdownAndAwaitTermination(processingPool, 1, TimeUnit.MINUTES);
// MoreExecutors.shutdownAndAwaitTermination(processingPool, 1, TimeUnit.MINUTES);
//MoreExecutors.shutdownAndAwaitTermination(databaseExecutor, 1, TimeUnit.MINUTES);
}
}

View File

@ -88,7 +88,7 @@ public class FastLoginVelocity implements PlatformPlugin<CommandSource> {
@Subscribe
public void onProxyInitialization(ProxyInitializeEvent event) {
scheduler = new AsyncScheduler(logger, getThreadFactory());
scheduler = new AsyncScheduler(logger, task -> server.getScheduler().buildTask(this, task).schedule());
core = new FastLoginCore<>(this);
core.load();
loadOrGenerateProxyId();