Handle PubSub better with a queue and actually waiting

This commit is contained in:
Kavin 2023-07-23 19:27:01 +01:00
parent 5bdf6674b4
commit c027fbefe8
No known key found for this signature in database
GPG key ID: 6E4598CA5C92C41F
2 changed files with 50 additions and 14 deletions

View file

@ -23,6 +23,8 @@ import org.schabi.newpipe.extractor.services.youtube.extractors.YoutubeStreamExt
import rocks.kavin.reqwest4j.ReqwestUtils;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -102,15 +104,39 @@ public class Main {
Collections.shuffle(channelIds);
channelIds.stream()
.parallel()
.forEach(id -> Multithreading.runAsyncLimitedPubSub(() -> {
var queue = new ConcurrentLinkedQueue<>(channelIds);
System.out.println("PubSub: queue size - " + queue.size() + " channels");
for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
new Thread(() -> {
Object o = new Object();
String channelId;
while ((channelId = queue.poll()) != null) {
try {
PubSubHelper.subscribePubSub(id);
CompletableFuture<?> future = PubSubHelper.subscribePubSub(channelId);
if (future == null)
continue;
future.whenComplete((resp, throwable) -> {
synchronized (o) {
o.notify();
}
});
synchronized (o) {
o.wait();
}
} catch (Exception e) {
ExceptionHandler.handle(e);
}
}));
}
}, "PubSub-" + i).start();
}
} catch (Exception e) {
e.printStackTrace();

View file

@ -6,16 +6,21 @@ import okhttp3.FormBody;
import okio.Buffer;
import org.hibernate.StatelessSession;
import rocks.kavin.reqwest4j.ReqwestUtils;
import rocks.kavin.reqwest4j.Response;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class PubSubHelper {
public static void subscribePubSub(String channelId) throws IOException {
@Nullable
public static CompletableFuture<Response> subscribePubSub(String channelId) throws IOException {
if (!ChannelHelpers.isValidId(channelId))
return;
return null;
PubSub pubsub = DatabaseHelper.getPubSubFromId(channelId);
@ -44,16 +49,21 @@ public class PubSubHelper {
var buffer = new Buffer();
formBuilder.build().writeTo(buffer);
ReqwestUtils.fetch(Constants.PUBSUB_HUB_URL, "POST", buffer.readByteArray(), Map.of())
.thenAccept(resp -> {
if (resp.status() != 202)
var completableFuture = ReqwestUtils.fetch(Constants.PUBSUB_HUB_URL, "POST", buffer.readByteArray(), Map.of());
completableFuture
.whenComplete((resp, e) -> {
if (e != null) {
ExceptionHandler.handle((Exception) e);
return;
}
if (resp != null && resp.status() != 202)
System.out.println("Failed to subscribe: " + resp.status() + "\n" + new String(resp.body()));
})
.exceptionally(e -> {
ExceptionHandler.handle((Exception) e);
return null;
});
return completableFuture;
}
return null;
}
public static void updatePubSub(String channelId) {