Add 'needs_update' column for scheduling feed refresh

This commit is contained in:
Omar Roth 2019-06-01 10:19:18 -05:00
parent 701b5ea561
commit 18d66ddded
No known key found for this signature in database
GPG key ID: B8254FB7EC3D37F2
7 changed files with 46 additions and 125 deletions

View file

@ -184,7 +184,7 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil)
views: views,
)
users = db.query_all("UPDATE users SET notifications = notifications || $1 \
emails = db.query_all("UPDATE users SET notifications = notifications || $1 \
WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications) RETURNING email",
video.id, video.published, ucid, as: String)
@ -198,13 +198,14 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil)
updated = $4, ucid = $5, author = $6, length_seconds = $7, \
live_now = $8, views = $10", video_array)
users.each do |user|
payload = {
"email" => user,
"action" => "refresh",
}.to_json
PG_DB.exec("NOTIFY feeds, E'#{payload}'")
# Update all users affected by insert
if emails.empty?
values = "'{}'"
else
values = "VALUES #{emails.map { |id| %(('#{id}')) }.join(",")}"
end
db.exec("UPDATE users SET feed_needs_update = true WHERE email = ANY($1)", emails)
end
if pull_all_videos
@ -252,7 +253,7 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil)
# We are notified of Red videos elsewhere (PubSub), which includes a correct published date,
# so since they don't provide a published date here we can safely ignore them.
if Time.now - video.published > 1.minute
users = db.query_all("UPDATE users SET notifications = notifications || $1 \
emails = db.query_all("UPDATE users SET notifications = notifications || $1 \
WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications) RETURNING email",
video.id, video.published, video.ucid, as: String)
@ -266,13 +267,13 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil)
live_now = $8, views = $10", video_array)
# Update all users affected by insert
users.each do |user|
payload = {
"email" => user,
"action" => "refresh",
}.to_json
PG_DB.exec("NOTIFY feeds, E'#{payload}'")
if emails.empty?
values = "'{}'"
else
values = "VALUES #{emails.map { |id| %(('#{id}')) }.join(",")}"
end
db.exec("UPDATE users SET feed_needs_update = true WHERE email = ANY($1)", emails)
end
end

View file

@ -105,7 +105,6 @@ struct Config
hmac_key: String?, # HMAC signing key for CSRF tokens and verifying pubsub subscriptions
domain: String?, # Domain to be used for links to resources on the site where an absolute URL is required
use_pubsub_feeds: {type: Bool | Int32, default: false}, # Subscribe to channels using PubSubHubbub (requires domain, hmac_key)
use_feed_events: {type: Bool | Int32, default: false}, # Update feeds on receiving notifications
default_home: {type: String, default: "Top"},
feed_menu: {type: Array(String), default: ["Popular", "Top", "Trending", "Subscriptions"]},
top_enabled: {type: Bool, default: true},

View file

@ -43,66 +43,6 @@ def refresh_channels(db, logger, config)
end
def refresh_feeds(db, logger, config)
# Spawn thread to handle feed events
if config.use_feed_events
case config.use_feed_events
when Bool
max_feed_event_threads = config.use_feed_events.as(Bool).to_unsafe
when Int32
max_feed_event_threads = config.use_feed_events.as(Int32)
end
max_feed_event_channel = Channel(Int32).new
spawn do
queue = Deque(String).new(30)
PG.connect_listen(PG_URL, "feeds") do |event|
if !queue.includes? event.payload
queue << event.payload
end
end
max_threads = max_feed_event_channel.receive
active_threads = 0
active_channel = Channel(Bool).new
loop do
until queue.empty?
event = queue.shift
if active_threads >= max_threads
if active_channel.receive
active_threads -= 1
end
end
active_threads += 1
spawn do
begin
feed = JSON.parse(event)
email = feed["email"].as_s
action = feed["action"].as_s
view_name = "subscriptions_#{sha256(email)}"
case action
when "refresh"
db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
end
rescue ex
end
active_channel.send(true)
end
end
sleep 5.seconds
end
end
max_feed_event_channel.send(max_feed_event_threads.as(Int32))
end
max_channel = Channel(Int32).new
spawn do
max_threads = max_channel.receive
@ -110,7 +50,7 @@ def refresh_feeds(db, logger, config)
active_channel = Channel(Bool).new
loop do
db.query("SELECT email FROM users") do |rs|
db.query("SELECT email FROM users WHERE feed_needs_update = true OR feed_needs_update IS NULL") do |rs|
rs.each do
email = rs.read(String)
view_name = "subscriptions_#{sha256(email)}"
@ -135,6 +75,7 @@ def refresh_feeds(db, logger, config)
end
db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email)
rescue ex
# Rename old views
begin
@ -152,6 +93,7 @@ def refresh_feeds(db, logger, config)
SELECT * FROM channel_videos WHERE \
ucid = ANY ((SELECT subscriptions FROM users WHERE email = E'#{email.gsub("'", "\\'")}')::text[]) \
ORDER BY published DESC;")
db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email)
end
rescue ex
logger.write("REFRESH #{email} : #{ex.message}\n")
@ -164,7 +106,7 @@ def refresh_feeds(db, logger, config)
end
end
sleep 1.minute
sleep 5.seconds
end
end

View file

@ -20,9 +20,10 @@ struct User
type: Preferences,
converter: PreferencesConverter,
},
password: String?,
token: String,
watched: Array(String),
password: String?,
token: String,
watched: Array(String),
feed_needs_update: Bool?,
})
end
@ -205,7 +206,7 @@ def fetch_user(sid, headers, db)
token = Base64.urlsafe_encode(Random::Secure.random_bytes(32))
user = User.new(Time.now, [] of String, channels, email, CONFIG.default_user_preferences, nil, token, [] of String)
user = User.new(Time.now, [] of String, channels, email, CONFIG.default_user_preferences, nil, token, [] of String, true)
return user, sid
end
@ -213,7 +214,7 @@ def create_user(sid, email, password)
password = Crypto::Bcrypt::Password.create(password, cost: 10)
token = Base64.urlsafe_encode(Random::Secure.random_bytes(32))
user = User.new(Time.now, [] of String, [] of String, email, CONFIG.default_user_preferences, password.to_s, token, [] of String)
user = User.new(Time.now, [] of String, [] of String, email, CONFIG.default_user_preferences, password.to_s, token, [] of String, true)
return user, sid
end