mirror of
				https://gitea.invidious.io/iv-org/invidious.git
				synced 2024-08-15 00:53:41 +00:00 
			
		
		
		
	Merge pull request #1608 from saltycrys/add-subscription-traces
Add Subscription Traces
This commit is contained in:
		
						commit
						c89632d2a8
					
				
					 7 changed files with 72 additions and 48 deletions
				
			
		|  | @ -259,7 +259,7 @@ before_all do |env| | ||||||
|       headers["Cookie"] = env.request.headers["Cookie"] |       headers["Cookie"] = env.request.headers["Cookie"] | ||||||
| 
 | 
 | ||||||
|       begin |       begin | ||||||
|         user, sid = get_user(sid, headers, PG_DB, false) |         user, sid = get_user(sid, headers, PG_DB, logger, false) | ||||||
|         csrf_token = generate_response(sid, { |         csrf_token = generate_response(sid, { | ||||||
|           ":authorize_token", |           ":authorize_token", | ||||||
|           ":playlist_ajax", |           ":playlist_ajax", | ||||||
|  | @ -529,7 +529,7 @@ post "/subscription_ajax" do |env| | ||||||
|   case action |   case action | ||||||
|   when "action_create_subscription_to_channel" |   when "action_create_subscription_to_channel" | ||||||
|     if !user.subscriptions.includes? channel_id |     if !user.subscriptions.includes? channel_id | ||||||
|       get_channel(channel_id, PG_DB, false, false) |       get_channel(channel_id, PG_DB, logger, false, false) | ||||||
|       PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = array_append(subscriptions, $1) WHERE email = $2", channel_id, email) |       PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = array_append(subscriptions, $1) WHERE email = $2", channel_id, email) | ||||||
|     end |     end | ||||||
|   when "action_remove_subscriptions" |   when "action_remove_subscriptions" | ||||||
|  | @ -564,7 +564,7 @@ get "/subscription_manager" do |env| | ||||||
|     headers = HTTP::Headers.new |     headers = HTTP::Headers.new | ||||||
|     headers["Cookie"] = env.request.headers["Cookie"] |     headers["Cookie"] = env.request.headers["Cookie"] | ||||||
| 
 | 
 | ||||||
|     user, sid = get_user(sid, headers, PG_DB) |     user, sid = get_user(sid, headers, PG_DB, logger) | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   action_takeout = env.params.query["action_takeout"]?.try &.to_i? |   action_takeout = env.params.query["action_takeout"]?.try &.to_i? | ||||||
|  | @ -688,7 +688,7 @@ post "/data_control" do |env| | ||||||
|           user.subscriptions += body["subscriptions"].as_a.map { |a| a.as_s } |           user.subscriptions += body["subscriptions"].as_a.map { |a| a.as_s } | ||||||
|           user.subscriptions.uniq! |           user.subscriptions.uniq! | ||||||
| 
 | 
 | ||||||
|           user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, false, false) |           user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, logger, false, false) | ||||||
| 
 | 
 | ||||||
|           PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) |           PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) | ||||||
|         end |         end | ||||||
|  | @ -757,7 +757,7 @@ post "/data_control" do |env| | ||||||
|         end |         end | ||||||
|         user.subscriptions.uniq! |         user.subscriptions.uniq! | ||||||
| 
 | 
 | ||||||
|         user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, false, false) |         user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, logger, false, false) | ||||||
| 
 | 
 | ||||||
|         PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) |         PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) | ||||||
|       when "import_freetube" |       when "import_freetube" | ||||||
|  | @ -766,7 +766,7 @@ post "/data_control" do |env| | ||||||
|         end |         end | ||||||
|         user.subscriptions.uniq! |         user.subscriptions.uniq! | ||||||
| 
 | 
 | ||||||
|         user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, false, false) |         user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, logger, false, false) | ||||||
| 
 | 
 | ||||||
|         PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) |         PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) | ||||||
|       when "import_newpipe_subscriptions" |       when "import_newpipe_subscriptions" | ||||||
|  | @ -785,7 +785,7 @@ post "/data_control" do |env| | ||||||
|         end |         end | ||||||
|         user.subscriptions.uniq! |         user.subscriptions.uniq! | ||||||
| 
 | 
 | ||||||
|         user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, false, false) |         user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, logger, false, false) | ||||||
| 
 | 
 | ||||||
|         PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) |         PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) | ||||||
|       when "import_newpipe" |       when "import_newpipe" | ||||||
|  | @ -804,7 +804,7 @@ post "/data_control" do |env| | ||||||
|               user.subscriptions += db.query_all("SELECT url FROM subscriptions", as: String).map { |url| url.lchop("https://www.youtube.com/channel/") } |               user.subscriptions += db.query_all("SELECT url FROM subscriptions", as: String).map { |url| url.lchop("https://www.youtube.com/channel/") } | ||||||
|               user.subscriptions.uniq! |               user.subscriptions.uniq! | ||||||
| 
 | 
 | ||||||
|               user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, false, false) |               user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, logger, false, false) | ||||||
| 
 | 
 | ||||||
|               PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) |               PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) | ||||||
| 
 | 
 | ||||||
|  | @ -1207,7 +1207,7 @@ get "/feed/subscriptions" do |env| | ||||||
|   headers["Cookie"] = env.request.headers["Cookie"] |   headers["Cookie"] = env.request.headers["Cookie"] | ||||||
| 
 | 
 | ||||||
|   if !user.password |   if !user.password | ||||||
|     user, sid = get_user(sid, headers, PG_DB) |     user, sid = get_user(sid, headers, PG_DB, logger) | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   max_results = env.params.query["max_results"]?.try &.to_i?.try &.clamp(0, MAX_ITEMS_PER_PAGE) |   max_results = env.params.query["max_results"]?.try &.to_i?.try &.clamp(0, MAX_ITEMS_PER_PAGE) | ||||||
|  | @ -2827,7 +2827,7 @@ post "/api/v1/auth/subscriptions/:ucid" do |env| | ||||||
|   ucid = env.params.url["ucid"] |   ucid = env.params.url["ucid"] | ||||||
| 
 | 
 | ||||||
|   if !user.subscriptions.includes? ucid |   if !user.subscriptions.includes? ucid | ||||||
|     get_channel(ucid, PG_DB, false, false) |     get_channel(ucid, PG_DB, logger, false, false) | ||||||
|     PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = array_append(subscriptions,$1) WHERE email = $2", ucid, user.email) |     PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = array_append(subscriptions,$1) WHERE email = $2", ucid, user.email) | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -144,7 +144,7 @@ class ChannelRedirect < Exception | ||||||
|   end |   end | ||||||
| end | end | ||||||
| 
 | 
 | ||||||
| def get_batch_channels(channels, db, refresh = false, pull_all_videos = true, max_threads = 10) | def get_batch_channels(channels, db, logger, refresh = false, pull_all_videos = true, max_threads = 10) | ||||||
|   finished_channel = Channel(String | Nil).new |   finished_channel = Channel(String | Nil).new | ||||||
| 
 | 
 | ||||||
|   spawn do |   spawn do | ||||||
|  | @ -160,7 +160,7 @@ def get_batch_channels(channels, db, refresh = false, pull_all_videos = true, ma | ||||||
|       active_threads += 1 |       active_threads += 1 | ||||||
|       spawn do |       spawn do | ||||||
|         begin |         begin | ||||||
|           get_channel(ucid, db, refresh, pull_all_videos) |           get_channel(ucid, db, logger, refresh, pull_all_videos) | ||||||
|           finished_channel.send(ucid) |           finished_channel.send(ucid) | ||||||
|         rescue ex |         rescue ex | ||||||
|           finished_channel.send(nil) |           finished_channel.send(nil) | ||||||
|  | @ -181,10 +181,10 @@ def get_batch_channels(channels, db, refresh = false, pull_all_videos = true, ma | ||||||
|   return final |   return final | ||||||
| end | end | ||||||
| 
 | 
 | ||||||
| def get_channel(id, db, refresh = true, pull_all_videos = true) | def get_channel(id, db, logger, refresh = true, pull_all_videos = true) | ||||||
|   if channel = db.query_one?("SELECT * FROM channels WHERE id = $1", id, as: InvidiousChannel) |   if channel = db.query_one?("SELECT * FROM channels WHERE id = $1", id, as: InvidiousChannel) | ||||||
|     if refresh && Time.utc - channel.updated > 10.minutes |     if refresh && Time.utc - channel.updated > 10.minutes | ||||||
|       channel = fetch_channel(id, db, pull_all_videos: pull_all_videos) |       channel = fetch_channel(id, db, logger, pull_all_videos: pull_all_videos) | ||||||
|       channel_array = channel.to_a |       channel_array = channel.to_a | ||||||
|       args = arg_array(channel_array) |       args = arg_array(channel_array) | ||||||
| 
 | 
 | ||||||
|  | @ -192,7 +192,7 @@ def get_channel(id, db, refresh = true, pull_all_videos = true) | ||||||
|         ON CONFLICT (id) DO UPDATE SET author = $2, updated = $3", args: channel_array) |         ON CONFLICT (id) DO UPDATE SET author = $2, updated = $3", args: channel_array) | ||||||
|     end |     end | ||||||
|   else |   else | ||||||
|     channel = fetch_channel(id, db, pull_all_videos: pull_all_videos) |     channel = fetch_channel(id, db, logger, pull_all_videos: pull_all_videos) | ||||||
|     channel_array = channel.to_a |     channel_array = channel.to_a | ||||||
|     args = arg_array(channel_array) |     args = arg_array(channel_array) | ||||||
| 
 | 
 | ||||||
|  | @ -202,8 +202,12 @@ def get_channel(id, db, refresh = true, pull_all_videos = true) | ||||||
|   return channel |   return channel | ||||||
| end | end | ||||||
| 
 | 
 | ||||||
| def fetch_channel(ucid, db, pull_all_videos = true, locale = nil) | def fetch_channel(ucid, db, logger, pull_all_videos = true, locale = nil) | ||||||
|  |   logger.trace("fetch_channel: #{ucid} : pull_all_videos = #{pull_all_videos}, locale = #{locale}") | ||||||
|  | 
 | ||||||
|  |   logger.trace("fetch_channel: #{ucid} : Downloading RSS feed") | ||||||
|   rss = YT_POOL.client &.get("/feeds/videos.xml?channel_id=#{ucid}").body |   rss = YT_POOL.client &.get("/feeds/videos.xml?channel_id=#{ucid}").body | ||||||
|  |   logger.trace("fetch_channel: #{ucid} : Parsing RSS feed") | ||||||
|   rss = XML.parse_html(rss) |   rss = XML.parse_html(rss) | ||||||
| 
 | 
 | ||||||
|   author = rss.xpath_node(%q(//feed/title)) |   author = rss.xpath_node(%q(//feed/title)) | ||||||
|  | @ -219,14 +223,19 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil) | ||||||
|     auto_generated = true |     auto_generated = true | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|  |   logger.trace("fetch_channel: #{ucid} : author = #{author}, auto_generated = #{auto_generated}") | ||||||
|  | 
 | ||||||
|   page = 1 |   page = 1 | ||||||
| 
 | 
 | ||||||
|  |   logger.trace("fetch_channel: #{ucid} : Downloading channel videos page") | ||||||
|   response = get_channel_videos_response(ucid, page, auto_generated: auto_generated) |   response = get_channel_videos_response(ucid, page, auto_generated: auto_generated) | ||||||
| 
 | 
 | ||||||
|   videos = [] of SearchVideo |   videos = [] of SearchVideo | ||||||
|   begin |   begin | ||||||
|     initial_data = JSON.parse(response.body).as_a.find &.["response"]? |     initial_data = JSON.parse(response.body).as_a.find &.["response"]? | ||||||
|     raise InfoException.new("Could not extract channel JSON") if !initial_data |     raise InfoException.new("Could not extract channel JSON") if !initial_data | ||||||
|  | 
 | ||||||
|  |     logger.trace("fetch_channel: #{ucid} : Extracting videos from channel videos page initial_data") | ||||||
|     videos = extract_videos(initial_data.as_h, author, ucid) |     videos = extract_videos(initial_data.as_h, author, ucid) | ||||||
|   rescue ex |   rescue ex | ||||||
|     if response.body.includes?("To continue with your YouTube experience, please fill out the form below.") || |     if response.body.includes?("To continue with your YouTube experience, please fill out the form below.") || | ||||||
|  | @ -236,6 +245,7 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil) | ||||||
|     raise ex |     raise ex | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|  |   logger.trace("fetch_channel: #{ucid} : Extracting videos from channel RSS feed") | ||||||
|   rss.xpath_nodes("//feed/entry").each do |entry| |   rss.xpath_nodes("//feed/entry").each do |entry| | ||||||
|     video_id = entry.xpath_node("videoid").not_nil!.content |     video_id = entry.xpath_node("videoid").not_nil!.content | ||||||
|     title = entry.xpath_node("title").not_nil!.content |     title = entry.xpath_node("title").not_nil!.content | ||||||
|  | @ -269,6 +279,8 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil) | ||||||
|       views:              views, |       views:              views, | ||||||
|     }) |     }) | ||||||
| 
 | 
 | ||||||
|  |     logger.trace("fetch_channel: #{ucid} : video #{video_id} : Updating or inserting video") | ||||||
|  | 
 | ||||||
|     # We don't include the 'premiere_timestamp' here because channel pages don't include them, |     # We don't include the 'premiere_timestamp' here because channel pages don't include them, | ||||||
|     # meaning the above timestamp is always null |     # meaning the above timestamp is always null | ||||||
|     was_insert = db.query_one("INSERT INTO channel_videos VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) \ |     was_insert = db.query_one("INSERT INTO channel_videos VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) \ | ||||||
|  | @ -276,8 +288,13 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil) | ||||||
|       updated = $4, ucid = $5, author = $6, length_seconds = $7, \ |       updated = $4, ucid = $5, author = $6, length_seconds = $7, \ | ||||||
|       live_now = $8, views = $10 returning (xmax=0) as was_insert", *video.to_tuple, as: Bool) |       live_now = $8, views = $10 returning (xmax=0) as was_insert", *video.to_tuple, as: Bool) | ||||||
| 
 | 
 | ||||||
|  |     if was_insert | ||||||
|  |       logger.trace("fetch_channel: #{ucid} : video #{video_id} : Inserted, updating subscriptions") | ||||||
|       db.exec("UPDATE users SET notifications = array_append(notifications, $1), \ |       db.exec("UPDATE users SET notifications = array_append(notifications, $1), \ | ||||||
|       feed_needs_update = true WHERE $2 = ANY(subscriptions)", video.id, video.ucid) if was_insert |         feed_needs_update = true WHERE $2 = ANY(subscriptions)", video.id, video.ucid) | ||||||
|  |     else | ||||||
|  |       logger.trace("fetch_channel: #{ucid} : video #{video_id} : Updated") | ||||||
|  |     end | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   if pull_all_videos |   if pull_all_videos | ||||||
|  |  | ||||||
|  | @ -7,9 +7,9 @@ class Invidious::Jobs::RefreshChannelsJob < Invidious::Jobs::BaseJob | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   def begin |   def begin | ||||||
|     max_threads = config.channel_threads |     max_fibers = config.channel_threads | ||||||
|     lim_threads = max_threads |     lim_fibers = max_fibers | ||||||
|     active_threads = 0 |     active_fibers = 0 | ||||||
|     active_channel = Channel(Bool).new |     active_channel = Channel(Bool).new | ||||||
|     backoff = 1.seconds |     backoff = 1.seconds | ||||||
| 
 | 
 | ||||||
|  | @ -19,27 +19,32 @@ class Invidious::Jobs::RefreshChannelsJob < Invidious::Jobs::BaseJob | ||||||
|         rs.each do |         rs.each do | ||||||
|           id = rs.read(String) |           id = rs.read(String) | ||||||
| 
 | 
 | ||||||
|           if active_threads >= lim_threads |           if active_fibers >= lim_fibers | ||||||
|  |             logger.trace("RefreshChannelsJob: Fiber limit reached, waiting...") | ||||||
|             if active_channel.receive |             if active_channel.receive | ||||||
|               active_threads -= 1 |               logger.trace("RefreshChannelsJob: Fiber limit ok, continuing") | ||||||
|  |               active_fibers -= 1 | ||||||
|             end |             end | ||||||
|           end |           end | ||||||
| 
 | 
 | ||||||
|           active_threads += 1 |           logger.trace("RefreshChannelsJob: #{id} : Spawning fiber") | ||||||
|  |           active_fibers += 1 | ||||||
|           spawn do |           spawn do | ||||||
|             begin |             begin | ||||||
|               logger.trace("RefreshChannelsJob: Fetching channel #{id}") |               logger.trace("RefreshChannelsJob: #{id} fiber : Fetching channel") | ||||||
|               channel = fetch_channel(id, db, config.full_refresh) |               channel = fetch_channel(id, db, logger, config.full_refresh) | ||||||
| 
 | 
 | ||||||
|               lim_threads = max_threads |               lim_fibers = max_fibers | ||||||
|  | 
 | ||||||
|  |               logger.trace("RefreshChannelsJob: #{id} fiber : Updating DB") | ||||||
|               db.exec("UPDATE channels SET updated = $1, author = $2, deleted = false WHERE id = $3", Time.utc, channel.author, id) |               db.exec("UPDATE channels SET updated = $1, author = $2, deleted = false WHERE id = $3", Time.utc, channel.author, id) | ||||||
|             rescue ex |             rescue ex | ||||||
|               logger.error("RefreshChannelsJob: #{id} : #{ex.message}") |               logger.error("RefreshChannelsJob: #{id} : #{ex.message}") | ||||||
|               if ex.message == "Deleted or invalid channel" |               if ex.message == "Deleted or invalid channel" | ||||||
|                 db.exec("UPDATE channels SET updated = $1, deleted = true WHERE id = $2", Time.utc, id) |                 db.exec("UPDATE channels SET updated = $1, deleted = true WHERE id = $2", Time.utc, id) | ||||||
|               else |               else | ||||||
|                 lim_threads = 1 |                 lim_fibers = 1 | ||||||
|                 logger.error("RefreshChannelsJob: #{id} : backing off for #{backoff}s") |                 logger.error("RefreshChannelsJob: #{id} fiber : backing off for #{backoff}s") | ||||||
|                 sleep backoff |                 sleep backoff | ||||||
|                 if backoff < 1.days |                 if backoff < 1.days | ||||||
|                   backoff += backoff |                   backoff += backoff | ||||||
|  | @ -47,13 +52,15 @@ class Invidious::Jobs::RefreshChannelsJob < Invidious::Jobs::BaseJob | ||||||
|                   backoff = 1.days |                   backoff = 1.days | ||||||
|                 end |                 end | ||||||
|               end |               end | ||||||
|             end |             ensure | ||||||
| 
 |               logger.trace("RefreshChannelsJob: #{id} fiber : Done") | ||||||
|               active_channel.send(true) |               active_channel.send(true) | ||||||
|             end |             end | ||||||
|           end |           end | ||||||
|         end |         end | ||||||
|  |       end | ||||||
| 
 | 
 | ||||||
|  |       logger.debug("RefreshChannelsJob: Done, sleeping for one minute") | ||||||
|       sleep 1.minute |       sleep 1.minute | ||||||
|       Fiber.yield |       Fiber.yield | ||||||
|     end |     end | ||||||
|  |  | ||||||
|  | @ -7,8 +7,8 @@ class Invidious::Jobs::RefreshFeedsJob < Invidious::Jobs::BaseJob | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   def begin |   def begin | ||||||
|     max_threads = config.feed_threads |     max_fibers = config.feed_threads | ||||||
|     active_threads = 0 |     active_fibers = 0 | ||||||
|     active_channel = Channel(Bool).new |     active_channel = Channel(Bool).new | ||||||
| 
 | 
 | ||||||
|     loop do |     loop do | ||||||
|  | @ -17,13 +17,13 @@ class Invidious::Jobs::RefreshFeedsJob < Invidious::Jobs::BaseJob | ||||||
|           email = rs.read(String) |           email = rs.read(String) | ||||||
|           view_name = "subscriptions_#{sha256(email)}" |           view_name = "subscriptions_#{sha256(email)}" | ||||||
| 
 | 
 | ||||||
|           if active_threads >= max_threads |           if active_fibers >= max_fibers | ||||||
|             if active_channel.receive |             if active_channel.receive | ||||||
|               active_threads -= 1 |               active_fibers -= 1 | ||||||
|             end |             end | ||||||
|           end |           end | ||||||
| 
 | 
 | ||||||
|           active_threads += 1 |           active_fibers += 1 | ||||||
|           spawn do |           spawn do | ||||||
|             begin |             begin | ||||||
|               # Drop outdated views |               # Drop outdated views | ||||||
|  |  | ||||||
|  | @ -8,12 +8,12 @@ class Invidious::Jobs::SubscribeToFeedsJob < Invidious::Jobs::BaseJob | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   def begin |   def begin | ||||||
|     max_threads = 1 |     max_fibers = 1 | ||||||
|     if config.use_pubsub_feeds.is_a?(Int32) |     if config.use_pubsub_feeds.is_a?(Int32) | ||||||
|       max_threads = config.use_pubsub_feeds.as(Int32) |       max_fibers = config.use_pubsub_feeds.as(Int32) | ||||||
|     end |     end | ||||||
| 
 | 
 | ||||||
|     active_threads = 0 |     active_fibers = 0 | ||||||
|     active_channel = Channel(Bool).new |     active_channel = Channel(Bool).new | ||||||
| 
 | 
 | ||||||
|     loop do |     loop do | ||||||
|  | @ -21,13 +21,13 @@ class Invidious::Jobs::SubscribeToFeedsJob < Invidious::Jobs::BaseJob | ||||||
|         rs.each do |         rs.each do | ||||||
|           ucid = rs.read(String) |           ucid = rs.read(String) | ||||||
| 
 | 
 | ||||||
|           if active_threads >= max_threads.as(Int32) |           if active_fibers >= max_fibers.as(Int32) | ||||||
|             if active_channel.receive |             if active_channel.receive | ||||||
|               active_threads -= 1 |               active_fibers -= 1 | ||||||
|             end |             end | ||||||
|           end |           end | ||||||
| 
 | 
 | ||||||
|           active_threads += 1 |           active_fibers += 1 | ||||||
| 
 | 
 | ||||||
|           spawn do |           spawn do | ||||||
|             begin |             begin | ||||||
|  |  | ||||||
|  | @ -267,7 +267,7 @@ class Invidious::Routes::Login < Invidious::Routes::BaseRoute | ||||||
|           raise "Couldn't get SID." |           raise "Couldn't get SID." | ||||||
|         end |         end | ||||||
| 
 | 
 | ||||||
|         user, sid = get_user(sid, headers, PG_DB) |         user, sid = get_user(sid, headers, PG_DB, logger) | ||||||
| 
 | 
 | ||||||
|         # We are now logged in |         # We are now logged in | ||||||
|         traceback << "done.<br/>" |         traceback << "done.<br/>" | ||||||
|  |  | ||||||
|  | @ -269,12 +269,12 @@ struct Preferences | ||||||
|   end |   end | ||||||
| end | end | ||||||
| 
 | 
 | ||||||
| def get_user(sid, headers, db, refresh = true) | def get_user(sid, headers, db, logger, refresh = true) | ||||||
|   if email = db.query_one?("SELECT email FROM session_ids WHERE id = $1", sid, as: String) |   if email = db.query_one?("SELECT email FROM session_ids WHERE id = $1", sid, as: String) | ||||||
|     user = db.query_one("SELECT * FROM users WHERE email = $1", email, as: User) |     user = db.query_one("SELECT * FROM users WHERE email = $1", email, as: User) | ||||||
| 
 | 
 | ||||||
|     if refresh && Time.utc - user.updated > 1.minute |     if refresh && Time.utc - user.updated > 1.minute | ||||||
|       user, sid = fetch_user(sid, headers, db) |       user, sid = fetch_user(sid, headers, db, logger) | ||||||
|       user_array = user.to_a |       user_array = user.to_a | ||||||
|       user_array[4] = user_array[4].to_json # User preferences |       user_array[4] = user_array[4].to_json # User preferences | ||||||
|       args = arg_array(user_array) |       args = arg_array(user_array) | ||||||
|  | @ -292,7 +292,7 @@ def get_user(sid, headers, db, refresh = true) | ||||||
|       end |       end | ||||||
|     end |     end | ||||||
|   else |   else | ||||||
|     user, sid = fetch_user(sid, headers, db) |     user, sid = fetch_user(sid, headers, db, logger) | ||||||
|     user_array = user.to_a |     user_array = user.to_a | ||||||
|     user_array[4] = user_array[4].to_json # User preferences |     user_array[4] = user_array[4].to_json # User preferences | ||||||
|     args = arg_array(user.to_a) |     args = arg_array(user.to_a) | ||||||
|  | @ -313,7 +313,7 @@ def get_user(sid, headers, db, refresh = true) | ||||||
|   return user, sid |   return user, sid | ||||||
| end | end | ||||||
| 
 | 
 | ||||||
| def fetch_user(sid, headers, db) | def fetch_user(sid, headers, db, logger) | ||||||
|   feed = YT_POOL.client &.get("/subscription_manager?disable_polymer=1", headers) |   feed = YT_POOL.client &.get("/subscription_manager?disable_polymer=1", headers) | ||||||
|   feed = XML.parse_html(feed.body) |   feed = XML.parse_html(feed.body) | ||||||
| 
 | 
 | ||||||
|  | @ -326,7 +326,7 @@ def fetch_user(sid, headers, db) | ||||||
|     end |     end | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   channels = get_batch_channels(channels, db, false, false) |   channels = get_batch_channels(channels, db, logger, false, false) | ||||||
| 
 | 
 | ||||||
|   email = feed.xpath_node(%q(//a[@class="yt-masthead-picker-header yt-masthead-picker-active-account"])) |   email = feed.xpath_node(%q(//a[@class="yt-masthead-picker-header yt-masthead-picker-active-account"])) | ||||||
|   if email |   if email | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue