limit feeds and delete materialized views

This commit is contained in:
Émilien Devos 2021-10-19 07:12:15 +00:00 committed by GitHub
parent 7f445f6167
commit cf51c5e98f
11 changed files with 41 additions and 140 deletions

View file

@ -0,0 +1,6 @@
CREATE INDEX channel_videos_ucid_published_idx
ON public.channel_videos
USING btree
(ucid COLLATE pg_catalog."default", published);
DROP INDEX channel_videos_ucid_idx;

View file

@ -19,12 +19,12 @@ CREATE TABLE IF NOT EXISTS public.channel_videos
GRANT ALL ON TABLE public.channel_videos TO current_user; GRANT ALL ON TABLE public.channel_videos TO current_user;
-- Index: public.channel_videos_ucid_idx -- Index: public.channel_videos_ucid_published_idx
-- DROP INDEX public.channel_videos_ucid_idx; -- DROP INDEX public.channel_videos_ucid_published_idx;
CREATE INDEX IF NOT EXISTS channel_videos_ucid_idx CREATE INDEX IF NOT EXISTS channel_videos_ucid_published_idx
ON public.channel_videos ON public.channel_videos
USING btree USING btree
(ucid COLLATE pg_catalog."default"); (ucid COLLATE pg_catalog."default", published);

View file

@ -44,7 +44,6 @@ postgresql:
# Adapted from ../config/config.yml # Adapted from ../config/config.yml
config: config:
channel_threads: 1 channel_threads: 1
feed_threads: 1
db: db:
user: kemal user: kemal
password: kemal password: kemal

View file

@ -84,14 +84,6 @@ Kemal.config.extra_options do |parser|
exit exit
end end
end end
parser.on("-f THREADS", "--feed-threads=THREADS", "Number of threads for refreshing feeds (default: #{CONFIG.feed_threads})") do |number|
begin
CONFIG.feed_threads = number.to_i
rescue ex
puts "THREADS must be integer"
exit
end
end
parser.on("-o OUTPUT", "--output=OUTPUT", "Redirect output (default: #{CONFIG.output})") do |output| parser.on("-o OUTPUT", "--output=OUTPUT", "Redirect output (default: #{CONFIG.output})") do |output|
CONFIG.output = output CONFIG.output = output
end end
@ -135,10 +127,6 @@ if CONFIG.channel_threads > 0
Invidious::Jobs.register Invidious::Jobs::RefreshChannelsJob.new(PG_DB) Invidious::Jobs.register Invidious::Jobs::RefreshChannelsJob.new(PG_DB)
end end
if CONFIG.feed_threads > 0
Invidious::Jobs.register Invidious::Jobs::RefreshFeedsJob.new(PG_DB)
end
DECRYPT_FUNCTION = DecryptFunction.new(CONFIG.decrypt_polling) DECRYPT_FUNCTION = DecryptFunction.new(CONFIG.decrypt_polling)
if CONFIG.decrypt_polling if CONFIG.decrypt_polling
Invidious::Jobs.register Invidious::Jobs::UpdateDecryptFunctionJob.new Invidious::Jobs.register Invidious::Jobs::UpdateDecryptFunctionJob.new

View file

@ -62,8 +62,6 @@ class Config
# Time interval between two executions of the job that crawls channel videos (subscriptions update). # Time interval between two executions of the job that crawls channel videos (subscriptions update).
@[YAML::Field(converter: Preferences::TimeSpanConverter)] @[YAML::Field(converter: Preferences::TimeSpanConverter)]
property channel_refresh_interval : Time::Span = 30.minutes property channel_refresh_interval : Time::Span = 30.minutes
# Number of threads to use for updating feeds
property feed_threads : Int32 = 1
# Log file path or STDOUT # Log file path or STDOUT
property output : String = "STDOUT" property output : String = "STDOUT"
# Default log level, valid YAML values are ints and strings, see src/invidious/helpers/logger.cr # Default log level, valid YAML values are ints and strings, see src/invidious/helpers/logger.cr

View file

@ -1,75 +0,0 @@
class Invidious::Jobs::RefreshFeedsJob < Invidious::Jobs::BaseJob
private getter db : DB::Database
def initialize(@db)
end
def begin
max_fibers = CONFIG.feed_threads
active_fibers = 0
active_channel = Channel(Bool).new
loop do
db.query("SELECT email FROM users WHERE feed_needs_update = true OR feed_needs_update IS NULL") do |rs|
rs.each do
email = rs.read(String)
view_name = "subscriptions_#{sha256(email)}"
if active_fibers >= max_fibers
if active_channel.receive
active_fibers -= 1
end
end
active_fibers += 1
spawn do
begin
# Drop outdated views
column_array = Invidious::Database.get_column_array(db, view_name)
ChannelVideo.type_array.each_with_index do |name, i|
if name != column_array[i]?
LOGGER.info("RefreshFeedsJob: DROP MATERIALIZED VIEW #{view_name}")
db.exec("DROP MATERIALIZED VIEW #{view_name}")
raise "view does not exist"
end
end
if !db.query_one("SELECT pg_get_viewdef('#{view_name}')", as: String).includes? "WHERE ((cv.ucid = ANY (u.subscriptions))"
LOGGER.info("RefreshFeedsJob: Materialized view #{view_name} is out-of-date, recreating...")
db.exec("DROP MATERIALIZED VIEW #{view_name}")
end
db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email)
rescue ex
# Rename old views
begin
legacy_view_name = "subscriptions_#{sha256(email)[0..7]}"
db.exec("SELECT * FROM #{legacy_view_name} LIMIT 0")
LOGGER.info("RefreshFeedsJob: RENAME MATERIALIZED VIEW #{legacy_view_name}")
db.exec("ALTER MATERIALIZED VIEW #{legacy_view_name} RENAME TO #{view_name}")
rescue ex
begin
# While iterating through, we may have an email stored from a deleted account
if db.query_one?("SELECT true FROM users WHERE email = $1", email, as: Bool)
LOGGER.info("RefreshFeedsJob: CREATE #{view_name}")
db.exec("CREATE MATERIALIZED VIEW #{view_name} AS #{MATERIALIZED_VIEW_SQL.call(email)}")
db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email)
end
rescue ex
LOGGER.error("RefreshFeedJobs: REFRESH #{email} : #{ex.message}")
end
end
end
active_channel.send(true)
end
end
end
sleep 5.seconds
Fiber.yield
end
end
end

View file

@ -128,10 +128,8 @@ module Invidious::Routes::Account
return error_template(400, ex) return error_template(400, ex)
end end
view_name = "subscriptions_#{sha256(user.email)}"
Invidious::Database::Users.delete(user) Invidious::Database::Users.delete(user)
Invidious::Database::SessionIDs.delete(email: user.email) Invidious::Database::SessionIDs.delete(email: user.email)
PG_DB.exec("DROP MATERIALIZED VIEW #{view_name}")
env.request.cookies.each do |cookie| env.request.cookies.each do |cookie|
cookie.expires = Time.utc(1990, 1, 1) cookie.expires = Time.utc(1990, 1, 1)

View file

@ -430,9 +430,6 @@ module Invidious::Routes::Login
Invidious::Database::Users.insert(user) Invidious::Database::Users.insert(user)
Invidious::Database::SessionIDs.insert(sid, email) Invidious::Database::SessionIDs.insert(sid, email)
view_name = "subscriptions_#{sha256(user.email)}"
PG_DB.exec("CREATE MATERIALIZED VIEW #{view_name} AS #{MATERIALIZED_VIEW_SQL.call(user.email)}")
env.response.cookies["SID"] = Invidious::User::Cookies.sid(CONFIG.domain, sid) env.response.cookies["SID"] = Invidious::User::Cookies.sid(CONFIG.domain, sid)
if env.request.cookies["PREFS"]? if env.request.cookies["PREFS"]?

View file

@ -53,6 +53,8 @@ module Invidious::Routes::Search
user = env.get? "user" user = env.get? "user"
user = user ? user.as(User) : nil
begin begin
search_query, videos, operators = process_search_query(query, page, user, region: region) search_query, videos, operators = process_search_query(query, page, user, region: region)
rescue ex : ChannelSearchException rescue ex : ChannelSearchException

View file

@ -175,11 +175,6 @@ def produce_channel_search_continuation(ucid, query, page)
end end
def process_search_query(query, page, user, region) def process_search_query(query, page, user, region)
if user
user = user.as(Invidious::User)
view_name = "subscriptions_#{sha256(user.email)}"
end
channel = nil channel = nil
content_type = "all" content_type = "all"
date = "" date = ""
@ -217,14 +212,14 @@ def process_search_query(query, page, user, region)
if channel if channel
items = channel_search(search_query, page, channel) items = channel_search(search_query, page, channel)
elsif subscriptions elsif subscriptions
if view_name if user
items = PG_DB.query_all("SELECT id,title,published,updated,ucid,author,length_seconds FROM ( items = PG_DB.query_all("SELECT id,title,published,updated,ucid,author,length_seconds FROM (
SELECT *, SELECT cv.*, to_tsvector(cv.title) || to_tsvector(cv.author) AS document
to_tsvector(#{view_name}.title) || FROM channel_videos cv
to_tsvector(#{view_name}.author) JOIN users ON cv.ucid = any(users.subscriptions)
as document WHERE users.email = $1 AND published > now() - interval '1 month'
FROM #{view_name} ORDER BY published
) v_search WHERE v_search.document @@ plainto_tsquery($1) LIMIT 20 OFFSET $2;", search_query, (page - 1) * 20, as: ChannelVideo) ) v_search WHERE v_search.document @@ plainto_tsquery($2) LIMIT 20 OFFSET $3;", user.email, search_query, (page - 1) * 20, as: ChannelVideo)
else else
items = [] of ChannelVideo items = [] of ChannelVideo
end end

View file

@ -12,24 +12,12 @@ def get_user(sid, headers, refresh = true)
Invidious::Database::Users.insert(user, update_on_conflict: true) Invidious::Database::Users.insert(user, update_on_conflict: true)
Invidious::Database::SessionIDs.insert(sid, user.email, handle_conflicts: true) Invidious::Database::SessionIDs.insert(sid, user.email, handle_conflicts: true)
begin
view_name = "subscriptions_#{sha256(user.email)}"
PG_DB.exec("CREATE MATERIALIZED VIEW #{view_name} AS #{MATERIALIZED_VIEW_SQL.call(user.email)}")
rescue ex
end
end end
else else
user, sid = fetch_user(sid, headers) user, sid = fetch_user(sid, headers)
Invidious::Database::Users.insert(user, update_on_conflict: true) Invidious::Database::Users.insert(user, update_on_conflict: true)
Invidious::Database::SessionIDs.insert(sid, user.email, handle_conflicts: true) Invidious::Database::SessionIDs.insert(sid, user.email, handle_conflicts: true)
begin
view_name = "subscriptions_#{sha256(user.email)}"
PG_DB.exec("CREATE MATERIALIZED VIEW #{view_name} AS #{MATERIALIZED_VIEW_SQL.call(user.email)}")
rescue ex
end
end end
return user, sid return user, sid
@ -128,7 +116,6 @@ def get_subscription_feed(user, max_results = 40, page = 1)
offset = (page - 1) * limit offset = (page - 1) * limit
notifications = Invidious::Database::Users.select_notifications(user) notifications = Invidious::Database::Users.select_notifications(user)
view_name = "subscriptions_#{sha256(user.email)}"
if user.preferences.notifications_only && !notifications.empty? if user.preferences.notifications_only && !notifications.empty?
# Only show notifications # Only show notifications
@ -154,33 +141,39 @@ def get_subscription_feed(user, max_results = 40, page = 1)
# Show latest video from a channel that a user hasn't watched # Show latest video from a channel that a user hasn't watched
# "unseen_only" isn't really correct here, more accurate would be "unwatched_only" # "unseen_only" isn't really correct here, more accurate would be "unwatched_only"
if user.watched.empty? # "SELECT cv.* FROM channel_videos cv JOIN users ON cv.ucid = any(users.subscriptions) WHERE users.email = $1 AND published > now() - interval '1 month' ORDER BY published DESC"
values = "'{}'" # "SELECT DISTINCT ON (cv.ucid) cv.* FROM channel_videos cv JOIN users ON cv.ucid = any(users.subscriptions) WHERE users.email = ? AND NOT cv.id = any(users.watched) AND published > now() - interval '1 month' ORDER BY ucid, published DESC"
else videos = PG_DB.query_all("SELECT DISTINCT ON (cv.ucid) cv.* " \
values = "VALUES #{user.watched.map { |id| %(('#{id}')) }.join(",")}" "FROM channel_videos cv " \
end "JOIN users ON cv.ucid = any(users.subscriptions) " \
videos = PG_DB.query_all("SELECT DISTINCT ON (ucid) * FROM #{view_name} WHERE NOT id = ANY (#{values}) ORDER BY ucid, published DESC", as: ChannelVideo) "WHERE users.email = $1 AND NOT cv.id = any(users.watched) AND published > now() - interval '1 month' " \
"ORDER BY ucid, published DESC", user.email, as: ChannelVideo)
else else
# Show latest video from each channel # Show latest video from each channel
videos = PG_DB.query_all("SELECT DISTINCT ON (ucid) * FROM #{view_name} ORDER BY ucid, published DESC", as: ChannelVideo) videos = PG_DB.query_all("SELECT DISTINCT ON (cv.ucid) cv.* " \
"FROM channel_videos cv " \
"JOIN users ON cv.ucid = any(users.subscriptions) " \
"WHERE users.email = $1 AND published > now() - interval '1 month' " \
"ORDER BY ucid, published DESC", user.email, as: ChannelVideo)
end end
videos.sort_by!(&.published).reverse! videos.sort_by!(&.published).reverse!
else else
if user.preferences.unseen_only if user.preferences.unseen_only
# Only show unwatched # Only show unwatched
videos = PG_DB.query_all("SELECT cv.* " \
if user.watched.empty? "FROM channel_videos cv " \
values = "'{}'" "JOIN users ON cv.ucid = any(users.subscriptions) " \
else "WHERE users.email = $1 AND NOT cv.id = any(users.watched) AND published > now() - interval '1 month' " \
values = "VALUES #{user.watched.map { |id| %(('#{id}')) }.join(",")}" "ORDER BY published DESC LIMIT $2 OFFSET $3", user.email, limit, offset, as: ChannelVideo)
end
videos = PG_DB.query_all("SELECT * FROM #{view_name} WHERE NOT id = ANY (#{values}) ORDER BY published DESC LIMIT $1 OFFSET $2", limit, offset, as: ChannelVideo)
else else
# Sort subscriptions as normal # Sort subscriptions as normal
videos = PG_DB.query_all("SELECT cv.* " \
videos = PG_DB.query_all("SELECT * FROM #{view_name} ORDER BY published DESC LIMIT $1 OFFSET $2", limit, offset, as: ChannelVideo) "FROM channel_videos cv " \
"JOIN users ON cv.ucid = any(users.subscriptions) " \
"WHERE users.email = $1 AND published > now() - interval '1 month' " \
"ORDER BY published DESC LIMIT $2 OFFSET $3", user.email, limit, offset, as: ChannelVideo)
end end
end end