mirror of
https://github.com/TeamPiped/Piped-Backend.git
synced 2024-08-14 23:51:41 +00:00
Major multi-threading and pubsub improvements.
This commit is contained in:
parent
1c39359279
commit
c45b723135
8 changed files with 264 additions and 131 deletions
|
@ -1,5 +1,7 @@
|
|||
package me.kavin.piped;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -14,6 +16,7 @@ import io.activej.inject.Injector;
|
|||
import me.kavin.piped.utils.DatabaseHelper;
|
||||
import me.kavin.piped.utils.DatabaseSessionFactory;
|
||||
import me.kavin.piped.utils.DownloaderImpl;
|
||||
import me.kavin.piped.utils.Multithreading;
|
||||
import me.kavin.piped.utils.ResponseHelper;
|
||||
|
||||
public class Main {
|
||||
|
@ -24,18 +27,24 @@ public class Main {
|
|||
|
||||
Injector.useSpecializer();
|
||||
|
||||
new Thread(() -> {
|
||||
DatabaseSessionFactory.createSession().close();
|
||||
}).start();
|
||||
|
||||
new Timer().scheduleAtFixedRate(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Session s = DatabaseSessionFactory.createSession();
|
||||
|
||||
for (String channelId : DatabaseHelper.getGlobalSubscribedChannelIds(s))
|
||||
ResponseHelper.subscribePubSub(channelId);
|
||||
List<String> channels = DatabaseHelper.getGlobalSubscribedChannelIds(s);
|
||||
|
||||
for (String channelId : channels)
|
||||
Multithreading.runAsyncLimitedPubSub(() -> {
|
||||
Session sess = DatabaseSessionFactory.createSession();
|
||||
try {
|
||||
ResponseHelper.subscribePubSub(channelId, sess);
|
||||
} catch (IOException | InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
sess.close();
|
||||
});
|
||||
|
||||
s.close();
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -12,9 +12,9 @@ import java.net.InetSocketAddress;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.hibernate.Session;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.schabi.newpipe.extractor.exceptions.AgeRestrictedContentException;
|
||||
import org.schabi.newpipe.extractor.exceptions.ContentNotAvailableException;
|
||||
|
@ -35,6 +35,7 @@ import io.activej.inject.module.Module;
|
|||
import io.activej.launchers.http.MultithreadedHttpServerLauncher;
|
||||
import me.kavin.piped.consts.Constants;
|
||||
import me.kavin.piped.utils.CustomServletDecorator;
|
||||
import me.kavin.piped.utils.DatabaseSessionFactory;
|
||||
import me.kavin.piped.utils.Multithreading;
|
||||
import me.kavin.piped.utils.ResponseHelper;
|
||||
import me.kavin.piped.utils.SponsorBlockUtils;
|
||||
|
@ -46,7 +47,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
|
|||
|
||||
@Provides
|
||||
Executor executor() {
|
||||
return Executors.newCachedThreadPool();
|
||||
return Multithreading.getCachedExecutor();
|
||||
}
|
||||
|
||||
@Provides
|
||||
|
@ -63,11 +64,13 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
|
|||
new InputSource(new ByteArrayInputStream(request.loadBody().getResult().asArray())));
|
||||
|
||||
Multithreading.runAsync(() -> {
|
||||
Session s = DatabaseSessionFactory.createSession();
|
||||
feed.getEntries().forEach(entry -> {
|
||||
System.out.println(entry.getLinks().get(0).getHref());
|
||||
ResponseHelper.handleNewVideo(entry.getLinks().get(0).getHref(),
|
||||
entry.getPublishedDate().getTime());
|
||||
entry.getPublishedDate().getTime(), null, s);
|
||||
});
|
||||
s.close();
|
||||
});
|
||||
|
||||
return HttpResponse.ofCode(204);
|
||||
|
|
|
@ -12,6 +12,7 @@ import org.hibernate.Session;
|
|||
|
||||
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
|
||||
import me.kavin.piped.utils.obj.db.Channel;
|
||||
import me.kavin.piped.utils.obj.db.PubSub;
|
||||
import me.kavin.piped.utils.obj.db.User;
|
||||
import me.kavin.piped.utils.obj.db.Video;
|
||||
|
||||
|
@ -59,7 +60,7 @@ public class DatabaseHelper {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
List<String> subscriptions = new ObjectArrayList<>(
|
||||
new LinkedHashSet<>(s.createNativeQuery("Select channel from users_subscribed").getResultList()));
|
||||
new LinkedHashSet<>(s.createNativeQuery("select channel from users_subscribed").getResultList()));
|
||||
|
||||
return subscriptions;
|
||||
}
|
||||
|
@ -72,4 +73,13 @@ public class DatabaseHelper {
|
|||
|
||||
return s.createQuery(cr).uniqueResult();
|
||||
}
|
||||
|
||||
public static final PubSub getPubSubFromId(Session s, String id) {
|
||||
CriteriaBuilder cb = s.getCriteriaBuilder();
|
||||
CriteriaQuery<PubSub> cr = cb.createQuery(PubSub.class);
|
||||
Root<PubSub> root = cr.from(PubSub.class);
|
||||
cr.select(root).where(root.get("id").in(id));
|
||||
|
||||
return s.createQuery(cr).uniqueResult();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import org.hibernate.cfg.Configuration;
|
|||
|
||||
import me.kavin.piped.consts.Constants;
|
||||
import me.kavin.piped.utils.obj.db.Channel;
|
||||
import me.kavin.piped.utils.obj.db.PubSub;
|
||||
import me.kavin.piped.utils.obj.db.User;
|
||||
import me.kavin.piped.utils.obj.db.Video;
|
||||
|
||||
|
@ -22,7 +23,7 @@ public class DatabaseSessionFactory {
|
|||
configuration.configure();
|
||||
|
||||
sessionFactory = configuration.addAnnotatedClass(User.class).addAnnotatedClass(Video.class)
|
||||
.addAnnotatedClass(Channel.class).buildSessionFactory();
|
||||
.addAnnotatedClass(Channel.class).addAnnotatedClass(PubSub.class).buildSessionFactory();
|
||||
}
|
||||
|
||||
public static final Session createSession() {
|
||||
|
|
|
@ -6,8 +6,24 @@ import java.util.concurrent.Executors;
|
|||
public class Multithreading {
|
||||
|
||||
private static final ExecutorService es = Executors.newCachedThreadPool();
|
||||
private static final ExecutorService esLimited = Executors
|
||||
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
|
||||
private static final ExecutorService esLimitedPubSub = Executors
|
||||
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
|
||||
|
||||
public static void runAsync(final Runnable runnable) {
|
||||
es.submit(runnable);
|
||||
}
|
||||
|
||||
public static void runAsyncLimited(final Runnable runnable) {
|
||||
esLimited.submit(runnable);
|
||||
}
|
||||
|
||||
public static void runAsyncLimitedPubSub(final Runnable runnable) {
|
||||
esLimited.submit(runnable);
|
||||
}
|
||||
|
||||
public static ExecutorService getCachedExecutor() {
|
||||
return es;
|
||||
}
|
||||
}
|
|
@ -70,6 +70,7 @@ import me.kavin.piped.utils.obj.StreamItem;
|
|||
import me.kavin.piped.utils.obj.Streams;
|
||||
import me.kavin.piped.utils.obj.StreamsPage;
|
||||
import me.kavin.piped.utils.obj.Subtitle;
|
||||
import me.kavin.piped.utils.obj.db.PubSub;
|
||||
import me.kavin.piped.utils.obj.db.User;
|
||||
import me.kavin.piped.utils.obj.db.Video;
|
||||
import me.kavin.piped.utils.obj.search.SearchChannel;
|
||||
|
@ -527,7 +528,8 @@ public class ResponseHelper {
|
|||
User newuser = new User(user, argon2PasswordEncoder.encode(pass), Collections.emptyList());
|
||||
|
||||
s.save(newuser);
|
||||
s.beginTransaction().commit();
|
||||
s.getTransaction().begin();
|
||||
s.getTransaction().commit();
|
||||
|
||||
s.close();
|
||||
|
||||
|
@ -568,15 +570,15 @@ public class ResponseHelper {
|
|||
|
||||
if (user != null) {
|
||||
if (!user.getSubscribed().contains(channelId)) {
|
||||
user.getSubscribed().add(channelId);
|
||||
s.update(user);
|
||||
s.beginTransaction().commit();
|
||||
}
|
||||
|
||||
s.getTransaction().begin();
|
||||
s.createNativeQuery("insert into users_subscribed (subscriber, channel) values (?,?)")
|
||||
.setParameter(1, user.getId()).setParameter(2, channelId).executeUpdate();
|
||||
s.getTransaction().commit();
|
||||
s.close();
|
||||
|
||||
Multithreading.runAsync(() -> {
|
||||
Session sess = DatabaseSessionFactory.createSession();
|
||||
|
||||
me.kavin.piped.utils.obj.db.Channel channel = DatabaseHelper.getChannelFromId(sess, channelId);
|
||||
|
||||
if (channel == null) {
|
||||
|
@ -588,13 +590,15 @@ public class ResponseHelper {
|
|||
ExceptionUtils.rethrow(e);
|
||||
}
|
||||
|
||||
channel = new me.kavin.piped.utils.obj.db.Channel(channelId, info.getName(), info.getAvatarUrl(),
|
||||
false);
|
||||
channel = new me.kavin.piped.utils.obj.db.Channel(channelId, info.getName(),
|
||||
info.getAvatarUrl(), false);
|
||||
sess.save(channel);
|
||||
sess.beginTransaction().commit();
|
||||
|
||||
try {
|
||||
subscribePubSub(channelId);
|
||||
Session sessSub = DatabaseSessionFactory.createSession();
|
||||
subscribePubSub(channelId, sessSub);
|
||||
sessSub.close();
|
||||
} catch (IOException | InterruptedException e) {
|
||||
ExceptionUtils.rethrow(e);
|
||||
}
|
||||
|
@ -604,12 +608,13 @@ public class ResponseHelper {
|
|||
? item.getUploadDate().offsetDateTime().toInstant().toEpochMilli()
|
||||
: System.currentTimeMillis();
|
||||
if ((System.currentTimeMillis() - time) < TimeUnit.DAYS.toMillis(10))
|
||||
handleNewVideo(item.getUrl(), time);
|
||||
handleNewVideo(item.getUrl(), time, channel, s);
|
||||
}
|
||||
}
|
||||
|
||||
sess.close();
|
||||
});
|
||||
}
|
||||
|
||||
return Constants.mapper.writeValueAsBytes(new AcceptedResponse());
|
||||
}
|
||||
|
@ -625,13 +630,13 @@ public class ResponseHelper {
|
|||
|
||||
Session s = DatabaseSessionFactory.createSession();
|
||||
|
||||
User user = DatabaseHelper.getUserFromSessionWithSubscribed(s, session);
|
||||
User user = DatabaseHelper.getUserFromSession(s, session);
|
||||
|
||||
if (user != null) {
|
||||
if (user.getSubscribed().remove(channelId)) {
|
||||
s.update(user);
|
||||
s.beginTransaction().commit();
|
||||
}
|
||||
s.getTransaction().begin();
|
||||
s.createNativeQuery("delete from users_subscribed where subscriber = :id and channel = :channel")
|
||||
.setParameter("id", user.getId()).setParameter("channel", channelId).executeUpdate();
|
||||
s.getTransaction().commit();
|
||||
s.close();
|
||||
return Constants.mapper.writeValueAsBytes(new AcceptedResponse());
|
||||
}
|
||||
|
@ -672,10 +677,13 @@ public class ResponseHelper {
|
|||
User user = DatabaseHelper.getUserFromSessionWithSubscribed(s, session);
|
||||
|
||||
if (user != null) {
|
||||
List<Video> videos = DatabaseHelper.getVideosFromChannelIds(s, user.getSubscribed());
|
||||
|
||||
List<FeedItem> feedItems = new ObjectArrayList<>();
|
||||
|
||||
if (user.getSubscribed() != null && !user.getSubscribed().isEmpty()) {
|
||||
|
||||
List<Video> videos = DatabaseHelper.getVideosFromChannelIds(s, user.getSubscribed());
|
||||
|
||||
videos.forEach(video -> {
|
||||
feedItems.add(new FeedItem(video.getId(), video.getTitle(), rewriteURL(video.getThumbnail()),
|
||||
video.getChannel().getUploaderId(), video.getChannel().getUploader(),
|
||||
|
@ -684,6 +692,7 @@ public class ResponseHelper {
|
|||
});
|
||||
|
||||
Collections.sort(feedItems, (a, b) -> (int) (b.uploaded - a.uploaded));
|
||||
}
|
||||
|
||||
s.close();
|
||||
|
||||
|
@ -705,48 +714,73 @@ public class ResponseHelper {
|
|||
|
||||
if (user != null) {
|
||||
|
||||
for (String channelId : channelIds) {
|
||||
|
||||
if (!user.getSubscribed().contains(channelId)) {
|
||||
Multithreading.runAsync(() -> {
|
||||
for (String channelId : channelIds)
|
||||
if (!user.getSubscribed().contains(channelId))
|
||||
user.getSubscribed().add(channelId);
|
||||
|
||||
if (channelIds.length > 0) {
|
||||
s.update(user);
|
||||
s.beginTransaction().commit();
|
||||
}
|
||||
|
||||
me.kavin.piped.utils.obj.db.Channel channel = DatabaseHelper.getChannelFromId(s, channelId);
|
||||
s.close();
|
||||
});
|
||||
|
||||
for (String channelId : channelIds) {
|
||||
|
||||
Multithreading.runAsyncLimited(() -> {
|
||||
try {
|
||||
|
||||
Session sess = DatabaseSessionFactory.createSession();
|
||||
|
||||
me.kavin.piped.utils.obj.db.Channel channel = DatabaseHelper.getChannelFromId(sess, channelId);
|
||||
|
||||
if (channel == null) {
|
||||
ChannelInfo info = null;
|
||||
|
||||
try {
|
||||
info = ChannelInfo.getInfo("https://youtube.com/channel/" + channelId);
|
||||
} catch (IOException | ExtractionException e) {
|
||||
ExceptionUtils.rethrow(e);
|
||||
} catch (Exception e) {
|
||||
return;
|
||||
}
|
||||
|
||||
channel = new me.kavin.piped.utils.obj.db.Channel(channelId, info.getName(), info.getAvatarUrl(),
|
||||
false);
|
||||
s.save(channel);
|
||||
channel = new me.kavin.piped.utils.obj.db.Channel(channelId, info.getName(),
|
||||
info.getAvatarUrl(), false);
|
||||
sess.save(channel);
|
||||
|
||||
Multithreading.runAsync(() -> {
|
||||
try {
|
||||
subscribePubSub(channelId);
|
||||
Session sessSub = DatabaseSessionFactory.createSession();
|
||||
subscribePubSub(channelId, sessSub);
|
||||
sessSub.close();
|
||||
} catch (IOException | InterruptedException e) {
|
||||
ExceptionUtils.rethrow(e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
|
||||
for (StreamInfoItem item : info.getRelatedItems()) {
|
||||
long time = item.getUploadDate() != null
|
||||
? item.getUploadDate().offsetDateTime().toInstant().toEpochMilli()
|
||||
: System.currentTimeMillis();
|
||||
if ((System.currentTimeMillis() - time) < TimeUnit.DAYS.toMillis(10))
|
||||
handleNewVideo(item.getUrl(), time);
|
||||
}
|
||||
handleNewVideo(item.getUrl(), time, channel, sess);
|
||||
}
|
||||
|
||||
if (!sess.getTransaction().isActive())
|
||||
sess.getTransaction().begin();
|
||||
sess.getTransaction().commit();
|
||||
}
|
||||
|
||||
s.beginTransaction().commit();
|
||||
sess.close();
|
||||
|
||||
s.close();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
return Constants.mapper.writeValueAsBytes(new AcceptedResponse());
|
||||
}
|
||||
|
@ -776,19 +810,19 @@ public class ResponseHelper {
|
|||
|
||||
}
|
||||
|
||||
public static void handleNewVideo(String url, long time) {
|
||||
public static void handleNewVideo(String url, long time, me.kavin.piped.utils.obj.db.Channel channel, Session s) {
|
||||
try {
|
||||
handleNewVideo(StreamInfo.getInfo(url), time);
|
||||
handleNewVideo(StreamInfo.getInfo(url), time, channel, s);
|
||||
} catch (IOException | ExtractionException e) {
|
||||
ExceptionUtils.rethrow(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void handleNewVideo(StreamInfo info, long time) {
|
||||
private static void handleNewVideo(StreamInfo info, long time, me.kavin.piped.utils.obj.db.Channel channel,
|
||||
Session s) {
|
||||
|
||||
Session s = DatabaseSessionFactory.createSession();
|
||||
|
||||
me.kavin.piped.utils.obj.db.Channel channel = DatabaseHelper.getChannelFromId(s,
|
||||
if (channel == null)
|
||||
channel = DatabaseHelper.getChannelFromId(s,
|
||||
info.getUploaderUrl().substring("https://www.youtube.com/channel/".length()));
|
||||
|
||||
long infoTime = info.getUploadDate() != null ? info.getUploadDate().offsetDateTime().toInstant().toEpochMilli()
|
||||
|
@ -804,17 +838,17 @@ public class ResponseHelper {
|
|||
|
||||
s.save(video);
|
||||
|
||||
s.beginTransaction().commit();
|
||||
if (!s.getTransaction().isActive())
|
||||
s.getTransaction().begin();
|
||||
s.getTransaction().commit();
|
||||
} else if (video != null) {
|
||||
video.setViews(info.getViewCount());
|
||||
|
||||
s.update(video);
|
||||
|
||||
s.beginTransaction().commit();
|
||||
s.getTransaction().commit();
|
||||
}
|
||||
|
||||
s.close();
|
||||
|
||||
}
|
||||
|
||||
private static void updateViews(String id, long views, long time, boolean addIfNonExistent) {
|
||||
|
@ -829,7 +863,7 @@ public class ResponseHelper {
|
|||
s.update(video);
|
||||
s.beginTransaction().commit();
|
||||
} else if (addIfNonExistent)
|
||||
handleNewVideo("https://www.youtube.com/watch?v=" + id, time);
|
||||
handleNewVideo("https://www.youtube.com/watch?v=" + id, time, null, s);
|
||||
|
||||
s.close();
|
||||
|
||||
|
@ -839,8 +873,11 @@ public class ResponseHelper {
|
|||
});
|
||||
}
|
||||
|
||||
public static void subscribePubSub(String channelId) throws IOException, InterruptedException {
|
||||
public static void subscribePubSub(String channelId, Session s) throws IOException, InterruptedException {
|
||||
|
||||
PubSub pubsub = DatabaseHelper.getPubSubFromId(s, channelId);
|
||||
|
||||
if (pubsub == null || System.currentTimeMillis() - pubsub.getSubbedAt() > TimeUnit.DAYS.toMillis(4)) {
|
||||
System.out.println(String.format("PubSub: Subscribing to %s", channelId));
|
||||
|
||||
String callback = Constants.PUBLIC_URL + "/webhooks/pubsub";
|
||||
|
@ -863,10 +900,23 @@ public class ResponseHelper {
|
|||
formBody.append(name + "=" + URLUtils.silentEncode(value) + "&");
|
||||
});
|
||||
|
||||
builder.method("POST", BodyPublishers.ofString(String.valueOf(formBody.substring(0, formBody.length() - 1))));
|
||||
builder.method("POST",
|
||||
BodyPublishers.ofString(String.valueOf(formBody.substring(0, formBody.length() - 1))));
|
||||
|
||||
Constants.h2client.send(builder.build(), BodyHandlers.ofInputStream());
|
||||
|
||||
if (pubsub == null)
|
||||
pubsub = new PubSub(channelId, System.currentTimeMillis());
|
||||
else
|
||||
pubsub.setSubbedAt(System.currentTimeMillis());
|
||||
|
||||
s.saveOrUpdate(pubsub);
|
||||
|
||||
if (!s.getTransaction().isActive())
|
||||
s.getTransaction().begin();
|
||||
s.getTransaction().commit();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static final String optionalSubstring(String s, int index) {
|
||||
|
|
43
src/main/java/me/kavin/piped/utils/obj/db/PubSub.java
Normal file
43
src/main/java/me/kavin/piped/utils/obj/db/PubSub.java
Normal file
|
@ -0,0 +1,43 @@
|
|||
package me.kavin.piped.utils.obj.db;
|
||||
|
||||
import javax.persistence.Column;
|
||||
import javax.persistence.Entity;
|
||||
import javax.persistence.Id;
|
||||
import javax.persistence.Index;
|
||||
import javax.persistence.Table;
|
||||
|
||||
@Entity
|
||||
@Table(name = "pubsub", indexes = { @Index(columnList = "id", name = "id_idx") })
|
||||
public class PubSub {
|
||||
|
||||
@Id
|
||||
@Column(name = "id", unique = true, length = 24)
|
||||
private String id;
|
||||
|
||||
@Column(name = "subbed_at")
|
||||
private long subbedAt;
|
||||
|
||||
public PubSub() {
|
||||
}
|
||||
|
||||
public PubSub(String id, long subbedAt) {
|
||||
this.id = id;
|
||||
this.subbedAt = subbedAt;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public long getSubbedAt() {
|
||||
return subbedAt;
|
||||
}
|
||||
|
||||
public void setSubbedAt(long subbedAt) {
|
||||
this.subbedAt = subbedAt;
|
||||
}
|
||||
}
|
|
@ -10,5 +10,6 @@
|
|||
<property name="hibernate.format_sql">true</property>
|
||||
|
||||
<property name="hibernate.connection.provider_class">org.hibernate.hikaricp.internal.HikariCPConnectionProvider</property>
|
||||
<property name="hibernate.connection.handling_mode">DELAYED_ACQUISITION_AND_RELEASE_AFTER_STATEMENT</property>
|
||||
</session-factory>
|
||||
</hibernate-configuration>
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue