Merge pull request #520 from TeamPiped/s3-storage-api

Implement persistent client-data storage with S3.
This commit is contained in:
Kavin 2023-02-02 15:53:47 +00:00 committed by GitHub
commit 16b4dfee13
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 211 additions and 21 deletions

View file

@ -41,6 +41,7 @@ dependencies {
implementation 'com.squareup.okhttp3:okhttp-brotli'
implementation 'io.sentry:sentry:6.11.0'
implementation 'rocks.kavin:reqwest4j:1.0'
implementation 'io.minio:minio:8.5.1'
}
shadowJar {

View file

@ -63,6 +63,12 @@ MATRIX_SERVER:https://matrix-client.matrix.org
# Geo Restriction Checker for federated bypassing of Geo Restrictions
#GEO_RESTRICTION_CHECKER_URL:INSERT_HERE
# S3 Configuration Data (compatible with any provider that offers an S3 compatible API)
#S3_ENDPOINT:INSERT_HERE
#S3_ACCESS_KEY:INSERT_HERE
#S3_SECRET_KEY:INSERT_HERE
#S3_BUCKET:INSERT_HERE
# Hibernate properties
hibernate.connection.url:jdbc:postgresql://postgres:5432/piped
hibernate.connection.driver_class:org.postgresql.Driver

View file

@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.minio.MinioClient;
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
import me.kavin.piped.utils.PageMixin;
import me.kavin.piped.utils.RequestUtils;
@ -77,6 +78,16 @@ public class Constants {
public static final String SENTRY_DSN;
public static final String S3_ENDPOINT;
public static final String S3_ACCESS_KEY;
public static final String S3_SECRET_KEY;
public static final String S3_BUCKET;
public static final MinioClient S3_CLIENT;
public static final String MATRIX_ROOM = "#piped-events:matrix.org";
public static final String MATRIX_SERVER;
@ -132,6 +143,18 @@ public class Constants {
DISABLE_LBRY = Boolean.parseBoolean(getProperty(prop, "DISABLE_LBRY", "false"));
SUBSCRIPTIONS_EXPIRY = Integer.parseInt(getProperty(prop, "SUBSCRIPTIONS_EXPIRY", "30"));
SENTRY_DSN = getProperty(prop, "SENTRY_DSN", "");
S3_ENDPOINT = getProperty(prop, "S3_ENDPOINT");
S3_ACCESS_KEY = getProperty(prop, "S3_ACCESS_KEY");
S3_SECRET_KEY = getProperty(prop, "S3_SECRET_KEY");
S3_BUCKET = getProperty(prop, "S3_BUCKET");
if (S3_ENDPOINT != null) {
S3_CLIENT = MinioClient.builder()
.endpoint(S3_ENDPOINT)
.credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
.build();
} else {
S3_CLIENT = null;
}
System.getenv().forEach((key, value) -> {
if (key.startsWith("hibernate"))
hibernateProperties.put(key, value);
@ -151,6 +174,7 @@ public class Constants {
YOUTUBE_SERVICE.getSupportedCountries().stream().map(ContentCountry::getCountryCode)
.map(JsonNodeFactory.instance::textNode).toList()
);
frontendProperties.put("s3Enabled", S3_CLIENT != null);
// transform hibernate properties for legacy configurations
hibernateProperties.replace("hibernate.dialect",

View file

@ -5,10 +5,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.rometools.rome.feed.synd.SyndFeed;
import com.rometools.rome.io.SyndFeedInput;
import io.activej.config.Config;
import io.activej.http.AsyncServlet;
import io.activej.http.HttpMethod;
import io.activej.http.HttpResponse;
import io.activej.http.RoutingServlet;
import io.activej.http.*;
import io.activej.inject.annotation.Provides;
import io.activej.inject.module.AbstractModule;
import io.activej.inject.module.Module;
@ -18,6 +15,7 @@ import me.kavin.piped.consts.Constants;
import me.kavin.piped.server.handlers.*;
import me.kavin.piped.server.handlers.auth.AuthPlaylistHandlers;
import me.kavin.piped.server.handlers.auth.FeedHandlers;
import me.kavin.piped.server.handlers.auth.StorageHandlers;
import me.kavin.piped.server.handlers.auth.UserHandlers;
import me.kavin.piped.utils.*;
import me.kavin.piped.utils.obj.MatrixHelper;
@ -42,9 +40,14 @@ import static io.activej.http.HttpHeaders.*;
import static io.activej.http.HttpMethod.GET;
import static io.activej.http.HttpMethod.POST;
import static java.nio.charset.StandardCharsets.UTF_8;
import static me.kavin.piped.consts.Constants.mapper;
public class ServerLauncher extends MultithreadedHttpServerLauncher {
private static final HttpHeader FILE_NAME = HttpHeaders.of("x-file-name");
private static final HttpHeader LAST_ETAG = HttpHeaders.of("x-last-etag");
@Provides
Executor executor() {
return Multithreading.getCachedExecutor();
@ -265,7 +268,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/register", AsyncServlet.ofBlocking(executor, request -> {
try {
LoginRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
LoginRequest body = mapper.readValue(request.loadBody().getResult().asArray(),
LoginRequest.class);
return getJsonResponse(UserHandlers.registerResponse(body.username, body.password),
"private");
@ -274,7 +277,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/login", AsyncServlet.ofBlocking(executor, request -> {
try {
LoginRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
LoginRequest body = mapper.readValue(request.loadBody().getResult().asArray(),
LoginRequest.class);
return getJsonResponse(UserHandlers.loginResponse(body.username, body.password), "private");
} catch (Exception e) {
@ -282,7 +285,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/subscribe", AsyncServlet.ofBlocking(executor, request -> {
try {
SubscriptionUpdateRequest body = Constants.mapper
SubscriptionUpdateRequest body = mapper
.readValue(request.loadBody().getResult().asArray(), SubscriptionUpdateRequest.class);
return getJsonResponse(
FeedHandlers.subscribeResponse(request.getHeader(AUTHORIZATION), body.channelId),
@ -292,7 +295,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/unsubscribe", AsyncServlet.ofBlocking(executor, request -> {
try {
SubscriptionUpdateRequest body = Constants.mapper
SubscriptionUpdateRequest body = mapper
.readValue(request.loadBody().getResult().asArray(), SubscriptionUpdateRequest.class);
return getJsonResponse(
FeedHandlers.unsubscribeResponse(request.getHeader(AUTHORIZATION), body.channelId),
@ -331,7 +334,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/feed/unauthenticated", AsyncServlet.ofBlocking(executor, request -> {
try {
String[] subscriptions = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
String[] subscriptions = mapper.readValue(request.loadBody().getResult().asArray(),
String[].class);
return getJsonResponse(FeedHandlers.unauthenticatedFeedResponse(subscriptions), "public, s-maxage=120");
} catch (Exception e) {
@ -347,7 +350,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/import", AsyncServlet.ofBlocking(executor, request -> {
try {
String[] subscriptions = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
String[] subscriptions = mapper.readValue(request.loadBody().getResult().asArray(),
String[].class);
return getJsonResponse(FeedHandlers.importResponse(request.getHeader(AUTHORIZATION),
subscriptions, Boolean.parseBoolean(request.getQueryParameter("override"))), "private");
@ -356,7 +359,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/import/playlist", AsyncServlet.ofBlocking(executor, request -> {
try {
var json = Constants.mapper.readTree(request.loadBody().getResult().asArray());
var json = mapper.readTree(request.loadBody().getResult().asArray());
var playlistId = json.get("playlistId").textValue();
return getJsonResponse(AuthPlaylistHandlers.importPlaylistResponse(request.getHeader(AUTHORIZATION), playlistId), "private");
} catch (Exception e) {
@ -379,7 +382,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/subscriptions/unauthenticated", AsyncServlet.ofBlocking(executor, request -> {
try {
String[] subscriptions = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
String[] subscriptions = mapper.readValue(request.loadBody().getResult().asArray(),
String[].class);
return getJsonResponse(FeedHandlers.unauthenticatedSubscriptionsResponse(subscriptions), "public, s-maxage=120");
} catch (Exception e) {
@ -387,7 +390,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/user/playlists/create", AsyncServlet.ofBlocking(executor, request -> {
try {
var name = Constants.mapper.readTree(request.loadBody().getResult().asArray()).get("name").textValue();
var name = mapper.readTree(request.loadBody().getResult().asArray()).get("name").textValue();
return getJsonResponse(AuthPlaylistHandlers.createPlaylist(request.getHeader(AUTHORIZATION), name), "private");
} catch (Exception e) {
return getErrorResponse(e, request.getPath());
@ -400,7 +403,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/user/playlists/add", AsyncServlet.ofBlocking(executor, request -> {
try {
var json = Constants.mapper.readTree(request.loadBody().getResult().asArray());
var json = mapper.readTree(request.loadBody().getResult().asArray());
var playlistId = json.get("playlistId").textValue();
var videoIds = new ObjectArrayList<String>();
// backwards compatibility
@ -421,7 +424,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/user/playlists/remove", AsyncServlet.ofBlocking(executor, request -> {
try {
var json = Constants.mapper.readTree(request.loadBody().getResult().asArray());
var json = mapper.readTree(request.loadBody().getResult().asArray());
var playlistId = json.get("playlistId").textValue();
var index = json.get("index").intValue();
return getJsonResponse(AuthPlaylistHandlers.removeFromPlaylistResponse(request.getHeader(AUTHORIZATION), playlistId, index), "private");
@ -430,7 +433,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/user/playlists/clear", AsyncServlet.ofBlocking(executor, request -> {
try {
var json = Constants.mapper.readTree(request.loadBody().getResult().asArray());
var json = mapper.readTree(request.loadBody().getResult().asArray());
var playlistId = json.get("playlistId").textValue();
return getJsonResponse(AuthPlaylistHandlers.clearPlaylistResponse(request.getHeader(AUTHORIZATION), playlistId), "private");
} catch (Exception e) {
@ -438,7 +441,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/user/playlists/rename", AsyncServlet.ofBlocking(executor, request -> {
try {
var json = Constants.mapper.readTree(request.loadBody().getResult().asArray());
var json = mapper.readTree(request.loadBody().getResult().asArray());
var playlistId = json.get("playlistId").textValue();
var newName = json.get("newName").textValue();
return getJsonResponse(AuthPlaylistHandlers.renamePlaylistResponse(request.getHeader(AUTHORIZATION), playlistId, newName), "private");
@ -447,7 +450,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/user/playlists/delete", AsyncServlet.ofBlocking(executor, request -> {
try {
var json = Constants.mapper.readTree(request.loadBody().getResult().asArray());
var json = mapper.readTree(request.loadBody().getResult().asArray());
var playlistId = json.get("playlistId").textValue();
return getJsonResponse(AuthPlaylistHandlers.deletePlaylistResponse(request.getHeader(AUTHORIZATION), playlistId), "private");
} catch (Exception e) {
@ -462,7 +465,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/user/delete", AsyncServlet.ofBlocking(executor, request -> {
try {
DeleteUserRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
DeleteUserRequest body = mapper.readValue(request.loadBody().getResult().asArray(),
DeleteUserRequest.class);
return getJsonResponse(UserHandlers.deleteUserResponse(request.getHeader(AUTHORIZATION), body.password),
"private");
@ -475,7 +478,33 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
} catch (Exception e) {
return getErrorResponse(e, request.getPath());
}
})).map(GET, "/", AsyncServlet.ofBlocking(executor, request -> HttpResponse.redirect302(Constants.FRONTEND_URL)));
})).map(GET, "/storage/stat", AsyncServlet.ofBlocking(executor, request -> {
try {
var file = request.getQueryParameter("file");
return getJsonResponse(StorageHandlers.statFile(request.getHeader(AUTHORIZATION), file), "private");
} catch (Exception e) {
return getErrorResponse(e, request.getPath());
}
})).map(POST, "/storage/put", AsyncServlet.ofBlocking(executor, request -> {
try {
var data = request.loadBody().getResult().asArray();
String fileName = request.getHeader(FILE_NAME);
String etag = request.getHeader(LAST_ETAG);
return getJsonResponse(StorageHandlers.putFile(request.getHeader(AUTHORIZATION), fileName, etag, data), "private");
} catch (Exception e) {
return getErrorResponse(e, request.getPath());
}
})).map(GET, "/storage/get", AsyncServlet.ofBlocking(executor, request -> {
try {
var file = request.getQueryParameter("file");
return getRawResponse(StorageHandlers.getFile(request.getHeader(AUTHORIZATION), file), "application/octet-stream", "private");
} catch (Exception e) {
return getErrorResponse(e, request.getPath());
}
}))
.map(GET, "/", AsyncServlet.ofBlocking(executor, request -> HttpResponse.redirect302(Constants.FRONTEND_URL)));
return new CustomServletDecorator(router);
}
@ -540,7 +569,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
try {
return getJsonResponse(500, Constants.mapper
return getJsonResponse(500, mapper
.writeValueAsBytes(new StackTraceResponse(ExceptionUtils.getStackTrace(e), e.getMessage())), "private");
} catch (JsonProcessingException ex) {
return HttpResponse.ofCode(500);

View file

@ -0,0 +1,130 @@
package me.kavin.piped.server.handlers.auth;
import io.minio.GetObjectArgs;
import io.minio.PutObjectArgs;
import io.minio.StatObjectArgs;
import io.minio.errors.ErrorResponseException;
import me.kavin.piped.consts.Constants;
import me.kavin.piped.utils.DatabaseHelper;
import me.kavin.piped.utils.ExceptionHandler;
import me.kavin.piped.utils.obj.db.User;
import me.kavin.piped.utils.resp.SimpleErrorMessage;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import java.io.ByteArrayInputStream;
import static me.kavin.piped.consts.Constants.mapper;
public class StorageHandlers {
public static byte[] statFile(String session, String name) throws Exception {
if (Constants.S3_CLIENT == null)
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Storage is not configured on this instance!"));
if (!StringUtils.isAlphanumeric(name) || name.length() > 32)
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid path provided!"));
User user = DatabaseHelper.getUserFromSession(session);
if (user == null)
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid session provided!"));
try {
var statData = Constants.S3_CLIENT.statObject(
StatObjectArgs.builder()
.bucket(Constants.S3_BUCKET)
.object(user.getId() + "/" + name)
.build()
);
return mapper.writeValueAsBytes(
mapper.createObjectNode()
.put("status", "exists")
.put("etag", statData.etag())
.put("date", statData.lastModified().toInstant().toEpochMilli())
);
} catch (ErrorResponseException e) {
if (e.errorResponse().code().equals("NoSuchKey"))
return mapper.writeValueAsBytes(
mapper.createObjectNode()
.put("status", "not_exists")
);
else
throw e;
}
}
public static byte[] putFile(String session, String name, String etag, byte[] content) throws Exception {
if (Constants.S3_CLIENT == null)
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Storage is not configured on this instance!"));
if (!StringUtils.isAlphanumeric(name) || name.length() > 32)
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid path provided!"));
User user = DatabaseHelper.getUserFromSession(session);
if (user == null)
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid session provided!"));
// check if file size is greater than 500kb
if (content.length > 500 * 1024)
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("File size is too large!"));
// check if file already exists, if it does, check if the etag matches
try {
var statData = Constants.S3_CLIENT.statObject(
StatObjectArgs.builder()
.bucket(Constants.S3_BUCKET)
.object(user.getId() + "/" + name)
.build()
);
if (!statData.etag().equals(etag))
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid etag provided! (File uploaded by another client?)"));
} catch (ErrorResponseException e) {
if (!e.errorResponse().code().equals("NoSuchKey"))
ExceptionUtils.rethrow(e);
}
var stream = new ByteArrayInputStream(content);
Constants.S3_CLIENT.putObject(
PutObjectArgs.builder()
.bucket(Constants.S3_BUCKET)
.object(user.getId() + "/" + name)
.stream(stream, content.length, -1)
.build()
);
return mapper.writeValueAsBytes(
mapper.createObjectNode()
.put("status", "ok")
);
}
public static byte[] getFile(String session, String name) throws Exception {
if (Constants.S3_CLIENT == null)
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Storage is not configured on this instance!"));
if (!StringUtils.isAlphanumeric(name) || name.length() > 32)
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid path provided!"));
User user = DatabaseHelper.getUserFromSession(session);
if (user == null)
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid session provided!"));
try (var stream = Constants.S3_CLIENT.getObject(GetObjectArgs.builder()
.bucket(Constants.S3_BUCKET)
.object(user.getId() + "/" + name)
.build())) {
return IOUtils.toByteArray(stream);
}
}
}