From 65469ed579ee104a86051665677a5f39f5deeb66 Mon Sep 17 00:00:00 2001 From: games647 Date: Fri, 20 Mar 2020 13:44:58 +0100 Subject: [PATCH] Make scheduler platform dependent --- .../fastlogin/bukkit/BukkitScheduler.java | 27 ++++++++++ .../fastlogin/bukkit/FastLoginBukkit.java | 16 +++++- .../bukkit/command/CrackedCommand.java | 4 +- .../bukkit/command/PremiumCommand.java | 4 +- .../protocollib/ProtocolLibListener.java | 4 +- .../fastlogin/bungee/FastLoginBungee.java | 12 ++++- .../bungee/listener/ConnectListener.java | 4 +- .../listener/PluginMessageListener.java | 6 +-- .../fastlogin/core/AsyncScheduler.java | 53 ++++++++++++++----- .../fastlogin/core/shared/FastLoginCore.java | 9 +--- .../fastlogin/core/shared/PlatformPlugin.java | 3 ++ 11 files changed, 107 insertions(+), 35 deletions(-) create mode 100644 bukkit/src/main/java/com/github/games647/fastlogin/bukkit/BukkitScheduler.java diff --git a/bukkit/src/main/java/com/github/games647/fastlogin/bukkit/BukkitScheduler.java b/bukkit/src/main/java/com/github/games647/fastlogin/bukkit/BukkitScheduler.java new file mode 100644 index 00000000..3f55e5b5 --- /dev/null +++ b/bukkit/src/main/java/com/github/games647/fastlogin/bukkit/BukkitScheduler.java @@ -0,0 +1,27 @@ +package com.github.games647.fastlogin.bukkit; + +import com.github.games647.fastlogin.core.AsyncScheduler; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; + +import org.bukkit.Bukkit; +import org.bukkit.plugin.Plugin; +import org.slf4j.Logger; + +public class BukkitScheduler extends AsyncScheduler { + + private final Plugin plugin; + private final Executor syncExecutor; + + public BukkitScheduler(Plugin plugin, Logger logger, ThreadFactory threadFactory) { + super(logger, threadFactory); + this.plugin = plugin; + + syncExecutor = r -> Bukkit.getScheduler().runTask(plugin, r); + } + + public Executor getSyncExecutor() { + return syncExecutor; + } +} diff --git a/bukkit/src/main/java/com/github/games647/fastlogin/bukkit/FastLoginBukkit.java b/bukkit/src/main/java/com/github/games647/fastlogin/bukkit/FastLoginBukkit.java index 16cea7b4..61106d89 100644 --- a/bukkit/src/main/java/com/github/games647/fastlogin/bukkit/FastLoginBukkit.java +++ b/bukkit/src/main/java/com/github/games647/fastlogin/bukkit/FastLoginBukkit.java @@ -41,17 +41,24 @@ public class FastLoginBukkit extends JavaPlugin implements PlatformPlugin loginSession = CommonUtil.buildCache(1, -1); - private final Logger logger = CommonUtil.createLoggerFromJDK(getLogger()); private final Map premiumPlayers = new ConcurrentHashMap<>(); + private final Logger logger; private boolean serverStarted; private boolean bungeeCord; + private final BukkitScheduler scheduler; private FastLoginCore core; + public FastLoginBukkit() { + this.logger = CommonUtil.createLoggerFromJDK(getLogger()); + this.scheduler = new BukkitScheduler(this, logger, getThreadFactory()); + } + @Override public void onEnable() { core = new FastLoginCore<>(this); core.load(); + try { bungeeCord = Class.forName("org.spigotmc.SpigotConfig").getDeclaredField("bungee").getBoolean(null); } catch (ClassNotFoundException notFoundEx) { @@ -72,7 +79,7 @@ public class FastLoginBukkit extends JavaPlugin implements PlatformPlugin { + plugin.getScheduler().runAsync(() -> { plugin.getCore().getStorage().save(profile); }); } else { @@ -75,7 +75,7 @@ public class CrackedCommand extends ToggleCommand { plugin.getCore().sendLocaleMessage("remove-premium", sender); profile.setPremium(false); - plugin.getCore().getAsyncScheduler().runAsync(() -> { + plugin.getScheduler().runAsync(() -> { plugin.getCore().getStorage().save(profile); }); } diff --git a/bukkit/src/main/java/com/github/games647/fastlogin/bukkit/command/PremiumCommand.java b/bukkit/src/main/java/com/github/games647/fastlogin/bukkit/command/PremiumCommand.java index 190848e8..28974bae 100644 --- a/bukkit/src/main/java/com/github/games647/fastlogin/bukkit/command/PremiumCommand.java +++ b/bukkit/src/main/java/com/github/games647/fastlogin/bukkit/command/PremiumCommand.java @@ -55,7 +55,7 @@ public class PremiumCommand extends ToggleCommand { } else { //todo: resolve uuid profile.setPremium(true); - plugin.getCore().getAsyncScheduler().runAsync(() -> { + plugin.getScheduler().runAsync(() -> { plugin.getCore().getStorage().save(profile); }); @@ -84,7 +84,7 @@ public class PremiumCommand extends ToggleCommand { } else { //todo: resolve uuid profile.setPremium(true); - plugin.getCore().getAsyncScheduler().runAsync(() -> { + plugin.getScheduler().runAsync(() -> { plugin.getCore().getStorage().save(profile); }); diff --git a/bukkit/src/main/java/com/github/games647/fastlogin/bukkit/listener/protocollib/ProtocolLibListener.java b/bukkit/src/main/java/com/github/games647/fastlogin/bukkit/listener/protocollib/ProtocolLibListener.java index e8525e6d..42d1963c 100644 --- a/bukkit/src/main/java/com/github/games647/fastlogin/bukkit/listener/protocollib/ProtocolLibListener.java +++ b/bukkit/src/main/java/com/github/games647/fastlogin/bukkit/listener/protocollib/ProtocolLibListener.java @@ -66,7 +66,7 @@ public class ProtocolLibListener extends PacketAdapter { packetEvent.getAsyncMarker().incrementProcessingDelay(); Runnable verifyTask = new VerifyResponseTask(plugin, packetEvent, sender, sharedSecret, keyPair); - plugin.getCore().getAsyncScheduler().runAsync(verifyTask); + plugin.getScheduler().runAsync(verifyTask); } private void onLogin(PacketEvent packetEvent, Player player) { @@ -84,6 +84,6 @@ public class ProtocolLibListener extends PacketAdapter { packetEvent.getAsyncMarker().incrementProcessingDelay(); Runnable nameCheckTask = new NameCheckTask(plugin, packetEvent, random, player, username, keyPair.getPublic()); - plugin.getCore().getAsyncScheduler().runAsync(nameCheckTask); + plugin.getScheduler().runAsync(nameCheckTask); } } diff --git a/bungee/src/main/java/com/github/games647/fastlogin/bungee/FastLoginBungee.java b/bungee/src/main/java/com/github/games647/fastlogin/bungee/FastLoginBungee.java index 2d0874d4..3f753241 100644 --- a/bungee/src/main/java/com/github/games647/fastlogin/bungee/FastLoginBungee.java +++ b/bungee/src/main/java/com/github/games647/fastlogin/bungee/FastLoginBungee.java @@ -3,6 +3,7 @@ package com.github.games647.fastlogin.bungee; import com.github.games647.fastlogin.bungee.hook.BungeeAuthHook; import com.github.games647.fastlogin.bungee.listener.ConnectListener; import com.github.games647.fastlogin.bungee.listener.PluginMessageListener; +import com.github.games647.fastlogin.core.AsyncScheduler; import com.github.games647.fastlogin.core.CommonUtil; import com.github.games647.fastlogin.core.message.ChangePremiumMessage; import com.github.games647.fastlogin.core.message.ChannelMessage; @@ -37,11 +38,13 @@ public class FastLoginBungee extends Plugin implements PlatformPlugin session = new MapMaker().weakKeys().makeMap(); private FastLoginCore core; + private AsyncScheduler scheduler; private Logger logger; @Override public void onEnable() { logger = CommonUtil.createLoggerFromJDK(getLogger()); + scheduler = new AsyncScheduler(logger, getThreadFactory()); core = new FastLoginCore<>(this); core.load(); @@ -54,8 +57,8 @@ public class FastLoginBungee extends Plugin implements PlatformPlugin readMessage(forPlayer, channel, data)); + plugin.getScheduler().runAsync(() -> readMessage(forPlayer, channel, data)); } private void readMessage(ProxiedPlayer forPlayer, String channel, byte[] data) { @@ -81,10 +81,10 @@ public class PluginMessageListener implements Listener { core.getPendingConfirms().remove(forPlayer.getUniqueId()); Runnable task = new AsyncToggleMessage(core, forPlayer, playerName, true, isSourceInvoker); - plugin.getCore().getAsyncScheduler().runAsync(task); + plugin.getScheduler().runAsync(task); } else { Runnable task = new AsyncToggleMessage(core, forPlayer, playerName, false, isSourceInvoker); - plugin.getCore().getAsyncScheduler().runAsync(task); + plugin.getScheduler().runAsync(task); } } } diff --git a/core/src/main/java/com/github/games647/fastlogin/core/AsyncScheduler.java b/core/src/main/java/com/github/games647/fastlogin/core/AsyncScheduler.java index 86081299..9bec2369 100644 --- a/core/src/main/java/com/github/games647/fastlogin/core/AsyncScheduler.java +++ b/core/src/main/java/com/github/games647/fastlogin/core/AsyncScheduler.java @@ -1,10 +1,16 @@ package com.github.games647.fastlogin.core; +import com.google.common.util.concurrent.MoreExecutors; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; + /** * This limits the number of threads that are used at maximum. Thread creation can be very heavy for the CPU and * context switching between threads too. However we need many threads for blocking HTTP and database calls. @@ -14,27 +20,50 @@ import java.util.concurrent.TimeUnit; */ public class AsyncScheduler { + private static final int MAX_CAPACITY = 1024; + + //todo: single thread for delaying and scheduling tasks + private final Logger logger; + // 30 threads are still too many - the optimal solution is to separate into processing and blocking threads // where processing threads could only be max number of cores while blocking threads could be minimized using // non-blocking I/O and a single event executor - private final ThreadPoolExecutor executorService; + private final ExecutorService processingPool; - public AsyncScheduler(ThreadFactory threadFactory) { - executorService = new ThreadPoolExecutor(5, 30, + /* + private final ExecutorService databaseExecutor = new ThreadPoolExecutor(1, 10, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(MAX_CAPACITY)); + */ + + public AsyncScheduler(Logger logger, ThreadFactory threadFactory) { + this.logger = logger; + processingPool = new ThreadPoolExecutor(6, 32, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024), threadFactory); + new LinkedBlockingQueue<>(MAX_CAPACITY), threadFactory); } - public void runAsync(Runnable task) { - executorService.execute(task); + /* + public CompletableFuture runDatabaseTask(Supplier databaseTask) { + return CompletableFuture.supplyAsync(databaseTask, databaseExecutor) + .exceptionally(error -> { + logger.warn("Error occurred on thread pool", error); + return null; + }) + // change context to the processing pool + .thenApplyAsync(r -> r, processingPool); + } + */ + + public CompletableFuture runAsync(Runnable task) { + return CompletableFuture.runAsync(task, processingPool).exceptionally(error -> { + logger.warn("Error occurred on thread pool", error); + return null; + }); } public void shutdown() { - executorService.shutdown(); - try { - executorService.awaitTermination(1, TimeUnit.MINUTES); - } catch (InterruptedException interruptEx) { - Thread.currentThread().interrupt(); - } + MoreExecutors.shutdownAndAwaitTermination(processingPool, 1, TimeUnit.MINUTES); + //MoreExecutors.shutdownAndAwaitTermination(databaseExecutor, 1, TimeUnit.MINUTES); } } diff --git a/core/src/main/java/com/github/games647/fastlogin/core/shared/FastLoginCore.java b/core/src/main/java/com/github/games647/fastlogin/core/shared/FastLoginCore.java index eefecc9e..7962ec52 100644 --- a/core/src/main/java/com/github/games647/fastlogin/core/shared/FastLoginCore.java +++ b/core/src/main/java/com/github/games647/fastlogin/core/shared/FastLoginCore.java @@ -4,7 +4,6 @@ import com.github.games647.craftapi.resolver.MojangResolver; import com.github.games647.craftapi.resolver.http.RotatingProxySelector; import com.github.games647.fastlogin.core.AuthStorage; import com.github.games647.fastlogin.core.CommonUtil; -import com.github.games647.fastlogin.core.AsyncScheduler; import com.github.games647.fastlogin.core.hooks.AuthPlugin; import com.github.games647.fastlogin.core.hooks.DefaultPasswordGenerator; import com.github.games647.fastlogin.core.hooks.PasswordGenerator; @@ -52,7 +51,6 @@ public class FastLoginCore

> { private final T plugin; private final MojangResolver resolver = new MojangResolver(); - private final AsyncScheduler scheduler; private Configuration config; private AuthStorage storage; @@ -61,7 +59,6 @@ public class FastLoginCore

> { public FastLoginCore(T plugin) { this.plugin = plugin; - this.scheduler = new AsyncScheduler(plugin.getThreadFactory()); } public void load() { @@ -242,13 +239,9 @@ public class FastLoginCore

> { } } - public AsyncScheduler getAsyncScheduler() { - return scheduler; - } - public void close() { plugin.getLog().info("Safely shutting down scheduler. This could take up to one minute."); - scheduler.shutdown(); + plugin.getScheduler().shutdown(); if (storage != null) { storage.close(); diff --git a/core/src/main/java/com/github/games647/fastlogin/core/shared/PlatformPlugin.java b/core/src/main/java/com/github/games647/fastlogin/core/shared/PlatformPlugin.java index 9ecbccfc..c98f3964 100644 --- a/core/src/main/java/com/github/games647/fastlogin/core/shared/PlatformPlugin.java +++ b/core/src/main/java/com/github/games647/fastlogin/core/shared/PlatformPlugin.java @@ -1,5 +1,6 @@ package com.github.games647.fastlogin.core.shared; +import com.github.games647.fastlogin.core.AsyncScheduler; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.nio.file.Path; @@ -17,6 +18,8 @@ public interface PlatformPlugin { void sendMessage(C receiver, String message); + AsyncScheduler getScheduler(); + default void sendMultiLineMessage(C receiver, String message) { for (String line : message.split("%nl%")) { sendMessage(receiver, line);