diff --git a/src/main/java/me/kavin/piped/Main.java b/src/main/java/me/kavin/piped/Main.java index 4dedab6..b18a8df 100644 --- a/src/main/java/me/kavin/piped/Main.java +++ b/src/main/java/me/kavin/piped/Main.java @@ -1,5 +1,7 @@ package me.kavin.piped; +import java.io.IOException; +import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.TimeUnit; @@ -14,6 +16,7 @@ import io.activej.inject.Injector; import me.kavin.piped.utils.DatabaseHelper; import me.kavin.piped.utils.DatabaseSessionFactory; import me.kavin.piped.utils.DownloaderImpl; +import me.kavin.piped.utils.Multithreading; import me.kavin.piped.utils.ResponseHelper; public class Main { @@ -24,18 +27,24 @@ public class Main { Injector.useSpecializer(); - new Thread(() -> { - DatabaseSessionFactory.createSession().close(); - }).start(); - new Timer().scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { Session s = DatabaseSessionFactory.createSession(); - for (String channelId : DatabaseHelper.getGlobalSubscribedChannelIds(s)) - ResponseHelper.subscribePubSub(channelId); + List channels = DatabaseHelper.getGlobalSubscribedChannelIds(s); + + for (String channelId : channels) + Multithreading.runAsyncLimitedPubSub(() -> { + Session sess = DatabaseSessionFactory.createSession(); + try { + ResponseHelper.subscribePubSub(channelId, sess); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + sess.close(); + }); s.close(); } catch (Exception e) { diff --git a/src/main/java/me/kavin/piped/ServerLauncher.java b/src/main/java/me/kavin/piped/ServerLauncher.java index 22e17a8..4336d30 100644 --- a/src/main/java/me/kavin/piped/ServerLauncher.java +++ b/src/main/java/me/kavin/piped/ServerLauncher.java @@ -12,9 +12,9 @@ import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.hibernate.Session; import org.jetbrains.annotations.NotNull; import org.schabi.newpipe.extractor.exceptions.AgeRestrictedContentException; import org.schabi.newpipe.extractor.exceptions.ContentNotAvailableException; @@ -35,6 +35,7 @@ import io.activej.inject.module.Module; import io.activej.launchers.http.MultithreadedHttpServerLauncher; import me.kavin.piped.consts.Constants; import me.kavin.piped.utils.CustomServletDecorator; +import me.kavin.piped.utils.DatabaseSessionFactory; import me.kavin.piped.utils.Multithreading; import me.kavin.piped.utils.ResponseHelper; import me.kavin.piped.utils.SponsorBlockUtils; @@ -46,7 +47,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { @Provides Executor executor() { - return Executors.newCachedThreadPool(); + return Multithreading.getCachedExecutor(); } @Provides @@ -63,11 +64,13 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher { new InputSource(new ByteArrayInputStream(request.loadBody().getResult().asArray()))); Multithreading.runAsync(() -> { + Session s = DatabaseSessionFactory.createSession(); feed.getEntries().forEach(entry -> { System.out.println(entry.getLinks().get(0).getHref()); ResponseHelper.handleNewVideo(entry.getLinks().get(0).getHref(), - entry.getPublishedDate().getTime()); + entry.getPublishedDate().getTime(), null, s); }); + s.close(); }); return HttpResponse.ofCode(204); diff --git a/src/main/java/me/kavin/piped/utils/DatabaseHelper.java b/src/main/java/me/kavin/piped/utils/DatabaseHelper.java index 1b47bee..5872a49 100644 --- a/src/main/java/me/kavin/piped/utils/DatabaseHelper.java +++ b/src/main/java/me/kavin/piped/utils/DatabaseHelper.java @@ -12,6 +12,7 @@ import org.hibernate.Session; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import me.kavin.piped.utils.obj.db.Channel; +import me.kavin.piped.utils.obj.db.PubSub; import me.kavin.piped.utils.obj.db.User; import me.kavin.piped.utils.obj.db.Video; @@ -59,7 +60,7 @@ public class DatabaseHelper { @SuppressWarnings("unchecked") List subscriptions = new ObjectArrayList<>( - new LinkedHashSet<>(s.createNativeQuery("Select channel from users_subscribed").getResultList())); + new LinkedHashSet<>(s.createNativeQuery("select channel from users_subscribed").getResultList())); return subscriptions; } @@ -72,4 +73,13 @@ public class DatabaseHelper { return s.createQuery(cr).uniqueResult(); } + + public static final PubSub getPubSubFromId(Session s, String id) { + CriteriaBuilder cb = s.getCriteriaBuilder(); + CriteriaQuery cr = cb.createQuery(PubSub.class); + Root root = cr.from(PubSub.class); + cr.select(root).where(root.get("id").in(id)); + + return s.createQuery(cr).uniqueResult(); + } } diff --git a/src/main/java/me/kavin/piped/utils/DatabaseSessionFactory.java b/src/main/java/me/kavin/piped/utils/DatabaseSessionFactory.java index 5a8fffd..6603f42 100644 --- a/src/main/java/me/kavin/piped/utils/DatabaseSessionFactory.java +++ b/src/main/java/me/kavin/piped/utils/DatabaseSessionFactory.java @@ -6,6 +6,7 @@ import org.hibernate.cfg.Configuration; import me.kavin.piped.consts.Constants; import me.kavin.piped.utils.obj.db.Channel; +import me.kavin.piped.utils.obj.db.PubSub; import me.kavin.piped.utils.obj.db.User; import me.kavin.piped.utils.obj.db.Video; @@ -22,7 +23,7 @@ public class DatabaseSessionFactory { configuration.configure(); sessionFactory = configuration.addAnnotatedClass(User.class).addAnnotatedClass(Video.class) - .addAnnotatedClass(Channel.class).buildSessionFactory(); + .addAnnotatedClass(Channel.class).addAnnotatedClass(PubSub.class).buildSessionFactory(); } public static final Session createSession() { diff --git a/src/main/java/me/kavin/piped/utils/Multithreading.java b/src/main/java/me/kavin/piped/utils/Multithreading.java index 90ff150..7ede7a2 100644 --- a/src/main/java/me/kavin/piped/utils/Multithreading.java +++ b/src/main/java/me/kavin/piped/utils/Multithreading.java @@ -6,8 +6,24 @@ import java.util.concurrent.Executors; public class Multithreading { private static final ExecutorService es = Executors.newCachedThreadPool(); + private static final ExecutorService esLimited = Executors + .newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + private static final ExecutorService esLimitedPubSub = Executors + .newFixedThreadPool(Runtime.getRuntime().availableProcessors()); public static void runAsync(final Runnable runnable) { es.submit(runnable); } + + public static void runAsyncLimited(final Runnable runnable) { + esLimited.submit(runnable); + } + + public static void runAsyncLimitedPubSub(final Runnable runnable) { + esLimited.submit(runnable); + } + + public static ExecutorService getCachedExecutor() { + return es; + } } \ No newline at end of file diff --git a/src/main/java/me/kavin/piped/utils/ResponseHelper.java b/src/main/java/me/kavin/piped/utils/ResponseHelper.java index 53076ef..d634796 100644 --- a/src/main/java/me/kavin/piped/utils/ResponseHelper.java +++ b/src/main/java/me/kavin/piped/utils/ResponseHelper.java @@ -70,6 +70,7 @@ import me.kavin.piped.utils.obj.StreamItem; import me.kavin.piped.utils.obj.Streams; import me.kavin.piped.utils.obj.StreamsPage; import me.kavin.piped.utils.obj.Subtitle; +import me.kavin.piped.utils.obj.db.PubSub; import me.kavin.piped.utils.obj.db.User; import me.kavin.piped.utils.obj.db.Video; import me.kavin.piped.utils.obj.search.SearchChannel; @@ -527,7 +528,8 @@ public class ResponseHelper { User newuser = new User(user, argon2PasswordEncoder.encode(pass), Collections.emptyList()); s.save(newuser); - s.beginTransaction().commit(); + s.getTransaction().begin(); + s.getTransaction().commit(); s.close(); @@ -568,48 +570,51 @@ public class ResponseHelper { if (user != null) { if (!user.getSubscribed().contains(channelId)) { - user.getSubscribed().add(channelId); - s.update(user); - s.beginTransaction().commit(); + + s.getTransaction().begin(); + s.createNativeQuery("insert into users_subscribed (subscriber, channel) values (?,?)") + .setParameter(1, user.getId()).setParameter(2, channelId).executeUpdate(); + s.getTransaction().commit(); + s.close(); + + Multithreading.runAsync(() -> { + Session sess = DatabaseSessionFactory.createSession(); + me.kavin.piped.utils.obj.db.Channel channel = DatabaseHelper.getChannelFromId(sess, channelId); + + if (channel == null) { + ChannelInfo info = null; + + try { + info = ChannelInfo.getInfo("https://youtube.com/channel/" + channelId); + } catch (IOException | ExtractionException e) { + ExceptionUtils.rethrow(e); + } + + channel = new me.kavin.piped.utils.obj.db.Channel(channelId, info.getName(), + info.getAvatarUrl(), false); + sess.save(channel); + sess.beginTransaction().commit(); + + try { + Session sessSub = DatabaseSessionFactory.createSession(); + subscribePubSub(channelId, sessSub); + sessSub.close(); + } catch (IOException | InterruptedException e) { + ExceptionUtils.rethrow(e); + } + + for (StreamInfoItem item : info.getRelatedItems()) { + long time = item.getUploadDate() != null + ? item.getUploadDate().offsetDateTime().toInstant().toEpochMilli() + : System.currentTimeMillis(); + if ((System.currentTimeMillis() - time) < TimeUnit.DAYS.toMillis(10)) + handleNewVideo(item.getUrl(), time, channel, s); + } + } + + sess.close(); + }); } - s.close(); - - Multithreading.runAsync(() -> { - Session sess = DatabaseSessionFactory.createSession(); - - me.kavin.piped.utils.obj.db.Channel channel = DatabaseHelper.getChannelFromId(sess, channelId); - - if (channel == null) { - ChannelInfo info = null; - - try { - info = ChannelInfo.getInfo("https://youtube.com/channel/" + channelId); - } catch (IOException | ExtractionException e) { - ExceptionUtils.rethrow(e); - } - - channel = new me.kavin.piped.utils.obj.db.Channel(channelId, info.getName(), info.getAvatarUrl(), - false); - sess.save(channel); - sess.beginTransaction().commit(); - - try { - subscribePubSub(channelId); - } catch (IOException | InterruptedException e) { - ExceptionUtils.rethrow(e); - } - - for (StreamInfoItem item : info.getRelatedItems()) { - long time = item.getUploadDate() != null - ? item.getUploadDate().offsetDateTime().toInstant().toEpochMilli() - : System.currentTimeMillis(); - if ((System.currentTimeMillis() - time) < TimeUnit.DAYS.toMillis(10)) - handleNewVideo(item.getUrl(), time); - } - } - - sess.close(); - }); return Constants.mapper.writeValueAsBytes(new AcceptedResponse()); } @@ -625,13 +630,13 @@ public class ResponseHelper { Session s = DatabaseSessionFactory.createSession(); - User user = DatabaseHelper.getUserFromSessionWithSubscribed(s, session); + User user = DatabaseHelper.getUserFromSession(s, session); if (user != null) { - if (user.getSubscribed().remove(channelId)) { - s.update(user); - s.beginTransaction().commit(); - } + s.getTransaction().begin(); + s.createNativeQuery("delete from users_subscribed where subscriber = :id and channel = :channel") + .setParameter("id", user.getId()).setParameter("channel", channelId).executeUpdate(); + s.getTransaction().commit(); s.close(); return Constants.mapper.writeValueAsBytes(new AcceptedResponse()); } @@ -672,18 +677,22 @@ public class ResponseHelper { User user = DatabaseHelper.getUserFromSessionWithSubscribed(s, session); if (user != null) { - List