Merge pull request #651 from TeamPiped/dearrow-threading

Optimize threading when batch fetching DeArrow parallelly
This commit is contained in:
Kavin 2023-07-22 05:40:34 +01:00 committed by GitHub
commit 5bdf6674b4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 32 deletions

View file

@ -155,7 +155,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
var videoIds = getArray(request.getQueryParameter("videoIds")); var videoIds = getArray(request.getQueryParameter("videoIds"));
return getJsonResponse( return getJsonResponse(
SponsorBlockUtils.getDeArrowedInfo(List.of(videoIds)) SponsorBlockUtils.getDeArrowedInfo(videoIds)
.thenApplyAsync(json -> { .thenApplyAsync(json -> {
try { try {
return mapper.writeValueAsBytes(json); return mapper.writeValueAsBytes(json);

View file

@ -1,8 +1,6 @@
package me.kavin.piped.utils; package me.kavin.piped.utils;
import java.util.concurrent.ExecutorService; import java.util.concurrent.*;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Supplier; import java.util.function.Supplier;
public class Multithreading { public class Multithreading {
@ -12,11 +10,16 @@ public class Multithreading {
.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 8); .newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 8);
private static final ExecutorService esLimitedPubSub = Executors private static final ExecutorService esLimitedPubSub = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
public static void runAsync(final Runnable runnable) { public static void runAsync(final Runnable runnable) {
es.submit(runnable); es.submit(runnable);
} }
public static void runAsyncTask(final ForkJoinTask<?> task) {
forkJoinPool.submit(task);
}
public static void runAsyncLimited(final Runnable runnable) { public static void runAsyncLimited(final Runnable runnable) {
esLimited.submit(runnable); esLimited.submit(runnable);
} }

View file

@ -42,13 +42,13 @@ public class RequestUtils {
} }
} }
public static CompletableFuture<JsonNode> sendGetJson(String url) throws Exception { public static CompletableFuture<JsonNode> sendGetJson(String url) {
return ReqwestUtils.fetch(url, "GET", null, Map.of()).thenApply(Response::body).thenApplyAsync(resp -> { return ReqwestUtils.fetch(url, "GET", null, Map.of()).thenApply(Response::body).thenApplyAsync(resp -> {
try { try {
return mapper.readTree(resp); return mapper.readTree(resp);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Failed to parse JSON", e); throw new RuntimeException("Failed to parse JSON", e);
} }
}, Multithreading.getCachedExecutor()); });
} }
} }

View file

@ -12,10 +12,12 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.Arrays;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinTask;
import static me.kavin.piped.consts.Constants.SPONSORBLOCK_SERVERS;
import static me.kavin.piped.consts.Constants.mapper; import static me.kavin.piped.consts.Constants.mapper;
public class SponsorBlockUtils { public class SponsorBlockUtils {
@ -54,47 +56,58 @@ public class SponsorBlockUtils {
return null; return null;
} }
public static CompletableFuture<ObjectNode> getDeArrowedInfo(List<String> videoIds) { public static CompletableFuture<ObjectNode> getDeArrowedInfo(String[] videoIds) {
ObjectNode objectNode = mapper.createObjectNode(); ObjectNode objectNode = mapper.createObjectNode();
var futures = videoIds.stream() var futures = Arrays.stream(videoIds)
.map(id -> getDeArrowedInfo(id).thenAcceptAsync(jsonNode -> objectNode.set(id, jsonNode.orElse(NullNode.getInstance())))) .map(id -> getDeArrowedInfo(id, SPONSORBLOCK_SERVERS.toArray(new String[0]))
.thenAcceptAsync(jsonNode -> objectNode.set(id, jsonNode.orElse(NullNode.getInstance())))
)
.toArray(CompletableFuture[]::new); .toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(futures) return CompletableFuture.allOf(futures)
.thenApplyAsync(v -> objectNode, Multithreading.getCachedExecutor()); .thenApplyAsync(v -> objectNode, Multithreading.getCachedExecutor());
} }
private static CompletableFuture<Optional<JsonNode>> getDeArrowedInfo(String videoId) { private static CompletableFuture<Optional<JsonNode>> getDeArrowedInfo(String videoId, String[] servers) {
String hash = DigestUtils.sha256Hex(videoId); String hash = DigestUtils.sha256Hex(videoId);
CompletableFuture<Optional<JsonNode>> future = new CompletableFuture<>(); CompletableFuture<Optional<JsonNode>> future = new CompletableFuture<>();
Multithreading.runAsync(() -> { var task = ForkJoinTask.adapt(() -> {
for (String url : Constants.SPONSORBLOCK_SERVERS) fetchDeArrowedCf(future, videoId, hash, servers);
try { });
Optional<JsonNode> optional = RequestUtils.sendGetJson(url + "/api/branding/" + URLUtils.silentEncode(hash.substring(0, 4)))
.thenApplyAsync(json -> json.has(videoId) ? Optional.of(json.get(videoId)) : Optional.<JsonNode>empty())
.get();
optional.ifPresent(jsonNode -> { Multithreading.runAsyncTask(task);
return future;
}
private static void fetchDeArrowedCf(CompletableFuture<Optional<JsonNode>> future, String videoId, String hash, String[] servers) {
var completableFuture = RequestUtils.sendGetJson(servers[0] + "/api/branding/" + URLUtils.silentEncode(hash.substring(0, 4)))
.thenApplyAsync(json -> json.has(videoId) ? Optional.of(json.get(videoId)) : Optional.<JsonNode>empty());
completableFuture.thenAcceptAsync(optional -> optional.ifPresent(jsonNode -> {
ArrayNode nodes = (ArrayNode) jsonNode.get("thumbnails"); ArrayNode nodes = (ArrayNode) jsonNode.get("thumbnails");
for (JsonNode node : nodes) { for (JsonNode node : nodes) {
if (!node.get("original").booleanValue()) if (!node.get("original").booleanValue())
((ObjectNode) node).set("thumbnail", new TextNode(URLUtils.rewriteURL("https://dearrow-thumb.ajay.app/api/v1/getThumbnail?videoID=" + videoId + "&time=" + node.get("timestamp").asText()))); ((ObjectNode) node).set("thumbnail", new TextNode(URLUtils.rewriteURL("https://dearrow-thumb.ajay.app/api/v1/getThumbnail?videoID=" + videoId + "&time=" + node.get("timestamp").asText())));
} }
}); }));
completableFuture.whenComplete((optional, throwable) -> {
if (throwable == null)
future.complete(optional); future.complete(optional);
return; else {
} catch (Exception ignored) { if (servers.length == 1)
}
future.completeExceptionally(new Exception("All SponsorBlock servers are down")); future.completeExceptionally(new Exception("All SponsorBlock servers are down"));
else
fetchDeArrowedCf(future, videoId, hash, Arrays.copyOfRange(servers, 1, servers.length));
}
}); });
return future;
} }
} }