Implement Feed, PubSub, query optimizations.

This commit is contained in:
FireMasterK 2021-07-14 03:09:39 +05:30
parent 27e3eee8ca
commit eadba21321
No known key found for this signature in database
GPG key ID: 49451E4482CC5BCD
10 changed files with 545 additions and 194 deletions

View file

@ -1,11 +1,20 @@
package me.kavin.piped; package me.kavin.piped;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import org.hibernate.Session;
import org.hibernate.Transaction;
import org.hibernate.query.Query;
import org.schabi.newpipe.extractor.NewPipe; import org.schabi.newpipe.extractor.NewPipe;
import org.schabi.newpipe.extractor.localization.Localization; import org.schabi.newpipe.extractor.localization.Localization;
import io.activej.inject.Injector; import io.activej.inject.Injector;
import me.kavin.piped.utils.DatabaseHelper;
import me.kavin.piped.utils.DatabaseSessionFactory; import me.kavin.piped.utils.DatabaseSessionFactory;
import me.kavin.piped.utils.DownloaderImpl; import me.kavin.piped.utils.DownloaderImpl;
import me.kavin.piped.utils.ResponseHelper;
public class Main { public class Main {
@ -19,6 +28,46 @@ public class Main {
DatabaseSessionFactory.createSession().close(); DatabaseSessionFactory.createSession().close();
}).start(); }).start();
new Timer().scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
Session s = DatabaseSessionFactory.createSession();
for (String channelId : DatabaseHelper.getGlobalSubscribedChannelIds(s))
ResponseHelper.subscribePubSub(channelId);
s.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}, 0, TimeUnit.MINUTES.toMillis(90));
new Timer().scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
Session s = DatabaseSessionFactory.createSession();
Transaction tr = s.getTransaction();
tr.begin();
Query<?> query = s.createQuery("delete from Video where uploaded < :time").setParameter("time",
System.currentTimeMillis() - TimeUnit.DAYS.toMillis(10));
System.out.println(String.format("Cleanup: Removed %o old videos", query.executeUpdate()));
tr.commit();
s.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}, 0, TimeUnit.MINUTES.toMillis(60));
new ServerLauncher().launch(args); new ServerLauncher().launch(args);
} }

View file

@ -4,6 +4,8 @@ import static io.activej.config.converter.ConfigConverters.ofInetSocketAddress;
import static io.activej.http.HttpHeaders.AUTHORIZATION; import static io.activej.http.HttpHeaders.AUTHORIZATION;
import static io.activej.http.HttpHeaders.CACHE_CONTROL; import static io.activej.http.HttpHeaders.CACHE_CONTROL;
import static io.activej.http.HttpHeaders.CONTENT_TYPE; import static io.activej.http.HttpHeaders.CONTENT_TYPE;
import static io.activej.http.HttpMethod.GET;
import static io.activej.http.HttpMethod.POST;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -33,6 +35,7 @@ import io.activej.inject.module.Module;
import io.activej.launchers.http.MultithreadedHttpServerLauncher; import io.activej.launchers.http.MultithreadedHttpServerLauncher;
import me.kavin.piped.consts.Constants; import me.kavin.piped.consts.Constants;
import me.kavin.piped.utils.CustomServletDecorator; import me.kavin.piped.utils.CustomServletDecorator;
import me.kavin.piped.utils.Multithreading;
import me.kavin.piped.utils.ResponseHelper; import me.kavin.piped.utils.ResponseHelper;
import me.kavin.piped.utils.SponsorBlockUtils; import me.kavin.piped.utils.SponsorBlockUtils;
import me.kavin.piped.utils.resp.ErrorResponse; import me.kavin.piped.utils.resp.ErrorResponse;
@ -49,17 +52,22 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
@Provides @Provides
AsyncServlet mainServlet(Executor executor) { AsyncServlet mainServlet(Executor executor) {
RoutingServlet router = RoutingServlet.create().map(HttpMethod.GET, "/webhooks/pubsub", request -> { RoutingServlet router = RoutingServlet.create()
.map(HttpMethod.OPTIONS, "/*", request -> HttpResponse.ofCode(200))
.map(GET, "/webhooks/pubsub", request -> {
return HttpResponse.ok200().withPlainText(request.getQueryParameter("hub.challenge")); return HttpResponse.ok200().withPlainText(request.getQueryParameter("hub.challenge"));
}).map(HttpMethod.POST, "/webhooks/pubsub", AsyncServlet.ofBlocking(executor, request -> { }).map(POST, "/webhooks/pubsub", AsyncServlet.ofBlocking(executor, request -> {
try { try {
SyndFeed feed = new SyndFeedInput() SyndFeed feed = new SyndFeedInput().build(
.build(new InputSource(new ByteArrayInputStream(request.loadBody().getResult().asArray()))); new InputSource(new ByteArrayInputStream(request.loadBody().getResult().asArray())));
Multithreading.runAsync(() -> {
feed.getEntries().forEach(entry -> { feed.getEntries().forEach(entry -> {
System.out.println(entry.getLinks().get(0).getHref()); System.out.println(entry.getLinks().get(0).getHref());
System.out.println(entry.getAuthors().get(0).getUri()); ResponseHelper.handleNewVideo(entry.getLinks().get(0).getHref(),
entry.getPublishedDate().getTime());
});
}); });
return HttpResponse.ofCode(204); return HttpResponse.ofCode(204);
@ -67,22 +75,23 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/sponsors/:videoId", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/sponsors/:videoId", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse(SponsorBlockUtils return getJsonResponse(
.getSponsors(request.getPathParameter("videoId"), request.getQueryParameter("category")) SponsorBlockUtils.getSponsors(request.getPathParameter("videoId"),
.getBytes(StandardCharsets.UTF_8), "public, max-age=3600"); request.getQueryParameter("category")).getBytes(StandardCharsets.UTF_8),
"public, max-age=3600");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/streams/:videoId", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/streams/:videoId", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse(ResponseHelper.streamsResponse(request.getPathParameter("videoId")), return getJsonResponse(ResponseHelper.streamsResponse(request.getPathParameter("videoId")),
"public, s-maxage=21540"); "public, s-maxage=21540");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/channel/:channelId", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/channel/:channelId", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse( return getJsonResponse(
ResponseHelper.channelResponse("channel/" + request.getPathParameter("channelId")), ResponseHelper.channelResponse("channel/" + request.getPathParameter("channelId")),
@ -90,63 +99,67 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/c/:name", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/c/:name", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse(ResponseHelper.channelResponse("c/" + request.getPathParameter("name")), return getJsonResponse(ResponseHelper.channelResponse("c/" + request.getPathParameter("name")),
"public, max-age=600"); "public, max-age=600");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/user/:name", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/user/:name", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse(ResponseHelper.channelResponse("user/" + request.getPathParameter("name")), return getJsonResponse(
ResponseHelper.channelResponse("user/" + request.getPathParameter("name")),
"public, max-age=600"); "public, max-age=600");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/nextpage/channel/:channelId", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/nextpage/channel/:channelId", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse(ResponseHelper.channelPageResponse(request.getPathParameter("channelId"), return getJsonResponse(ResponseHelper.channelPageResponse(request.getPathParameter("channelId"),
request.getQueryParameter("nextpage")), "public, max-age=3600"); request.getQueryParameter("nextpage")), "public, max-age=3600");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/playlists/:playlistId", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/playlists/:playlistId", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse(ResponseHelper.playlistResponse(request.getPathParameter("playlistId")), return getJsonResponse(ResponseHelper.playlistResponse(request.getPathParameter("playlistId")),
"public, max-age=600"); "public, max-age=600");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/nextpage/playlists/:playlistId", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/nextpage/playlists/:playlistId", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse(ResponseHelper.playlistPageResponse(request.getPathParameter("playlistId"), return getJsonResponse(
request.getQueryParameter("nextpage")), "public, max-age=3600"); ResponseHelper.playlistPageResponse(request.getPathParameter("playlistId"),
request.getQueryParameter("nextpage")),
"public, max-age=3600");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/rss/playlists/:playlistId", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/rss/playlists/:playlistId", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse(ResponseHelper.playlistRSSResponse(request.getPathParameter("playlistId")), return getJsonResponse(
ResponseHelper.playlistRSSResponse(request.getPathParameter("playlistId")),
"public, s-maxage=600"); "public, s-maxage=600");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/suggestions", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/suggestions", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse(ResponseHelper.suggestionsResponse(request.getQueryParameter("query")), return getJsonResponse(ResponseHelper.suggestionsResponse(request.getQueryParameter("query")),
"public, max-age=600"); "public, max-age=600");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/search", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/search", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse(ResponseHelper.searchResponse(request.getQueryParameter("q"), return getJsonResponse(ResponseHelper.searchResponse(request.getQueryParameter("q"),
request.getQueryParameter("filter")), "public, max-age=600"); request.getQueryParameter("filter")), "public, max-age=600");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/nextpage/search", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/nextpage/search", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse( return getJsonResponse(
ResponseHelper.searchPageResponse(request.getQueryParameter("q"), ResponseHelper.searchPageResponse(request.getQueryParameter("q"),
@ -155,36 +168,37 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/trending", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/trending", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse(ResponseHelper.trendingResponse(request.getQueryParameter("region")), return getJsonResponse(ResponseHelper.trendingResponse(request.getQueryParameter("region")),
"public, max-age=3600"); "public, max-age=3600");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/comments/:videoId", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/comments/:videoId", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse(ResponseHelper.commentsResponse(request.getPathParameter("videoId")), return getJsonResponse(ResponseHelper.commentsResponse(request.getPathParameter("videoId")),
"public, max-age=1200"); "public, max-age=1200");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/nextpage/comments/:videoId", AsyncServlet.ofBlocking(executor, request -> { })).map(GET, "/nextpage/comments/:videoId", AsyncServlet.ofBlocking(executor, request -> {
try { try {
return getJsonResponse(ResponseHelper.commentsPageResponse(request.getPathParameter("videoId"), return getJsonResponse(ResponseHelper.commentsPageResponse(request.getPathParameter("videoId"),
request.getQueryParameter("url")), "public, max-age=3600"); request.getQueryParameter("url")), "public, max-age=3600");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/register", AsyncServlet.ofBlocking(executor, request -> { })).map(POST, "/register", AsyncServlet.ofBlocking(executor, request -> {
try { try {
LoginRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(), LoginRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
LoginRequest.class); LoginRequest.class);
return getJsonResponse(ResponseHelper.registerResponse(body.username, body.password), "private"); return getJsonResponse(ResponseHelper.registerResponse(body.username, body.password),
"private");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map("/login", AsyncServlet.ofBlocking(executor, request -> { })).map(POST, "/login", AsyncServlet.ofBlocking(executor, request -> {
try { try {
LoginRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(), LoginRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
LoginRequest.class); LoginRequest.class);
@ -192,26 +206,41 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map(HttpMethod.POST, "/subscribe", AsyncServlet.ofBlocking(executor, request -> { })).map(POST, "/subscribe", AsyncServlet.ofBlocking(executor, request -> {
try { try {
SubscriptionUpdateRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(), SubscriptionUpdateRequest body = Constants.mapper
SubscriptionUpdateRequest.class); .readValue(request.loadBody().getResult().asArray(), SubscriptionUpdateRequest.class);
return getJsonResponse( return getJsonResponse(
ResponseHelper.subscribeResponse(request.getHeader(AUTHORIZATION), body.channelId), "private"); ResponseHelper.subscribeResponse(request.getHeader(AUTHORIZATION), body.channelId),
"private");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map(HttpMethod.POST, "/unsubscribe", AsyncServlet.ofBlocking(executor, request -> { })).map(POST, "/unsubscribe", AsyncServlet.ofBlocking(executor, request -> {
try { try {
SubscriptionUpdateRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(), SubscriptionUpdateRequest body = Constants.mapper
SubscriptionUpdateRequest.class); .readValue(request.loadBody().getResult().asArray(), SubscriptionUpdateRequest.class);
return getJsonResponse( return getJsonResponse(
ResponseHelper.unsubscribeResponse(request.getHeader(AUTHORIZATION), body.channelId), ResponseHelper.unsubscribeResponse(request.getHeader(AUTHORIZATION), body.channelId),
"private"); "private");
} catch (Exception e) { } catch (Exception e) {
return getErrorResponse(e); return getErrorResponse(e);
} }
})).map(HttpMethod.OPTIONS, "/*", request -> HttpResponse.ofCode(200)); })).map(GET, "/subscribed", AsyncServlet.ofBlocking(executor, request -> {
try {
return getJsonResponse(ResponseHelper.isSubscribedResponse(request.getHeader(AUTHORIZATION),
request.getQueryParameter("channelId")), "private");
} catch (Exception e) {
return getErrorResponse(e);
}
})).map(GET, "/feed", AsyncServlet.ofBlocking(executor, request -> {
try {
return getJsonResponse(ResponseHelper.feedResponse(request.getQueryParameter("authToken")),
"private");
} catch (Exception e) {
return getErrorResponse(e);
}
}));
return new CustomServletDecorator(router); return new CustomServletDecorator(router);
} }

View file

@ -29,6 +29,8 @@ public class Constants {
public static final StreamingService YOUTUBE_SERVICE; public static final StreamingService YOUTUBE_SERVICE;
public static final String PUBLIC_URL;
public static final HttpClient h2client = HttpClient.newBuilder().followRedirects(Redirect.NORMAL) public static final HttpClient h2client = HttpClient.newBuilder().followRedirects(Redirect.NORMAL)
.version(Version.HTTP_2).build(); .version(Version.HTTP_2).build();
public static final HttpClient h2_no_redir_client = HttpClient.newBuilder().followRedirects(Redirect.NEVER) public static final HttpClient h2_no_redir_client = HttpClient.newBuilder().followRedirects(Redirect.NEVER)
@ -48,6 +50,7 @@ public class Constants {
PROXY_PART = prop.getProperty("PROXY_PART"); PROXY_PART = prop.getProperty("PROXY_PART");
CAPTCHA_BASE_URL = prop.getProperty("CAPTCHA_BASE_URL"); CAPTCHA_BASE_URL = prop.getProperty("CAPTCHA_BASE_URL");
CAPTCHA_API_KEY = prop.getProperty("CAPTCHA_API_KEY"); CAPTCHA_API_KEY = prop.getProperty("CAPTCHA_API_KEY");
PUBLIC_URL = prop.getProperty("API_URL");
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }

View file

@ -0,0 +1,75 @@
package me.kavin.piped.utils;
import java.util.LinkedHashSet;
import java.util.List;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.JoinType;
import javax.persistence.criteria.Root;
import org.hibernate.Session;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import me.kavin.piped.utils.obj.db.Channel;
import me.kavin.piped.utils.obj.db.User;
import me.kavin.piped.utils.obj.db.Video;
public class DatabaseHelper {
public static final User getUserFromSession(Session s, String session) {
CriteriaBuilder cb = s.getCriteriaBuilder();
CriteriaQuery<User> cr = cb.createQuery(User.class);
Root<User> root = cr.from(User.class);
cr.select(root).where(root.get("sessionId").in(session));
return s.createQuery(cr).uniqueResult();
}
public static final User getUserFromSessionWithSubscribed(Session s, String session) {
CriteriaBuilder cb = s.getCriteriaBuilder();
CriteriaQuery<User> cr = cb.createQuery(User.class);
Root<User> root = cr.from(User.class);
root.fetch("subscribed_ids", JoinType.LEFT);
cr.select(root).where(root.get("sessionId").in(session));
return s.createQuery(cr).uniqueResult();
}
public static final Channel getChannelFromId(Session s, String id) {
CriteriaBuilder cb = s.getCriteriaBuilder();
CriteriaQuery<Channel> cr = cb.createQuery(Channel.class);
Root<Channel> root = cr.from(Channel.class);
cr.select(root).where(root.get("uploader_id").in(id));
return s.createQuery(cr).uniqueResult();
}
public static final List<Video> getVideosFromChannelIds(Session s, List<String> id) {
CriteriaBuilder cb = s.getCriteriaBuilder();
CriteriaQuery<Video> cr = cb.createQuery(Video.class);
Root<Video> root = cr.from(Video.class);
root.fetch("channel", JoinType.LEFT);
cr.select(root).where(root.get("channel").get("uploader_id").in(id));
return s.createQuery(cr).getResultList();
}
public static final List<String> getGlobalSubscribedChannelIds(Session s) {
@SuppressWarnings("unchecked")
List<String> subscriptions = new ObjectArrayList<>(
new LinkedHashSet<>(s.createNativeQuery("Select channel from users_subscribed").getResultList()));
return subscriptions;
}
public static final Video getVideoFromId(Session s, String id) {
CriteriaBuilder cb = s.getCriteriaBuilder();
CriteriaQuery<Video> cr = cb.createQuery(Video.class);
Root<Video> root = cr.from(Video.class);
cr.select(root).where(root.get("id").in(id));
return s.createQuery(cr).uniqueResult();
}
}

View file

@ -5,15 +5,9 @@ import java.util.concurrent.Executors;
public class Multithreading { public class Multithreading {
private static final ExecutorService es = Executors.newFixedThreadPool(16); private static final ExecutorService es = Executors.newCachedThreadPool();
public static void runAsync(final Runnable runnable) { public static void runAsync(final Runnable runnable) {
es.submit(runnable); es.submit(runnable);
} }
public static void runAsyncTimeout(final Runnable runnable) {
es.submit(runnable);
}
} }

View file

@ -6,12 +6,14 @@ import java.net.URI;
import java.net.URL; import java.net.URL;
import java.net.http.HttpRequest; import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublishers; import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpRequest.Builder;
import java.net.http.HttpResponse.BodyHandlers; import java.net.http.HttpResponse.BodyHandlers;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.security.spec.InvalidKeySpecException; import java.security.spec.InvalidKeySpecException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -52,6 +54,7 @@ import com.rometools.rome.feed.synd.SyndFeedImpl;
import com.rometools.rome.io.FeedException; import com.rometools.rome.io.FeedException;
import com.rometools.rome.io.SyndFeedOutput; import com.rometools.rome.io.SyndFeedOutput;
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import me.kavin.piped.consts.Constants; import me.kavin.piped.consts.Constants;
import me.kavin.piped.ipfs.IPFS; import me.kavin.piped.ipfs.IPFS;
@ -59,6 +62,7 @@ import me.kavin.piped.utils.obj.Channel;
import me.kavin.piped.utils.obj.ChapterSegment; import me.kavin.piped.utils.obj.ChapterSegment;
import me.kavin.piped.utils.obj.Comment; import me.kavin.piped.utils.obj.Comment;
import me.kavin.piped.utils.obj.CommentsPage; import me.kavin.piped.utils.obj.CommentsPage;
import me.kavin.piped.utils.obj.FeedItem;
import me.kavin.piped.utils.obj.PipedStream; import me.kavin.piped.utils.obj.PipedStream;
import me.kavin.piped.utils.obj.Playlist; import me.kavin.piped.utils.obj.Playlist;
import me.kavin.piped.utils.obj.SearchResults; import me.kavin.piped.utils.obj.SearchResults;
@ -67,6 +71,7 @@ import me.kavin.piped.utils.obj.Streams;
import me.kavin.piped.utils.obj.StreamsPage; import me.kavin.piped.utils.obj.StreamsPage;
import me.kavin.piped.utils.obj.Subtitle; import me.kavin.piped.utils.obj.Subtitle;
import me.kavin.piped.utils.obj.db.User; import me.kavin.piped.utils.obj.db.User;
import me.kavin.piped.utils.obj.db.Video;
import me.kavin.piped.utils.obj.search.SearchChannel; import me.kavin.piped.utils.obj.search.SearchChannel;
import me.kavin.piped.utils.obj.search.SearchItem; import me.kavin.piped.utils.obj.search.SearchItem;
import me.kavin.piped.utils.obj.search.SearchPlaylist; import me.kavin.piped.utils.obj.search.SearchPlaylist;
@ -74,9 +79,9 @@ import me.kavin.piped.utils.obj.search.SearchStream;
import me.kavin.piped.utils.resp.AcceptedResponse; import me.kavin.piped.utils.resp.AcceptedResponse;
import me.kavin.piped.utils.resp.AlreadyRegisteredResponse; import me.kavin.piped.utils.resp.AlreadyRegisteredResponse;
import me.kavin.piped.utils.resp.AuthenticationFailureResponse; import me.kavin.piped.utils.resp.AuthenticationFailureResponse;
import me.kavin.piped.utils.resp.DatabaseHelper;
import me.kavin.piped.utils.resp.IncorrectCredentialsResponse; import me.kavin.piped.utils.resp.IncorrectCredentialsResponse;
import me.kavin.piped.utils.resp.LoginResponse; import me.kavin.piped.utils.resp.LoginResponse;
import me.kavin.piped.utils.resp.SubscribeStatusResponse;
public class ResponseHelper { public class ResponseHelper {
@ -159,6 +164,10 @@ public class ResponseHelper {
info.getStreamSegments().forEach( info.getStreamSegments().forEach(
segment -> segments.add(new ChapterSegment(segment.getTitle(), segment.getStartTimeSeconds()))); segment -> segments.add(new ChapterSegment(segment.getTitle(), segment.getStartTimeSeconds())));
if (info.getUploadDate() != null && System.currentTimeMillis()
- info.getUploadDate().offsetDateTime().toInstant().toEpochMilli() < TimeUnit.DAYS.toMillis(10))
updateViews(info.getId(), info.getViewCount());
final Streams streams = new Streams(info.getName(), info.getDescription().getContent(), final Streams streams = new Streams(info.getName(), info.getDescription().getContent(),
info.getTextualUploadDate(), info.getUploaderName(), info.getUploaderUrl().substring(23), info.getTextualUploadDate(), info.getUploaderName(), info.getUploaderUrl().substring(23),
rewriteURL(info.getUploaderAvatarUrl()), rewriteURL(info.getThumbnailUrl()), info.getDuration(), rewriteURL(info.getUploaderAvatarUrl()), rewriteURL(info.getThumbnailUrl()), info.getDuration(),
@ -544,6 +553,44 @@ public class ResponseHelper {
s.beginTransaction().commit(); s.beginTransaction().commit();
} }
s.close(); s.close();
Multithreading.runAsync(() -> {
Session sess = DatabaseSessionFactory.createSession();
me.kavin.piped.utils.obj.db.Channel channel = DatabaseHelper.getChannelFromId(sess, channelId);
if (channel == null) {
ChannelInfo info = null;
try {
info = ChannelInfo.getInfo("https://youtube.com/channel/" + channelId);
} catch (IOException | ExtractionException e) {
ExceptionUtils.rethrow(e);
}
channel = new me.kavin.piped.utils.obj.db.Channel(channelId, info.getName(), info.getAvatarUrl(),
false);
sess.save(channel);
sess.beginTransaction().commit();
try {
subscribePubSub(channelId);
} catch (IOException | InterruptedException e) {
ExceptionUtils.rethrow(e);
}
for (StreamInfoItem item : info.getRelatedItems()) {
long time = item.getUploadDate() != null
? item.getUploadDate().offsetDateTime().toInstant().toEpochMilli()
: System.currentTimeMillis();
if ((System.currentTimeMillis() - time) < TimeUnit.DAYS.toMillis(10))
handleNewVideo(item.getUrl(), time);
}
}
sess.close();
});
return Constants.mapper.writeValueAsBytes(new AcceptedResponse()); return Constants.mapper.writeValueAsBytes(new AcceptedResponse());
} }
@ -558,7 +605,7 @@ public class ResponseHelper {
Session s = DatabaseSessionFactory.createSession(); Session s = DatabaseSessionFactory.createSession();
User user = DatabaseHelper.getUserFromSession(s, session); User user = DatabaseHelper.getUserFromSessionWithSubscribed(s, session);
if (user != null) { if (user != null) {
if (user.getSubscribed().remove(channelId)) { if (user.getSubscribed().remove(channelId)) {
@ -575,6 +622,60 @@ public class ResponseHelper {
} }
public static final byte[] isSubscribedResponse(String session, String channelId)
throws IOException, NoSuchAlgorithmException, InvalidKeySpecException {
Session s = DatabaseSessionFactory.createSession();
User user = DatabaseHelper.getUserFromSessionWithSubscribed(s, session);
if (user != null) {
if (user.getSubscribed().contains(channelId)) {
s.close();
return Constants.mapper.writeValueAsBytes(new SubscribeStatusResponse(true));
}
s.close();
return Constants.mapper.writeValueAsBytes(new SubscribeStatusResponse(false));
}
s.close();
return Constants.mapper.writeValueAsBytes(new AuthenticationFailureResponse());
}
public static final byte[] feedResponse(String session)
throws IOException, NoSuchAlgorithmException, InvalidKeySpecException {
Session s = DatabaseSessionFactory.createSession();
User user = DatabaseHelper.getUserFromSessionWithSubscribed(s, session);
if (user != null) {
List<Video> videos = DatabaseHelper.getVideosFromChannelIds(s, user.getSubscribed());
List<FeedItem> feedItems = new ObjectArrayList<>();
videos.forEach(video -> {
feedItems.add(new FeedItem(video.getId(), video.getTitle(), rewriteURL(video.getThumbnail()),
video.getChannel().getUploaderId(), video.getChannel().getUploader(),
rewriteURL(video.getChannel().getUploaderAvatar()), video.getViews(), video.getDuration(),
video.getUploaded(), video.getChannel().isVerified()));
});
Collections.sort(feedItems, (a, b) -> (int) (b.uploaded - a.uploaded));
s.close();
return Constants.mapper.writeValueAsBytes(feedItems);
}
s.close();
return Constants.mapper.writeValueAsBytes(new AuthenticationFailureResponse());
}
private static final String getLBRYStreamURL(String videoId) throws IOException, InterruptedException { private static final String getLBRYStreamURL(String videoId) throws IOException, InterruptedException {
String lbryId = new JSONObject(Constants.h2client.send(HttpRequest String lbryId = new JSONObject(Constants.h2client.send(HttpRequest
@ -594,6 +695,92 @@ public class ResponseHelper {
} }
public static void handleNewVideo(String url, long time) {
try {
handleNewVideo(StreamInfo.getInfo(url), time);
} catch (IOException | ExtractionException e) {
ExceptionUtils.rethrow(e);
}
}
private static void handleNewVideo(StreamInfo info, long time) {
Session s = DatabaseSessionFactory.createSession();
me.kavin.piped.utils.obj.db.Channel channel = DatabaseHelper.getChannelFromId(s,
info.getUploaderUrl().substring("https://www.youtube.com/channel/".length()));
long infoTime = info.getUploadDate() != null ? info.getUploadDate().offsetDateTime().toInstant().toEpochMilli()
: System.currentTimeMillis();
Video video;
if (channel != null && (video = DatabaseHelper.getVideoFromId(s, info.getId())) == null
&& (System.currentTimeMillis() - infoTime) < TimeUnit.DAYS.toMillis(10)) {
video = new Video(info.getId(), info.getName(), info.getViewCount(), info.getDuration(),
Math.max(infoTime, time), info.getThumbnailUrl(), channel);
s.save(video);
s.beginTransaction().commit();
}
s.close();
}
private static void updateViews(String id, long views) {
Multithreading.runAsync(() -> {
try {
Session s = DatabaseSessionFactory.createSession();
Video video = DatabaseHelper.getVideoFromId(s, id);
if (video != null) {
video.setViews(views);
s.update(video);
s.beginTransaction().commit();
}
s.close();
} catch (Exception e) {
e.printStackTrace();
}
});
}
public static void subscribePubSub(String channelId) throws IOException, InterruptedException {
System.out.println(String.format("PubSub: Subscribing to %s", channelId));
String callback = Constants.PUBLIC_URL + "/webhooks/pubsub";
String topic = "https://www.youtube.com/xml/feeds/videos.xml?channel_id=" + channelId;
Builder builder = HttpRequest.newBuilder(URI.create("https://pubsubhubbub.appspot.com/subscribe"));
Map<String, String> formParams = new Object2ObjectOpenHashMap<>();
StringBuilder formBody = new StringBuilder();
builder.header("content-type", "application/x-www-form-urlencoded");
formParams.put("hub.callback", callback);
formParams.put("hub.topic", topic);
formParams.put("hub.verify", "async");
formParams.put("hub.mode", "subscribe");
formParams.put("hub.lease_seconds", "432000");
formParams.forEach((name, value) -> {
formBody.append(name + "=" + URLUtils.silentEncode(value) + "&");
});
builder.method("POST", BodyPublishers.ofString(String.valueOf(formBody.substring(0, formBody.length() - 1))));
Constants.h2client.send(builder.build(), BodyHandlers.ofInputStream());
}
private static final String optionalSubstring(String s, int index) { private static final String optionalSubstring(String s, int index) {
return s == null || s.isEmpty() ? null : s.substring(index); return s == null || s.isEmpty() ? null : s.substring(index);
} }

View file

@ -0,0 +1,24 @@
package me.kavin.piped.utils.obj;
public class FeedItem {
public String id, title, thumbnail, uploader_id, uploader, uploaderAvatar;
public long views, duration, uploaded;
public boolean verified;
public FeedItem(String id, String title, String thumbnail, String uploader_id, String uploader,
String uploaderAvatar, long views, long duration, long uploaded, boolean verified) {
this.id = id;
this.title = title;
this.thumbnail = thumbnail;
this.uploader_id = uploader_id;
this.uploader = uploader;
this.uploaderAvatar = uploaderAvatar;
this.views = views;
this.duration = duration;
this.uploaded = uploaded;
this.verified = verified;
}
}

View file

@ -10,7 +10,8 @@ import javax.persistence.ManyToOne;
import javax.persistence.Table; import javax.persistence.Table;
@Entity @Entity
@Table(name = "videos", indexes = { @Index(columnList = "id", name = "id_idx") }) @Table(name = "videos", indexes = { @Index(columnList = "id", name = "id_idx"),
@Index(columnList = "uploader_id", name = "uploader_id_idx") })
public class Video { public class Video {
@Id @Id
@ -24,7 +25,7 @@ public class Video {
private long views; private long views;
@Column(name = "duration") @Column(name = "duration")
private int duration; private long duration;
@Column(name = "uploaded") @Column(name = "uploaded")
private long uploaded; private long uploaded;
@ -39,7 +40,7 @@ public class Video {
public Video() { public Video() {
} }
public Video(String id, String title, long views, int duration, long uploaded, String thumbnail, Channel channel) { public Video(String id, String title, long views, long duration, long uploaded, String thumbnail, Channel channel) {
this.id = id; this.id = id;
this.title = title; this.title = title;
this.views = views; this.views = views;
@ -73,11 +74,11 @@ public class Video {
this.views = views; this.views = views;
} }
public int getDuration() { public long getDuration() {
return duration; return duration;
} }
public void setDuration(int duration) { public void setDuration(long duration) {
this.duration = duration; this.duration = duration;
} }

View file

@ -1,21 +0,0 @@
package me.kavin.piped.utils.resp;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Root;
import org.hibernate.Session;
import me.kavin.piped.utils.obj.db.User;
public class DatabaseHelper {
public static final User getUserFromSession(Session s, String session) {
CriteriaBuilder cb = s.getCriteriaBuilder();
CriteriaQuery<User> cr = cb.createQuery(User.class);
Root<User> root = cr.from(User.class);
cr.select(root).where(root.get("sessionId").in(session));
return s.createQuery(cr).uniqueResult();
}
}

View file

@ -0,0 +1,10 @@
package me.kavin.piped.utils.resp;
public class SubscribeStatusResponse {
public boolean subscribed;
public SubscribeStatusResponse(boolean subscribed) {
this.subscribed = subscribed;
}
}