diff --git a/config.properties b/config.properties index aa923d6..1b79a6c 100644 --- a/config.properties +++ b/config.properties @@ -1,48 +1,72 @@ -# The port to Listen on. -PORT:8080 -# The number of workers to use for the server -HTTP_WORKERS:2 -# Proxy -PROXY_PART:https://pipedproxy-ams.kavin.rocks -# Outgoing HTTP Proxy - eg: 127.0.0.1:8118 -#HTTP_PROXY: 127.0.0.1:8118 -# Captcha Parameters -CAPTCHA_BASE_URL:https://api.capmonster.cloud/ -CAPTCHA_API_KEY:INSERT_HERE -# Public API URL -API_URL:https://pipedapi.kavin.rocks -# Public Frontend URL -FRONTEND_URL:https://piped.video -# Enable haveibeenpwned compromised password API -COMPROMISED_PASSWORD_CHECK:true -# Disable Registration -DISABLE_REGISTRATION:false -# Feed Retention Time in Days -FEED_RETENTION:30 -# Disable CPU expensive timers (for nodes with low CPU, at least one node should have this disabled) -DISABLE_TIMERS:false -# RYD Proxy URL (see https://github.com/TeamPiped/RYD-Proxy) -RYD_PROXY_URL:https://ryd-proxy.kavin.rocks -# SponsorBlock Servers(s) -# Comma separated list of SponsorBlock Servers to use -SPONSORBLOCK_SERVERS:https://sponsor.ajay.app,https://sponsorblock.kavin.rocks -# Disable the usage of RYD -DISABLE_RYD:false -# Disable API server (node just runs timers if enabled) -DISABLE_SERVER:false -# Disable the inclusion of LBRY streams -DISABLE_LBRY:false -# How long should unauthenticated subscriptions last for -SUBSCRIPTIONS_EXPIRY:30 -# Sentry DSN -# Use Sentry to log errors and trace performance -#SENTRY_DSN:INSERT_HERE -# Hibernate properties -hibernate.connection.url:jdbc:postgresql://postgres:5432/piped -hibernate.connection.driver_class:org.postgresql.Driver -hibernate.dialect:org.hibernate.dialect.PostgreSQLDialect -hibernate.connection.username:piped -hibernate.connection.password:changeme -# Frontend configuration -#frontend.statusPageUrl:https://kavin.rocks -#frontend.donationUrl:https://kavin.rocks +# The port to Listen on. +PORT:8080 +# The number of workers to use for the server +HTTP_WORKERS:2 + +# Proxy +PROXY_PART:https://pipedproxy-ams.kavin.rocks + +# Outgoing HTTP Proxy - eg: 127.0.0.1:8118 +#HTTP_PROXY: 127.0.0.1:8118 + +# Captcha Parameters +CAPTCHA_BASE_URL:https://api.capmonster.cloud/ +CAPTCHA_API_KEY:INSERT_HERE + +# Public API URL +API_URL:https://pipedapi.kavin.rocks + +# Public Frontend URL +FRONTEND_URL:https://piped.video + +# Enable haveibeenpwned compromised password API +COMPROMISED_PASSWORD_CHECK:true + +# Disable Registration +DISABLE_REGISTRATION:false + +# Feed Retention Time in Days +FEED_RETENTION:30 + +# Disable CPU expensive timers (for nodes with low CPU, at least one node should have this disabled) +DISABLE_TIMERS:false + +# RYD Proxy URL (see https://github.com/TeamPiped/RYD-Proxy) +RYD_PROXY_URL:https://ryd-proxy.kavin.rocks + +# SponsorBlock Servers(s) +# Comma separated list of SponsorBlock Servers to use +SPONSORBLOCK_SERVERS:https://sponsor.ajay.app,https://sponsorblock.kavin.rocks + +# Disable the usage of RYD +DISABLE_RYD:false + +# Disable API server (node just runs timers if enabled) +DISABLE_SERVER:false + +# Disable the inclusion of LBRY streams +DISABLE_LBRY:false + +# How long should unauthenticated subscriptions last for +SUBSCRIPTIONS_EXPIRY:30 + +# Sentry DSN +# Use Sentry to log errors and trace performance +#SENTRY_DSN:INSERT_HERE + +# Matrix Client Server URL +MATRIX_SERVER:https://matrix-client.matrix.org +# Matrix Access Token +# If not present, will work in anon mode +#MATRIX_TOKEN:INSERT_HERE + +# Hibernate properties +hibernate.connection.url:jdbc:postgresql://postgres:5432/piped +hibernate.connection.driver_class:org.postgresql.Driver +hibernate.dialect:org.hibernate.dialect.PostgreSQLDialect +hibernate.connection.username:piped +hibernate.connection.password:changeme + +# Frontend configuration +#frontend.statusPageUrl:https://kavin.rocks +#frontend.donationUrl:https://kavin.rocks diff --git a/src/main/java/me/kavin/piped/Main.java b/src/main/java/me/kavin/piped/Main.java index ea2af43..007e32a 100644 --- a/src/main/java/me/kavin/piped/Main.java +++ b/src/main/java/me/kavin/piped/Main.java @@ -6,8 +6,11 @@ import jakarta.persistence.criteria.CriteriaBuilder; import me.kavin.piped.consts.Constants; import me.kavin.piped.server.ServerLauncher; import me.kavin.piped.utils.*; +import me.kavin.piped.utils.matrix.SyncRunner; +import me.kavin.piped.utils.obj.MatrixHelper; import me.kavin.piped.utils.obj.db.PlaylistVideo; import me.kavin.piped.utils.obj.db.Video; +import okhttp3.OkHttpClient; import org.hibernate.Session; import org.hibernate.StatelessSession; import org.schabi.newpipe.extractor.NewPipe; @@ -21,6 +24,8 @@ import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static me.kavin.piped.consts.Constants.MATRIX_SERVER; + public class Main { public static void main(String[] args) throws Exception { @@ -38,6 +43,12 @@ public class Main { Injector.useSpecializer(); + new Thread(new SyncRunner( + new OkHttpClient.Builder().readTimeout(60, TimeUnit.SECONDS).build(), + MATRIX_SERVER, + MatrixHelper.MATRIX_TOKEN) + ).start(); + new Timer().scheduleAtFixedRate(new TimerTask() { @Override public void run() { @@ -136,7 +147,7 @@ public class Main { var subQuery = pvQuery.subquery(me.kavin.piped.utils.obj.db.Playlist.class); var subRoot = subQuery.from(me.kavin.piped.utils.obj.db.Playlist.class); - subQuery.select(subRoot.join("videos").get("id")); + subQuery.select(subRoot.join("videos").get("id")).distinct(true); pvQuery.where(cb.not(pvRoot.get("id").in(subQuery))); diff --git a/src/main/java/me/kavin/piped/consts/Constants.java b/src/main/java/me/kavin/piped/consts/Constants.java index 63de8e9..4475acd 100644 --- a/src/main/java/me/kavin/piped/consts/Constants.java +++ b/src/main/java/me/kavin/piped/consts/Constants.java @@ -70,6 +70,12 @@ public class Constants { public static final String SENTRY_DSN; + public static final String MATRIX_ROOM = "#piped-events:matrix.org"; + + public static final String MATRIX_SERVER; + + public static final String MATRIX_TOKEN; + public static final String VERSION; public static final ObjectMapper mapper = JsonMapper.builder() @@ -114,6 +120,8 @@ public class Constants { if (key.startsWith("hibernate")) hibernateProperties.put(key, value); }); + MATRIX_SERVER = getProperty(prop, "MATRIX_SERVER", "https://matrix-client.matrix.org"); + MATRIX_TOKEN = getProperty(prop, "MATRIX_TOKEN"); prop.forEach((_key, _value) -> { String key = String.valueOf(_key), value = String.valueOf(_value); if (key.startsWith("hibernate")) diff --git a/src/main/java/me/kavin/piped/server/handlers/ChannelHandlers.java b/src/main/java/me/kavin/piped/server/handlers/ChannelHandlers.java index 4099edd..35efd71 100644 --- a/src/main/java/me/kavin/piped/server/handlers/ChannelHandlers.java +++ b/src/main/java/me/kavin/piped/server/handlers/ChannelHandlers.java @@ -7,6 +7,7 @@ import me.kavin.piped.ipfs.IPFS; import me.kavin.piped.utils.*; import me.kavin.piped.utils.obj.*; import me.kavin.piped.utils.obj.db.Video; +import me.kavin.piped.utils.obj.federation.FederatedVideoInfo; import me.kavin.piped.utils.resp.InvalidRequestResponse; import org.apache.commons.lang3.StringUtils; import org.hibernate.StatelessSession; @@ -39,6 +40,23 @@ public class ChannelHandlers { final List relatedStreams = collectRelatedItems(info.getRelatedItems()); + Multithreading.runAsync(() -> info.getRelatedItems().forEach(infoItem -> { + if ( + infoItem.getUploadDate() != null && + System.currentTimeMillis() - infoItem.getUploadDate().offsetDateTime().toInstant().toEpochMilli() + < TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION) + ) + try { + MatrixHelper.sendEvent("video.piped.stream.info", new FederatedVideoInfo( + StringUtils.substring(infoItem.getUrl(), -11), StringUtils.substring(infoItem.getUploaderUrl(), -24), + infoItem.getName(), + infoItem.getDuration(), infoItem.getViewCount()) + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + Multithreading.runAsync(() -> { var channel = DatabaseHelper.getChannelFromId(info.getId()); diff --git a/src/main/java/me/kavin/piped/server/handlers/StreamHandlers.java b/src/main/java/me/kavin/piped/server/handlers/StreamHandlers.java index 5197379..e25d2c8 100644 --- a/src/main/java/me/kavin/piped/server/handlers/StreamHandlers.java +++ b/src/main/java/me/kavin/piped/server/handlers/StreamHandlers.java @@ -9,6 +9,7 @@ import it.unimi.dsi.fastutil.objects.ObjectArrayList; import me.kavin.piped.consts.Constants; import me.kavin.piped.utils.*; import me.kavin.piped.utils.obj.*; +import me.kavin.piped.utils.obj.federation.FederatedVideoInfo; import me.kavin.piped.utils.resp.InvalidRequestResponse; import me.kavin.piped.utils.resp.VideoResolvedResponse; import org.apache.commons.lang3.StringUtils; @@ -147,8 +148,14 @@ public class StreamHandlers { long time = info.getUploadDate() != null ? info.getUploadDate().offsetDateTime().toInstant().toEpochMilli() : System.currentTimeMillis(); - if (info.getUploadDate() != null && System.currentTimeMillis() - time < TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION)) + if (info.getUploadDate() != null && System.currentTimeMillis() - time < TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION)) { VideoHelpers.updateVideo(info.getId(), info, time); + MatrixHelper.sendEvent("video.piped.stream.info", new FederatedVideoInfo( + info.getId(), StringUtils.substring(info.getUploaderUrl(), -24), + info.getName(), + info.getDuration(), info.getViewCount()) + ); + } String lbryId; diff --git a/src/main/java/me/kavin/piped/utils/RequestUtils.java b/src/main/java/me/kavin/piped/utils/RequestUtils.java index d663ecb..0687703 100644 --- a/src/main/java/me/kavin/piped/utils/RequestUtils.java +++ b/src/main/java/me/kavin/piped/utils/RequestUtils.java @@ -1,6 +1,8 @@ package me.kavin.piped.utils; +import com.fasterxml.jackson.databind.JsonNode; import me.kavin.piped.consts.Constants; +import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; @@ -29,4 +31,16 @@ public class RequestUtils { return responseString; } + + public static JsonNode getJsonNode(OkHttpClient client, Request request) throws IOException { + try (var resp = client.newCall(request).execute()) { + try { + return Constants.mapper.readTree(resp.body().byteStream()); + } catch (Exception e) { + if (!resp.isSuccessful()) + ExceptionHandler.handle(e); + throw new RuntimeException("Failed to parse JSON", e); + } + } + } } diff --git a/src/main/java/me/kavin/piped/utils/matrix/SyncRunner.java b/src/main/java/me/kavin/piped/utils/matrix/SyncRunner.java new file mode 100644 index 0000000..d34ab4b --- /dev/null +++ b/src/main/java/me/kavin/piped/utils/matrix/SyncRunner.java @@ -0,0 +1,210 @@ +package me.kavin.piped.utils.matrix; + +import com.fasterxml.jackson.databind.JsonNode; +import me.kavin.piped.utils.*; +import me.kavin.piped.utils.obj.db.Channel; +import me.kavin.piped.utils.obj.federation.FederatedChannelInfo; +import me.kavin.piped.utils.obj.federation.FederatedVideoInfo; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import org.hibernate.StatelessSession; + +import java.io.IOException; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static me.kavin.piped.consts.Constants.mapper; +import static me.kavin.piped.utils.obj.MatrixHelper.*; + +public class SyncRunner implements Runnable { + + private final OkHttpClient client; + private final String url; + private final String token; + + /** + * @param client The OkHttpClient to use + * @param url The URL to send the request to + * @param token The access token to use requests. + */ + public SyncRunner(OkHttpClient client, String url, String token) { + this.client = client; + this.url = url; + this.token = token; + } + + @Override + public void run() { + + try { + String user_id = null; + + if (!UNAUTHENTICATED) { + // whoami to get the user id + user_id = RequestUtils.getJsonNode(client, new Request.Builder() + .url(url + "/_matrix/client/v3/account/whoami") + .header("Authorization", "Bearer " + token) + .build()) + .get("user_id") + .asText(); + } + + System.out.println("Logged in as user: " + user_id); + + // Join room and get the room id + System.out.println("Room ID: " + ROOM_ID); + + String filter_id = null; + + // We have to filter on client-side if unauthenticated + if (!UNAUTHENTICATED) { + // Get the filter id + filter_id = getFilterId(user_id, ROOM_ID); + } + + System.out.println("Filter ID: " + filter_id); + + String next_batch = null; + + //noinspection InfiniteLoopStatement + while (true) { + try { + String url; + + if (UNAUTHENTICATED) { + url = this.url + "/_matrix/client/v3/events?room_id=" + URLUtils.silentEncode(ROOM_ID); + } else { + url = this.url + "/_matrix/client/v3/sync?filter=" + filter_id; + } + + boolean initial_sync = next_batch == null; + + if (initial_sync) { + url += "&timeout=0"; + } else { + url += "&" + (UNAUTHENTICATED ? "from" : "since") + "=" + next_batch; + url += "&timeout=30000"; + } + + var response = RequestUtils.getJsonNode(client, new Request.Builder() + .url(url) + .header("Authorization", "Bearer " + token) + .build()); + + Set events; + + if (UNAUTHENTICATED) { + events = StreamSupport.stream(response.get("chunk").spliterator(), true) + .filter(event -> event.get("type").asText().startsWith("video.piped.")) + .filter(event -> { + var sender = event.get("sender").asText(); + for (var user : AUTHORIZED_USERS) + if (user.asText().equals(sender)) + return true; + return false; + }) + .collect(Collectors.toUnmodifiableSet()); + } else { + var resp_events = response.at("/rooms/join/" + ROOM_ID + "/timeline").get("events"); + if (resp_events != null) { + events = StreamSupport.stream(resp_events.spliterator(), true) + .collect(Collectors.toUnmodifiableSet()); + } else { + events = Set.of(); + } + } + + if (!initial_sync && events.size() > 0) { + + System.out.println("Got " + events.size() + " events"); + + for (var event : events) { + + var type = event.get("type").asText(); + + if (event.get("sender").asText().equals(user_id)) { + + if (type.startsWith("video.piped.stream.bypass.")) { + // TODO: Implement geo-restriction bypassing + } + + continue; + } + + switch (type) { + case "video.piped.stream.info" -> { + FederatedVideoInfo info = mapper.treeToValue(event.at("/content/content"), FederatedVideoInfo.class); + Multithreading.runAsync(() -> { + try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { + var video = DatabaseHelper.getVideoFromId(s, info.getVideoId()); + Channel channel; + if (video != null) + VideoHelpers.updateVideo(s, video, + info.getViews(), + info.getDuration(), + info.getTitle()); + else if ((channel = DatabaseHelper.getChannelFromId(s, info.getUploaderId())) != null) { + VideoHelpers.handleNewVideo("https://www.youtube.com/watch?v=" + info.getVideoId(), System.currentTimeMillis(), channel); + } + } + }); + } + case "video.piped.channel.info" -> { + FederatedChannelInfo info = mapper.treeToValue(event.at("/content/content"), FederatedChannelInfo.class); + // TODO: Handle and send channel updates + } + default -> System.err.println("Unknown event type: " + type); + } + } + } + + next_batch = UNAUTHENTICATED ? + response.get("end").asText() : + response.get("next_batch").asText(); + + } catch (Exception ignored) { + Thread.sleep(1000); + } + } + + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private String getFilterId(String user_id, String room_id) throws IOException { + + var root = mapper.createObjectNode(); + + var room = root.putObject("room"); + var timeline = room + .putObject("timeline") + .put("lazy_load_members", true) + .put("limit", 50); + + room.putArray("rooms").add(room_id); + timeline.set("senders", AUTHORIZED_USERS); + + timeline.putArray("types").add("video.piped.*"); + + root.putObject("account_data").putArray("not_types").add("*"); + root.putObject("presence").putArray("not_types").add("*"); + room.putObject("account_data").put("lazy_load_members", true).putArray("not_types").add("*"); + room.putObject("ephemeral").put("lazy_load_members", true).putArray("not_types").add("*"); + room.putObject("state").put("lazy_load_members", true).putArray("not_types").add("*"); + + // Create a filter + return RequestUtils.getJsonNode(client, new Request.Builder() + .url(url + "/_matrix/client/v3/user/" + URLUtils.silentEncode(user_id) + "/filter") + .header("Authorization", "Bearer " + token) + .post(RequestBody.create(mapper.writeValueAsBytes( + root + ), MediaType.get("application/json"))) + .build()) + .get("filter_id") + .asText(); + } +} diff --git a/src/main/java/me/kavin/piped/utils/obj/MatrixHelper.java b/src/main/java/me/kavin/piped/utils/obj/MatrixHelper.java new file mode 100644 index 0000000..fdccabb --- /dev/null +++ b/src/main/java/me/kavin/piped/utils/obj/MatrixHelper.java @@ -0,0 +1,85 @@ +package me.kavin.piped.utils.obj; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import me.kavin.piped.consts.Constants; +import me.kavin.piped.utils.ExceptionHandler; +import me.kavin.piped.utils.RequestUtils; +import me.kavin.piped.utils.URLUtils; +import okhttp3.MediaType; +import okhttp3.Request; +import okhttp3.RequestBody; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; + +import static me.kavin.piped.consts.Constants.*; +import static me.kavin.piped.utils.URLUtils.silentEncode; + +public class MatrixHelper { + + public static final boolean UNAUTHENTICATED; + public static final String MATRIX_TOKEN; + public static final String ROOM_ID; + + public static final ArrayNode AUTHORIZED_USERS; + + static { + UNAUTHENTICATED = StringUtils.isEmpty(Constants.MATRIX_TOKEN); + + try { + + if (UNAUTHENTICATED) { + MATRIX_TOKEN = RequestUtils.getJsonNode(h2client, new Request.Builder() + .url(MATRIX_SERVER + "/_matrix/client/v3/register?kind=guest") + .post(RequestBody.create(mapper.writeValueAsBytes( + mapper.createObjectNode().put("initial_device_display_name", "Piped's Backend")), + MediaType.get("application/json"))) + .build() + ).get("access_token").asText(); + } else { + MATRIX_TOKEN = Constants.MATRIX_TOKEN; + } + + if (UNAUTHENTICATED) { + ROOM_ID = RequestUtils.getJsonNode(h2client, new Request.Builder() + .url(MATRIX_SERVER + "/_matrix/client/v3/directory/room/" + URLUtils.silentEncode(MATRIX_ROOM)) + .header("Authorization", "Bearer " + MATRIX_TOKEN) + .build()) + .get("room_id") + .asText(); + } else { + ROOM_ID = RequestUtils.getJsonNode(h2client, new Request.Builder() + .url(MATRIX_SERVER + "/_matrix/client/v3/join/" + URLUtils.silentEncode(MATRIX_ROOM)) + .post(RequestBody.create(mapper.writeValueAsBytes(mapper.createObjectNode()), MediaType.get("application/json"))) + .header("Authorization", "Bearer " + MATRIX_TOKEN) + .build()) + .get("room_id") + .asText(); + } + + AUTHORIZED_USERS = (ArrayNode) mapper.readTree(RequestUtils.sendGet("https://raw.githubusercontent.com/TeamPiped/piped-federation/main/authorized-users.json")); + + } catch (Exception e) { + ExceptionHandler.handle(e); + throw new RuntimeException(e); + } + } + + public static String sendEvent(String type, Object content) throws IOException { + + if (UNAUTHENTICATED) + return null; + + return RequestUtils.getJsonNode(h2client, new Request.Builder() + .url(MATRIX_SERVER + "/_matrix/client/v3/rooms/" + silentEncode(ROOM_ID) + "/send/" + type + "/" + RandomStringUtils.randomAlphanumeric(12)) + .header("Authorization", "Bearer " + MATRIX_TOKEN) + .put(RequestBody.create(mapper.writeValueAsBytes( + mapper.createObjectNode() + .put("msgtype", type) + .set("body", mapper.valueToTree(content)) + ), MediaType.get("application/json"))) + .build() + ).get("event_id").asText(); + } +} diff --git a/src/main/java/me/kavin/piped/utils/obj/federation/FederatedChannelInfo.java b/src/main/java/me/kavin/piped/utils/obj/federation/FederatedChannelInfo.java new file mode 100644 index 0000000..c354018 --- /dev/null +++ b/src/main/java/me/kavin/piped/utils/obj/federation/FederatedChannelInfo.java @@ -0,0 +1,7 @@ +package me.kavin.piped.utils.obj.federation; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties +public class FederatedChannelInfo { +} diff --git a/src/main/java/me/kavin/piped/utils/obj/federation/FederatedVideoInfo.java b/src/main/java/me/kavin/piped/utils/obj/federation/FederatedVideoInfo.java new file mode 100644 index 0000000..2da82cb --- /dev/null +++ b/src/main/java/me/kavin/piped/utils/obj/federation/FederatedVideoInfo.java @@ -0,0 +1,41 @@ +package me.kavin.piped.utils.obj.federation; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class FederatedVideoInfo { + + private String videoId, uploaderId, title; + private long duration, views; + + public FederatedVideoInfo() { + } + + public FederatedVideoInfo(String videoId, String uploaderId, String title, long duration, long views) { + this.videoId = videoId; + this.uploaderId = uploaderId; + this.title = title; + this.duration = duration; + this.views = views; + } + + public String getVideoId() { + return videoId; + } + + public String getUploaderId() { + return uploaderId; + } + + public String getTitle() { + return title; + } + + public long getDuration() { + return duration; + } + + public long getViews() { + return views; + } +}