From 383c017f5c0fb81bfe0f9dfaf1743de6637055a6 Mon Sep 17 00:00:00 2001 From: Kavin <20838718+FireMasterK@users.noreply.github.com> Date: Sat, 17 Sep 2022 20:29:26 +0530 Subject: [PATCH] Optimize pubsub subscription query. --- src/main/java/me/kavin/piped/Main.java | 60 +++++++++++--------------- 1 file changed, 24 insertions(+), 36 deletions(-) diff --git a/src/main/java/me/kavin/piped/Main.java b/src/main/java/me/kavin/piped/Main.java index d627919..fa385c1 100644 --- a/src/main/java/me/kavin/piped/Main.java +++ b/src/main/java/me/kavin/piped/Main.java @@ -2,10 +2,10 @@ package me.kavin.piped; import io.activej.inject.Injector; import jakarta.persistence.criteria.CriteriaBuilder; -import jakarta.persistence.criteria.CriteriaQuery; import me.kavin.piped.consts.Constants; import me.kavin.piped.utils.*; -import me.kavin.piped.utils.obj.db.*; +import me.kavin.piped.utils.obj.db.PlaylistVideo; +import me.kavin.piped.utils.obj.db.Video; import org.hibernate.Session; import org.hibernate.StatelessSession; import org.schabi.newpipe.extractor.NewPipe; @@ -17,6 +17,7 @@ import org.schabi.newpipe.extractor.services.youtube.extractors.YoutubeStreamExt import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public class Main { @@ -64,41 +65,28 @@ public class Main { public void run() { try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { - CriteriaBuilder cb = s.getCriteriaBuilder(); - CriteriaQuery criteria = cb.createQuery(PubSub.class); - var root = criteria.from(PubSub.class); - var userSq = criteria.subquery(User.class); - var userRoot = userSq.from(User.class); - userSq.select(userRoot.get("subscribed_ids")); - var subquery = criteria.subquery(UnauthenticatedSubscription.class); - var subRoot = subquery.from(UnauthenticatedSubscription.class); - subquery.select(subRoot.get("id")) - .where(cb.gt(subRoot.get("subscribedAt"), System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY))); - criteria.select(root) - .where(cb.and( - cb.lessThan(root.get("subbedAt"), System.currentTimeMillis() - TimeUnit.DAYS.toMillis(4)), - cb.or( - root.get("id").in(userSq), - root.get("id").in(subquery) - ) - ) - ); + final Random random = new Random(); - List pubSubList = s.createQuery(criteria).list(); - - Collections.shuffle(pubSubList); - - pubSubList.stream().parallel() - .forEach(pubSub -> { - if (pubSub != null) - Multithreading.runAsyncLimitedPubSub(() -> { - try { - ResponseHelper.subscribePubSub(pubSub.getId()); - } catch (IOException e) { - ExceptionHandler.handle(e); - } - }); - }); + s.createNativeQuery("SELECT id FROM pubsub WHERE subbed_at < :subbedTime AND id IN (" + + "SELECT DISTINCT channel FROM users_subscribed" + + " UNION " + + "SELECT id FROM unauthenticated_subscriptions WHERE subscribed_at > :unauthSubbed" + + ")", String.class) + .setParameter("subbedTime", System.currentTimeMillis() - TimeUnit.DAYS.toMillis(4)) + .setParameter("unauthSubbed", System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY)) + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toUnmodifiableSet()) + .stream() + .sorted(Comparator.comparingInt(o -> random.nextInt())) + .parallel() + .forEach(id -> Multithreading.runAsyncLimitedPubSub(() -> { + try { + ResponseHelper.subscribePubSub(id); + } catch (IOException e) { + ExceptionHandler.handle(e); + } + })); } catch (Exception e) { e.printStackTrace();