diff --git a/src/main/java/me/kavin/piped/Main.java b/src/main/java/me/kavin/piped/Main.java index 33471cc..6aedabb 100644 --- a/src/main/java/me/kavin/piped/Main.java +++ b/src/main/java/me/kavin/piped/Main.java @@ -2,6 +2,7 @@ package me.kavin.piped; import io.activej.inject.Injector; import io.sentry.Sentry; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; import jakarta.persistence.criteria.CriteriaBuilder; import me.kavin.piped.consts.Constants; import me.kavin.piped.server.ServerLauncher; @@ -81,9 +82,7 @@ public class Main { public void run() { try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { - final Random random = new Random(); - - s.createNativeQuery("SELECT id FROM pubsub WHERE subbed_at < :subbedTime AND id IN (" + + List channelIds = s.createNativeQuery("SELECT id FROM pubsub WHERE subbed_at < :subbedTime AND id IN (" + "SELECT DISTINCT channel FROM users_subscribed" + " UNION " + "SELECT id FROM unauthenticated_subscriptions WHERE subscribed_at > :unauthSubbed" + @@ -92,9 +91,12 @@ public class Main { .setParameter("unauthSubbed", System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY)) .stream() .filter(Objects::nonNull) - .collect(Collectors.toUnmodifiableSet()) - .stream() - .sorted(Comparator.comparingInt(o -> random.nextInt())) + .distinct() + .collect(Collectors.toCollection(ObjectArrayList::new)); + + Collections.shuffle(channelIds); + + channelIds.stream() .parallel() .forEach(id -> Multithreading.runAsyncLimitedPubSub(() -> { try {