Hold pubsub session for lesser time. (#298)

This commit is contained in:
Kavin 2022-07-02 18:13:58 +01:00 committed by GitHub
parent 3a009407ad
commit 0158744506
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 27 deletions

View file

@ -4,10 +4,7 @@ import io.activej.inject.Injector;
import jakarta.persistence.criteria.CriteriaBuilder; import jakarta.persistence.criteria.CriteriaBuilder;
import jakarta.persistence.criteria.CriteriaQuery; import jakarta.persistence.criteria.CriteriaQuery;
import me.kavin.piped.consts.Constants; import me.kavin.piped.consts.Constants;
import me.kavin.piped.utils.DatabaseSessionFactory; import me.kavin.piped.utils.*;
import me.kavin.piped.utils.DownloaderImpl;
import me.kavin.piped.utils.Multithreading;
import me.kavin.piped.utils.ResponseHelper;
import me.kavin.piped.utils.obj.db.PubSub; import me.kavin.piped.utils.obj.db.PubSub;
import me.kavin.piped.utils.obj.db.User; import me.kavin.piped.utils.obj.db.User;
import org.hibernate.Session; import org.hibernate.Session;
@ -18,6 +15,7 @@ import org.schabi.newpipe.extractor.localization.Localization;
import org.schabi.newpipe.extractor.services.youtube.YoutubeThrottlingDecrypter; import org.schabi.newpipe.extractor.services.youtube.YoutubeThrottlingDecrypter;
import org.schabi.newpipe.extractor.services.youtube.extractors.YoutubeStreamExtractor; import org.schabi.newpipe.extractor.services.youtube.extractors.YoutubeStreamExtractor;
import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -75,7 +73,13 @@ public class Main {
pubSubList.stream().parallel() pubSubList.stream().parallel()
.forEach(pubSub -> { .forEach(pubSub -> {
if (pubSub != null) if (pubSub != null)
Multithreading.runAsyncLimitedPubSub(() -> ResponseHelper.subscribePubSub(pubSub.getId())); Multithreading.runAsyncLimitedPubSub(() -> {
try {
ResponseHelper.subscribePubSub(pubSub.getId());
} catch (IOException e) {
ExceptionHandler.handle(e);
}
});
}); });
} catch (Exception e) { } catch (Exception e) {

View file

@ -104,4 +104,11 @@ public class DatabaseHelper {
return s.createQuery(cr).uniqueResult(); return s.createQuery(cr).uniqueResult();
} }
public static PubSub getPubSubFromId(String id) {
try (Session s = DatabaseSessionFactory.createSession()) {
s.setHibernateFlushMode(FlushMode.MANUAL);
return getPubSubFromId(s, id);
}
}
} }

View file

@ -1425,7 +1425,13 @@ public class ResponseHelper {
s.persist(channel); s.persist(channel);
s.beginTransaction().commit(); s.beginTransaction().commit();
Multithreading.runAsync(() -> subscribePubSub(channelId)); Multithreading.runAsync(() -> {
try {
subscribePubSub(channelId);
} catch (IOException e) {
ExceptionHandler.handle(e);
}
});
Multithreading.runAsync(() -> { Multithreading.runAsync(() -> {
try (Session sess = DatabaseSessionFactory.createSession()) { try (Session sess = DatabaseSessionFactory.createSession()) {
@ -1443,17 +1449,9 @@ public class ResponseHelper {
} }
} }
public static void subscribePubSub(String channelId) { public static void subscribePubSub(String channelId) throws IOException {
try (Session s = DatabaseSessionFactory.createSession()) {
subscribePubSub(channelId, s);
} catch (Exception e) {
ExceptionHandler.handle(e);
}
}
private static void subscribePubSub(String channelId, Session s) throws IOException { PubSub pubsub = DatabaseHelper.getPubSubFromId(channelId);
PubSub pubsub = DatabaseHelper.getPubSubFromId(s, channelId);
if (pubsub == null || System.currentTimeMillis() - pubsub.getSubbedAt() > TimeUnit.DAYS.toMillis(4)) { if (pubsub == null || System.currentTimeMillis() - pubsub.getSubbedAt() > TimeUnit.DAYS.toMillis(4)) {
@ -1476,19 +1474,18 @@ public class ResponseHelper {
.build()).execute(); .build()).execute();
if (resp.code() == 202) { if (resp.code() == 202) {
try (Session s = DatabaseSessionFactory.createSession()) {
var tr = s.beginTransaction(); var tr = s.beginTransaction();
if (pubsub == null) {
if (pubsub == null) { pubsub = new PubSub(channelId, System.currentTimeMillis());
pubsub = new PubSub(channelId, System.currentTimeMillis()); s.persist(pubsub);
s.persist(pubsub); } else {
} else { pubsub.setSubbedAt(System.currentTimeMillis());
pubsub.setSubbedAt(System.currentTimeMillis()); s.merge(pubsub);
s.merge(pubsub); }
tr.commit();
} }
tr.commit();
} else } else
System.out.println("Failed to subscribe: " + resp.code() + "\n" + Objects.requireNonNull(resp.body()).string()); System.out.println("Failed to subscribe: " + resp.code() + "\n" + Objects.requireNonNull(resp.body()).string());