Run delayed velocity tasks under our scheduler too

This commit is contained in:
games647
2024-05-07 20:31:55 +02:00
parent 7b6d2062cf
commit bb78043d64
9 changed files with 45 additions and 61 deletions

View File

@ -25,7 +25,7 @@
*/ */
package com.github.games647.fastlogin.bukkit; package com.github.games647.fastlogin.bukkit;
import com.github.games647.fastlogin.core.AsyncScheduler; import com.github.games647.fastlogin.core.scheduler.AsyncScheduler;
import org.bukkit.Bukkit; import org.bukkit.Bukkit;
import org.bukkit.plugin.Plugin; import org.bukkit.plugin.Plugin;
import org.slf4j.Logger; import org.slf4j.Logger;

View File

@ -28,7 +28,6 @@ package com.github.games647.fastlogin.bungee;
import com.github.games647.fastlogin.bungee.hook.BungeeAuthHook; import com.github.games647.fastlogin.bungee.hook.BungeeAuthHook;
import com.github.games647.fastlogin.bungee.listener.ConnectListener; import com.github.games647.fastlogin.bungee.listener.ConnectListener;
import com.github.games647.fastlogin.bungee.listener.PluginMessageListener; import com.github.games647.fastlogin.bungee.listener.PluginMessageListener;
import com.github.games647.fastlogin.core.AsyncScheduler;
import com.github.games647.fastlogin.core.CommonUtil; import com.github.games647.fastlogin.core.CommonUtil;
import com.github.games647.fastlogin.core.hooks.AuthPlugin; import com.github.games647.fastlogin.core.hooks.AuthPlugin;
import com.github.games647.fastlogin.core.hooks.bedrock.BedrockService; import com.github.games647.fastlogin.core.hooks.bedrock.BedrockService;
@ -38,6 +37,7 @@ import com.github.games647.fastlogin.core.message.ChangePremiumMessage;
import com.github.games647.fastlogin.core.message.ChannelMessage; import com.github.games647.fastlogin.core.message.ChannelMessage;
import com.github.games647.fastlogin.core.message.NamespaceKey; import com.github.games647.fastlogin.core.message.NamespaceKey;
import com.github.games647.fastlogin.core.message.SuccessMessage; import com.github.games647.fastlogin.core.message.SuccessMessage;
import com.github.games647.fastlogin.core.scheduler.AsyncScheduler;
import com.github.games647.fastlogin.core.shared.FastLoginCore; import com.github.games647.fastlogin.core.shared.FastLoginCore;
import com.github.games647.fastlogin.core.shared.PlatformPlugin; import com.github.games647.fastlogin.core.shared.PlatformPlugin;
import com.google.common.collect.MapMaker; import com.google.common.collect.MapMaker;

View File

@ -23,47 +23,31 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE. * SOFTWARE.
*/ */
package com.github.games647.fastlogin.core; package com.github.games647.fastlogin.core.scheduler;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.time.Duration;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** public abstract class AbstractAsyncScheduler {
* This limits the number of threads that are used at maximum. Thread creation can be very heavy for the CPU and
* context switching between threads too. However, we need many threads for blocking HTTP and database calls.
* Nevertheless, this number can be further limited, because the number of actually working database threads
* is limited by the size of our database pool. The goal is to separate concerns into processing and blocking only
* threads.
*/
public class AsyncScheduler {
private final Logger logger; protected final Logger logger;
protected final Executor processingPool;
protected final AtomicInteger currentlyRunning = new AtomicInteger();
private final Executor asyncPool; public AbstractAsyncScheduler(Logger logger, Executor processingPool) {
private final AtomicInteger currentlyRunning = new AtomicInteger();
public AsyncScheduler(Logger logger, Executor processingPool) {
this.logger = logger; this.logger = logger;
this.processingPool = processingPool;
logger.info("Using optimized green threads with Java 21");
this.asyncPool = Executors.newVirtualThreadPerTaskExecutor();
} }
public CompletableFuture<Void> runAsync(Runnable task) { public abstract CompletableFuture<Void> runAsync(Runnable task);
return CompletableFuture
.runAsync(() -> process(task), asyncPool)
.exceptionally(error -> {
logger.warn("Error occurred on thread pool", error);
return null;
});
}
private void process(Runnable task) { public abstract CompletableFuture<Void> runAsyncDelayed(Runnable task, Duration delay);
protected void process(Runnable task) {
currentlyRunning.incrementAndGet(); currentlyRunning.incrementAndGet();
try { try {
task.run(); task.run();

View File

@ -23,13 +23,13 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE. * SOFTWARE.
*/ */
package com.github.games647.fastlogin.core; package com.github.games647.fastlogin.core.scheduler;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.time.Duration;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* This limits the number of threads that are used at maximum. Thread creation can be very heavy for the CPU and * This limits the number of threads that are used at maximum. Thread creation can be very heavy for the CPU and
@ -38,21 +38,14 @@ import java.util.concurrent.atomic.AtomicInteger;
* is limited by the size of our database pool. The goal is to separate concerns into processing and blocking only * is limited by the size of our database pool. The goal is to separate concerns into processing and blocking only
* threads. * threads.
*/ */
public class AsyncScheduler { public class AsyncScheduler extends AbstractAsyncScheduler {
private final Logger logger;
private final Executor processingPool;
private final AtomicInteger currentlyRunning = new AtomicInteger();
public AsyncScheduler(Logger logger, Executor processingPool) { public AsyncScheduler(Logger logger, Executor processingPool) {
this.logger = logger; super(logger, processingPool);
logger.info("Using legacy scheduler"); logger.info("Using legacy scheduler");
this.processingPool = processingPool;
} }
@Override
public CompletableFuture<Void> runAsync(Runnable task) { public CompletableFuture<Void> runAsync(Runnable task) {
return CompletableFuture.runAsync(() -> process(task), processingPool).exceptionally(error -> { return CompletableFuture.runAsync(() -> process(task), processingPool).exceptionally(error -> {
logger.warn("Error occurred on thread pool", error); logger.warn("Error occurred on thread pool", error);
@ -60,12 +53,22 @@ public class AsyncScheduler {
}); });
} }
private void process(Runnable task) { @Override
currentlyRunning.incrementAndGet(); public CompletableFuture<Void> runAsyncDelayed(Runnable task, Duration delay) {
try { return CompletableFuture.runAsync(() -> {
task.run(); currentlyRunning.incrementAndGet();
} finally { try {
currentlyRunning.getAndDecrement(); Thread.sleep(delay.toMillis());
} process(task);
} catch (InterruptedException interruptedException) {
// restore interrupt flag
Thread.currentThread().interrupt();
} finally {
currentlyRunning.getAndDecrement();
}
}, processingPool).exceptionally(error -> {
logger.warn("Error occurred on thread pool", error);
return null;
});
} }
} }

View File

@ -25,8 +25,8 @@
*/ */
package com.github.games647.fastlogin.core.shared; package com.github.games647.fastlogin.core.shared;
import com.github.games647.fastlogin.core.AsyncScheduler;
import com.github.games647.fastlogin.core.hooks.bedrock.BedrockService; import com.github.games647.fastlogin.core.hooks.bedrock.BedrockService;
import com.github.games647.fastlogin.core.scheduler.AsyncScheduler;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;

View File

@ -25,6 +25,7 @@
*/ */
package com.github.games647.fastlogin.core; package com.github.games647.fastlogin.core;
import com.github.games647.fastlogin.core.scheduler.AsyncScheduler;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledForJreRange; import org.junit.jupiter.api.condition.DisabledForJreRange;

View File

@ -25,13 +25,13 @@
*/ */
package com.github.games647.fastlogin.velocity; package com.github.games647.fastlogin.velocity;
import com.github.games647.fastlogin.core.AsyncScheduler;
import com.github.games647.fastlogin.core.hooks.bedrock.BedrockService; import com.github.games647.fastlogin.core.hooks.bedrock.BedrockService;
import com.github.games647.fastlogin.core.hooks.bedrock.FloodgateService; import com.github.games647.fastlogin.core.hooks.bedrock.FloodgateService;
import com.github.games647.fastlogin.core.hooks.bedrock.GeyserService; import com.github.games647.fastlogin.core.hooks.bedrock.GeyserService;
import com.github.games647.fastlogin.core.message.ChangePremiumMessage; import com.github.games647.fastlogin.core.message.ChangePremiumMessage;
import com.github.games647.fastlogin.core.message.ChannelMessage; import com.github.games647.fastlogin.core.message.ChannelMessage;
import com.github.games647.fastlogin.core.message.SuccessMessage; import com.github.games647.fastlogin.core.message.SuccessMessage;
import com.github.games647.fastlogin.core.scheduler.AsyncScheduler;
import com.github.games647.fastlogin.core.shared.FastLoginCore; import com.github.games647.fastlogin.core.shared.FastLoginCore;
import com.github.games647.fastlogin.core.shared.PlatformPlugin; import com.github.games647.fastlogin.core.shared.PlatformPlugin;
import com.github.games647.fastlogin.velocity.listener.ConnectListener; import com.github.games647.fastlogin.velocity.listener.ConnectListener;

View File

@ -53,11 +53,11 @@ import net.kyori.adventure.text.serializer.legacy.LegacyComponentSerializer;
import org.geysermc.floodgate.api.player.FloodgatePlayer; import org.geysermc.floodgate.api.player.FloodgatePlayer;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class ConnectListener { public class ConnectListener {
@ -167,10 +167,9 @@ public class ConnectListener {
// In this case it means that the force command (plugin message) is already received and processed while // In this case it means that the force command (plugin message) is already received and processed while
// player is still in the login phase and reported to be offline. // player is still in the login phase and reported to be offline.
Runnable loginTask = new ForceLoginTask(plugin.getCore(), player, server, session); Runnable loginTask = new ForceLoginTask(plugin.getCore(), player, server, session);
plugin.getProxy().getScheduler()
.buildTask(plugin, loginTask) // Delay at least one second, otherwise the login command can be missed
.delay(1L, TimeUnit.SECONDS) // Delay at least one second, otherwise the login command can be missed plugin.getScheduler().runAsyncDelayed(loginTask, Duration.ofSeconds(1));
.schedule();
} }
@Subscribe @Subscribe

View File

@ -35,8 +35,8 @@ import com.velocitypowered.api.proxy.server.RegisteredServer;
import org.geysermc.floodgate.api.player.FloodgatePlayer; import org.geysermc.floodgate.api.player.FloodgatePlayer;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class FloodgateAuthTask public class FloodgateAuthTask
extends FloodgateManagement<Player, CommandSource, VelocityLoginSession, FastLoginVelocity> { extends FloodgateManagement<Player, CommandSource, VelocityLoginSession, FastLoginVelocity> {
@ -62,10 +62,7 @@ public class FloodgateAuthTask
// In this case it means that the force command (plugin message) is already received and processed while // In this case it means that the force command (plugin message) is already received and processed while
// player is still in the login phase and reported to be offline. // player is still in the login phase and reported to be offline.
Runnable loginTask = new ForceLoginTask(core.getPlugin().getCore(), player, server, session, forcedOnlineMode); Runnable loginTask = new ForceLoginTask(core.getPlugin().getCore(), player, server, session, forcedOnlineMode);
core.getPlugin().getProxy().getScheduler() core.getPlugin().getScheduler().runAsyncDelayed(loginTask, Duration.ofSeconds(1));
.buildTask(core.getPlugin(), () -> core.getPlugin().getScheduler().runAsync(loginTask))
.delay(1L, TimeUnit.SECONDS) // Delay at least one second, otherwise the login command can be missed
.schedule();
} }
@Override @Override