Close threads at server shutdown (#4029)

Co-authored-by: Peter Svensson <petersv@psv.nu>
Co-authored-by: Konicai <71294714+Konicai@users.noreply.github.com>
This commit is contained in:
petersv5 2023-10-02 00:15:44 +02:00 committed by GitHub
parent be5b437e5f
commit 7d489c7354
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 57 additions and 17 deletions

View file

@ -47,6 +47,10 @@ public class GeyserServerInitializer extends BedrockServerInitializer {
this.geyser = geyser; this.geyser = geyser;
} }
public DefaultEventLoopGroup getEventLoopGroup() {
return eventLoopGroup;
}
@Override @Override
public void initSession(@Nonnull BedrockServerSession bedrockServerSession) { public void initSession(@Nonnull BedrockServerSession bedrockServerSession) {
try { try {
@ -72,4 +76,4 @@ public class GeyserServerInitializer extends BedrockServerInitializer {
protected BedrockPeer createPeer(Channel channel) { protected BedrockPeer createPeer(Channel channel) {
return new GeyserBedrockPeer(channel, this::createSession); return new GeyserBedrockPeer(channel, this::createSession);
} }
} }

View file

@ -39,6 +39,7 @@ import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.concurrent.Future;
import lombok.Getter; import lombok.Getter;
import net.jodah.expiringmap.ExpirationPolicy; import net.jodah.expiringmap.ExpirationPolicy;
import net.jodah.expiringmap.ExpiringMap; import net.jodah.expiringmap.ExpiringMap;
@ -58,6 +59,7 @@ import org.geysermc.geyser.network.netty.handler.RakPingHandler;
import org.geysermc.geyser.network.netty.proxy.ProxyServerHandler; import org.geysermc.geyser.network.netty.proxy.ProxyServerHandler;
import org.geysermc.geyser.ping.GeyserPingInfo; import org.geysermc.geyser.ping.GeyserPingInfo;
import org.geysermc.geyser.ping.IGeyserPingPassthrough; import org.geysermc.geyser.ping.IGeyserPingPassthrough;
import org.geysermc.geyser.skin.SkinProvider;
import org.geysermc.geyser.text.GeyserLocale; import org.geysermc.geyser.text.GeyserLocale;
import org.geysermc.geyser.translator.text.MessageTranslator; import org.geysermc.geyser.translator.text.MessageTranslator;
@ -83,14 +85,21 @@ public final class GeyserServer {
private static final Transport TRANSPORT = compatibleTransport(); private static final Transport TRANSPORT = compatibleTransport();
/**
* See {@link EventLoopGroup#shutdownGracefully(long, long, TimeUnit)}
*/
private static final int SHUTDOWN_QUIET_PERIOD_MS = 100;
private static final int SHUTDOWN_TIMEOUT_MS = 500;
private final GeyserImpl geyser; private final GeyserImpl geyser;
private final EventLoopGroup group; private EventLoopGroup group;
private final ServerBootstrap bootstrap; private final ServerBootstrap bootstrap;
private EventLoopGroup playerGroup;
@Getter @Getter
private final ExpiringMap<InetSocketAddress, InetSocketAddress> proxiedAddresses; private final ExpiringMap<InetSocketAddress, InetSocketAddress> proxiedAddresses;
private ChannelFuture future; private ChannelFuture bootstrapFuture;
public GeyserServer(GeyserImpl geyser, int threadCount) { public GeyserServer(GeyserImpl geyser, int threadCount) {
this.geyser = geyser; this.geyser = geyser;
@ -109,7 +118,7 @@ public final class GeyserServer {
public CompletableFuture<Void> bind(InetSocketAddress address) { public CompletableFuture<Void> bind(InetSocketAddress address) {
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
this.future = this.bootstrap.bind(address).addListener(bindResult -> { this.bootstrapFuture = this.bootstrap.bind(address).addListener(bindResult -> {
if (bindResult.cause() != null) { if (bindResult.cause() != null) {
future.completeExceptionally(bindResult.cause()); future.completeExceptionally(bindResult.cause());
return; return;
@ -117,7 +126,7 @@ public final class GeyserServer {
future.complete(null); future.complete(null);
}); });
Channel channel = this.future.channel(); Channel channel = this.bootstrapFuture.channel();
// Add our ping handler // Add our ping handler
channel.pipeline() channel.pipeline()
@ -132,8 +141,19 @@ public final class GeyserServer {
} }
public void shutdown() { public void shutdown() {
this.group.shutdownGracefully(); try {
this.future.channel().closeFuture().syncUninterruptibly(); Future<?> future1 = this.group.shutdownGracefully(SHUTDOWN_QUIET_PERIOD_MS, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
this.group = null;
Future<?> future2 = this.playerGroup.shutdownGracefully(SHUTDOWN_QUIET_PERIOD_MS, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
this.playerGroup = null;
future1.sync();
future2.sync();
SkinProvider.shutdown();
} catch (InterruptedException e) {
GeyserImpl.getInstance().getLogger().severe("Exception in shutdown process", e);
}
this.bootstrapFuture.channel().closeFuture().syncUninterruptibly();
} }
private ServerBootstrap createBootstrap(EventLoopGroup group) { private ServerBootstrap createBootstrap(EventLoopGroup group) {
@ -149,11 +169,13 @@ public final class GeyserServer {
} }
} }
GeyserServerInitializer serverInitializer = new GeyserServerInitializer(this.geyser);
playerGroup = serverInitializer.getEventLoopGroup();
return new ServerBootstrap() return new ServerBootstrap()
.channelFactory(RakChannelFactory.server(TRANSPORT.datagramChannel())) .channelFactory(RakChannelFactory.server(TRANSPORT.datagramChannel()))
.group(group) .group(group)
.option(RakChannelOption.RAK_HANDLE_PING, true) .option(RakChannelOption.RAK_HANDLE_PING, true)
.childHandler(new GeyserServerInitializer(this.geyser)); .childHandler(serverInitializer);
} }
public boolean onConnectionRequest(InetSocketAddress inetSocketAddress) { public boolean onConnectionRequest(InetSocketAddress inetSocketAddress) {
@ -217,7 +239,7 @@ public final class GeyserServer {
.version(GameProtocol.DEFAULT_BEDROCK_CODEC.getMinecraftVersion()) // Required to not be empty as of 1.16.210.59. Can only contain . and numbers. .version(GameProtocol.DEFAULT_BEDROCK_CODEC.getMinecraftVersion()) // Required to not be empty as of 1.16.210.59. Can only contain . and numbers.
.ipv4Port(this.geyser.getConfig().getBedrock().port()) .ipv4Port(this.geyser.getConfig().getBedrock().port())
.ipv6Port(this.geyser.getConfig().getBedrock().port()) .ipv6Port(this.geyser.getConfig().getBedrock().port())
.serverId(future.channel().config().getOption(RakChannelOption.RAK_GUID)); .serverId(bootstrapFuture.channel().config().getOption(RakChannelOption.RAK_GUID));
if (config.isPassthroughMotd() && pingInfo != null && pingInfo.getDescription() != null) { if (config.isPassthroughMotd() && pingInfo != null && pingInfo.getDescription() != null) {
String[] motd = MessageTranslator.convertMessageLenient(pingInfo.getDescription()).split("\n"); String[] motd = MessageTranslator.convertMessageLenient(pingInfo.getDescription()).split("\n");

View file

@ -58,7 +58,7 @@ import java.util.function.Predicate;
public class SkinProvider { public class SkinProvider {
private static final boolean ALLOW_THIRD_PARTY_CAPES = GeyserImpl.getInstance().getConfig().isAllowThirdPartyCapes(); private static final boolean ALLOW_THIRD_PARTY_CAPES = GeyserImpl.getInstance().getConfig().isAllowThirdPartyCapes();
static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(ALLOW_THIRD_PARTY_CAPES ? 21 : 14); static ExecutorService EXECUTOR_SERVICE;
static final Skin EMPTY_SKIN; static final Skin EMPTY_SKIN;
static final Cape EMPTY_CAPE = new Cape("", "no-cape", ByteArrays.EMPTY_ARRAY, -1, true); static final Cape EMPTY_CAPE = new Cape("", "no-cape", ByteArrays.EMPTY_ARRAY, -1, true);
@ -133,6 +133,20 @@ public class SkinProvider {
WEARING_CUSTOM_SKULL_SLIM = new SkinGeometry("{\"geometry\" :{\"default\" :\"geometry.humanoid.wearingCustomSkullSlim\"}}", wearingCustomSkullSlim, false); WEARING_CUSTOM_SKULL_SLIM = new SkinGeometry("{\"geometry\" :{\"default\" :\"geometry.humanoid.wearingCustomSkullSlim\"}}", wearingCustomSkullSlim, false);
} }
private static ExecutorService getExecutorService() {
if (EXECUTOR_SERVICE == null) {
EXECUTOR_SERVICE = Executors.newFixedThreadPool(ALLOW_THIRD_PARTY_CAPES ? 21 : 14);
}
return EXECUTOR_SERVICE;
}
public static void shutdown() {
if (EXECUTOR_SERVICE != null) {
EXECUTOR_SERVICE.shutdown();
EXECUTOR_SERVICE = null;
}
}
public static void registerCacheImageTask(GeyserImpl geyser) { public static void registerCacheImageTask(GeyserImpl geyser) {
// Schedule Daily Image Expiry if we are caching them // Schedule Daily Image Expiry if we are caching them
if (geyser.getConfig().getCacheImages() > 0) { if (geyser.getConfig().getCacheImages() > 0) {
@ -302,7 +316,7 @@ public class SkinProvider {
GeyserImpl.getInstance().getLogger().debug("Took " + (System.currentTimeMillis() - time) + "ms for " + playerId); GeyserImpl.getInstance().getLogger().debug("Took " + (System.currentTimeMillis() - time) + "ms for " + playerId);
return skinAndCape; return skinAndCape;
}, EXECUTOR_SERVICE); }, getExecutorService());
} }
static CompletableFuture<Skin> requestSkin(UUID playerId, String textureUrl, boolean newThread) { static CompletableFuture<Skin> requestSkin(UUID playerId, String textureUrl, boolean newThread) {
@ -320,7 +334,7 @@ public class SkinProvider {
CompletableFuture<Skin> future; CompletableFuture<Skin> future;
if (newThread) { if (newThread) {
future = CompletableFuture.supplyAsync(() -> supplySkin(playerId, textureUrl), EXECUTOR_SERVICE) future = CompletableFuture.supplyAsync(() -> supplySkin(playerId, textureUrl), getExecutorService())
.whenCompleteAsync((skin, throwable) -> { .whenCompleteAsync((skin, throwable) -> {
skin.updated = true; skin.updated = true;
CACHED_JAVA_SKINS.put(textureUrl, skin); CACHED_JAVA_SKINS.put(textureUrl, skin);
@ -349,7 +363,7 @@ public class SkinProvider {
CompletableFuture<Cape> future; CompletableFuture<Cape> future;
if (newThread) { if (newThread) {
future = CompletableFuture.supplyAsync(() -> supplyCape(capeUrl, provider), EXECUTOR_SERVICE) future = CompletableFuture.supplyAsync(() -> supplyCape(capeUrl, provider), getExecutorService())
.whenCompleteAsync((cape, throwable) -> { .whenCompleteAsync((cape, throwable) -> {
CACHED_JAVA_CAPES.put(capeUrl, cape); CACHED_JAVA_CAPES.put(capeUrl, cape);
requestedCapes.remove(capeUrl); requestedCapes.remove(capeUrl);
@ -388,7 +402,7 @@ public class SkinProvider {
CompletableFuture<Skin> future; CompletableFuture<Skin> future;
if (newThread) { if (newThread) {
future = CompletableFuture.supplyAsync(() -> supplyEars(skin, earsUrl), EXECUTOR_SERVICE) future = CompletableFuture.supplyAsync(() -> supplyEars(skin, earsUrl), getExecutorService())
.whenCompleteAsync((outSkin, throwable) -> { }); .whenCompleteAsync((outSkin, throwable) -> { });
} else { } else {
Skin ears = supplyEars(skin, earsUrl); // blocking Skin ears = supplyEars(skin, earsUrl); // blocking
@ -620,7 +634,7 @@ public class SkinProvider {
} }
return null; return null;
} }
}, EXECUTOR_SERVICE); }, getExecutorService());
} }
/** /**
@ -646,7 +660,7 @@ public class SkinProvider {
} }
return null; return null;
} }
}, EXECUTOR_SERVICE).thenCompose(uuid -> { }, getExecutorService()).thenCompose(uuid -> {
if (uuid == null) { if (uuid == null) {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }

View file

@ -14,7 +14,7 @@ protocol-connection = "3.0.0.Beta1-20230908.171156-105"
raknet = "1.0.0.CR1-20230703.195238-9" raknet = "1.0.0.CR1-20230703.195238-9"
blockstateupdater="1.20.30-20230918.203831-4" blockstateupdater="1.20.30-20230918.203831-4"
mcauthlib = "d9d773e" mcauthlib = "d9d773e"
mcprotocollib = "1.20.2-1-20231001.173210-4" mcprotocollib = "1.20.2-1-20231001.201013-5"
adventure = "4.14.0" adventure = "4.14.0"
adventure-platform = "4.3.0" adventure-platform = "4.3.0"
junit = "5.9.2" junit = "5.9.2"