mirror of
				https://gitea.invidious.io/iv-org/invidious-copy-2023-06-08.git
				synced 2024-08-15 00:53:38 +00:00 
			
		
		
		
	Extract RefreshChannelsJob (#1403)
This commit is contained in:
		
							parent
							
								
									cce6db4aeb
								
							
						
					
					
						commit
						989317e5d3
					
				
					 3 changed files with 60 additions and 58 deletions
				
			
		| 
						 | 
					@ -159,7 +159,7 @@ end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# Start jobs
 | 
					# Start jobs
 | 
				
			||||||
 | 
					
 | 
				
			||||||
refresh_channels(PG_DB, logger, config)
 | 
					Invidious::Jobs.register Invidious::Jobs::RefreshChannelsJob.new(PG_DB, logger, config)
 | 
				
			||||||
refresh_feeds(PG_DB, logger, config)
 | 
					refresh_feeds(PG_DB, logger, config)
 | 
				
			||||||
subscribe_to_feeds(PG_DB, logger, HMAC_KEY, config)
 | 
					subscribe_to_feeds(PG_DB, logger, HMAC_KEY, config)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,60 +1,3 @@
 | 
				
			||||||
def refresh_channels(db, logger, config)
 | 
					 | 
				
			||||||
  max_channel = Channel(Int32).new
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  spawn do
 | 
					 | 
				
			||||||
    max_threads = max_channel.receive
 | 
					 | 
				
			||||||
    lim_threads = max_threads
 | 
					 | 
				
			||||||
    active_threads = 0
 | 
					 | 
				
			||||||
    active_channel = Channel(Bool).new
 | 
					 | 
				
			||||||
    backoff = 1.seconds
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    loop do
 | 
					 | 
				
			||||||
      db.query("SELECT id FROM channels ORDER BY updated") do |rs|
 | 
					 | 
				
			||||||
        rs.each do
 | 
					 | 
				
			||||||
          id = rs.read(String)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
          if active_threads >= lim_threads
 | 
					 | 
				
			||||||
            if active_channel.receive
 | 
					 | 
				
			||||||
              active_threads -= 1
 | 
					 | 
				
			||||||
            end
 | 
					 | 
				
			||||||
          end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
          active_threads += 1
 | 
					 | 
				
			||||||
          spawn do
 | 
					 | 
				
			||||||
            begin
 | 
					 | 
				
			||||||
              channel = fetch_channel(id, db, config.full_refresh)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
              lim_threads = max_threads
 | 
					 | 
				
			||||||
              db.exec("UPDATE channels SET updated = $1, author = $2, deleted = false WHERE id = $3", Time.utc, channel.author, id)
 | 
					 | 
				
			||||||
            rescue ex
 | 
					 | 
				
			||||||
              logger.puts("#{id} : #{ex.message}")
 | 
					 | 
				
			||||||
              if ex.message == "Deleted or invalid channel"
 | 
					 | 
				
			||||||
                db.exec("UPDATE channels SET updated = $1, deleted = true WHERE id = $2", Time.utc, id)
 | 
					 | 
				
			||||||
              else
 | 
					 | 
				
			||||||
                lim_threads = 1
 | 
					 | 
				
			||||||
	              logger.puts("#{id} : backing off for #{backoff}s")
 | 
					 | 
				
			||||||
                sleep backoff
 | 
					 | 
				
			||||||
                if backoff < 1.days
 | 
					 | 
				
			||||||
                  backoff += backoff
 | 
					 | 
				
			||||||
                else
 | 
					 | 
				
			||||||
                  backoff = 1.days
 | 
					 | 
				
			||||||
                end
 | 
					 | 
				
			||||||
              end
 | 
					 | 
				
			||||||
            end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            active_channel.send(true)
 | 
					 | 
				
			||||||
          end
 | 
					 | 
				
			||||||
        end
 | 
					 | 
				
			||||||
      end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      sleep 1.minute
 | 
					 | 
				
			||||||
      Fiber.yield
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  max_channel.send(config.channel_threads)
 | 
					 | 
				
			||||||
end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def refresh_feeds(db, logger, config)
 | 
					def refresh_feeds(db, logger, config)
 | 
				
			||||||
  max_channel = Channel(Int32).new
 | 
					  max_channel = Channel(Int32).new
 | 
				
			||||||
  spawn do
 | 
					  spawn do
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										59
									
								
								src/invidious/jobs/refresh_channels_job.cr
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										59
									
								
								src/invidious/jobs/refresh_channels_job.cr
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
					@ -0,0 +1,59 @@
 | 
				
			||||||
 | 
					class Invidious::Jobs::RefreshChannelsJob < Invidious::Jobs::BaseJob
 | 
				
			||||||
 | 
					  private getter db : DB::Database
 | 
				
			||||||
 | 
					  private getter logger : Invidious::LogHandler
 | 
				
			||||||
 | 
					  private getter config : Config
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def initialize(@db, @logger, @config)
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def begin
 | 
				
			||||||
 | 
					    max_threads = config.channel_threads
 | 
				
			||||||
 | 
					    lim_threads = max_threads
 | 
				
			||||||
 | 
					    active_threads = 0
 | 
				
			||||||
 | 
					    active_channel = Channel(Bool).new
 | 
				
			||||||
 | 
					    backoff = 1.seconds
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    loop do
 | 
				
			||||||
 | 
					      db.query("SELECT id FROM channels ORDER BY updated") do |rs|
 | 
				
			||||||
 | 
					        rs.each do
 | 
				
			||||||
 | 
					          id = rs.read(String)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          if active_threads >= lim_threads
 | 
				
			||||||
 | 
					            if active_channel.receive
 | 
				
			||||||
 | 
					              active_threads -= 1
 | 
				
			||||||
 | 
					            end
 | 
				
			||||||
 | 
					          end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          active_threads += 1
 | 
				
			||||||
 | 
					          spawn do
 | 
				
			||||||
 | 
					            begin
 | 
				
			||||||
 | 
					              channel = fetch_channel(id, db, config.full_refresh)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					              lim_threads = max_threads
 | 
				
			||||||
 | 
					              db.exec("UPDATE channels SET updated = $1, author = $2, deleted = false WHERE id = $3", Time.utc, channel.author, id)
 | 
				
			||||||
 | 
					            rescue ex
 | 
				
			||||||
 | 
					              logger.puts("#{id} : #{ex.message}")
 | 
				
			||||||
 | 
					              if ex.message == "Deleted or invalid channel"
 | 
				
			||||||
 | 
					                db.exec("UPDATE channels SET updated = $1, deleted = true WHERE id = $2", Time.utc, id)
 | 
				
			||||||
 | 
					              else
 | 
				
			||||||
 | 
					                lim_threads = 1
 | 
				
			||||||
 | 
					                logger.puts("#{id} : backing off for #{backoff}s")
 | 
				
			||||||
 | 
					                sleep backoff
 | 
				
			||||||
 | 
					                if backoff < 1.days
 | 
				
			||||||
 | 
					                  backoff += backoff
 | 
				
			||||||
 | 
					                else
 | 
				
			||||||
 | 
					                  backoff = 1.days
 | 
				
			||||||
 | 
					                end
 | 
				
			||||||
 | 
					              end
 | 
				
			||||||
 | 
					            end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            active_channel.send(true)
 | 
				
			||||||
 | 
					          end
 | 
				
			||||||
 | 
					        end
 | 
				
			||||||
 | 
					      end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      sleep 1.minute
 | 
				
			||||||
 | 
					      Fiber.yield
 | 
				
			||||||
 | 
					    end
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					end
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue