From b1b7dbb44a0e8b8463ef83c86238b69f33acf547 Mon Sep 17 00:00:00 2001 From: Kavin <20838718+FireMasterK@users.noreply.github.com> Date: Sat, 10 Dec 2022 04:56:31 +0000 Subject: [PATCH] Attempt to improve pubsub subscription handling. --- src/main/java/me/kavin/piped/Main.java | 3 +- .../me/kavin/piped/server/ServerLauncher.java | 11 +++++-- .../me/kavin/piped/utils/PubSubHelper.java | 30 ++++++++++--------- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/main/java/me/kavin/piped/Main.java b/src/main/java/me/kavin/piped/Main.java index 82045a9..d06921e 100644 --- a/src/main/java/me/kavin/piped/Main.java +++ b/src/main/java/me/kavin/piped/Main.java @@ -19,7 +19,6 @@ import org.schabi.newpipe.extractor.localization.Localization; import org.schabi.newpipe.extractor.services.youtube.YoutubeThrottlingDecrypter; import org.schabi.newpipe.extractor.services.youtube.extractors.YoutubeStreamExtractor; -import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -99,7 +98,7 @@ public class Main { .forEach(id -> Multithreading.runAsyncLimitedPubSub(() -> { try { PubSubHelper.subscribePubSub(id); - } catch (IOException e) { + } catch (Exception e) { ExceptionHandler.handle(e); } })); diff --git a/src/main/java/me/kavin/piped/server/ServerLauncher.java b/src/main/java/me/kavin/piped/server/ServerLauncher.java index 35c8540..bccba11 100644 --- a/src/main/java/me/kavin/piped/server/ServerLauncher.java +++ b/src/main/java/me/kavin/piped/server/ServerLauncher.java @@ -69,8 +69,15 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { })) .map(GET, "/version", AsyncServlet.ofBlocking(executor, request -> getRawResponse(Constants.VERSION.getBytes(UTF_8), "text/plain", "no-store"))) .map(HttpMethod.OPTIONS, "/*", request -> HttpResponse.ofCode(200)) - .map(GET, "/webhooks/pubsub", request -> HttpResponse.ok200().withPlainText(Objects.requireNonNull(request.getQueryParameter("hub.challenge")))) - .map(POST, "/webhooks/pubsub", AsyncServlet.ofBlocking(executor, request -> { + .map(GET, "/webhooks/pubsub", AsyncServlet.ofBlocking(executor, request -> { + var topic = request.getQueryParameter("hub.topic"); + if (topic != null) + Multithreading.runAsync(() -> { + String channelId = StringUtils.substringAfter(topic, "channel_id="); + PubSubHelper.updatePubSub(channelId); + }); + return HttpResponse.ok200().withPlainText(Objects.requireNonNull(request.getQueryParameter("hub.challenge"))); + })).map(POST, "/webhooks/pubsub", AsyncServlet.ofBlocking(executor, request -> { try { SyndFeed feed = new SyndFeedInput().build( diff --git a/src/main/java/me/kavin/piped/utils/PubSubHelper.java b/src/main/java/me/kavin/piped/utils/PubSubHelper.java index db57eda..d5d0d58 100644 --- a/src/main/java/me/kavin/piped/utils/PubSubHelper.java +++ b/src/main/java/me/kavin/piped/utils/PubSubHelper.java @@ -35,24 +35,26 @@ public class PubSubHelper { .newCall(builder.post(formBuilder.build()) .build()).execute()) { - if (resp.code() == 202) { - try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { - var tr = s.beginTransaction(); - if (pubsub == null) { - pubsub = new PubSub(channelId, System.currentTimeMillis()); - s.insert(pubsub); - } else { - pubsub.setSubbedAt(System.currentTimeMillis()); - s.update(pubsub); - } - tr.commit(); - } - - } else + if (resp.code() != 202) System.out.println("Failed to subscribe: " + resp.code() + "\n" + Objects.requireNonNull(resp.body()).string()); } } } + + public static void updatePubSub(String channelId) { + var pubsub = DatabaseHelper.getPubSubFromId(channelId); + try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { + s.beginTransaction(); + if (pubsub == null) { + pubsub = new PubSub(channelId, System.currentTimeMillis()); + s.insert(pubsub); + } else { + pubsub.setSubbedAt(System.currentTimeMillis()); + s.update(pubsub); + } + s.getTransaction().commit(); + } + } }