Fix subscription imports and improve session handling. (#228)

This commit is contained in:
Kavin 2022-03-19 18:57:00 +00:00 committed by GitHub
parent d1789ace9f
commit b1e875bcf5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 85 additions and 83 deletions

View file

@ -13,23 +13,27 @@ import javax.persistence.criteria.Root;
public class DatabaseHelper { public class DatabaseHelper {
public static final User getUserFromSession(Session s, String session) { public static final User getUserFromSession(String session) {
CriteriaBuilder cb = s.getCriteriaBuilder(); try (Session s = DatabaseSessionFactory.createSession()) {
CriteriaQuery<User> cr = cb.createQuery(User.class); CriteriaBuilder cb = s.getCriteriaBuilder();
Root<User> root = cr.from(User.class); CriteriaQuery<User> cr = cb.createQuery(User.class);
cr.select(root).where(cb.equal(root.get("sessionId"), session)); Root<User> root = cr.from(User.class);
cr.select(root).where(cb.equal(root.get("sessionId"), session));
return s.createQuery(cr).uniqueResult(); return s.createQuery(cr).uniqueResult();
}
} }
public static final User getUserFromSessionWithSubscribed(Session s, String session) { public static final User getUserFromSessionWithSubscribed(String session) {
CriteriaBuilder cb = s.getCriteriaBuilder(); try (Session s = DatabaseSessionFactory.createSession()) {
CriteriaQuery<User> cr = cb.createQuery(User.class); CriteriaBuilder cb = s.getCriteriaBuilder();
Root<User> root = cr.from(User.class); CriteriaQuery<User> cr = cb.createQuery(User.class);
root.fetch("subscribed_ids", JoinType.LEFT); Root<User> root = cr.from(User.class);
cr.select(root).where(cb.equal(root.get("sessionId"), session)); root.fetch("subscribed_ids", JoinType.LEFT);
cr.select(root).where(cb.equal(root.get("sessionId"), session));
return s.createQuery(cr).uniqueResult(); return s.createQuery(cr).uniqueResult();
}
} }
public static final Channel getChannelFromId(Session s, String id) { public static final Channel getChannelFromId(Session s, String id) {

View file

@ -597,7 +597,7 @@ public class ResponseHelper {
try (Session s = DatabaseSessionFactory.createSession()) { try (Session s = DatabaseSessionFactory.createSession()) {
User user = DatabaseHelper.getUserFromSessionWithSubscribed(s, session); User user = DatabaseHelper.getUserFromSessionWithSubscribed(session);
if (user != null) { if (user != null) {
if (!user.getSubscribed().contains(channelId)) { if (!user.getSubscribed().contains(channelId)) {
@ -657,16 +657,17 @@ public class ResponseHelper {
public static byte[] unsubscribeResponse(String session, String channelId) public static byte[] unsubscribeResponse(String session, String channelId)
throws IOException { throws IOException {
try (Session s = DatabaseSessionFactory.createSession()) { User user = DatabaseHelper.getUserFromSession(session);
User user = DatabaseHelper.getUserFromSession(s, session);
if (user != null) { if (user != null) {
try (Session s = DatabaseSessionFactory.createSession()) {
s.getTransaction().begin(); s.getTransaction().begin();
s.createNativeQuery("delete from users_subscribed where subscriber = :id and channel = :channel") s.createNativeQuery("delete from users_subscribed where subscriber = :id and channel = :channel")
.setParameter("id", user.getId()).setParameter("channel", channelId).executeUpdate(); .setParameter("id", user.getId()).setParameter("channel", channelId).executeUpdate();
s.getTransaction().commit(); s.getTransaction().commit();
return Constants.mapper.writeValueAsBytes(new AcceptedResponse()); return Constants.mapper.writeValueAsBytes(new AcceptedResponse());
} }
} }
return Constants.mapper.writeValueAsBytes(new AuthenticationFailureResponse()); return Constants.mapper.writeValueAsBytes(new AuthenticationFailureResponse());
@ -690,11 +691,11 @@ public class ResponseHelper {
} }
public static byte[] feedResponse(String session) throws IOException { public static byte[] feedResponse(String session) throws IOException {
try (Session s = DatabaseSessionFactory.createSession()) {
User user = DatabaseHelper.getUserFromSession(s, session); User user = DatabaseHelper.getUserFromSession(session);
if (user != null) { if (user != null) {
try (Session s = DatabaseSessionFactory.createSession()) {
CriteriaBuilder cb = s.getCriteriaBuilder(); CriteriaBuilder cb = s.getCriteriaBuilder();
@ -727,19 +728,18 @@ public class ResponseHelper {
return Constants.mapper.writeValueAsBytes(feedItems); return Constants.mapper.writeValueAsBytes(feedItems);
} }
return Constants.mapper.writeValueAsBytes(new AuthenticationFailureResponse());
} }
return Constants.mapper.writeValueAsBytes(new AuthenticationFailureResponse());
} }
public static byte[] feedResponseRSS(String session) throws IOException, FeedException { public static byte[] feedResponseRSS(String session) throws IOException, FeedException {
try (Session s = DatabaseSessionFactory.createSession()) { User user = DatabaseHelper.getUserFromSession(session);
User user = DatabaseHelper.getUserFromSession(s, session); if (user != null) {
if (user != null) { try (Session s = DatabaseSessionFactory.createSession()) {
SyndFeed feed = new SyndFeedImpl(); SyndFeed feed = new SyndFeedImpl();
feed.setFeedType("atom_1.0"); feed.setFeedType("atom_1.0");
@ -789,20 +789,20 @@ public class ResponseHelper {
return new SyndFeedOutput().outputString(feed).getBytes(UTF_8); return new SyndFeedOutput().outputString(feed).getBytes(UTF_8);
} }
return Constants.mapper.writeValueAsBytes(new AuthenticationFailureResponse());
} }
return Constants.mapper.writeValueAsBytes(new AuthenticationFailureResponse());
} }
public static byte[] importResponse(String session, String[] channelIds, boolean override) throws IOException { public static byte[] importResponse(String session, String[] channelIds, boolean override) throws IOException {
try (Session s = DatabaseSessionFactory.createSession()) {
User user = DatabaseHelper.getUserFromSessionWithSubscribed(s, session); User user = DatabaseHelper.getUserFromSessionWithSubscribed(session);
if (user != null) { if (user != null) {
Multithreading.runAsync(() -> { Multithreading.runAsync(() -> {
try (Session sess = DatabaseSessionFactory.createSession()) {
if (override) if (override)
user.setSubscribed(Arrays.asList(channelIds)); user.setSubscribed(Arrays.asList(channelIds));
else else
@ -811,75 +811,74 @@ public class ResponseHelper {
user.getSubscribed().add(channelId); user.getSubscribed().add(channelId);
if (channelIds.length > 0) { if (channelIds.length > 0) {
s.update(user); sess.update(user);
s.beginTransaction().commit(); sess.beginTransaction().commit();
} }
}); }
});
for (String channelId : channelIds) { for (String channelId : channelIds) {
Multithreading.runAsyncLimited(() -> { Multithreading.runAsyncLimited(() -> {
try (Session sess = DatabaseSessionFactory.createSession()) { try (Session sess = DatabaseSessionFactory.createSession()) {
var channel = DatabaseHelper.getChannelFromId(sess, channelId); var channel = DatabaseHelper.getChannelFromId(sess, channelId);
if (channel == null) { if (channel == null) {
ChannelInfo info; ChannelInfo info;
try { try {
info = ChannelInfo.getInfo("https://youtube.com/channel/" + channelId); info = ChannelInfo.getInfo("https://youtube.com/channel/" + channelId);
} catch (Exception e) { } catch (Exception e) {
return; return;
}
channel = new me.kavin.piped.utils.obj.db.Channel(channelId, info.getName(),
info.getAvatarUrl(), info.isVerified());
sess.save(channel);
Multithreading.runAsync(() -> {
try (Session sessSub = DatabaseSessionFactory.createSession()) {
subscribePubSub(channelId, sessSub);
} catch (Exception e) {
ExceptionHandler.handle(e);
}
});
for (StreamInfoItem item : info.getRelatedItems()) {
long time = item.getUploadDate() != null
? item.getUploadDate().offsetDateTime().toInstant().toEpochMilli()
: System.currentTimeMillis();
if ((System.currentTimeMillis() - time) < TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION))
handleNewVideo(item.getUrl(), time, channel, sess);
}
if (!sess.getTransaction().isActive())
sess.getTransaction().begin();
sess.getTransaction().commit();
} }
} catch (Exception e) { channel = new me.kavin.piped.utils.obj.db.Channel(channelId, info.getName(),
ExceptionHandler.handle(e); info.getAvatarUrl(), info.isVerified());
sess.save(channel);
Multithreading.runAsync(() -> {
try (Session sessSub = DatabaseSessionFactory.createSession()) {
subscribePubSub(channelId, sessSub);
} catch (Exception e) {
ExceptionHandler.handle(e);
}
});
for (StreamInfoItem item : info.getRelatedItems()) {
long time = item.getUploadDate() != null
? item.getUploadDate().offsetDateTime().toInstant().toEpochMilli()
: System.currentTimeMillis();
if ((System.currentTimeMillis() - time) < TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION))
handleNewVideo(item.getUrl(), time, channel, sess);
}
if (!sess.getTransaction().isActive())
sess.getTransaction().begin();
sess.getTransaction().commit();
} }
}); } catch (Exception e) {
ExceptionHandler.handle(e);
}
} });
return Constants.mapper.writeValueAsBytes(new AcceptedResponse());
} }
return Constants.mapper.writeValueAsBytes(new AuthenticationFailureResponse()); return Constants.mapper.writeValueAsBytes(new AcceptedResponse());
} }
return Constants.mapper.writeValueAsBytes(new AuthenticationFailureResponse());
} }
public static byte[] subscriptionsResponse(String session) public static byte[] subscriptionsResponse(String session)
throws IOException { throws IOException {
try (Session s = DatabaseSessionFactory.createSession()) { User user = DatabaseHelper.getUserFromSession(session);
User user = DatabaseHelper.getUserFromSession(s, session); if (user != null) {
try (Session s = DatabaseSessionFactory.createSession()) {
if (user != null) {
List<SubscriptionChannel> subscriptionItems = new ObjectArrayList<>(); List<SubscriptionChannel> subscriptionItems = new ObjectArrayList<>();
@ -902,11 +901,10 @@ public class ResponseHelper {
return Constants.mapper.writeValueAsBytes(subscriptionItems); return Constants.mapper.writeValueAsBytes(subscriptionItems);
} }
return Constants.mapper.writeValueAsBytes(new AuthenticationFailureResponse());
} }
return Constants.mapper.writeValueAsBytes(new AuthenticationFailureResponse());
} }
public static String registeredBadgeRedirect() { public static String registeredBadgeRedirect() {