From eadba213212764870ef9074f2528a932b75b4c8c Mon Sep 17 00:00:00 2001 From: FireMasterK <20838718+FireMasterK@users.noreply.github.com> Date: Wed, 14 Jul 2021 03:09:39 +0530 Subject: [PATCH] Implement Feed, PubSub, query optimizations. --- src/main/java/me/kavin/piped/Main.java | 49 +++ .../java/me/kavin/piped/ServerLauncher.java | 347 ++++++++++-------- .../java/me/kavin/piped/consts/Constants.java | 3 + .../me/kavin/piped/utils/DatabaseHelper.java | 75 ++++ .../me/kavin/piped/utils/Multithreading.java | 8 +- .../me/kavin/piped/utils/ResponseHelper.java | 191 +++++++++- .../me/kavin/piped/utils/obj/FeedItem.java | 24 ++ .../me/kavin/piped/utils/obj/db/Video.java | 11 +- .../piped/utils/resp/DatabaseHelper.java | 21 -- .../utils/resp/SubscribeStatusResponse.java | 10 + 10 files changed, 545 insertions(+), 194 deletions(-) create mode 100644 src/main/java/me/kavin/piped/utils/DatabaseHelper.java create mode 100644 src/main/java/me/kavin/piped/utils/obj/FeedItem.java delete mode 100644 src/main/java/me/kavin/piped/utils/resp/DatabaseHelper.java create mode 100644 src/main/java/me/kavin/piped/utils/resp/SubscribeStatusResponse.java diff --git a/src/main/java/me/kavin/piped/Main.java b/src/main/java/me/kavin/piped/Main.java index e79c379..4dedab6 100644 --- a/src/main/java/me/kavin/piped/Main.java +++ b/src/main/java/me/kavin/piped/Main.java @@ -1,11 +1,20 @@ package me.kavin.piped; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; + +import org.hibernate.Session; +import org.hibernate.Transaction; +import org.hibernate.query.Query; import org.schabi.newpipe.extractor.NewPipe; import org.schabi.newpipe.extractor.localization.Localization; import io.activej.inject.Injector; +import me.kavin.piped.utils.DatabaseHelper; import me.kavin.piped.utils.DatabaseSessionFactory; import me.kavin.piped.utils.DownloaderImpl; +import me.kavin.piped.utils.ResponseHelper; public class Main { @@ -19,6 +28,46 @@ public class Main { DatabaseSessionFactory.createSession().close(); }).start(); + new Timer().scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try { + Session s = DatabaseSessionFactory.createSession(); + + for (String channelId : DatabaseHelper.getGlobalSubscribedChannelIds(s)) + ResponseHelper.subscribePubSub(channelId); + + s.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, 0, TimeUnit.MINUTES.toMillis(90)); + + new Timer().scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try { + Session s = DatabaseSessionFactory.createSession(); + + Transaction tr = s.getTransaction(); + + tr.begin(); + + Query query = s.createQuery("delete from Video where uploaded < :time").setParameter("time", + System.currentTimeMillis() - TimeUnit.DAYS.toMillis(10)); + + System.out.println(String.format("Cleanup: Removed %o old videos", query.executeUpdate())); + + tr.commit(); + + s.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, 0, TimeUnit.MINUTES.toMillis(60)); + new ServerLauncher().launch(args); } diff --git a/src/main/java/me/kavin/piped/ServerLauncher.java b/src/main/java/me/kavin/piped/ServerLauncher.java index cce57ca..4dfd006 100644 --- a/src/main/java/me/kavin/piped/ServerLauncher.java +++ b/src/main/java/me/kavin/piped/ServerLauncher.java @@ -4,6 +4,8 @@ import static io.activej.config.converter.ConfigConverters.ofInetSocketAddress; import static io.activej.http.HttpHeaders.AUTHORIZATION; import static io.activej.http.HttpHeaders.CACHE_CONTROL; import static io.activej.http.HttpHeaders.CONTENT_TYPE; +import static io.activej.http.HttpMethod.GET; +import static io.activej.http.HttpMethod.POST; import java.io.ByteArrayInputStream; import java.net.InetSocketAddress; @@ -33,6 +35,7 @@ import io.activej.inject.module.Module; import io.activej.launchers.http.MultithreadedHttpServerLauncher; import me.kavin.piped.consts.Constants; import me.kavin.piped.utils.CustomServletDecorator; +import me.kavin.piped.utils.Multithreading; import me.kavin.piped.utils.ResponseHelper; import me.kavin.piped.utils.SponsorBlockUtils; import me.kavin.piped.utils.resp.ErrorResponse; @@ -49,169 +52,195 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { @Provides AsyncServlet mainServlet(Executor executor) { - RoutingServlet router = RoutingServlet.create().map(HttpMethod.GET, "/webhooks/pubsub", request -> { - return HttpResponse.ok200().withPlainText(request.getQueryParameter("hub.challenge")); - }).map(HttpMethod.POST, "/webhooks/pubsub", AsyncServlet.ofBlocking(executor, request -> { - try { + RoutingServlet router = RoutingServlet.create() + .map(HttpMethod.OPTIONS, "/*", request -> HttpResponse.ofCode(200)) + .map(GET, "/webhooks/pubsub", request -> { + return HttpResponse.ok200().withPlainText(request.getQueryParameter("hub.challenge")); + }).map(POST, "/webhooks/pubsub", AsyncServlet.ofBlocking(executor, request -> { + try { - SyndFeed feed = new SyndFeedInput() - .build(new InputSource(new ByteArrayInputStream(request.loadBody().getResult().asArray()))); + SyndFeed feed = new SyndFeedInput().build( + new InputSource(new ByteArrayInputStream(request.loadBody().getResult().asArray()))); - feed.getEntries().forEach(entry -> { - System.out.println(entry.getLinks().get(0).getHref()); - System.out.println(entry.getAuthors().get(0).getUri()); - }); + Multithreading.runAsync(() -> { + feed.getEntries().forEach(entry -> { + System.out.println(entry.getLinks().get(0).getHref()); + ResponseHelper.handleNewVideo(entry.getLinks().get(0).getHref(), + entry.getPublishedDate().getTime()); + }); + }); - return HttpResponse.ofCode(204); + return HttpResponse.ofCode(204); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/sponsors/:videoId", AsyncServlet.ofBlocking(executor, request -> { - try { - return getJsonResponse(SponsorBlockUtils - .getSponsors(request.getPathParameter("videoId"), request.getQueryParameter("category")) - .getBytes(StandardCharsets.UTF_8), "public, max-age=3600"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/streams/:videoId", AsyncServlet.ofBlocking(executor, request -> { - try { - return getJsonResponse(ResponseHelper.streamsResponse(request.getPathParameter("videoId")), - "public, s-maxage=21540"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/channel/:channelId", AsyncServlet.ofBlocking(executor, request -> { - try { - return getJsonResponse( - ResponseHelper.channelResponse("channel/" + request.getPathParameter("channelId")), - "public, max-age=600"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/c/:name", AsyncServlet.ofBlocking(executor, request -> { - try { - return getJsonResponse(ResponseHelper.channelResponse("c/" + request.getPathParameter("name")), - "public, max-age=600"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/user/:name", AsyncServlet.ofBlocking(executor, request -> { - try { - return getJsonResponse(ResponseHelper.channelResponse("user/" + request.getPathParameter("name")), - "public, max-age=600"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/nextpage/channel/:channelId", AsyncServlet.ofBlocking(executor, request -> { - try { - return getJsonResponse(ResponseHelper.channelPageResponse(request.getPathParameter("channelId"), - request.getQueryParameter("nextpage")), "public, max-age=3600"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/playlists/:playlistId", AsyncServlet.ofBlocking(executor, request -> { - try { - return getJsonResponse(ResponseHelper.playlistResponse(request.getPathParameter("playlistId")), - "public, max-age=600"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/nextpage/playlists/:playlistId", AsyncServlet.ofBlocking(executor, request -> { - try { - return getJsonResponse(ResponseHelper.playlistPageResponse(request.getPathParameter("playlistId"), - request.getQueryParameter("nextpage")), "public, max-age=3600"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/rss/playlists/:playlistId", AsyncServlet.ofBlocking(executor, request -> { - try { - return getJsonResponse(ResponseHelper.playlistRSSResponse(request.getPathParameter("playlistId")), - "public, s-maxage=600"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/suggestions", AsyncServlet.ofBlocking(executor, request -> { - try { - return getJsonResponse(ResponseHelper.suggestionsResponse(request.getQueryParameter("query")), - "public, max-age=600"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/search", AsyncServlet.ofBlocking(executor, request -> { - try { - return getJsonResponse(ResponseHelper.searchResponse(request.getQueryParameter("q"), - request.getQueryParameter("filter")), "public, max-age=600"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/nextpage/search", AsyncServlet.ofBlocking(executor, request -> { - try { - return getJsonResponse( - ResponseHelper.searchPageResponse(request.getQueryParameter("q"), - request.getQueryParameter("filter"), request.getQueryParameter("nextpage")), - "public, max-age=3600"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/trending", AsyncServlet.ofBlocking(executor, request -> { - try { - return getJsonResponse(ResponseHelper.trendingResponse(request.getQueryParameter("region")), - "public, max-age=3600"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/comments/:videoId", AsyncServlet.ofBlocking(executor, request -> { - try { - return getJsonResponse(ResponseHelper.commentsResponse(request.getPathParameter("videoId")), - "public, max-age=1200"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/nextpage/comments/:videoId", AsyncServlet.ofBlocking(executor, request -> { - try { - return getJsonResponse(ResponseHelper.commentsPageResponse(request.getPathParameter("videoId"), - request.getQueryParameter("url")), "public, max-age=3600"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/register", AsyncServlet.ofBlocking(executor, request -> { - try { - LoginRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(), - LoginRequest.class); - return getJsonResponse(ResponseHelper.registerResponse(body.username, body.password), "private"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map("/login", AsyncServlet.ofBlocking(executor, request -> { - try { - LoginRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(), - LoginRequest.class); - return getJsonResponse(ResponseHelper.loginResponse(body.username, body.password), "private"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map(HttpMethod.POST, "/subscribe", AsyncServlet.ofBlocking(executor, request -> { - try { - SubscriptionUpdateRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(), - SubscriptionUpdateRequest.class); - return getJsonResponse( - ResponseHelper.subscribeResponse(request.getHeader(AUTHORIZATION), body.channelId), "private"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map(HttpMethod.POST, "/unsubscribe", AsyncServlet.ofBlocking(executor, request -> { - try { - SubscriptionUpdateRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(), - SubscriptionUpdateRequest.class); - return getJsonResponse( - ResponseHelper.unsubscribeResponse(request.getHeader(AUTHORIZATION), body.channelId), - "private"); - } catch (Exception e) { - return getErrorResponse(e); - } - })).map(HttpMethod.OPTIONS, "/*", request -> HttpResponse.ofCode(200)); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/sponsors/:videoId", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse( + SponsorBlockUtils.getSponsors(request.getPathParameter("videoId"), + request.getQueryParameter("category")).getBytes(StandardCharsets.UTF_8), + "public, max-age=3600"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/streams/:videoId", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse(ResponseHelper.streamsResponse(request.getPathParameter("videoId")), + "public, s-maxage=21540"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/channel/:channelId", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse( + ResponseHelper.channelResponse("channel/" + request.getPathParameter("channelId")), + "public, max-age=600"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/c/:name", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse(ResponseHelper.channelResponse("c/" + request.getPathParameter("name")), + "public, max-age=600"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/user/:name", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse( + ResponseHelper.channelResponse("user/" + request.getPathParameter("name")), + "public, max-age=600"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/nextpage/channel/:channelId", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse(ResponseHelper.channelPageResponse(request.getPathParameter("channelId"), + request.getQueryParameter("nextpage")), "public, max-age=3600"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/playlists/:playlistId", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse(ResponseHelper.playlistResponse(request.getPathParameter("playlistId")), + "public, max-age=600"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/nextpage/playlists/:playlistId", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse( + ResponseHelper.playlistPageResponse(request.getPathParameter("playlistId"), + request.getQueryParameter("nextpage")), + "public, max-age=3600"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/rss/playlists/:playlistId", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse( + ResponseHelper.playlistRSSResponse(request.getPathParameter("playlistId")), + "public, s-maxage=600"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/suggestions", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse(ResponseHelper.suggestionsResponse(request.getQueryParameter("query")), + "public, max-age=600"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/search", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse(ResponseHelper.searchResponse(request.getQueryParameter("q"), + request.getQueryParameter("filter")), "public, max-age=600"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/nextpage/search", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse( + ResponseHelper.searchPageResponse(request.getQueryParameter("q"), + request.getQueryParameter("filter"), request.getQueryParameter("nextpage")), + "public, max-age=3600"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/trending", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse(ResponseHelper.trendingResponse(request.getQueryParameter("region")), + "public, max-age=3600"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/comments/:videoId", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse(ResponseHelper.commentsResponse(request.getPathParameter("videoId")), + "public, max-age=1200"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/nextpage/comments/:videoId", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse(ResponseHelper.commentsPageResponse(request.getPathParameter("videoId"), + request.getQueryParameter("url")), "public, max-age=3600"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(POST, "/register", AsyncServlet.ofBlocking(executor, request -> { + try { + LoginRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(), + LoginRequest.class); + return getJsonResponse(ResponseHelper.registerResponse(body.username, body.password), + "private"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(POST, "/login", AsyncServlet.ofBlocking(executor, request -> { + try { + LoginRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(), + LoginRequest.class); + return getJsonResponse(ResponseHelper.loginResponse(body.username, body.password), "private"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(POST, "/subscribe", AsyncServlet.ofBlocking(executor, request -> { + try { + SubscriptionUpdateRequest body = Constants.mapper + .readValue(request.loadBody().getResult().asArray(), SubscriptionUpdateRequest.class); + return getJsonResponse( + ResponseHelper.subscribeResponse(request.getHeader(AUTHORIZATION), body.channelId), + "private"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(POST, "/unsubscribe", AsyncServlet.ofBlocking(executor, request -> { + try { + SubscriptionUpdateRequest body = Constants.mapper + .readValue(request.loadBody().getResult().asArray(), SubscriptionUpdateRequest.class); + return getJsonResponse( + ResponseHelper.unsubscribeResponse(request.getHeader(AUTHORIZATION), body.channelId), + "private"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/subscribed", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse(ResponseHelper.isSubscribedResponse(request.getHeader(AUTHORIZATION), + request.getQueryParameter("channelId")), "private"); + } catch (Exception e) { + return getErrorResponse(e); + } + })).map(GET, "/feed", AsyncServlet.ofBlocking(executor, request -> { + try { + return getJsonResponse(ResponseHelper.feedResponse(request.getQueryParameter("authToken")), + "private"); + } catch (Exception e) { + return getErrorResponse(e); + } + })); return new CustomServletDecorator(router); } diff --git a/src/main/java/me/kavin/piped/consts/Constants.java b/src/main/java/me/kavin/piped/consts/Constants.java index fba82b1..876be91 100644 --- a/src/main/java/me/kavin/piped/consts/Constants.java +++ b/src/main/java/me/kavin/piped/consts/Constants.java @@ -29,6 +29,8 @@ public class Constants { public static final StreamingService YOUTUBE_SERVICE; + public static final String PUBLIC_URL; + public static final HttpClient h2client = HttpClient.newBuilder().followRedirects(Redirect.NORMAL) .version(Version.HTTP_2).build(); public static final HttpClient h2_no_redir_client = HttpClient.newBuilder().followRedirects(Redirect.NEVER) @@ -48,6 +50,7 @@ public class Constants { PROXY_PART = prop.getProperty("PROXY_PART"); CAPTCHA_BASE_URL = prop.getProperty("CAPTCHA_BASE_URL"); CAPTCHA_API_KEY = prop.getProperty("CAPTCHA_API_KEY"); + PUBLIC_URL = prop.getProperty("API_URL"); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/src/main/java/me/kavin/piped/utils/DatabaseHelper.java b/src/main/java/me/kavin/piped/utils/DatabaseHelper.java new file mode 100644 index 0000000..1b47bee --- /dev/null +++ b/src/main/java/me/kavin/piped/utils/DatabaseHelper.java @@ -0,0 +1,75 @@ +package me.kavin.piped.utils; + +import java.util.LinkedHashSet; +import java.util.List; + +import javax.persistence.criteria.CriteriaBuilder; +import javax.persistence.criteria.CriteriaQuery; +import javax.persistence.criteria.JoinType; +import javax.persistence.criteria.Root; + +import org.hibernate.Session; + +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import me.kavin.piped.utils.obj.db.Channel; +import me.kavin.piped.utils.obj.db.User; +import me.kavin.piped.utils.obj.db.Video; + +public class DatabaseHelper { + + public static final User getUserFromSession(Session s, String session) { + CriteriaBuilder cb = s.getCriteriaBuilder(); + CriteriaQuery cr = cb.createQuery(User.class); + Root root = cr.from(User.class); + cr.select(root).where(root.get("sessionId").in(session)); + + return s.createQuery(cr).uniqueResult(); + } + + public static final User getUserFromSessionWithSubscribed(Session s, String session) { + CriteriaBuilder cb = s.getCriteriaBuilder(); + CriteriaQuery cr = cb.createQuery(User.class); + Root root = cr.from(User.class); + root.fetch("subscribed_ids", JoinType.LEFT); + cr.select(root).where(root.get("sessionId").in(session)); + + return s.createQuery(cr).uniqueResult(); + } + + public static final Channel getChannelFromId(Session s, String id) { + CriteriaBuilder cb = s.getCriteriaBuilder(); + CriteriaQuery cr = cb.createQuery(Channel.class); + Root root = cr.from(Channel.class); + cr.select(root).where(root.get("uploader_id").in(id)); + + return s.createQuery(cr).uniqueResult(); + } + + public static final List