From 0158744506eaf81865a1b7af7f9fa3dc5ae63899 Mon Sep 17 00:00:00 2001 From: Kavin <20838718+FireMasterK@users.noreply.github.com> Date: Sat, 2 Jul 2022 18:13:58 +0100 Subject: [PATCH] Hold pubsub session for lesser time. (#298) --- src/main/java/me/kavin/piped/Main.java | 14 ++++--- .../me/kavin/piped/utils/DatabaseHelper.java | 7 ++++ .../me/kavin/piped/utils/ResponseHelper.java | 41 +++++++++---------- 3 files changed, 35 insertions(+), 27 deletions(-) diff --git a/src/main/java/me/kavin/piped/Main.java b/src/main/java/me/kavin/piped/Main.java index 592c097..ea6cb02 100644 --- a/src/main/java/me/kavin/piped/Main.java +++ b/src/main/java/me/kavin/piped/Main.java @@ -4,10 +4,7 @@ import io.activej.inject.Injector; import jakarta.persistence.criteria.CriteriaBuilder; import jakarta.persistence.criteria.CriteriaQuery; import me.kavin.piped.consts.Constants; -import me.kavin.piped.utils.DatabaseSessionFactory; -import me.kavin.piped.utils.DownloaderImpl; -import me.kavin.piped.utils.Multithreading; -import me.kavin.piped.utils.ResponseHelper; +import me.kavin.piped.utils.*; import me.kavin.piped.utils.obj.db.PubSub; import me.kavin.piped.utils.obj.db.User; import org.hibernate.Session; @@ -18,6 +15,7 @@ import org.schabi.newpipe.extractor.localization.Localization; import org.schabi.newpipe.extractor.services.youtube.YoutubeThrottlingDecrypter; import org.schabi.newpipe.extractor.services.youtube.extractors.YoutubeStreamExtractor; +import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; @@ -75,7 +73,13 @@ public class Main { pubSubList.stream().parallel() .forEach(pubSub -> { if (pubSub != null) - Multithreading.runAsyncLimitedPubSub(() -> ResponseHelper.subscribePubSub(pubSub.getId())); + Multithreading.runAsyncLimitedPubSub(() -> { + try { + ResponseHelper.subscribePubSub(pubSub.getId()); + } catch (IOException e) { + ExceptionHandler.handle(e); + } + }); }); } catch (Exception e) { diff --git a/src/main/java/me/kavin/piped/utils/DatabaseHelper.java b/src/main/java/me/kavin/piped/utils/DatabaseHelper.java index e4e83aa..0c54471 100644 --- a/src/main/java/me/kavin/piped/utils/DatabaseHelper.java +++ b/src/main/java/me/kavin/piped/utils/DatabaseHelper.java @@ -104,4 +104,11 @@ public class DatabaseHelper { return s.createQuery(cr).uniqueResult(); } + + public static PubSub getPubSubFromId(String id) { + try (Session s = DatabaseSessionFactory.createSession()) { + s.setHibernateFlushMode(FlushMode.MANUAL); + return getPubSubFromId(s, id); + } + } } diff --git a/src/main/java/me/kavin/piped/utils/ResponseHelper.java b/src/main/java/me/kavin/piped/utils/ResponseHelper.java index d0e56f2..a2ad192 100644 --- a/src/main/java/me/kavin/piped/utils/ResponseHelper.java +++ b/src/main/java/me/kavin/piped/utils/ResponseHelper.java @@ -1425,7 +1425,13 @@ public class ResponseHelper { s.persist(channel); s.beginTransaction().commit(); - Multithreading.runAsync(() -> subscribePubSub(channelId)); + Multithreading.runAsync(() -> { + try { + subscribePubSub(channelId); + } catch (IOException e) { + ExceptionHandler.handle(e); + } + }); Multithreading.runAsync(() -> { try (Session sess = DatabaseSessionFactory.createSession()) { @@ -1443,17 +1449,9 @@ public class ResponseHelper { } } - public static void subscribePubSub(String channelId) { - try (Session s = DatabaseSessionFactory.createSession()) { - subscribePubSub(channelId, s); - } catch (Exception e) { - ExceptionHandler.handle(e); - } - } + public static void subscribePubSub(String channelId) throws IOException { - private static void subscribePubSub(String channelId, Session s) throws IOException { - - PubSub pubsub = DatabaseHelper.getPubSubFromId(s, channelId); + PubSub pubsub = DatabaseHelper.getPubSubFromId(channelId); if (pubsub == null || System.currentTimeMillis() - pubsub.getSubbedAt() > TimeUnit.DAYS.toMillis(4)) { @@ -1476,19 +1474,18 @@ public class ResponseHelper { .build()).execute(); if (resp.code() == 202) { - - var tr = s.beginTransaction(); - - if (pubsub == null) { - pubsub = new PubSub(channelId, System.currentTimeMillis()); - s.persist(pubsub); - } else { - pubsub.setSubbedAt(System.currentTimeMillis()); - s.merge(pubsub); + try (Session s = DatabaseSessionFactory.createSession()) { + var tr = s.beginTransaction(); + if (pubsub == null) { + pubsub = new PubSub(channelId, System.currentTimeMillis()); + s.persist(pubsub); + } else { + pubsub.setSubbedAt(System.currentTimeMillis()); + s.merge(pubsub); + } + tr.commit(); } - tr.commit(); - } else System.out.println("Failed to subscribe: " + resp.code() + "\n" + Objects.requireNonNull(resp.body()).string());