From 522fd2cd66e3da129268a90904031ffcb43d61ea Mon Sep 17 00:00:00 2001 From: Kavin <20838718+FireMasterK@users.noreply.github.com> Date: Wed, 14 Dec 2022 13:44:35 +0000 Subject: [PATCH] Add missing pubsub channels. --- src/main/java/me/kavin/piped/Main.java | 34 +++++++++++++++++++ .../me/kavin/piped/utils/PubSubHelper.java | 8 +++++ 2 files changed, 42 insertions(+) diff --git a/src/main/java/me/kavin/piped/Main.java b/src/main/java/me/kavin/piped/Main.java index d06921e..bc857ed 100644 --- a/src/main/java/me/kavin/piped/Main.java +++ b/src/main/java/me/kavin/piped/Main.java @@ -9,8 +9,10 @@ import me.kavin.piped.utils.*; import me.kavin.piped.utils.matrix.SyncRunner; import me.kavin.piped.utils.obj.MatrixHelper; import me.kavin.piped.utils.obj.db.PlaylistVideo; +import me.kavin.piped.utils.obj.db.PubSub; import me.kavin.piped.utils.obj.db.Video; import okhttp3.OkHttpClient; +import org.apache.commons.lang3.StringUtils; import org.hibernate.Session; import org.hibernate.StatelessSession; import org.schabi.newpipe.extractor.NewPipe; @@ -109,6 +111,38 @@ public class Main { } }, 0, TimeUnit.MINUTES.toMillis(90)); + new Timer().scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { + + s.createNativeQuery("SELECT channel_id.channel FROM " + + "(SELECT DISTINCT channel FROM users_subscribed UNION SELECT id FROM unauthenticated_subscriptions WHERE subscribed_at > :unauthSubbed) " + + "channel_id LEFT JOIN pubsub on pubsub.id = channel_id.channel " + + "WHERE pubsub.id IS NULL", String.class) + .setParameter("unauthSubbed", System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY)) + .getResultStream() + .parallel() + .forEach(id -> Multithreading.runAsyncLimitedPubSub(() -> { + System.out.println(id); + if (StringUtils.isBlank(id) || !id.matches("UC[A-Za-z\\d_-]{22}")) + return; + else + System.out.println("Subscribing to " + id); + try (StatelessSession sess = DatabaseSessionFactory.createStatelessSession()) { + var pubsub = new PubSub(id, -1); + var tr = sess.beginTransaction(); + sess.insert(pubsub); + tr.commit(); + } + })); + + } catch (Exception e) { + e.printStackTrace(); + } + } + }, 0, TimeUnit.DAYS.toMillis(1)); + new Timer().scheduleAtFixedRate(new TimerTask() { @Override public void run() { diff --git a/src/main/java/me/kavin/piped/utils/PubSubHelper.java b/src/main/java/me/kavin/piped/utils/PubSubHelper.java index d5d0d58..66bf178 100644 --- a/src/main/java/me/kavin/piped/utils/PubSubHelper.java +++ b/src/main/java/me/kavin/piped/utils/PubSubHelper.java @@ -31,6 +31,14 @@ public class PubSubHelper { formBuilder.add("hub.mode", "subscribe"); formBuilder.add("hub.lease_seconds", "432000"); + if (pubsub == null) + try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { + pubsub = new PubSub(channelId, -1); + var tr = s.beginTransaction(); + s.insert(pubsub); + tr.commit(); + } + try (var resp = Constants.h2client .newCall(builder.post(formBuilder.build()) .build()).execute()) {