Merge pull request #484 from TeamPiped/pubsub-improve

Attempt to improve pubsub subscription handling.
This commit is contained in:
Kavin 2022-12-10 04:59:05 +00:00 committed by GitHub
commit 02a1c33d55
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 18 deletions

View file

@ -19,7 +19,6 @@ import org.schabi.newpipe.extractor.localization.Localization;
import org.schabi.newpipe.extractor.services.youtube.YoutubeThrottlingDecrypter;
import org.schabi.newpipe.extractor.services.youtube.extractors.YoutubeStreamExtractor;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -99,7 +98,7 @@ public class Main {
.forEach(id -> Multithreading.runAsyncLimitedPubSub(() -> {
try {
PubSubHelper.subscribePubSub(id);
} catch (IOException e) {
} catch (Exception e) {
ExceptionHandler.handle(e);
}
}));

View file

@ -69,8 +69,15 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}))
.map(GET, "/version", AsyncServlet.ofBlocking(executor, request -> getRawResponse(Constants.VERSION.getBytes(UTF_8), "text/plain", "no-store")))
.map(HttpMethod.OPTIONS, "/*", request -> HttpResponse.ofCode(200))
.map(GET, "/webhooks/pubsub", request -> HttpResponse.ok200().withPlainText(Objects.requireNonNull(request.getQueryParameter("hub.challenge"))))
.map(POST, "/webhooks/pubsub", AsyncServlet.ofBlocking(executor, request -> {
.map(GET, "/webhooks/pubsub", AsyncServlet.ofBlocking(executor, request -> {
var topic = request.getQueryParameter("hub.topic");
if (topic != null)
Multithreading.runAsync(() -> {
String channelId = StringUtils.substringAfter(topic, "channel_id=");
PubSubHelper.updatePubSub(channelId);
});
return HttpResponse.ok200().withPlainText(Objects.requireNonNull(request.getQueryParameter("hub.challenge")));
})).map(POST, "/webhooks/pubsub", AsyncServlet.ofBlocking(executor, request -> {
try {
SyndFeed feed = new SyndFeedInput().build(

View file

@ -35,24 +35,26 @@ public class PubSubHelper {
.newCall(builder.post(formBuilder.build())
.build()).execute()) {
if (resp.code() == 202) {
try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
var tr = s.beginTransaction();
if (pubsub == null) {
pubsub = new PubSub(channelId, System.currentTimeMillis());
s.insert(pubsub);
} else {
pubsub.setSubbedAt(System.currentTimeMillis());
s.update(pubsub);
}
tr.commit();
}
} else
if (resp.code() != 202)
System.out.println("Failed to subscribe: " + resp.code() + "\n" + Objects.requireNonNull(resp.body()).string());
}
}
}
public static void updatePubSub(String channelId) {
var pubsub = DatabaseHelper.getPubSubFromId(channelId);
try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
s.beginTransaction();
if (pubsub == null) {
pubsub = new PubSub(channelId, System.currentTimeMillis());
s.insert(pubsub);
} else {
pubsub.setSubbedAt(System.currentTimeMillis());
s.update(pubsub);
}
s.getTransaction().commit();
}
}
}