From fa588aaf83a091d63b19fea74ece5738fc381fea Mon Sep 17 00:00:00 2001 From: Kavin <20838718+FireMasterK@users.noreply.github.com> Date: Sat, 22 Jul 2023 05:39:22 +0100 Subject: [PATCH] Optimize threading when batch fetching DeArrow parallelly --- .../me/kavin/piped/server/ServerLauncher.java | 2 +- .../me/kavin/piped/utils/Multithreading.java | 9 ++- .../me/kavin/piped/utils/RequestUtils.java | 4 +- .../kavin/piped/utils/SponsorBlockUtils.java | 65 +++++++++++-------- 4 files changed, 48 insertions(+), 32 deletions(-) diff --git a/src/main/java/me/kavin/piped/server/ServerLauncher.java b/src/main/java/me/kavin/piped/server/ServerLauncher.java index 85e0eac..9c10d57 100644 --- a/src/main/java/me/kavin/piped/server/ServerLauncher.java +++ b/src/main/java/me/kavin/piped/server/ServerLauncher.java @@ -155,7 +155,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { var videoIds = getArray(request.getQueryParameter("videoIds")); return getJsonResponse( - SponsorBlockUtils.getDeArrowedInfo(List.of(videoIds)) + SponsorBlockUtils.getDeArrowedInfo(videoIds) .thenApplyAsync(json -> { try { return mapper.writeValueAsBytes(json); diff --git a/src/main/java/me/kavin/piped/utils/Multithreading.java b/src/main/java/me/kavin/piped/utils/Multithreading.java index ccc54fd..2e4efd4 100644 --- a/src/main/java/me/kavin/piped/utils/Multithreading.java +++ b/src/main/java/me/kavin/piped/utils/Multithreading.java @@ -1,8 +1,6 @@ package me.kavin.piped.utils; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.*; import java.util.function.Supplier; public class Multithreading { @@ -12,11 +10,16 @@ public class Multithreading { .newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 8); private static final ExecutorService esLimitedPubSub = Executors .newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + private static final ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); public static void runAsync(final Runnable runnable) { es.submit(runnable); } + public static void runAsyncTask(final ForkJoinTask task) { + forkJoinPool.submit(task); + } + public static void runAsyncLimited(final Runnable runnable) { esLimited.submit(runnable); } diff --git a/src/main/java/me/kavin/piped/utils/RequestUtils.java b/src/main/java/me/kavin/piped/utils/RequestUtils.java index d886418..f5c25b0 100644 --- a/src/main/java/me/kavin/piped/utils/RequestUtils.java +++ b/src/main/java/me/kavin/piped/utils/RequestUtils.java @@ -42,13 +42,13 @@ public class RequestUtils { } } - public static CompletableFuture sendGetJson(String url) throws Exception { + public static CompletableFuture sendGetJson(String url) { return ReqwestUtils.fetch(url, "GET", null, Map.of()).thenApply(Response::body).thenApplyAsync(resp -> { try { return mapper.readTree(resp); } catch (Exception e) { throw new RuntimeException("Failed to parse JSON", e); } - }, Multithreading.getCachedExecutor()); + }); } } diff --git a/src/main/java/me/kavin/piped/utils/SponsorBlockUtils.java b/src/main/java/me/kavin/piped/utils/SponsorBlockUtils.java index 76c4c9a..1e9db55 100644 --- a/src/main/java/me/kavin/piped/utils/SponsorBlockUtils.java +++ b/src/main/java/me/kavin/piped/utils/SponsorBlockUtils.java @@ -12,10 +12,12 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; import java.io.IOException; -import java.util.List; +import java.util.Arrays; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinTask; +import static me.kavin.piped.consts.Constants.SPONSORBLOCK_SERVERS; import static me.kavin.piped.consts.Constants.mapper; public class SponsorBlockUtils { @@ -54,47 +56,58 @@ public class SponsorBlockUtils { return null; } - public static CompletableFuture getDeArrowedInfo(List videoIds) { + public static CompletableFuture getDeArrowedInfo(String[] videoIds) { ObjectNode objectNode = mapper.createObjectNode(); - var futures = videoIds.stream() - .map(id -> getDeArrowedInfo(id).thenAcceptAsync(jsonNode -> objectNode.set(id, jsonNode.orElse(NullNode.getInstance())))) + var futures = Arrays.stream(videoIds) + .map(id -> getDeArrowedInfo(id, SPONSORBLOCK_SERVERS.toArray(new String[0])) + .thenAcceptAsync(jsonNode -> objectNode.set(id, jsonNode.orElse(NullNode.getInstance()))) + ) .toArray(CompletableFuture[]::new); return CompletableFuture.allOf(futures) .thenApplyAsync(v -> objectNode, Multithreading.getCachedExecutor()); } - private static CompletableFuture> getDeArrowedInfo(String videoId) { + private static CompletableFuture> getDeArrowedInfo(String videoId, String[] servers) { String hash = DigestUtils.sha256Hex(videoId); CompletableFuture> future = new CompletableFuture<>(); - Multithreading.runAsync(() -> { - for (String url : Constants.SPONSORBLOCK_SERVERS) - try { - Optional optional = RequestUtils.sendGetJson(url + "/api/branding/" + URLUtils.silentEncode(hash.substring(0, 4))) - .thenApplyAsync(json -> json.has(videoId) ? Optional.of(json.get(videoId)) : Optional.empty()) - .get(); - - optional.ifPresent(jsonNode -> { - ArrayNode nodes = (ArrayNode) jsonNode.get("thumbnails"); - for (JsonNode node : nodes) { - if (!node.get("original").booleanValue()) - ((ObjectNode) node).set("thumbnail", new TextNode(URLUtils.rewriteURL("https://dearrow-thumb.ajay.app/api/v1/getThumbnail?videoID=" + videoId + "&time=" + node.get("timestamp").asText()))); - } - }); - - - future.complete(optional); - return; - } catch (Exception ignored) { - } - future.completeExceptionally(new Exception("All SponsorBlock servers are down")); + var task = ForkJoinTask.adapt(() -> { + fetchDeArrowedCf(future, videoId, hash, servers); }); + Multithreading.runAsyncTask(task); + return future; } + + private static void fetchDeArrowedCf(CompletableFuture> future, String videoId, String hash, String[] servers) { + + var completableFuture = RequestUtils.sendGetJson(servers[0] + "/api/branding/" + URLUtils.silentEncode(hash.substring(0, 4))) + .thenApplyAsync(json -> json.has(videoId) ? Optional.of(json.get(videoId)) : Optional.empty()); + + completableFuture.thenAcceptAsync(optional -> optional.ifPresent(jsonNode -> { + ArrayNode nodes = (ArrayNode) jsonNode.get("thumbnails"); + for (JsonNode node : nodes) { + if (!node.get("original").booleanValue()) + ((ObjectNode) node).set("thumbnail", new TextNode(URLUtils.rewriteURL("https://dearrow-thumb.ajay.app/api/v1/getThumbnail?videoID=" + videoId + "&time=" + node.get("timestamp").asText()))); + } + })); + + + completableFuture.whenComplete((optional, throwable) -> { + if (throwable == null) + future.complete(optional); + else { + if (servers.length == 1) + future.completeExceptionally(new Exception("All SponsorBlock servers are down")); + else + fetchDeArrowedCf(future, videoId, hash, Arrays.copyOfRange(servers, 1, servers.length)); + } + }); + } }