Merge pull request #486 from TeamPiped/missing-pubsub

Add missing pubsub channels
This commit is contained in:
Kavin 2022-12-14 13:49:48 +00:00 committed by GitHub
commit 47b938433e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 0 deletions

View File

@ -9,8 +9,10 @@ import me.kavin.piped.utils.*;
import me.kavin.piped.utils.matrix.SyncRunner; import me.kavin.piped.utils.matrix.SyncRunner;
import me.kavin.piped.utils.obj.MatrixHelper; import me.kavin.piped.utils.obj.MatrixHelper;
import me.kavin.piped.utils.obj.db.PlaylistVideo; import me.kavin.piped.utils.obj.db.PlaylistVideo;
import me.kavin.piped.utils.obj.db.PubSub;
import me.kavin.piped.utils.obj.db.Video; import me.kavin.piped.utils.obj.db.Video;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import org.apache.commons.lang3.StringUtils;
import org.hibernate.Session; import org.hibernate.Session;
import org.hibernate.StatelessSession; import org.hibernate.StatelessSession;
import org.schabi.newpipe.extractor.NewPipe; import org.schabi.newpipe.extractor.NewPipe;
@ -109,6 +111,38 @@ public class Main {
} }
}, 0, TimeUnit.MINUTES.toMillis(90)); }, 0, TimeUnit.MINUTES.toMillis(90));
new Timer().scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
s.createNativeQuery("SELECT channel_id.channel FROM " +
"(SELECT DISTINCT channel FROM users_subscribed UNION SELECT id FROM unauthenticated_subscriptions WHERE subscribed_at > :unauthSubbed) " +
"channel_id LEFT JOIN pubsub on pubsub.id = channel_id.channel " +
"WHERE pubsub.id IS NULL", String.class)
.setParameter("unauthSubbed", System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY))
.getResultStream()
.parallel()
.forEach(id -> Multithreading.runAsyncLimitedPubSub(() -> {
System.out.println(id);
if (StringUtils.isBlank(id) || !id.matches("UC[A-Za-z\\d_-]{22}"))
return;
else
System.out.println("Subscribing to " + id);
try (StatelessSession sess = DatabaseSessionFactory.createStatelessSession()) {
var pubsub = new PubSub(id, -1);
var tr = sess.beginTransaction();
sess.insert(pubsub);
tr.commit();
}
}));
} catch (Exception e) {
e.printStackTrace();
}
}
}, 0, TimeUnit.DAYS.toMillis(1));
new Timer().scheduleAtFixedRate(new TimerTask() { new Timer().scheduleAtFixedRate(new TimerTask() {
@Override @Override
public void run() { public void run() {

View File

@ -31,6 +31,14 @@ public class PubSubHelper {
formBuilder.add("hub.mode", "subscribe"); formBuilder.add("hub.mode", "subscribe");
formBuilder.add("hub.lease_seconds", "432000"); formBuilder.add("hub.lease_seconds", "432000");
if (pubsub == null)
try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
pubsub = new PubSub(channelId, -1);
var tr = s.beginTransaction();
s.insert(pubsub);
tr.commit();
}
try (var resp = Constants.h2client try (var resp = Constants.h2client
.newCall(builder.post(formBuilder.build()) .newCall(builder.post(formBuilder.build())
.build()).execute()) { .build()).execute()) {