diff --git a/build.gradle b/build.gradle index b1a5d30..bdd6693 100644 --- a/build.gradle +++ b/build.gradle @@ -41,6 +41,7 @@ dependencies { implementation 'com.squareup.okhttp3:okhttp-brotli' implementation 'io.sentry:sentry:6.11.0' implementation 'rocks.kavin:reqwest4j:1.0' + implementation 'io.minio:minio:8.5.1' } shadowJar { diff --git a/src/main/java/me/kavin/piped/consts/Constants.java b/src/main/java/me/kavin/piped/consts/Constants.java index 760adde..98f8b24 100644 --- a/src/main/java/me/kavin/piped/consts/Constants.java +++ b/src/main/java/me/kavin/piped/consts/Constants.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.minio.MinioClient; import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap; import me.kavin.piped.utils.PageMixin; import me.kavin.piped.utils.RequestUtils; @@ -77,6 +78,16 @@ public class Constants { public static final String SENTRY_DSN; + public static final String S3_ENDPOINT; + + public static final String S3_ACCESS_KEY; + + public static final String S3_SECRET_KEY; + + public static final String S3_BUCKET; + + public static final MinioClient S3_CLIENT; + public static final String MATRIX_ROOM = "#piped-events:matrix.org"; public static final String MATRIX_SERVER; @@ -132,6 +143,18 @@ public class Constants { DISABLE_LBRY = Boolean.parseBoolean(getProperty(prop, "DISABLE_LBRY", "false")); SUBSCRIPTIONS_EXPIRY = Integer.parseInt(getProperty(prop, "SUBSCRIPTIONS_EXPIRY", "30")); SENTRY_DSN = getProperty(prop, "SENTRY_DSN", ""); + S3_ENDPOINT = getProperty(prop, "S3_ENDPOINT"); + S3_ACCESS_KEY = getProperty(prop, "S3_ACCESS_KEY"); + S3_SECRET_KEY = getProperty(prop, "S3_SECRET_KEY"); + S3_BUCKET = getProperty(prop, "S3_BUCKET"); + if (S3_ENDPOINT != null) { + S3_CLIENT = MinioClient.builder() + .endpoint(S3_ENDPOINT) + .credentials(S3_ACCESS_KEY, S3_SECRET_KEY) + .build(); + } else { + S3_CLIENT = null; + } System.getenv().forEach((key, value) -> { if (key.startsWith("hibernate")) hibernateProperties.put(key, value); diff --git a/src/main/java/me/kavin/piped/server/ServerLauncher.java b/src/main/java/me/kavin/piped/server/ServerLauncher.java index 9a93023..76595f8 100644 --- a/src/main/java/me/kavin/piped/server/ServerLauncher.java +++ b/src/main/java/me/kavin/piped/server/ServerLauncher.java @@ -5,10 +5,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.rometools.rome.feed.synd.SyndFeed; import com.rometools.rome.io.SyndFeedInput; import io.activej.config.Config; -import io.activej.http.AsyncServlet; -import io.activej.http.HttpMethod; -import io.activej.http.HttpResponse; -import io.activej.http.RoutingServlet; +import io.activej.http.*; import io.activej.inject.annotation.Provides; import io.activej.inject.module.AbstractModule; import io.activej.inject.module.Module; @@ -18,6 +15,7 @@ import me.kavin.piped.consts.Constants; import me.kavin.piped.server.handlers.*; import me.kavin.piped.server.handlers.auth.AuthPlaylistHandlers; import me.kavin.piped.server.handlers.auth.FeedHandlers; +import me.kavin.piped.server.handlers.auth.StorageHandlers; import me.kavin.piped.server.handlers.auth.UserHandlers; import me.kavin.piped.utils.*; import me.kavin.piped.utils.obj.MatrixHelper; @@ -42,9 +40,14 @@ import static io.activej.http.HttpHeaders.*; import static io.activej.http.HttpMethod.GET; import static io.activej.http.HttpMethod.POST; import static java.nio.charset.StandardCharsets.UTF_8; +import static me.kavin.piped.consts.Constants.mapper; public class ServerLauncher extends MultithreadedHttpServerLauncher { + private static final HttpHeader FILE_NAME = HttpHeaders.of("x-file-name"); + private static final HttpHeader LAST_ETAG = HttpHeaders.of("x-last-etag"); + + @Provides Executor executor() { return Multithreading.getCachedExecutor(); @@ -265,7 +268,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } })).map(POST, "/register", AsyncServlet.ofBlocking(executor, request -> { try { - LoginRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(), + LoginRequest body = mapper.readValue(request.loadBody().getResult().asArray(), LoginRequest.class); return getJsonResponse(UserHandlers.registerResponse(body.username, body.password), "private"); @@ -274,7 +277,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } })).map(POST, "/login", AsyncServlet.ofBlocking(executor, request -> { try { - LoginRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(), + LoginRequest body = mapper.readValue(request.loadBody().getResult().asArray(), LoginRequest.class); return getJsonResponse(UserHandlers.loginResponse(body.username, body.password), "private"); } catch (Exception e) { @@ -282,7 +285,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } })).map(POST, "/subscribe", AsyncServlet.ofBlocking(executor, request -> { try { - SubscriptionUpdateRequest body = Constants.mapper + SubscriptionUpdateRequest body = mapper .readValue(request.loadBody().getResult().asArray(), SubscriptionUpdateRequest.class); return getJsonResponse( FeedHandlers.subscribeResponse(request.getHeader(AUTHORIZATION), body.channelId), @@ -292,7 +295,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } })).map(POST, "/unsubscribe", AsyncServlet.ofBlocking(executor, request -> { try { - SubscriptionUpdateRequest body = Constants.mapper + SubscriptionUpdateRequest body = mapper .readValue(request.loadBody().getResult().asArray(), SubscriptionUpdateRequest.class); return getJsonResponse( FeedHandlers.unsubscribeResponse(request.getHeader(AUTHORIZATION), body.channelId), @@ -331,7 +334,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } })).map(POST, "/feed/unauthenticated", AsyncServlet.ofBlocking(executor, request -> { try { - String[] subscriptions = Constants.mapper.readValue(request.loadBody().getResult().asArray(), + String[] subscriptions = mapper.readValue(request.loadBody().getResult().asArray(), String[].class); return getJsonResponse(FeedHandlers.unauthenticatedFeedResponse(subscriptions), "public, s-maxage=120"); } catch (Exception e) { @@ -347,7 +350,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } })).map(POST, "/import", AsyncServlet.ofBlocking(executor, request -> { try { - String[] subscriptions = Constants.mapper.readValue(request.loadBody().getResult().asArray(), + String[] subscriptions = mapper.readValue(request.loadBody().getResult().asArray(), String[].class); return getJsonResponse(FeedHandlers.importResponse(request.getHeader(AUTHORIZATION), subscriptions, Boolean.parseBoolean(request.getQueryParameter("override"))), "private"); @@ -356,7 +359,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } })).map(POST, "/import/playlist", AsyncServlet.ofBlocking(executor, request -> { try { - var json = Constants.mapper.readTree(request.loadBody().getResult().asArray()); + var json = mapper.readTree(request.loadBody().getResult().asArray()); var playlistId = json.get("playlistId").textValue(); return getJsonResponse(AuthPlaylistHandlers.importPlaylistResponse(request.getHeader(AUTHORIZATION), playlistId), "private"); } catch (Exception e) { @@ -379,7 +382,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } })).map(POST, "/subscriptions/unauthenticated", AsyncServlet.ofBlocking(executor, request -> { try { - String[] subscriptions = Constants.mapper.readValue(request.loadBody().getResult().asArray(), + String[] subscriptions = mapper.readValue(request.loadBody().getResult().asArray(), String[].class); return getJsonResponse(FeedHandlers.unauthenticatedSubscriptionsResponse(subscriptions), "public, s-maxage=120"); } catch (Exception e) { @@ -387,7 +390,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } })).map(POST, "/user/playlists/create", AsyncServlet.ofBlocking(executor, request -> { try { - var name = Constants.mapper.readTree(request.loadBody().getResult().asArray()).get("name").textValue(); + var name = mapper.readTree(request.loadBody().getResult().asArray()).get("name").textValue(); return getJsonResponse(AuthPlaylistHandlers.createPlaylist(request.getHeader(AUTHORIZATION), name), "private"); } catch (Exception e) { return getErrorResponse(e, request.getPath()); @@ -400,7 +403,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } })).map(POST, "/user/playlists/add", AsyncServlet.ofBlocking(executor, request -> { try { - var json = Constants.mapper.readTree(request.loadBody().getResult().asArray()); + var json = mapper.readTree(request.loadBody().getResult().asArray()); var playlistId = json.get("playlistId").textValue(); var videoIds = new ObjectArrayList(); // backwards compatibility @@ -421,7 +424,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } })).map(POST, "/user/playlists/remove", AsyncServlet.ofBlocking(executor, request -> { try { - var json = Constants.mapper.readTree(request.loadBody().getResult().asArray()); + var json = mapper.readTree(request.loadBody().getResult().asArray()); var playlistId = json.get("playlistId").textValue(); var index = json.get("index").intValue(); return getJsonResponse(AuthPlaylistHandlers.removeFromPlaylistResponse(request.getHeader(AUTHORIZATION), playlistId, index), "private"); @@ -430,7 +433,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } })).map(POST, "/user/playlists/clear", AsyncServlet.ofBlocking(executor, request -> { try { - var json = Constants.mapper.readTree(request.loadBody().getResult().asArray()); + var json = mapper.readTree(request.loadBody().getResult().asArray()); var playlistId = json.get("playlistId").textValue(); return getJsonResponse(AuthPlaylistHandlers.clearPlaylistResponse(request.getHeader(AUTHORIZATION), playlistId), "private"); } catch (Exception e) { @@ -438,7 +441,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } })).map(POST, "/user/playlists/rename", AsyncServlet.ofBlocking(executor, request -> { try { - var json = Constants.mapper.readTree(request.loadBody().getResult().asArray()); + var json = mapper.readTree(request.loadBody().getResult().asArray()); var playlistId = json.get("playlistId").textValue(); var newName = json.get("newName").textValue(); return getJsonResponse(AuthPlaylistHandlers.renamePlaylistResponse(request.getHeader(AUTHORIZATION), playlistId, newName), "private"); @@ -447,7 +450,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } })).map(POST, "/user/playlists/delete", AsyncServlet.ofBlocking(executor, request -> { try { - var json = Constants.mapper.readTree(request.loadBody().getResult().asArray()); + var json = mapper.readTree(request.loadBody().getResult().asArray()); var playlistId = json.get("playlistId").textValue(); return getJsonResponse(AuthPlaylistHandlers.deletePlaylistResponse(request.getHeader(AUTHORIZATION), playlistId), "private"); } catch (Exception e) { @@ -462,7 +465,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } })).map(POST, "/user/delete", AsyncServlet.ofBlocking(executor, request -> { try { - DeleteUserRequest body = Constants.mapper.readValue(request.loadBody().getResult().asArray(), + DeleteUserRequest body = mapper.readValue(request.loadBody().getResult().asArray(), DeleteUserRequest.class); return getJsonResponse(UserHandlers.deleteUserResponse(request.getHeader(AUTHORIZATION), body.password), "private"); @@ -475,7 +478,26 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } catch (Exception e) { return getErrorResponse(e, request.getPath()); } - })).map(GET, "/", AsyncServlet.ofBlocking(executor, request -> HttpResponse.redirect302(Constants.FRONTEND_URL))); + })).map(GET, "/storage/stat", AsyncServlet.ofBlocking(executor, request -> { + try { + var json = mapper.readTree(request.loadBody().getResult().asArray()); + return getJsonResponse(StorageHandlers.statFile(request.getHeader(AUTHORIZATION), json.get("file").textValue()), "private"); + } catch (Exception e) { + return getErrorResponse(e, request.getPath()); + } + })).map(POST, "/storage/put", AsyncServlet.ofBlocking(executor, request -> { + try { + var data = request.loadBody().getResult().asArray(); + + String fileName = request.getHeader(FILE_NAME); + String etag = request.getHeader(LAST_ETAG); + + return getJsonResponse(StorageHandlers.putFile(request.getHeader(AUTHORIZATION), fileName, etag, data), "private"); + } catch (Exception e) { + return getErrorResponse(e, request.getPath()); + } + })) + .map(GET, "/", AsyncServlet.ofBlocking(executor, request -> HttpResponse.redirect302(Constants.FRONTEND_URL))); return new CustomServletDecorator(router); } @@ -540,7 +562,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { } try { - return getJsonResponse(500, Constants.mapper + return getJsonResponse(500, mapper .writeValueAsBytes(new StackTraceResponse(ExceptionUtils.getStackTrace(e), e.getMessage())), "private"); } catch (JsonProcessingException ex) { return HttpResponse.ofCode(500); diff --git a/src/main/java/me/kavin/piped/server/handlers/auth/StorageHandlers.java b/src/main/java/me/kavin/piped/server/handlers/auth/StorageHandlers.java new file mode 100644 index 0000000..b57270d --- /dev/null +++ b/src/main/java/me/kavin/piped/server/handlers/auth/StorageHandlers.java @@ -0,0 +1,101 @@ +package me.kavin.piped.server.handlers.auth; + +import io.minio.PutObjectArgs; +import io.minio.StatObjectArgs; +import io.minio.errors.ErrorResponseException; +import me.kavin.piped.consts.Constants; +import me.kavin.piped.utils.DatabaseHelper; +import me.kavin.piped.utils.ExceptionHandler; +import me.kavin.piped.utils.obj.db.User; +import me.kavin.piped.utils.resp.SimpleErrorMessage; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; + +import java.io.ByteArrayInputStream; + +import static me.kavin.piped.consts.Constants.mapper; + +public class StorageHandlers { + + public static byte[] statFile(String session, String name) throws Exception { + + if (!StringUtils.isAlphanumeric(name) || name.length() > 32) + ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid path provided!")); + + User user = DatabaseHelper.getUserFromSession(session); + + if (user == null) + ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid session provided!")); + + try { + var statData = Constants.S3_CLIENT.statObject( + StatObjectArgs.builder() + .bucket(Constants.S3_BUCKET) + .object(user.getId() + "/" + name) + .build() + ); + + return mapper.writeValueAsBytes( + mapper.createObjectNode() + .put("status", "exists") + .put("etag", statData.etag()) + .put("date", statData.lastModified().toInstant().toEpochMilli()) + ); + } catch (ErrorResponseException e) { + if (e.errorResponse().code().equals("NoSuchKey")) + return mapper.writeValueAsBytes( + mapper.createObjectNode() + .put("status", "not_exists") + ); + else + throw e; + } + } + + public static byte[] putFile(String session, String name, String etag, byte[] content) throws Exception { + + if (!StringUtils.isAlphanumeric(name) || name.length() > 32) + ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid path provided!")); + + User user = DatabaseHelper.getUserFromSession(session); + + if (user == null) + ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid session provided!")); + + // check if file size is greater than 500kb + if (content.length > 500 * 1024) + ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("File size is too large!")); + + // check if file already exists, if it does, check if the etag matches + try { + var statData = Constants.S3_CLIENT.statObject( + StatObjectArgs.builder() + .bucket(Constants.S3_BUCKET) + .object(user.getId() + "/" + name) + .build() + ); + + if (!statData.etag().equals(etag)) + ExceptionHandler.throwErrorResponse(new SimpleErrorMessage("Invalid etag provided! (File uploaded by another client?)")); + + } catch (ErrorResponseException e) { + if (!e.errorResponse().code().equals("NoSuchKey")) + ExceptionUtils.rethrow(e); + } + + var stream = new ByteArrayInputStream(content); + + Constants.S3_CLIENT.putObject( + PutObjectArgs.builder() + .bucket(Constants.S3_BUCKET) + .object(user.getId() + "/" + name) + .stream(stream, content.length, -1) + .build() + ); + + return mapper.writeValueAsBytes( + mapper.createObjectNode() + .put("status", "ok") + ); + } +}