mirror of
				https://gitea.invidious.io/iv-org/invidious.git
				synced 2024-08-15 00:53:41 +00:00 
			
		
		
		
	Add multithreading to pubsub job
This commit is contained in:
		
							parent
							
								
									31312747e9
								
							
						
					
					
						commit
						305d636217
					
				
					 3 changed files with 43 additions and 11 deletions
				
			
		|  | @ -2344,7 +2344,8 @@ get "/feed/webhook/:token" do |env| | ||||||
|     data = "#{time}" |     data = "#{time}" | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   # The hub will sometimes check if we're still subscribed after delivery errors |   # The hub will sometimes check if we're still subscribed after delivery errors, | ||||||
|  |   # so we reply with a 200 as long as the request hasn't expired | ||||||
|   if Time.now.to_unix - time.to_i > 432000 |   if Time.now.to_unix - time.to_i > 432000 | ||||||
|     env.response.status_code = 400 |     env.response.status_code = 400 | ||||||
|     next |     next | ||||||
|  | @ -2377,11 +2378,12 @@ post "/feed/webhook/:token" do |env| | ||||||
|     rss = XML.parse_html(body) |     rss = XML.parse_html(body) | ||||||
|     rss.xpath_nodes("//feed/entry").each do |entry| |     rss.xpath_nodes("//feed/entry").each do |entry| | ||||||
|       id = entry.xpath_node("videoid").not_nil!.content |       id = entry.xpath_node("videoid").not_nil!.content | ||||||
|  |       author = entry.xpath_node("author/name").not_nil!.content | ||||||
|       published = Time.parse_rfc3339(entry.xpath_node("published").not_nil!.content) |       published = Time.parse_rfc3339(entry.xpath_node("published").not_nil!.content) | ||||||
|       updated = Time.parse_rfc3339(entry.xpath_node("updated").not_nil!.content) |       updated = Time.parse_rfc3339(entry.xpath_node("updated").not_nil!.content) | ||||||
| 
 | 
 | ||||||
|       video = get_video(id, PG_DB, proxies, region: nil) |       video = get_video(id, PG_DB, proxies, region: nil) | ||||||
|       video = ChannelVideo.new(id, video.title, published, updated, video.ucid, video.author, video.length_seconds, video.live_now, video.premiere_timestamp) |       video = ChannelVideo.new(id, video.title, published, updated, video.ucid, author, video.length_seconds, video.live_now, video.premiere_timestamp) | ||||||
| 
 | 
 | ||||||
|       PG_DB.exec("UPDATE users SET notifications = notifications || $1 \ |       PG_DB.exec("UPDATE users SET notifications = notifications || $1 \ | ||||||
|       WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications)", video.id, video.published, video.ucid) |       WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications)", video.id, video.published, video.ucid) | ||||||
|  |  | ||||||
|  | @ -85,7 +85,7 @@ user: String, | ||||||
|     https_only:               Bool?,                                # Used to tell Invidious it is behind a proxy, so links to resources should be https:// |     https_only:               Bool?,                                # Used to tell Invidious it is behind a proxy, so links to resources should be https:// | ||||||
|     hmac_key:                 String?,                              # HMAC signing key for CSRF tokens and verifying pubsub subscriptions |     hmac_key:                 String?,                              # HMAC signing key for CSRF tokens and verifying pubsub subscriptions | ||||||
|     domain:                   String?,                              # Domain to be used for links to resources on the site where an absolute URL is required |     domain:                   String?,                              # Domain to be used for links to resources on the site where an absolute URL is required | ||||||
|     use_pubsub_feeds:         {type: Bool, default: false}, # Subscribe to channels using PubSubHubbub (requires domain, hmac_key) |     use_pubsub_feeds:         {type: Bool | Int32, default: false}, # Subscribe to channels using PubSubHubbub (requires domain, hmac_key) | ||||||
|     default_home:             {type: String, default: "Top"}, |     default_home:             {type: String, default: "Top"}, | ||||||
|     feed_menu:                {type: Array(String), default: ["Popular", "Top", "Trending", "Subscriptions"]}, |     feed_menu:                {type: Array(String), default: ["Popular", "Top", "Trending", "Subscriptions"]}, | ||||||
|     top_enabled:              {type: Bool, default: true}, |     top_enabled:              {type: Bool, default: true}, | ||||||
|  |  | ||||||
|  | @ -104,22 +104,52 @@ end | ||||||
| 
 | 
 | ||||||
| def subscribe_to_feeds(db, logger, key, config) | def subscribe_to_feeds(db, logger, key, config) | ||||||
|   if config.use_pubsub_feeds |   if config.use_pubsub_feeds | ||||||
|  |     case config.use_pubsub_feeds | ||||||
|  |     when Bool | ||||||
|  |       max_threads = config.use_pubsub_feeds.as(Bool).to_unsafe | ||||||
|  |     when Int32 | ||||||
|  |       max_threads = config.use_pubsub_feeds.as(Int32) | ||||||
|  |     end | ||||||
|  |     max_channel = Channel(Int32).new | ||||||
|  | 
 | ||||||
|     spawn do |     spawn do | ||||||
|  |       max_threads = max_channel.receive | ||||||
|  |       active_threads = 0 | ||||||
|  |       active_channel = Channel(Bool).new | ||||||
|  | 
 | ||||||
|       loop do |       loop do | ||||||
|         db.query_all("SELECT id FROM channels WHERE CURRENT_TIMESTAMP - subscribed > '4 days' OR subscribed IS NULL") do |rs| |         db.query_all("SELECT id FROM channels WHERE CURRENT_TIMESTAMP - subscribed > interval '4 days' OR subscribed IS NULL") do |rs| | ||||||
|           rs.each do |           rs.each do | ||||||
|             ucid = rs.read(String) |             ucid = rs.read(String) | ||||||
|  | 
 | ||||||
|  |             if active_threads >= max_threads.as(Int32) | ||||||
|  |               if active_channel.receive | ||||||
|  |                 active_threads -= 1 | ||||||
|  |               end | ||||||
|  |             end | ||||||
|  | 
 | ||||||
|  |             active_threads += 1 | ||||||
|  | 
 | ||||||
|  |             spawn do | ||||||
|  |               begin | ||||||
|                 response = subscribe_pubsub(ucid, key, config) |                 response = subscribe_pubsub(ucid, key, config) | ||||||
| 
 | 
 | ||||||
|                 if response.status_code >= 400 |                 if response.status_code >= 400 | ||||||
|                   logger.write("#{ucid} : #{response.body}\n") |                   logger.write("#{ucid} : #{response.body}\n") | ||||||
|                 end |                 end | ||||||
|  |               rescue ex | ||||||
|  |               end | ||||||
|  | 
 | ||||||
|  |               active_channel.send(true) | ||||||
|  |             end | ||||||
|           end |           end | ||||||
|         end |         end | ||||||
| 
 | 
 | ||||||
|         sleep 1.minute |         sleep 1.minute | ||||||
|       end |       end | ||||||
|     end |     end | ||||||
|  | 
 | ||||||
|  |     max_channel.send(max_threads.as(Int32)) | ||||||
|   end |   end | ||||||
| end | end | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue