Significantly speed up import process. (#242)

This commit is contained in:
Kavin 2022-04-11 00:02:25 +01:00 committed by GitHub
parent ce50b7523f
commit 9c1c165fb2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 85 additions and 115 deletions

View file

@ -49,13 +49,7 @@ public class Main {
for (PubSub pubsub : pubSubList) for (PubSub pubsub : pubSubList)
if (pubsub != null) if (pubsub != null)
Multithreading.runAsyncLimitedPubSub(() -> { Multithreading.runAsyncLimitedPubSub(() -> ResponseHelper.subscribePubSub(pubsub.getId()));
try (Session sess = DatabaseSessionFactory.createSession()) {
ResponseHelper.subscribePubSub(pubsub.getId(), sess);
} catch (Exception e) {
e.printStackTrace();
}
});
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();

View file

@ -1,18 +1,21 @@
package me.kavin.piped.utils; package me.kavin.piped.utils;
import me.kavin.piped.utils.obj.db.*; import me.kavin.piped.utils.obj.db.*;
import org.hibernate.FlushMode;
import org.hibernate.Session; import org.hibernate.Session;
import javax.persistence.criteria.CriteriaBuilder; import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery; import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.JoinType; import javax.persistence.criteria.JoinType;
import javax.persistence.criteria.Root; import javax.persistence.criteria.Root;
import java.util.List;
import java.util.UUID; import java.util.UUID;
public class DatabaseHelper { public class DatabaseHelper {
public static User getUserFromSession(String session) { public static User getUserFromSession(String session) {
try (Session s = DatabaseSessionFactory.createSession()) { try (Session s = DatabaseSessionFactory.createSession()) {
s.setHibernateFlushMode(FlushMode.MANUAL);
CriteriaBuilder cb = s.getCriteriaBuilder(); CriteriaBuilder cb = s.getCriteriaBuilder();
CriteriaQuery<User> cr = cb.createQuery(User.class); CriteriaQuery<User> cr = cb.createQuery(User.class);
Root<User> root = cr.from(User.class); Root<User> root = cr.from(User.class);
@ -24,6 +27,7 @@ public class DatabaseHelper {
public static User getUserFromSessionWithSubscribed(String session) { public static User getUserFromSessionWithSubscribed(String session) {
try (Session s = DatabaseSessionFactory.createSession()) { try (Session s = DatabaseSessionFactory.createSession()) {
s.setHibernateFlushMode(FlushMode.MANUAL);
CriteriaBuilder cb = s.getCriteriaBuilder(); CriteriaBuilder cb = s.getCriteriaBuilder();
CriteriaQuery<User> cr = cb.createQuery(User.class); CriteriaQuery<User> cr = cb.createQuery(User.class);
Root<User> root = cr.from(User.class); Root<User> root = cr.from(User.class);
@ -43,6 +47,15 @@ public class DatabaseHelper {
return s.createQuery(cr).uniqueResult(); return s.createQuery(cr).uniqueResult();
} }
public static List<Channel> getChannelsFromIds(Session s, List<String> id) {
CriteriaBuilder cb = s.getCriteriaBuilder();
CriteriaQuery<Channel> cr = cb.createQuery(Channel.class);
Root<Channel> root = cr.from(Channel.class);
cr.select(root).where(root.get("uploader_id").in(id));
return s.createQuery(cr).list();
}
public static Video getVideoFromId(Session s, String id) { public static Video getVideoFromId(Session s, String id) {
CriteriaBuilder cb = s.getCriteriaBuilder(); CriteriaBuilder cb = s.getCriteriaBuilder();
CriteriaQuery<Video> cr = cb.createQuery(Video.class); CriteriaQuery<Video> cr = cb.createQuery(Video.class);

View file

@ -9,7 +9,7 @@ public class Multithreading {
private static final ExecutorService es = Executors.newCachedThreadPool(); private static final ExecutorService es = Executors.newCachedThreadPool();
private static final ExecutorService esLimited = Executors private static final ExecutorService esLimited = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); .newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 8);
private static final ExecutorService esLimitedPubSub = Executors private static final ExecutorService esLimitedPubSub = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); .newFixedThreadPool(Runtime.getRuntime().availableProcessors());

View file

@ -658,7 +658,7 @@ public class ResponseHelper {
return Constants.mapper.writeValueAsBytes(new CompromisedPasswordResponse()); return Constants.mapper.writeValueAsBytes(new CompromisedPasswordResponse());
} }
User newuser = new User(user, argon2PasswordEncoder.encode(pass), Collections.emptyList()); User newuser = new User(user, argon2PasswordEncoder.encode(pass), Set.of());
s.save(newuser); s.save(newuser);
s.getTransaction().begin(); s.getTransaction().begin();
@ -712,44 +712,16 @@ public class ResponseHelper {
if (user != null) { if (user != null) {
if (!user.getSubscribed().contains(channelId)) { if (!user.getSubscribed().contains(channelId)) {
user.getSubscribed().add(channelId);
s.update(user);
s.getTransaction().begin(); s.getTransaction().begin();
s.createNativeQuery("insert into users_subscribed (subscriber, channel) values (?,?)")
.setParameter(1, user.getId()).setParameter(2, channelId).executeUpdate();
s.getTransaction().commit(); s.getTransaction().commit();
Multithreading.runAsync(() -> { Multithreading.runAsync(() -> {
try (Session sess = DatabaseSessionFactory.createSession()) { try (Session s2 = DatabaseSessionFactory.createSession()) {
var channel = DatabaseHelper.getChannelFromId(sess, channelId); var channel = DatabaseHelper.getChannelFromId(s2, channelId);
if (channel == null) { if (channel == null) {
ChannelInfo info = null; Multithreading.runAsync(() -> saveChannel(channelId));
try {
info = ChannelInfo.getInfo("https://youtube.com/channel/" + channelId);
} catch (IOException | ExtractionException e) {
ExceptionUtils.rethrow(e);
}
channel = new me.kavin.piped.utils.obj.db.Channel(channelId, info.getName(),
info.getAvatarUrl(), info.isVerified());
sess.save(channel);
sess.beginTransaction().commit();
Multithreading.runAsync(() -> {
try (Session sessSub = DatabaseSessionFactory.createSession()) {
subscribePubSub(channelId, sessSub);
} catch (Exception e) {
ExceptionHandler.handle(e);
}
});
for (StreamInfoItem item : info.getRelatedItems()) {
long time = item.getUploadDate() != null
? item.getUploadDate().offsetDateTime().toInstant().toEpochMilli()
: System.currentTimeMillis();
if ((System.currentTimeMillis() - time) < TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION))
handleNewVideo(item.getUrl(), time, channel, sess);
}
} }
} }
}); });
@ -913,68 +885,33 @@ public class ResponseHelper {
Multithreading.runAsync(() -> { Multithreading.runAsync(() -> {
try (Session sess = DatabaseSessionFactory.createSession()) { try (Session sess = DatabaseSessionFactory.createSession()) {
if (override) if (override) {
user.setSubscribed(Arrays.asList(channelIds)); user.setSubscribed(Set.of(channelIds));
else } else {
for (String channelId : channelIds) for (String channelId : channelIds)
if (!user.getSubscribed().contains(channelId))
user.getSubscribed().add(channelId); user.getSubscribed().add(channelId);
}
if (channelIds.length > 0) { if (channelIds.length > 0) {
sess.update(user);
sess.beginTransaction().commit();
}
}
});
for (String channelId : channelIds) {
Multithreading.runAsyncLimited(() -> {
try (Session sess = DatabaseSessionFactory.createSession()) {
var channel = DatabaseHelper.getChannelFromId(sess, channelId);
if (channel == null) {
ChannelInfo info;
try {
info = ChannelInfo.getInfo("https://youtube.com/channel/" + channelId);
} catch (Exception e) {
return;
}
channel = new me.kavin.piped.utils.obj.db.Channel(channelId, info.getName(),
info.getAvatarUrl(), info.isVerified());
sess.save(channel);
Multithreading.runAsync(() -> {
try (Session sessSub = DatabaseSessionFactory.createSession()) {
subscribePubSub(channelId, sessSub);
} catch (Exception e) {
ExceptionHandler.handle(e);
}
});
for (StreamInfoItem item : info.getRelatedItems()) {
long time = item.getUploadDate() != null
? item.getUploadDate().offsetDateTime().toInstant().toEpochMilli()
: System.currentTimeMillis();
if ((System.currentTimeMillis() - time) < TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION))
handleNewVideo(item.getUrl(), time, channel, sess);
}
if (!sess.getTransaction().isActive())
sess.getTransaction().begin(); sess.getTransaction().begin();
sess.update(user);
sess.getTransaction().commit(); sess.getTransaction().commit();
} }
} catch (Exception e) {
ExceptionHandler.handle(e);
} }
}); });
Multithreading.runAsync(() -> {
try (Session s = DatabaseSessionFactory.createSession()) {
var channels = DatabaseHelper.getChannelsFromIds(s, Arrays.asList(channelIds));
outer:
for (String channelId : channelIds) {
for (var channel : channels)
if (channel.getUploaderId().equals(channelId))
continue outer;
Multithreading.runAsyncLimited(() -> saveChannel(channelId));
} }
}
});
return Constants.mapper.writeValueAsBytes(new AcceptedResponse()); return Constants.mapper.writeValueAsBytes(new AcceptedResponse());
} }
@ -1308,7 +1245,43 @@ public class ResponseHelper {
} }
} }
public static void subscribePubSub(String channelId, Session s) throws IOException { private static void saveChannel(String channelId) {
try (Session s = DatabaseSessionFactory.createSession()) {
ChannelInfo info = null;
try {
info = ChannelInfo.getInfo("https://youtube.com/channel/" + channelId);
} catch (IOException | ExtractionException e) {
ExceptionUtils.rethrow(e);
}
var channel = new me.kavin.piped.utils.obj.db.Channel(channelId, info.getName(),
info.getAvatarUrl(), info.isVerified());
s.save(channel);
s.beginTransaction().commit();
Multithreading.runAsync(() -> subscribePubSub(channelId));
for (StreamInfoItem item : info.getRelatedItems()) {
long time = item.getUploadDate() != null
? item.getUploadDate().offsetDateTime().toInstant().toEpochMilli()
: System.currentTimeMillis();
if ((System.currentTimeMillis() - time) < TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION))
handleNewVideo(item.getUrl(), time, channel, s);
}
}
}
public static void subscribePubSub(String channelId) {
try (Session s = DatabaseSessionFactory.createSession()) {
subscribePubSub(channelId, s);
} catch (Exception e) {
ExceptionHandler.handle(e);
}
}
private static void subscribePubSub(String channelId, Session s) throws IOException {
PubSub pubsub = DatabaseHelper.getPubSubFromId(s, channelId); PubSub pubsub = DatabaseHelper.getPubSubFromId(s, channelId);

View file

@ -1,24 +1,14 @@
package me.kavin.piped.utils.obj.db; package me.kavin.piped.utils.obj.db;
import javax.persistence.*;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import javax.persistence.CollectionTable;
import javax.persistence.Column;
import javax.persistence.ElementCollection;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Index;
import javax.persistence.JoinColumn;
import javax.persistence.Table;
@Entity @Entity
@Table(name = "users", indexes = { @Index(columnList = "id", name = "users_id_idx"), @Table(name = "users", indexes = {@Index(columnList = "id", name = "users_id_idx"),
@Index(columnList = "username", name = "username_idx"), @Index(columnList = "username", name = "username_idx"),
@Index(columnList = "session_id", name = "users_session_id_idx") }) @Index(columnList = "session_id", name = "users_session_id_idx")})
public class User implements Serializable { public class User implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@ -40,14 +30,14 @@ public class User implements Serializable {
@ElementCollection @ElementCollection
@CollectionTable(name = "users_subscribed", joinColumns = @JoinColumn(name = "subscriber"), indexes = { @CollectionTable(name = "users_subscribed", joinColumns = @JoinColumn(name = "subscriber"), indexes = {
@Index(columnList = "subscriber", name = "users_subscribed_subscriber_idx"), @Index(columnList = "subscriber", name = "users_subscribed_subscriber_idx"),
@Index(columnList = "channel", name = "users_subscribed_channel_idx") }) @Index(columnList = "channel", name = "users_subscribed_channel_idx")})
@Column(name = "channel", length = 30) @Column(name = "channel", length = 30)
private List<String> subscribed_ids; private Set<String> subscribed_ids;
public User() { public User() {
} }
public User(String username, String password, List<String> subscribed_ids) { public User(String username, String password, Set<String> subscribed_ids) {
this.username = username; this.username = username;
this.password = password; this.password = password;
this.subscribed_ids = subscribed_ids; this.subscribed_ids = subscribed_ids;
@ -86,11 +76,11 @@ public class User implements Serializable {
this.password = password; this.password = password;
} }
public List<String> getSubscribed() { public Set<String> getSubscribed() {
return subscribed_ids; return subscribed_ids;
} }
public void setSubscribed(List<String> subscribed_ids) { public void setSubscribed(Set<String> subscribed_ids) {
this.subscribed_ids = subscribed_ids; this.subscribed_ids = subscribed_ids;
} }
} }