diff --git a/src/invidious.cr b/src/invidious.cr index ad4401a7..f487b0e9 100644 --- a/src/invidious.cr +++ b/src/invidious.cr @@ -129,8 +129,8 @@ end # Start jobs -refresh_channels(PG_DB, logger, config.channel_threads, config.full_refresh) -refresh_feeds(PG_DB, logger, config.feed_threads, config.use_feed_events) +refresh_channels(PG_DB, logger, config) +refresh_feeds(PG_DB, logger, config) subscribe_to_feeds(PG_DB, logger, HMAC_KEY, config) statistics = { diff --git a/src/invidious/helpers/helpers.cr b/src/invidious/helpers/helpers.cr index 983d3d1e..3155cb67 100644 --- a/src/invidious/helpers/helpers.cr +++ b/src/invidious/helpers/helpers.cr @@ -105,7 +105,7 @@ struct Config hmac_key: String?, # HMAC signing key for CSRF tokens and verifying pubsub subscriptions domain: String?, # Domain to be used for links to resources on the site where an absolute URL is required use_pubsub_feeds: {type: Bool | Int32, default: false}, # Subscribe to channels using PubSubHubbub (requires domain, hmac_key) - use_feed_events: {type: Bool, default: false}, # Update feeds on receiving notifications + use_feed_events: {type: Bool | Int32, default: false}, # Update feeds on receiving notifications default_home: {type: String, default: "Top"}, feed_menu: {type: Array(String), default: ["Popular", "Top", "Trending", "Subscriptions"]}, top_enabled: {type: Bool, default: true}, diff --git a/src/invidious/helpers/jobs.cr b/src/invidious/helpers/jobs.cr index d725a023..b9f9a86f 100644 --- a/src/invidious/helpers/jobs.cr +++ b/src/invidious/helpers/jobs.cr @@ -1,4 +1,4 @@ -def refresh_channels(db, logger, max_threads = 1, full_refresh = false) +def refresh_channels(db, logger, config) max_channel = Channel(Int32).new spawn do @@ -20,7 +20,7 @@ def refresh_channels(db, logger, max_threads = 1, full_refresh = false) active_threads += 1 spawn do begin - channel = fetch_channel(id, db, full_refresh) + channel = fetch_channel(id, db, config.full_refresh) db.exec("UPDATE channels SET updated = $1, author = $2, deleted = false WHERE id = $3", Time.now, channel.author, id) rescue ex @@ -39,47 +39,71 @@ def refresh_channels(db, logger, max_threads = 1, full_refresh = false) end end - max_channel.send(max_threads) + max_channel.send(config.channel_threads) end -def refresh_feeds(db, logger, max_threads = 1, use_feed_events = false) - max_channel = Channel(Int32).new - +def refresh_feeds(db, logger, config) # Spawn thread to handle feed events - if use_feed_events + if config.use_feed_events + case config.use_feed_events + when Bool + max_feed_event_threads = config.use_feed_events.as(Bool).to_unsafe + when Int32 + max_feed_event_threads = config.use_feed_events.as(Int32) + end + max_feed_event_channel = Channel(Int32).new + spawn do queue = Deque(String).new(30) - - spawn do - loop do - if event = queue.shift? - feed = JSON.parse(event) - email = feed["email"].as_s - action = feed["action"].as_s - - view_name = "subscriptions_#{sha256(email)}" - - case action - when "refresh" - db.exec("REFRESH MATERIALIZED VIEW #{view_name}") - end - - # Delete any future events that we just processed - queue.delete(event) - else - sleep 1.second - end - - Fiber.yield + PG.connect_listen(PG_URL, "feeds") do |event| + if !queue.includes? event.payload + queue << event.payload end end - PG.connect_listen(PG_URL, "feeds") do |event| - queue << event.payload + max_threads = max_feed_event_channel.receive + active_threads = 0 + active_channel = Channel(Bool).new + + loop do + until queue.empty? + event = queue.shift + + if active_threads >= max_threads + if active_channel.receive + active_threads -= 1 + end + end + + active_threads += 1 + + spawn do + begin + feed = JSON.parse(event) + email = feed["email"].as_s + action = feed["action"].as_s + + view_name = "subscriptions_#{sha256(email)}" + + case action + when "refresh" + db.exec("REFRESH MATERIALIZED VIEW #{view_name}") + end + rescue ex + end + + active_channel.send(true) + end + end + + sleep 5.seconds end end + + max_feed_event_channel.send(max_feed_event_threads.as(Int32)) end + max_channel = Channel(Int32).new spawn do max_threads = max_channel.receive active_threads = 0 @@ -144,7 +168,7 @@ def refresh_feeds(db, logger, max_threads = 1, use_feed_events = false) end end - max_channel.send(max_threads) + max_channel.send(config.feed_threads) end def subscribe_to_feeds(db, logger, key, config)