mirror of
				https://github.com/TeamPiped/Piped-Backend.git
				synced 2024-08-14 23:51:41 +00:00 
			
		
		
		
	Merge pull request #642 from TeamPiped/pubsub-upsert
Use upsert for handling pubsub and run on limited executors
This commit is contained in:
		
						commit
						f99aea64cf
					
				
					 2 changed files with 14 additions and 14 deletions
				
			
		| 
						 | 
				
			
			@ -76,18 +76,21 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
 | 
			
		|||
                .map(GET, "/webhooks/pubsub", AsyncServlet.ofBlocking(executor, request -> {
 | 
			
		||||
                    var topic = request.getQueryParameter("hub.topic");
 | 
			
		||||
                    if (topic != null)
 | 
			
		||||
                        Multithreading.runAsync(() -> {
 | 
			
		||||
                        Multithreading.runAsyncLimited(() -> {
 | 
			
		||||
                            String channelId = StringUtils.substringAfter(topic, "channel_id=");
 | 
			
		||||
                            PubSubHelper.updatePubSub(channelId);
 | 
			
		||||
                        });
 | 
			
		||||
                    return HttpResponse.ok200().withPlainText(Objects.requireNonNull(request.getQueryParameter("hub.challenge")));
 | 
			
		||||
 | 
			
		||||
                    var challenge = request.getQueryParameter("hub.challenge");
 | 
			
		||||
                    return HttpResponse.ok200()
 | 
			
		||||
                            .withPlainText(Objects.requireNonNullElse(challenge, "ok"));
 | 
			
		||||
                })).map(POST, "/webhooks/pubsub", AsyncServlet.ofBlocking(executor, request -> {
 | 
			
		||||
                    try {
 | 
			
		||||
 | 
			
		||||
                        SyndFeed feed = new SyndFeedInput().build(
 | 
			
		||||
                                new InputSource(new ByteArrayInputStream(request.loadBody().getResult().asArray())));
 | 
			
		||||
 | 
			
		||||
                        Multithreading.runAsync(() -> {
 | 
			
		||||
                        Multithreading.runAsyncLimited(() -> {
 | 
			
		||||
                            for (var entry : feed.getEntries()) {
 | 
			
		||||
                                String url = entry.getLinks().get(0).getHref();
 | 
			
		||||
                                String videoId = StringUtils.substring(url, -11);
 | 
			
		||||
| 
						 | 
				
			
			@ -95,7 +98,7 @@ public class ServerLauncher extends MultithreadedHttpServerLauncher {
 | 
			
		|||
                                    if (DatabaseHelper.doesVideoExist(s, videoId))
 | 
			
		||||
                                        continue;
 | 
			
		||||
                                }
 | 
			
		||||
                                Multithreading.runAsync(() -> {
 | 
			
		||||
                                Multithreading.runAsyncLimited(() -> {
 | 
			
		||||
                                    try {
 | 
			
		||||
                                        Sentry.setExtra("videoId", videoId);
 | 
			
		||||
                                        var extractor = YOUTUBE_SERVICE.getStreamExtractor("https://youtube.com/watch?v=" + videoId);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -57,17 +57,14 @@ public class PubSubHelper {
 | 
			
		|||
    }
 | 
			
		||||
 | 
			
		||||
    public static void updatePubSub(String channelId) {
 | 
			
		||||
        var pubsub = DatabaseHelper.getPubSubFromId(channelId);
 | 
			
		||||
        try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) {
 | 
			
		||||
            s.beginTransaction();
 | 
			
		||||
            if (pubsub == null) {
 | 
			
		||||
                pubsub = new PubSub(channelId, System.currentTimeMillis());
 | 
			
		||||
                s.insert(pubsub);
 | 
			
		||||
            } else {
 | 
			
		||||
                pubsub.setSubbedAt(System.currentTimeMillis());
 | 
			
		||||
                s.update(pubsub);
 | 
			
		||||
            }
 | 
			
		||||
            s.getTransaction().commit();
 | 
			
		||||
            var tr = s.beginTransaction();
 | 
			
		||||
            s.createNativeMutationQuery("INSERT INTO pubsub (id, subbed_at) VALUES (?, ?) " +
 | 
			
		||||
                            "ON CONFLICT (id) DO UPDATE SET subbed_at = excluded.subbed_at")
 | 
			
		||||
                    .setParameter(1, channelId)
 | 
			
		||||
                    .setParameter(2, System.currentTimeMillis())
 | 
			
		||||
                    .executeUpdate();
 | 
			
		||||
            tr.commit();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue