Implement federation of video data.

This commit is contained in:
Kavin 2022-11-13 00:12:55 +00:00
parent f27924dc99
commit cd6e63130b
No known key found for this signature in database
GPG key ID: 49451E4482CC5BCD
10 changed files with 475 additions and 50 deletions

View file

@ -1,48 +1,72 @@
# The port to Listen on.
PORT:8080
# The number of workers to use for the server
HTTP_WORKERS:2
# Proxy
PROXY_PART:https://pipedproxy-ams.kavin.rocks
# Outgoing HTTP Proxy - eg: 127.0.0.1:8118
#HTTP_PROXY: 127.0.0.1:8118
# Captcha Parameters
CAPTCHA_BASE_URL:https://api.capmonster.cloud/
CAPTCHA_API_KEY:INSERT_HERE
# Public API URL
API_URL:https://pipedapi.kavin.rocks
# Public Frontend URL
FRONTEND_URL:https://piped.video
# Enable haveibeenpwned compromised password API
COMPROMISED_PASSWORD_CHECK:true
# Disable Registration
DISABLE_REGISTRATION:false
# Feed Retention Time in Days
FEED_RETENTION:30
# Disable CPU expensive timers (for nodes with low CPU, at least one node should have this disabled)
DISABLE_TIMERS:false
# RYD Proxy URL (see https://github.com/TeamPiped/RYD-Proxy)
RYD_PROXY_URL:https://ryd-proxy.kavin.rocks
# SponsorBlock Servers(s)
# Comma separated list of SponsorBlock Servers to use
SPONSORBLOCK_SERVERS:https://sponsor.ajay.app,https://sponsorblock.kavin.rocks
# Disable the usage of RYD
DISABLE_RYD:false
# Disable API server (node just runs timers if enabled)
DISABLE_SERVER:false
# Disable the inclusion of LBRY streams
DISABLE_LBRY:false
# How long should unauthenticated subscriptions last for
SUBSCRIPTIONS_EXPIRY:30
# Sentry DSN
# Use Sentry to log errors and trace performance
#SENTRY_DSN:INSERT_HERE
# Hibernate properties
hibernate.connection.url:jdbc:postgresql://postgres:5432/piped
hibernate.connection.driver_class:org.postgresql.Driver
hibernate.dialect:org.hibernate.dialect.PostgreSQLDialect
hibernate.connection.username:piped
hibernate.connection.password:changeme
# Frontend configuration
#frontend.statusPageUrl:https://kavin.rocks
#frontend.donationUrl:https://kavin.rocks
# The port to Listen on.
PORT:8080
# The number of workers to use for the server
HTTP_WORKERS:2
# Proxy
PROXY_PART:https://pipedproxy-ams.kavin.rocks
# Outgoing HTTP Proxy - eg: 127.0.0.1:8118
#HTTP_PROXY: 127.0.0.1:8118
# Captcha Parameters
CAPTCHA_BASE_URL:https://api.capmonster.cloud/
CAPTCHA_API_KEY:INSERT_HERE
# Public API URL
API_URL:https://pipedapi.kavin.rocks
# Public Frontend URL
FRONTEND_URL:https://piped.video
# Enable haveibeenpwned compromised password API
COMPROMISED_PASSWORD_CHECK:true
# Disable Registration
DISABLE_REGISTRATION:false
# Feed Retention Time in Days
FEED_RETENTION:30
# Disable CPU expensive timers (for nodes with low CPU, at least one node should have this disabled)
DISABLE_TIMERS:false
# RYD Proxy URL (see https://github.com/TeamPiped/RYD-Proxy)
RYD_PROXY_URL:https://ryd-proxy.kavin.rocks
# SponsorBlock Servers(s)
# Comma separated list of SponsorBlock Servers to use
SPONSORBLOCK_SERVERS:https://sponsor.ajay.app,https://sponsorblock.kavin.rocks
# Disable the usage of RYD
DISABLE_RYD:false
# Disable API server (node just runs timers if enabled)
DISABLE_SERVER:false
# Disable the inclusion of LBRY streams
DISABLE_LBRY:false
# How long should unauthenticated subscriptions last for
SUBSCRIPTIONS_EXPIRY:30
# Sentry DSN
# Use Sentry to log errors and trace performance
#SENTRY_DSN:INSERT_HERE
# Matrix Client Server URL
MATRIX_SERVER:https://matrix-client.matrix.org
# Matrix Access Token
# If not present, will work in anon mode
#MATRIX_TOKEN:INSERT_HERE
# Hibernate properties
hibernate.connection.url:jdbc:postgresql://postgres:5432/piped
hibernate.connection.driver_class:org.postgresql.Driver
hibernate.dialect:org.hibernate.dialect.PostgreSQLDialect
hibernate.connection.username:piped
hibernate.connection.password:changeme
# Frontend configuration
#frontend.statusPageUrl:https://kavin.rocks
#frontend.donationUrl:https://kavin.rocks

View file

@ -6,8 +6,11 @@ import jakarta.persistence.criteria.CriteriaBuilder;
import me.kavin.piped.consts.Constants;
import me.kavin.piped.server.ServerLauncher;
import me.kavin.piped.utils.*;
import me.kavin.piped.utils.matrix.SyncRunner;
import me.kavin.piped.utils.obj.MatrixHelper;
import me.kavin.piped.utils.obj.db.PlaylistVideo;
import me.kavin.piped.utils.obj.db.Video;
import okhttp3.OkHttpClient;
import org.hibernate.Session;
import org.hibernate.StatelessSession;
import org.schabi.newpipe.extractor.NewPipe;
@ -21,6 +24,8 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static me.kavin.piped.consts.Constants.MATRIX_SERVER;
public class Main {
public static void main(String[] args) throws Exception {
@ -38,6 +43,12 @@ public class Main {
Injector.useSpecializer();
new Thread(new SyncRunner(
new OkHttpClient.Builder().readTimeout(60, TimeUnit.SECONDS).build(),
MATRIX_SERVER,
MatrixHelper.MATRIX_TOKEN)
).start();
new Timer().scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
@ -136,7 +147,7 @@ public class Main {
var subQuery = pvQuery.subquery(me.kavin.piped.utils.obj.db.Playlist.class);
var subRoot = subQuery.from(me.kavin.piped.utils.obj.db.Playlist.class);
subQuery.select(subRoot.join("videos").get("id"));
subQuery.select(subRoot.join("videos").get("id")).distinct(true);
pvQuery.where(cb.not(pvRoot.get("id").in(subQuery)));

View file

@ -70,6 +70,12 @@ public class Constants {
public static final String SENTRY_DSN;
public static final String MATRIX_ROOM = "#piped-events:matrix.org";
public static final String MATRIX_SERVER;
public static final String MATRIX_TOKEN;
public static final String VERSION;
public static final ObjectMapper mapper = JsonMapper.builder()
@ -114,6 +120,8 @@ public class Constants {
if (key.startsWith("hibernate"))
hibernateProperties.put(key, value);
});
MATRIX_SERVER = getProperty(prop, "MATRIX_SERVER", "https://matrix-client.matrix.org");
MATRIX_TOKEN = getProperty(prop, "MATRIX_TOKEN");
prop.forEach((_key, _value) -> {
String key = String.valueOf(_key), value = String.valueOf(_value);
if (key.startsWith("hibernate"))

View file

@ -7,6 +7,7 @@ import me.kavin.piped.ipfs.IPFS;
import me.kavin.piped.utils.*;
import me.kavin.piped.utils.obj.*;
import me.kavin.piped.utils.obj.db.Video;
import me.kavin.piped.utils.obj.federation.FederatedVideoInfo;
import me.kavin.piped.utils.resp.InvalidRequestResponse;
import org.apache.commons.lang3.StringUtils;
import org.hibernate.StatelessSession;
@ -39,6 +40,23 @@ public class ChannelHandlers {
final List<ContentItem> relatedStreams = collectRelatedItems(info.getRelatedItems());
Multithreading.runAsync(() -> info.getRelatedItems().forEach(infoItem -> {
if (
infoItem.getUploadDate() != null &&
System.currentTimeMillis() - infoItem.getUploadDate().offsetDateTime().toInstant().toEpochMilli()
< TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION)
)
try {
MatrixHelper.sendEvent("video.piped.stream.info", new FederatedVideoInfo(
StringUtils.substring(infoItem.getUrl(), -11), StringUtils.substring(infoItem.getUploaderUrl(), -24),
infoItem.getName(),
infoItem.getDuration(), infoItem.getViewCount())
);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
Multithreading.runAsync(() -> {
var channel = DatabaseHelper.getChannelFromId(info.getId());

View file

@ -9,6 +9,7 @@ import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import me.kavin.piped.consts.Constants;
import me.kavin.piped.utils.*;
import me.kavin.piped.utils.obj.*;
import me.kavin.piped.utils.obj.federation.FederatedVideoInfo;
import me.kavin.piped.utils.resp.InvalidRequestResponse;
import me.kavin.piped.utils.resp.VideoResolvedResponse;
import org.apache.commons.lang3.StringUtils;
@ -147,8 +148,14 @@ public class StreamHandlers {
long time = info.getUploadDate() != null ? info.getUploadDate().offsetDateTime().toInstant().toEpochMilli()
: System.currentTimeMillis();
if (info.getUploadDate() != null && System.currentTimeMillis() - time < TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION))
if (info.getUploadDate() != null && System.currentTimeMillis() - time < TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION)) {
VideoHelpers.updateVideo(info.getId(), info, time);
MatrixHelper.sendEvent("video.piped.stream.info", new FederatedVideoInfo(
info.getId(), StringUtils.substring(info.getUploaderUrl(), -24),
info.getName(),
info.getDuration(), info.getViewCount())
);
}
String lbryId;

View file

@ -1,6 +1,8 @@
package me.kavin.piped.utils;
import com.fasterxml.jackson.databind.JsonNode;
import me.kavin.piped.consts.Constants;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
@ -29,4 +31,16 @@ public class RequestUtils {
return responseString;
}
public static JsonNode getJsonNode(OkHttpClient client, Request request) throws IOException {
try (var resp = client.newCall(request).execute()) {
try {
return Constants.mapper.readTree(resp.body().byteStream());
} catch (Exception e) {
if (!resp.isSuccessful())
ExceptionHandler.handle(e);
throw new RuntimeException("Failed to parse JSON", e);
}
}
}
}

View file

@ -0,0 +1,210 @@
package me.kavin.piped.utils.matrix;
import com.fasterxml.jackson.databind.JsonNode;
import me.kavin.piped.utils.*;
import me.kavin.piped.utils.obj.db.Channel;
import me.kavin.piped.utils.obj.federation.FederatedChannelInfo;
import me.kavin.piped.utils.obj.federation.FederatedVideoInfo;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import org.hibernate.StatelessSession;
import java.io.IOException;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static me.kavin.piped.consts.Constants.mapper;
import static me.kavin.piped.utils.obj.MatrixHelper.*;
public class SyncRunner implements Runnable {
private final OkHttpClient client;
private final String url;
private final String token;
/**
* @param client The OkHttpClient to use
* @param url The URL to send the request to
* @param token The access token to use requests.
*/
public SyncRunner(OkHttpClient client, String url, String token) {
this.client = client;
this.url = url;
this.token = token;
}
@Override
public void run() {
try {
String user_id = null;
if (!UNAUTHENTICATED) {
// whoami to get the user id
user_id = RequestUtils.getJsonNode(client, new Request.Builder()
.url(url + "/_matrix/client/v3/account/whoami")
.header("Authorization", "Bearer " + token)
.build())
.get("user_id")
.asText();
}
System.out.println("Logged in as user: " + user_id);
// Join room and get the room id
System.out.println("Room ID: " + ROOM_ID);
String filter_id = null;
// We have to filter on client-side if unauthenticated
if (!UNAUTHENTICATED) {
// Get the filter id
filter_id = getFilterId(user_id, ROOM_ID);
}
System.out.println("Filter ID: " + filter_id);
String next_batch = null;
//noinspection InfiniteLoopStatement
while (true) {
try {
String url;
if (UNAUTHENTICATED) {
url = this.url + "/_matrix/client/v3/events?room_id=" + URLUtils.silentEncode(ROOM_ID);
} else {
url = this.url + "/_matrix/client/v3/sync?filter=" + filter_id;
}
boolean initial_sync = next_batch == null;
if (initial_sync) {
url += "&timeout=0";
} else {
url += "&" + (UNAUTHENTICATED ? "from" : "since") + "=" + next_batch;
url += "&timeout=30000";
}
var response = RequestUtils.getJsonNode(client, new Request.Builder()
.url(url)
.header("Authorization", "Bearer " + token)
.build());
Set<JsonNode> events;
if (UNAUTHENTICATED) {
events = StreamSupport.stream(response.get("chunk").spliterator(), true)
.filter(event -> event.get("type").asText().startsWith("video.piped."))
.filter(event -> {
var sender = event.get("sender").asText();
for (var user : AUTHORIZED_USERS)
if (user.asText().equals(sender))
return true;
return false;
})
.collect(Collectors.toUnmodifiableSet());
} else {
var resp_events = response.at("/rooms/join/" + ROOM_ID + "/timeline").get("events");
if (resp_events != null) {
events = StreamSupport.stream(resp_events.spliterator(), true)
.collect(Collectors.toUnmodifiableSet());
} else {
events = Set.of();
}
}
if (!initial_sync && events.size() > 0) {
System.out.println("Got " + events.size() + " events");
for (var event : events) {
var type = event.get("type").asText();
if (event.get("sender").asText().equals(user_id)) {
if (type.startsWith("video.piped.stream.bypass.")) {
// TODO: Implement geo-restriction bypassing
}
continue;
}
switch (type) {
case "video.piped.stream.info" -> {
FederatedVideoInfo info = mapper.treeToValue(event.at("/content/content"), FederatedVideoInfo.class);
Multithreading.runAsync(() -> {
try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
var video = DatabaseHelper.getVideoFromId(s, info.getVideoId());
Channel channel;
if (video != null)
VideoHelpers.updateVideo(s, video,
info.getViews(),
info.getDuration(),
info.getTitle());
else if ((channel = DatabaseHelper.getChannelFromId(s, info.getUploaderId())) != null) {
VideoHelpers.handleNewVideo("https://www.youtube.com/watch?v=" + info.getVideoId(), System.currentTimeMillis(), channel);
}
}
});
}
case "video.piped.channel.info" -> {
FederatedChannelInfo info = mapper.treeToValue(event.at("/content/content"), FederatedChannelInfo.class);
// TODO: Handle and send channel updates
}
default -> System.err.println("Unknown event type: " + type);
}
}
}
next_batch = UNAUTHENTICATED ?
response.get("end").asText() :
response.get("next_batch").asText();
} catch (Exception ignored) {
Thread.sleep(1000);
}
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
private String getFilterId(String user_id, String room_id) throws IOException {
var root = mapper.createObjectNode();
var room = root.putObject("room");
var timeline = room
.putObject("timeline")
.put("lazy_load_members", true)
.put("limit", 50);
room.putArray("rooms").add(room_id);
timeline.set("senders", AUTHORIZED_USERS);
timeline.putArray("types").add("video.piped.*");
root.putObject("account_data").putArray("not_types").add("*");
root.putObject("presence").putArray("not_types").add("*");
room.putObject("account_data").put("lazy_load_members", true).putArray("not_types").add("*");
room.putObject("ephemeral").put("lazy_load_members", true).putArray("not_types").add("*");
room.putObject("state").put("lazy_load_members", true).putArray("not_types").add("*");
// Create a filter
return RequestUtils.getJsonNode(client, new Request.Builder()
.url(url + "/_matrix/client/v3/user/" + URLUtils.silentEncode(user_id) + "/filter")
.header("Authorization", "Bearer " + token)
.post(RequestBody.create(mapper.writeValueAsBytes(
root
), MediaType.get("application/json")))
.build())
.get("filter_id")
.asText();
}
}

View file

@ -0,0 +1,85 @@
package me.kavin.piped.utils.obj;
import com.fasterxml.jackson.databind.node.ArrayNode;
import me.kavin.piped.consts.Constants;
import me.kavin.piped.utils.ExceptionHandler;
import me.kavin.piped.utils.RequestUtils;
import me.kavin.piped.utils.URLUtils;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import static me.kavin.piped.consts.Constants.*;
import static me.kavin.piped.utils.URLUtils.silentEncode;
public class MatrixHelper {
public static final boolean UNAUTHENTICATED;
public static final String MATRIX_TOKEN;
public static final String ROOM_ID;
public static final ArrayNode AUTHORIZED_USERS;
static {
UNAUTHENTICATED = StringUtils.isEmpty(Constants.MATRIX_TOKEN);
try {
if (UNAUTHENTICATED) {
MATRIX_TOKEN = RequestUtils.getJsonNode(h2client, new Request.Builder()
.url(MATRIX_SERVER + "/_matrix/client/v3/register?kind=guest")
.post(RequestBody.create(mapper.writeValueAsBytes(
mapper.createObjectNode().put("initial_device_display_name", "Piped's Backend")),
MediaType.get("application/json")))
.build()
).get("access_token").asText();
} else {
MATRIX_TOKEN = Constants.MATRIX_TOKEN;
}
if (UNAUTHENTICATED) {
ROOM_ID = RequestUtils.getJsonNode(h2client, new Request.Builder()
.url(MATRIX_SERVER + "/_matrix/client/v3/directory/room/" + URLUtils.silentEncode(MATRIX_ROOM))
.header("Authorization", "Bearer " + MATRIX_TOKEN)
.build())
.get("room_id")
.asText();
} else {
ROOM_ID = RequestUtils.getJsonNode(h2client, new Request.Builder()
.url(MATRIX_SERVER + "/_matrix/client/v3/join/" + URLUtils.silentEncode(MATRIX_ROOM))
.post(RequestBody.create(mapper.writeValueAsBytes(mapper.createObjectNode()), MediaType.get("application/json")))
.header("Authorization", "Bearer " + MATRIX_TOKEN)
.build())
.get("room_id")
.asText();
}
AUTHORIZED_USERS = (ArrayNode) mapper.readTree(RequestUtils.sendGet("https://raw.githubusercontent.com/TeamPiped/piped-federation/main/authorized-users.json"));
} catch (Exception e) {
ExceptionHandler.handle(e);
throw new RuntimeException(e);
}
}
public static String sendEvent(String type, Object content) throws IOException {
if (UNAUTHENTICATED)
return null;
return RequestUtils.getJsonNode(h2client, new Request.Builder()
.url(MATRIX_SERVER + "/_matrix/client/v3/rooms/" + silentEncode(ROOM_ID) + "/send/" + type + "/" + RandomStringUtils.randomAlphanumeric(12))
.header("Authorization", "Bearer " + MATRIX_TOKEN)
.put(RequestBody.create(mapper.writeValueAsBytes(
mapper.createObjectNode()
.put("msgtype", type)
.set("body", mapper.valueToTree(content))
), MediaType.get("application/json")))
.build()
).get("event_id").asText();
}
}

View file

@ -0,0 +1,7 @@
package me.kavin.piped.utils.obj.federation;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties
public class FederatedChannelInfo {
}

View file

@ -0,0 +1,41 @@
package me.kavin.piped.utils.obj.federation;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown = true)
public class FederatedVideoInfo {
private String videoId, uploaderId, title;
private long duration, views;
public FederatedVideoInfo() {
}
public FederatedVideoInfo(String videoId, String uploaderId, String title, long duration, long views) {
this.videoId = videoId;
this.uploaderId = uploaderId;
this.title = title;
this.duration = duration;
this.views = views;
}
public String getVideoId() {
return videoId;
}
public String getUploaderId() {
return uploaderId;
}
public String getTitle() {
return title;
}
public long getDuration() {
return duration;
}
public long getViews() {
return views;
}
}