Implement persistent client-data storage with S3.

This commit is contained in:
Kavin 2023-02-02 13:00:35 +00:00
parent 7d53a588b9
commit 0e04526217
No known key found for this signature in database
GPG key ID: 49451E4482CC5BCD
4 changed files with 168 additions and 21 deletions

View file

@ -41,6 +41,7 @@ dependencies {
implementation 'com.squareup.okhttp3:okhttp-brotli'
implementation 'io.sentry:sentry:6.11.0'
implementation 'rocks.kavin:reqwest4j:1.0'
implementation 'io.minio:minio:8.5.1'
}
shadowJar {

View file

@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.minio.MinioClient;
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
import me.kavin.piped.utils.PageMixin;
import me.kavin.piped.utils.RequestUtils;
@ -77,6 +78,16 @@ public class Constants {
public static final String SENTRY_DSN;
public static final String S3_ENDPOINT;
public static final String S3_ACCESS_KEY;
public static final String S3_SECRET_KEY;
public static final String S3_BUCKET;
public static final MinioClient S3_CLIENT;
public static final String MATRIX_ROOM = "#piped-events:matrix.org";
public static final String MATRIX_SERVER;
@ -132,6 +143,18 @@ public class Constants {
DISABLE_LBRY = Boolean.parseBoolean(getProperty(prop, "DISABLE_LBRY", "false"));
SUBSCRIPTIONS_EXPIRY = Integer.parseInt(getProperty(prop, "SUBSCRIPTIONS_EXPIRY", "30"));
SENTRY_DSN = getProperty(prop, "SENTRY_DSN", "");
S3_ENDPOINT = getProperty(prop, "S3_ENDPOINT");
S3_ACCESS_KEY = getProperty(prop, "S3_ACCESS_KEY");
S3_SECRET_KEY = getProperty(prop, "S3_SECRET_KEY");
S3_BUCKET = getProperty(prop, "S3_BUCKET");
if (S3_ENDPOINT != null) {
S3_CLIENT = MinioClient.builder()
.endpoint(S3_ENDPOINT)
.credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
.build();
} else {
S3_CLIENT = null;
}
System.getenv().forEach((key, value) -> {
if (key.startsWith("hibernate"))
hibernateProperties.put(key, value);

View file

@ -5,10 +5,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.rometools.rome.feed.synd.SyndFeed;
import com.rometools.rome.io.SyndFeedInput;
import io.activej.config.Config;
import io.activej.http.AsyncServlet;
import io.activej.http.HttpMethod;
import io.activej.http.HttpResponse;
import io.activej.http.RoutingServlet;
import io.activej.http.*;
import io.activej.inject.annotation.Provides;
import io.activej.inject.module.AbstractModule;
import io.activej.inject.module.Module;
@ -18,6 +15,7 @@ import me.kavin.piped.consts.Constants;
import me.kavin.piped.server.handlers.*;
import me.kavin.piped.server.handlers.auth.AuthPlaylistHandlers;
import me.kavin.piped.server.handlers.auth.FeedHandlers;
import me.kavin.piped.server.handlers.auth.StorageHandlers;
import me.kavin.piped.server.handlers.auth.UserHandlers;
import me.kavin.piped.utils.*;
import me.kavin.piped.utils.obj.MatrixHelper;
@ -42,9 +40,14 @@ import static io.activej.http.HttpHeaders.*;
import static io.activej.http.HttpMethod.GET;
import static io.activej.http.HttpMethod.POST;
import static java.nio.charset.StandardCharsets.UTF_8;
import static me.kavin.piped.consts.Constants.mapper;
public class ServerLauncher extends MultithreadedHttpServerLauncher {
private static final HttpHeader FILE_NAME = HttpHeaders.of("x-file-name");
private static final HttpHeader LAST_ETAG = HttpHeaders.of("x-last-etag");
@Provides
Executor executor() {
return Multithreading.getCachedExecutor();
@ -265,7 +268,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/register", AsyncServlet.ofBlocking(executor, request -> {
try {
LoginRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
LoginRequest body = mapper.readValue(request.loadBody().getResult().asArray(),
LoginRequest.class);
return getJsonResponse(UserHandlers.registerResponse(body.username, body.password),
"private");
@ -274,7 +277,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/login", AsyncServlet.ofBlocking(executor, request -> {
try {
LoginRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
LoginRequest body = mapper.readValue(request.loadBody().getResult().asArray(),
LoginRequest.class);
return getJsonResponse(UserHandlers.loginResponse(body.username, body.password), "private");
} catch (Exception e) {
@ -282,7 +285,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/subscribe", AsyncServlet.ofBlocking(executor, request -> {
try {
SubscriptionUpdateRequest body = Constants.mapper
SubscriptionUpdateRequest body = mapper
.readValue(request.loadBody().getResult().asArray(), SubscriptionUpdateRequest.class);
return getJsonResponse(
FeedHandlers.subscribeResponse(request.getHeader(AUTHORIZATION), body.channelId),
@ -292,7 +295,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/unsubscribe", AsyncServlet.ofBlocking(executor, request -> {
try {
SubscriptionUpdateRequest body = Constants.mapper
SubscriptionUpdateRequest body = mapper
.readValue(request.loadBody().getResult().asArray(), SubscriptionUpdateRequest.class);
return getJsonResponse(
FeedHandlers.unsubscribeResponse(request.getHeader(AUTHORIZATION), body.channelId),
@ -331,7 +334,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/feed/unauthenticated", AsyncServlet.ofBlocking(executor, request -> {
try {
String[] subscriptions = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
String[] subscriptions = mapper.readValue(request.loadBody().getResult().asArray(),
String[].class);
return getJsonResponse(FeedHandlers.unauthenticatedFeedResponse(subscriptions), "public, s-maxage=120");
} catch (Exception e) {
@ -347,7 +350,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/import", AsyncServlet.ofBlocking(executor, request -> {
try {
String[] subscriptions = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
String[] subscriptions = mapper.readValue(request.loadBody().getResult().asArray(),
String[].class);
return getJsonResponse(FeedHandlers.importResponse(request.getHeader(AUTHORIZATION),
subscriptions, Boolean.parseBoolean(request.getQueryParameter("override"))), "private");
@ -356,7 +359,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/import/playlist", AsyncServlet.ofBlocking(executor, request -> {
try {
var json = Constants.mapper.readTree(request.loadBody().getResult().asArray());
var json = mapper.readTree(request.loadBody().getResult().asArray());
var playlistId = json.get("playlistId").textValue();
return getJsonResponse(AuthPlaylistHandlers.importPlaylistResponse(request.getHeader(AUTHORIZATION), playlistId), "private");
} catch (Exception e) {
@ -379,7 +382,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/subscriptions/unauthenticated", AsyncServlet.ofBlocking(executor, request -> {
try {
String[] subscriptions = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
String[] subscriptions = mapper.readValue(request.loadBody().getResult().asArray(),
String[].class);
return getJsonResponse(FeedHandlers.unauthenticatedSubscriptionsResponse(subscriptions), "public, s-maxage=120");
} catch (Exception e) {
@ -387,7 +390,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/user/playlists/create", AsyncServlet.ofBlocking(executor, request -> {
try {
var name = Constants.mapper.readTree(request.loadBody().getResult().asArray()).get("name").textValue();
var name = mapper.readTree(request.loadBody().getResult().asArray()).get("name").textValue();
return getJsonResponse(AuthPlaylistHandlers.createPlaylist(request.getHeader(AUTHORIZATION), name), "private");
} catch (Exception e) {
return getErrorResponse(e, request.getPath());
@ -400,7 +403,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/user/playlists/add", AsyncServlet.ofBlocking(executor, request -> {
try {
var json = Constants.mapper.readTree(request.loadBody().getResult().asArray());
var json = mapper.readTree(request.loadBody().getResult().asArray());
var playlistId = json.get("playlistId").textValue();
var videoIds = new ObjectArrayList<String>();
// backwards compatibility
@ -421,7 +424,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/user/playlists/remove", AsyncServlet.ofBlocking(executor, request -> {
try {
var json = Constants.mapper.readTree(request.loadBody().getResult().asArray());
var json = mapper.readTree(request.loadBody().getResult().asArray());
var playlistId = json.get("playlistId").textValue();
var index = json.get("index").intValue();
return getJsonResponse(AuthPlaylistHandlers.removeFromPlaylistResponse(request.getHeader(AUTHORIZATION), playlistId, index), "private");
@ -430,7 +433,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/user/playlists/clear", AsyncServlet.ofBlocking(executor, request -> {
try {
var json = Constants.mapper.readTree(request.loadBody().getResult().asArray());
var json = mapper.readTree(request.loadBody().getResult().asArray());
var playlistId = json.get("playlistId").textValue();
return getJsonResponse(AuthPlaylistHandlers.clearPlaylistResponse(request.getHeader(AUTHORIZATION), playlistId), "private");
} catch (Exception e) {
@ -438,7 +441,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/user/playlists/rename", AsyncServlet.ofBlocking(executor, request -> {
try {
var json = Constants.mapper.readTree(request.loadBody().getResult().asArray());
var json = mapper.readTree(request.loadBody().getResult().asArray());
var playlistId = json.get("playlistId").textValue();
var newName = json.get("newName").textValue();
return getJsonResponse(AuthPlaylistHandlers.renamePlaylistResponse(request.getHeader(AUTHORIZATION), playlistId, newName), "private");
@ -447,7 +450,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/user/playlists/delete", AsyncServlet.ofBlocking(executor, request -> {
try {
var json = Constants.mapper.readTree(request.loadBody().getResult().asArray());
var json = mapper.readTree(request.loadBody().getResult().asArray());
var playlistId = json.get("playlistId").textValue();
return getJsonResponse(AuthPlaylistHandlers.deletePlaylistResponse(request.getHeader(AUTHORIZATION), playlistId), "private");
} catch (Exception e) {
@ -462,7 +465,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
})).map(POST, "/user/delete", AsyncServlet.ofBlocking(executor, request -> {
try {
DeleteUserRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(),
DeleteUserRequest body = mapper.readValue(request.loadBody().getResult().asArray(),
DeleteUserRequest.class);
return getJsonResponse(UserHandlers.deleteUserResponse(request.getHeader(AUTHORIZATION), body.password),
"private");
@ -475,7 +478,26 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
} catch (Exception e) {
return getErrorResponse(e, request.getPath());
}
})).map(GET, "/", AsyncServlet.ofBlocking(executor, request -> HttpResponse.redirect302(Constants.FRONTEND_URL)));
})).map(GET, "/storage/stat", AsyncServlet.ofBlocking(executor, request -> {
try {
var json = mapper.readTree(request.loadBody().getResult().asArray());
return getJsonResponse(StorageHandlers.statFile(request.getHeader(AUTHORIZATION), json.get("file").textValue()), "private");
} catch (Exception e) {
return getErrorResponse(e, request.getPath());
}
})).map(POST, "/storage/put", AsyncServlet.ofBlocking(executor, request -> {
try {
var data = request.loadBody().getResult().asArray();
String fileName = request.getHeader(FILE_NAME);
String etag = request.getHeader(LAST_ETAG);
return getJsonResponse(StorageHandlers.putFile(request.getHeader(AUTHORIZATION), fileName, etag, data), "private");
} catch (Exception e) {
return getErrorResponse(e, request.getPath());
}
}))
.map(GET, "/", AsyncServlet.ofBlocking(executor, request -> HttpResponse.redirect302(Constants.FRONTEND_URL)));
return new CustomServletDecorator(router);
}
@ -540,7 +562,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
}
try {
return getJsonResponse(500, Constants.mapper
return getJsonResponse(500, mapper
.writeValueAsBytes(new StackTraceResponse(ExceptionUtils.getStackTrace(e), e.getMessage())), "private");
} catch (JsonProcessingException ex) {
return HttpResponse.ofCode(500);

View file

@ -0,0 +1,101 @@
package me.kavin.piped.server.handlers.auth;
import io.minio.PutObjectArgs;
import io.minio.StatObjectArgs;
import io.minio.errors.ErrorResponseException;
import me.kavin.piped.consts.Constants;
import me.kavin.piped.utils.DatabaseHelper;
import me.kavin.piped.utils.ExceptionHandler;
import me.kavin.piped.utils.obj.db.User;
import me.kavin.piped.utils.resp.SimpleErrorMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import java.io.ByteArrayInputStream;
import static me.kavin.piped.consts.Constants.mapper;
public class StorageHandlers {
public static byte[] statFile(String session, String name) throws Exception {
if (!StringUtils.isAlphanumeric(name) || name.length() > 32)
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid path provided!"));
User user = DatabaseHelper.getUserFromSession(session);
if (user == null)
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid session provided!"));
try {
var statData = Constants.S3_CLIENT.statObject(
StatObjectArgs.builder()
.bucket(Constants.S3_BUCKET)
.object(user.getId() + "/" + name)
.build()
);
return mapper.writeValueAsBytes(
mapper.createObjectNode()
.put("status", "exists")
.put("etag", statData.etag())
.put("date", statData.lastModified().toInstant().toEpochMilli())
);
} catch (ErrorResponseException e) {
if (e.errorResponse().code().equals("NoSuchKey"))
return mapper.writeValueAsBytes(
mapper.createObjectNode()
.put("status", "not_exists")
);
else
throw e;
}
}
public static byte[] putFile(String session, String name, String etag, byte[] content) throws Exception {
if (!StringUtils.isAlphanumeric(name) || name.length() > 32)
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid path provided!"));
User user = DatabaseHelper.getUserFromSession(session);
if (user == null)
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid session provided!"));
// check if file size is greater than 500kb
if (content.length > 500 * 1024)
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("File size is too large!"));
// check if file already exists, if it does, check if the etag matches
try {
var statData = Constants.S3_CLIENT.statObject(
StatObjectArgs.builder()
.bucket(Constants.S3_BUCKET)
.object(user.getId() + "/" + name)
.build()
);
if (!statData.etag().equals(etag))
ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid etag provided! (File uploaded by another client?)"));
} catch (ErrorResponseException e) {
if (!e.errorResponse().code().equals("NoSuchKey"))
ExceptionUtils.rethrow(e);
}
var stream = new ByteArrayInputStream(content);
Constants.S3_CLIENT.putObject(
PutObjectArgs.builder()
.bucket(Constants.S3_BUCKET)
.object(user.getId() + "/" + name)
.stream(stream, content.length, -1)
.build()
);
return mapper.writeValueAsBytes(
mapper.createObjectNode()
.put("status", "ok")
);
}
}