ActiveJ migration. (#30)

* Initial ActiveJ migration.

* Add multi-threading.
This commit is contained in:
FireMasterK 2021-03-29 13:59:10 +00:00 committed by GitHub
parent cb4378c9f8
commit efc28a8994
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 246 additions and 277 deletions

View File

@ -5,7 +5,6 @@ plugins {
}
repositories {
maven { url 'https://repo.spring.io/milestone' }
jcenter()
maven { url 'https://jitpack.io' }
}
@ -18,7 +17,6 @@ dependencies {
implementation 'commons-codec:commons-codec:1.15'
implementation 'org.bouncycastle:bcprov-jdk15on:1.67'
implementation 'org.mongodb:mongodb-driver-sync:4.2.2'
implementation 'io.projectreactor.netty:reactor-netty:1.0.5'
implementation 'com.github.TeamNewPipe.NewPipeExtractor:NewPipeExtractor:dev-SNAPSHOT'
implementation 'com.github.TeamNewPipe:nanojson:1d9e1aea9049fc9f85e68b43ba39fe7be1c1f751'
implementation 'com.fasterxml.jackson.core:jackson-core:2.12.2'
@ -29,6 +27,11 @@ dependencies {
implementation 'com.rometools:rome:1.15.0'
implementation 'com.github.ipfs:java-ipfs-http-client:v1.3.3'
implementation 'org.jsoup:jsoup:1.13.1'
implementation 'io.activej:activej-common:4.1'
implementation 'io.activej:activej-http:4.1'
implementation 'io.activej:activej-boot:4.1'
implementation 'io.activej:activej-specializer:4.1'
implementation 'io.activej:activej-launchers-http:4.1'
implementation 'net.java.dev.jna:jna-platform:5.8.0'
}

View File

@ -2,6 +2,9 @@
# The port to Listen on.
PORT: 8080
# The number of workers to use for the server
HTTP_WORKERS: 128
# Proxy
PROXY_PART: https://pipedproxy-ams.kavin.rocks

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.3-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

View File

@ -1,34 +1,10 @@
package me.kavin.piped;
import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN;
import static io.netty.handler.codec.http.HttpHeaderNames.CACHE_CONTROL;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_JSON;
import static io.netty.handler.codec.http.HttpHeaderValues.TEXT_PLAIN;
import java.nio.charset.StandardCharsets;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.schabi.newpipe.extractor.NewPipe;
import org.schabi.newpipe.extractor.localization.Localization;
import com.rometools.rome.feed.synd.SyndFeed;
import com.rometools.rome.io.SyndFeedInput;
import com.rometools.rome.io.XmlReader;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.util.AsciiString;
import me.kavin.piped.consts.Constants;
import io.activej.inject.Injector;
import me.kavin.piped.utils.DownloaderImpl;
import me.kavin.piped.utils.ResponseHelper;
import me.kavin.piped.utils.SponsorBlockUtils;
import reactor.core.publisher.Flux;
import reactor.netty.ByteBufFlux;
import reactor.netty.DisposableServer;
import reactor.netty.NettyOutbound;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerResponse;
public class Main {
@ -36,245 +12,9 @@ public class Main {
NewPipe.init(new DownloaderImpl(), new Localization("en", "US"));
DisposableServer server = HttpServer.create().port(Constants.PORT).route(routes -> {
Injector.useSpecializer();
routes.get("/webhooks/pubsub", (req, res) -> {
new ServerLauncher().launch(args);
long start = System.nanoTime();
QueryStringDecoder query = new QueryStringDecoder(req.uri());
try {
return writeResponse(res, query.parameters().get("hub.challenge").get(0), TEXT_PLAIN, 200,
"private", start);
} catch (Exception e) {
e.printStackTrace();
return writeResponse(res, ExceptionUtils.getStackTrace(e), 500, "private", start);
}
});
routes.post("/webhooks/pubsub", (req, res) -> {
long start = System.nanoTime();
try {
req.receive().asInputStream().subscribe(in -> {
try {
SyndFeed feed = new SyndFeedInput().build(new XmlReader(in));
feed.getEntries().forEach(entry -> {
System.out.println(entry.getLinks().get(0).getHref());
System.out.println(entry.getAuthors().get(0).getUri());
});
} catch (Exception e) {
e.printStackTrace();
}
});
return writeResponse(res, "ok", TEXT_PLAIN, 200, "private", start);
} catch (Exception e) {
e.printStackTrace();
return writeResponse(res, ExceptionUtils.getStackTrace(e), 500, "private", start);
}
});
routes.get("/sponsors/{videoId}", (req, res) -> {
long start = System.nanoTime();
QueryStringDecoder query = new QueryStringDecoder(req.uri());
try {
return writeResponse(res, SponsorBlockUtils.getSponsors(req.param("videoId"),
query.parameters().get("category").get(0)), 200, "public, max-age=3600", start);
} catch (Exception e) {
e.printStackTrace();
return writeResponse(res, ExceptionUtils.getStackTrace(e), 500, "private", start);
}
});
routes.get("/streams/{videoId}", (req, res) -> {
long start = System.nanoTime();
try {
// The stream links are valid for 6 hours.
return writeResponse(res, ResponseHelper.streamsResponse(req.param("videoId")), 200,
"public, s-maxage=21540", start);
} catch (Exception e) {
e.printStackTrace();
return writeResponse(res, ExceptionUtils.getStackTrace(e), 500, "private", start);
}
});
routes.get("/channels/{channelId}", (req, res) -> {
long start = System.nanoTime();
try {
return writeResponse(res, ResponseHelper.channelResponse(req.param("channelId")), 200,
"public, max-age=600", start);
} catch (Exception e) {
e.printStackTrace();
return writeResponse(res, ExceptionUtils.getStackTrace(e), 500, "private", start);
}
});
routes.get("/nextpage/channels/{channelId}", (req, res) -> {
long start = System.nanoTime();
QueryStringDecoder query = new QueryStringDecoder(req.uri());
try {
return writeResponse(res, ResponseHelper.channelPageResponse(req.param("channelId"),
query.parameters().get("url").get(0)), 200, "public, max-age=3600", start);
} catch (Exception e) {
e.printStackTrace();
return writeResponse(res, ExceptionUtils.getStackTrace(e), 500, "private", start);
}
});
routes.get("/playlists/{playlistId}", (req, res) -> {
long start = System.nanoTime();
try {
return writeResponse(res, ResponseHelper.playlistResponse(req.param("playlistId")), 200,
"public, max-age=600", start);
} catch (Exception e) {
e.printStackTrace();
return writeResponse(res, ExceptionUtils.getStackTrace(e), 500, "private", start);
}
});
routes.get("/nextpage/playlists/{playlistId}", (req, res) -> {
long start = System.nanoTime();
QueryStringDecoder query = new QueryStringDecoder(req.uri());
try {
return writeResponse(res, ResponseHelper.playlistPageResponse(req.param("playlistId"),
query.parameters().get("url").get(0)), 200, "public, max-age=3600", start);
} catch (Exception e) {
e.printStackTrace();
return writeResponse(res, ExceptionUtils.getStackTrace(e), 500, "private", start);
}
});
routes.get("/suggestions", (req, res) -> {
long start = System.nanoTime();
QueryStringDecoder query = new QueryStringDecoder(req.uri());
try {
return writeResponse(res,
ResponseHelper.suggestionsResponse(query.parameters().get("query").get(0)), 200,
"public, max-age=600", start);
} catch (Exception e) {
e.printStackTrace();
return writeResponse(res, ExceptionUtils.getStackTrace(e), 500, "private", start);
}
});
routes.get("/search", (req, res) -> {
long start = System.nanoTime();
QueryStringDecoder query = new QueryStringDecoder(req.uri());
try {
return writeResponse(res, ResponseHelper.searchResponse(query.parameters().get("q").get(0)), 200,
"public, max-age=600", start);
} catch (Exception e) {
e.printStackTrace();
return writeResponse(res, ExceptionUtils.getStackTrace(e), 500, "private", start);
}
});
routes.get("/nextpage/search", (req, res) -> {
long start = System.nanoTime();
QueryStringDecoder query = new QueryStringDecoder(req.uri());
try {
return writeResponse(res,
ResponseHelper.searchPageResponse(query.parameters().get("q").get(0),
query.parameters().get("url").get(0), query.parameters().get("id").get(0)),
200, "public, max-age=3600", start);
} catch (Exception e) {
e.printStackTrace();
return writeResponse(res, ExceptionUtils.getStackTrace(e), 500, "private", start);
}
});
routes.get("/trending", (req, res) -> {
long start = System.nanoTime();
try {
return writeResponse(res, ResponseHelper.trendingResponse(), 200, "public, max-age=3600", start);
} catch (Exception e) {
e.printStackTrace();
return writeResponse(res, ExceptionUtils.getStackTrace(e), 500, "private", start);
}
});
}).compress(true).bindNow();
server.onDispose().block();
}
public static NettyOutbound writeResponse(HttpServerResponse res, String resp, int code, String cache, long time) {
return writeResponse(res, resp, APPLICATION_JSON, code, cache, time);
}
public static NettyOutbound writeResponse(HttpServerResponse res, String resp, AsciiString mimeType, int code,
String cache, long time) {
return writeResponse(res, resp, mimeType.toString(), code, cache, time);
}
public static NettyOutbound writeResponse(HttpServerResponse res, String resp, String mimeType, int code,
String cache, long time) {
return writeResponse(res, resp.getBytes(StandardCharsets.UTF_8), mimeType, code, cache, time);
}
public static NettyOutbound writeResponse(HttpServerResponse res, byte[] resp, int code, String cache, long time) {
return writeResponse(res, resp, APPLICATION_JSON, code, cache, time);
}
public static NettyOutbound writeResponse(HttpServerResponse res, byte[] resp, AsciiString mimeType, int code,
String cache, long time) {
return writeResponse(res, resp, mimeType.toString(), code, cache, time);
}
public static NettyOutbound writeResponse(HttpServerResponse res, byte[] resp, String mimeType, int code,
String cache, long time) {
return res.status(code).addHeader(CONTENT_TYPE, mimeType).addHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.addHeader(CACHE_CONTROL, cache)
.addHeader("Server-Timing", "app;dur=" + (System.nanoTime() - time) / 1000000.0)
.sendByteArray(Flux.just(resp));
}
public static NettyOutbound writeResponse(HttpServerResponse res, Flux<String> resp, int code, String cache) {
return writeResponse(res, resp, APPLICATION_JSON, code, cache);
}
public static NettyOutbound writeResponse(HttpServerResponse res, Flux<String> resp, AsciiString mimeType, int code,
String cache) {
return writeResponse(res, resp, mimeType.toString(), code, cache);
}
public static NettyOutbound writeResponse(HttpServerResponse res, Flux<String> resp, String mimeType, int code,
String cache) {
return res.status(code).addHeader(CONTENT_TYPE, mimeType).addHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.addHeader(CACHE_CONTROL, cache)
.send(ByteBufFlux.fromString(resp, java.nio.charset.StandardCharsets.UTF_8, ByteBufAllocator.DEFAULT));
}
}

View File

@ -0,0 +1,169 @@
package me.kavin.piped;
import static io.activej.config.converter.ConfigConverters.ofInetSocketAddress;
import static io.activej.http.HttpHeaders.CACHE_CONTROL;
import static io.activej.http.HttpHeaders.CONTENT_TYPE;
import java.io.ByteArrayInputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.NotNull;
import org.xml.sax.InputSource;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.rometools.rome.feed.synd.SyndFeed;
import com.rometools.rome.io.SyndFeedInput;
import io.activej.config.Config;
import io.activej.http.AsyncServlet;
import io.activej.http.HttpMethod;
import io.activej.http.HttpResponse;
import io.activej.http.RoutingServlet;
import io.activej.inject.annotation.Provides;
import io.activej.inject.module.AbstractModule;
import io.activej.inject.module.Module;
import io.activej.launchers.http.MultithreadedHttpServerLauncher;
import me.kavin.piped.consts.Constants;
import me.kavin.piped.utils.CustomServletDecorator;
import me.kavin.piped.utils.ResponseHelper;
import me.kavin.piped.utils.SponsorBlockUtils;
import me.kavin.piped.utils.resp.ErrorResponse;
public class ServerLauncher extends MultithreadedHttpServerLauncher {
@Provides
AsyncServlet mainServlet() {
RoutingServlet router = RoutingServlet.create().map(HttpMethod.GET, "/webhooks/pubsub", request -> {
return HttpResponse.ok200().withPlainText(request.getQueryParameter("hub.challenge"));
}).map(HttpMethod.POST, "/webhooks/pubsub", request -> {
try {
SyndFeed feed = new SyndFeedInput()
.build(new InputSource(new ByteArrayInputStream(request.loadBody().getResult().asArray())));
feed.getEntries().forEach(entry -> {
System.out.println(entry.getLinks().get(0).getHref());
System.out.println(entry.getAuthors().get(0).getUri());
});
return HttpResponse.ofCode(204);
} catch (Exception e) {
return getErrorResponse(e);
}
}).map("/sponsors/:videoId", request -> {
try {
return getJsonResponse(SponsorBlockUtils
.getSponsors(request.getPathParameter("videoId"), request.getQueryParameter("category"))
.getBytes(StandardCharsets.UTF_8), "public, max-age=3600");
} catch (Exception e) {
return getErrorResponse(e);
}
}).map("/streams/:videoId", request -> {
try {
return getJsonResponse(ResponseHelper.streamsResponse(request.getPathParameter("videoId")),
"public, s-maxage=21540");
} catch (Exception e) {
return getErrorResponse(e);
}
}).map("/channels/:channelId", request -> {
try {
return getJsonResponse(ResponseHelper.channelResponse(request.getPathParameter("channelId")),
"public, s-maxage=600");
} catch (Exception e) {
return getErrorResponse(e);
}
}).map("/nextpage/channels/:channelId", request -> {
try {
return getJsonResponse(
ResponseHelper.channelPageResponse(request.getPathParameter("channelId"),
request.getQueryParameter("url"), request.getQueryParameter("id")),
"public, s-maxage=3600");
} catch (Exception e) {
return getErrorResponse(e);
}
}).map("/playlists/:playlistId", request -> {
try {
return getJsonResponse(ResponseHelper.playlistResponse(request.getPathParameter("playlistId")),
"public, s-maxage=600");
} catch (Exception e) {
return getErrorResponse(e);
}
}).map("/nextpage/playlists/:playlistId", request -> {
try {
return getJsonResponse(ResponseHelper.playlistPageResponse(request.getPathParameter("playlistId"),
request.getQueryParameter("url")), "public, s-maxage=3600");
} catch (Exception e) {
return getErrorResponse(e);
}
}).map("/suggestions", request -> {
try {
return getJsonResponse(ResponseHelper.suggestionsResponse(request.getQueryParameter("query")),
"public, s-maxage=600");
} catch (Exception e) {
return getErrorResponse(e);
}
}).map("/search", request -> {
try {
return getJsonResponse(ResponseHelper.searchResponse(request.getQueryParameter("q")),
"public, s-maxage=600");
} catch (Exception e) {
return getErrorResponse(e);
}
}).map("/nextpage/search", request -> {
try {
return getJsonResponse(
ResponseHelper.searchPageResponse(request.getQueryParameter("q"),
request.getQueryParameter("url"), request.getQueryParameter("id")),
"public, s-maxage=3600");
} catch (Exception e) {
return getErrorResponse(e);
}
}).map("/trending", request -> {
try {
return getJsonResponse(ResponseHelper.trendingResponse(), "public, s-maxage=3600");
} catch (Exception e) {
return getErrorResponse(e);
}
});
return new CustomServletDecorator(router);
}
@Override
protected Module getOverrideModule() {
return new AbstractModule() {
@Provides
Config config() {
return Config.create()
.with("http.listenAddresses",
Config.ofValue(ofInetSocketAddress(), new InetSocketAddress(PORT)))
.with("workers", Constants.HTTP_WORKERS);
}
};
}
private @NotNull HttpResponse getJsonResponse(byte[] body, String cache) {
return getJsonResponse(200, body, cache);
}
private @NotNull HttpResponse getJsonResponse(int code, byte[] body, String cache) {
return HttpResponse.ofCode(code).withBody(body).withHeader(CONTENT_TYPE, "application/json")
.withHeader(CACHE_CONTROL, cache);
}
private @NotNull HttpResponse getErrorResponse(Exception e) {
e.printStackTrace();
try {
return getJsonResponse(500,
Constants.mapper.writeValueAsBytes(new ErrorResponse(ExceptionUtils.getStackTrace(e))), "private");
} catch (JsonProcessingException ex) {
return HttpResponse.ofCode(500);
}
}
}

View File

@ -19,6 +19,7 @@ public class Constants {
public static final String USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0";
public static final int PORT;
public static final String HTTP_WORKERS;
public static final String PROXY_PART;
@ -42,7 +43,8 @@ public class Constants {
YOUTUBE_SERVICE = NewPipe.getService(0);
prop.load(new FileReader("config.properties"));
PORT = Integer.parseInt(prop.getProperty("PORT"));
PORT = Integer.parseInt(prop.getProperty("PORT", "8080"));
HTTP_WORKERS = prop.getProperty("HTTP_WORKERS", "128");
PROXY_PART = prop.getProperty("PROXY_PART");
CAPTCHA_BASE_URL = prop.getProperty("CAPTCHA_BASE_URL");
CAPTCHA_API_KEY = prop.getProperty("CAPTCHA_API_KEY");

View File

@ -0,0 +1,36 @@
package me.kavin.piped.utils;
import static io.activej.http.HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN;
import org.jetbrains.annotations.NotNull;
import io.activej.http.AsyncServlet;
import io.activej.http.HttpHeader;
import io.activej.http.HttpHeaderValue;
import io.activej.http.HttpHeaders;
import io.activej.http.HttpRequest;
import io.activej.http.HttpResponse;
import io.activej.promise.Promisable;
public class CustomServletDecorator implements AsyncServlet {
private static final HttpHeader HEADER = HttpHeaders.of("Server-Timing");
private final AsyncServlet servlet;
public CustomServletDecorator(AsyncServlet servlet) {
this.servlet = servlet;
}
@Override
public @NotNull Promisable<HttpResponse> serve(@NotNull HttpRequest request) {
long before = System.nanoTime();
return servlet.serve(request).promise().map(response -> {
HttpHeaderValue headerValue = HttpHeaderValue.of("app;dur=" + (System.nanoTime() - before) / 1000000.0);
return response.withHeader(HEADER, headerValue).withHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
});
}
}

View File

@ -74,7 +74,7 @@ public class ResponseHelper {
final List<Subtitle> subtitles = new ObjectArrayList<>();
final StreamInfo info = futureStream.get();
final StreamInfo info = futureStream.get(10, TimeUnit.SECONDS);
// System.out.println(Constants.mapper.writeValueAsString(info.getStreamSegments()));
info.getSubtitles().forEach(subtitle -> subtitles
@ -83,7 +83,7 @@ public class ResponseHelper {
final List<PipedStream> videoStreams = new ObjectArrayList<>();
final List<PipedStream> audioStreams = new ObjectArrayList<>();
final String lbryURL = futureLBRY.get();
final String lbryURL = futureLBRY.get(10, TimeUnit.SECONDS);
if (lbryURL != null)
videoStreams.add(new PipedStream(lbryURL, "MP4", "LBRY", "video/mp4", false));
@ -169,10 +169,15 @@ public class ResponseHelper {
item.getTextualUploadDate(), item.getDuration(), item.getViewCount()));
});
String nextpage = info.hasNextPage() ? info.getNextPage().getUrl() : null;
String nextpage = null, id = null;
if (info.hasNextPage()) {
Page page = info.getNextPage();
nextpage = page.getUrl();
id = info.getNextPage().getId();
}
final Channel channel = new Channel(info.getId(), info.getName(), rewriteURL(info.getAvatarUrl()),
rewriteURL(info.getBannerUrl()), info.getDescription(), nextpage, relatedStreams);
rewriteURL(info.getBannerUrl()), info.getDescription(), nextpage, id, relatedStreams);
IPFS.publishData(channel);
@ -180,11 +185,11 @@ public class ResponseHelper {
}
public static final byte[] channelPageResponse(String channelId, String url)
public static final byte[] channelPageResponse(String channelId, String url, String id)
throws IOException, ExtractionException, InterruptedException {
InfoItemsPage<StreamInfoItem> page = ChannelInfo.getMoreItems(Constants.YOUTUBE_SERVICE,
"https://youtube.com/channel/" + channelId, new Page(url));
"https://youtube.com/channel/" + channelId, new Page(url, id));
final List<StreamItem> relatedStreams = new ObjectArrayList<>();

View File

@ -26,8 +26,8 @@ public class SponsorBlockUtils {
URI uri = URI.create("https://sponsor.ajay.app/api/skipSegments/" + URLUtils.silentEncode(hash.substring(0, 4))
+ "?categories=" + URLUtils.silentEncode(categories));
JsonArray jArray = JsonParser.array()
.from(Constants.h2client.send(HttpRequest.newBuilder(uri).build(), BodyHandlers.ofString()).body());
JsonArray jArray = JsonParser.array().from(
Constants.h2client.send(HttpRequest.newBuilder(uri).build(), BodyHandlers.ofInputStream()).body());
jArray.removeIf(jObject -> !((JsonObject) jObject).getString("videoID").equalsIgnoreCase(id));

View File

@ -4,17 +4,18 @@ import java.util.List;
public class Channel {
public String id, name, avatarUrl, bannerUrl, description, nextpage;
public String id, name, avatarUrl, bannerUrl, description, nextpage, nextid;
public List<StreamItem> relatedStreams;
public Channel(String id, String name, String avatarUrl, String bannerUrl, String description, String nextpage,
List<StreamItem> relatedStreams) {
String nextid, List<StreamItem> relatedStreams) {
this.id = id;
this.name = name;
this.avatarUrl = avatarUrl;
this.bannerUrl = bannerUrl;
this.description = description;
this.nextpage = nextpage;
this.nextid = nextid;
this.relatedStreams = relatedStreams;
}
}

View File

@ -0,0 +1,10 @@
package me.kavin.piped.utils.resp;
public class ErrorResponse {
public final String error;
public ErrorResponse(String error) {
this.error = error;
}
}