Update QUICPool

This commit is contained in:
Omar Roth 2019-11-24 13:41:47 -05:00
parent 276bf09238
commit 0e3a48ff76
No known key found for this signature in database
GPG key ID: B8254FB7EC3D37F2
4 changed files with 44 additions and 100 deletions

View file

@ -26,7 +26,7 @@ dependencies:
version: ~> 0.1.2
lsquic:
github: omarroth/lsquic.cr
version: ~> 0.1.3
version: ~> 0.1.4
crystal: 0.31.1

View file

@ -53,8 +53,8 @@ CHARS_SAFE = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz012345
TEST_IDS = {"AgbeGFYluEA", "BaW_jenozKc", "a9LDPn-MO4I", "ddFvjfvPnqk", "iqKdEhx-dD4"}
MAX_ITEMS_PER_PAGE = 1500
REQUEST_HEADERS_WHITELIST = {"Accept", "Accept-Encoding", "Cache-Control", "Connection", "Content-Length", "If-None-Match", "Range"}
RESPONSE_HEADERS_BLACKLIST = {"Access-Control-Allow-Origin", "Alt-Svc", "Server"}
REQUEST_HEADERS_WHITELIST = {"accept", "accept-encoding", "cache-control", "content-length", "if-none-match", "range"}
RESPONSE_HEADERS_BLACKLIST = {"access-control-allow-origin", "alt-svc", "server"}
HTTP_CHUNK_SIZE = 10485760 # ~10MB
CURRENT_BRANCH = {{ "#{`git branch | sed -n '/\* /s///p'`.strip}" }}
@ -95,7 +95,7 @@ LOCALES = {
}
YT_POOL = QUICPool.new(YT_URL, capacity: CONFIG.pool_size, timeout: 0.05)
YT_IMG_POOL = HTTPPool.new(YT_IMG_URL, capacity: CONFIG.pool_size, timeout: 0.05)
YT_IMG_POOL = QUICPool.new(YT_IMG_URL, capacity: CONFIG.pool_size, timeout: 0.05)
config = CONFIG
logger = Invidious::LogHandler.new
@ -1448,7 +1448,7 @@ post "/login" do |env|
# See https://github.com/ytdl-org/youtube-dl/blob/2019.04.07/youtube_dl/extractor/youtube.py#L82
# TODO: Convert to QUIC
begin
client = make_client(LOGIN_URL)
client = QUIC::Client.new(LOGIN_URL)
headers = HTTP::Headers.new
login_page = client.get("/ServiceLogin")
@ -1471,7 +1471,6 @@ post "/login" do |env|
headers["Content-Type"] = "application/x-www-form-urlencoded;charset=utf-8"
headers["Google-Accounts-XSRF"] = "1"
headers["User-Agent"] = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36"
response = client.post("/_/signin/sl/lookup", headers, login_req(lookup_req))
lookup_results = JSON.parse(response.body[5..-1])
@ -1645,28 +1644,31 @@ post "/login" do |env|
traceback << "Logging in..."
location = challenge_results[0][-1][2].to_s
location = URI.parse(challenge_results[0][-1][2].to_s)
cookies = HTTP::Cookies.from_headers(headers)
headers.delete("Content-Type")
headers.delete("Google-Accounts-XSRF")
loop do
if !location || location.includes? "/ManageAccount"
if !location || location.path == "/ManageAccount"
break
end
# Occasionally there will be a second page after login confirming
# the user's phone number ("/b/0/SmsAuthInterstitial"), which we currently don't handle.
if location.includes? "/b/0/SmsAuthInterstitial"
if location.path.starts_with? "/b/0/SmsAuthInterstitial"
traceback << "Unhandled dialog /b/0/SmsAuthInterstitial."
end
login = client.get(location, headers)
headers = login.cookies.add_request_headers(headers)
login = client.get(location.full_path, headers)
cookies = HTTP::Cookies.from_headers(headers)
location = login.headers["Location"]?
headers = login.cookies.add_request_headers(headers)
location = login.headers["Location"]?.try { |u| URI.parse(u) }
end
cookies = HTTP::Cookies.from_headers(headers)
sid = cookies["SID"]?.try &.value
if !sid
raise "Couldn't get SID."
@ -5534,7 +5536,7 @@ get "/videoplayback" do |env|
client = make_client(URI.parse(host), region)
client.get(url, headers) do |response|
response.headers.each do |key, value|
if !RESPONSE_HEADERS_BLACKLIST.includes?(key)
if !RESPONSE_HEADERS_BLACKLIST.includes?(key.downcase)
env.response.headers[key] = value
end
end
@ -5602,7 +5604,7 @@ get "/videoplayback" do |env|
end
response.headers.each do |key, value|
if !RESPONSE_HEADERS_BLACKLIST.includes?(key) && key != "Content-Range"
if !RESPONSE_HEADERS_BLACKLIST.includes?(key.downcase) && key.downcase != "content-range"
env.response.headers[key] = value
end
end
@ -5666,7 +5668,7 @@ get "/ggpht/*" do |env|
client.get(url, headers) do |response|
env.response.status_code = response.status_code
response.headers.each do |key, value|
if !RESPONSE_HEADERS_BLACKLIST.includes? key
if !RESPONSE_HEADERS_BLACKLIST.includes?(key.downcase)
env.response.headers[key] = value
end
end
@ -5716,7 +5718,7 @@ get "/sb/:id/:storyboard/:index" do |env|
client.get(url, headers) do |response|
env.response.status_code = response.status_code
response.headers.each do |key, value|
if !RESPONSE_HEADERS_BLACKLIST.includes? key
if !RESPONSE_HEADERS_BLACKLIST.includes?(key.downcase)
env.response.headers[key] = value
end
end
@ -5753,7 +5755,7 @@ get "/s_p/:id/:name" do |env|
client.get(url, headers) do |response|
env.response.status_code = response.status_code
response.headers.each do |key, value|
if !RESPONSE_HEADERS_BLACKLIST.includes? key
if !RESPONSE_HEADERS_BLACKLIST.includes?(key.downcase)
env.response.headers[key] = value
end
end
@ -5783,7 +5785,7 @@ get "/yts/img/:name" do |env|
YT_POOL.client &.get(env.request.resource, headers) do |response|
env.response.status_code = response.status_code
response.headers.each do |key, value|
if !RESPONSE_HEADERS_BLACKLIST.includes? key
if !RESPONSE_HEADERS_BLACKLIST.includes?(key.downcase)
env.response.headers[key] = value
end
end
@ -5826,7 +5828,7 @@ get "/vi/:id/:name" do |env|
YT_IMG_POOL.client &.get(url, headers) do |response|
env.response.status_code = response.status_code
response.headers.each do |key, value|
if !RESPONSE_HEADERS_BLACKLIST.includes? key
if !RESPONSE_HEADERS_BLACKLIST.includes?(key.downcase)
env.response.headers[key] = value
end
end

View file

@ -127,8 +127,6 @@ def subscribe_to_feeds(db, logger, key, config)
end
max_channel = Channel(Int32).new
client_pool = HTTPPool.new(PUBSUB_URL, capacity: max_threads, timeout: 0.05)
spawn do
max_threads = max_channel.receive
active_threads = 0
@ -149,7 +147,7 @@ def subscribe_to_feeds(db, logger, key, config)
spawn do
begin
response = subscribe_pubsub(ucid, key, config, client_pool)
response = subscribe_pubsub(ucid, key, config)
if response.status_code >= 400
logger.puts("#{ucid} : #{response.body}")

View file

@ -11,11 +11,11 @@ def add_yt_headers(request)
request.headers["cookie"] = "#{(CONFIG.cookies.map { |c| "#{c.name}=#{c.value}" }).join("; ")}; #{request.headers["cookie"]?}"
end
struct HTTPPool
struct QUICPool
property! url : URI
property! capacity : Int32
property! timeout : Float64
property pool : ConnectionPool(HTTPClient)
property pool : ConnectionPool(QUIC::Client)
def initialize(url : URI, @capacity = 5, @timeout = 5.0)
@url = url
@ -23,91 +23,35 @@ struct HTTPPool
end
def client(region = nil, &block)
conn = pool.checkout
begin
if region
PROXY_LIST[region]?.try &.sample(40).each do |proxy|
begin
proxy = HTTPProxy.new(proxy_host: proxy[:ip], proxy_port: proxy[:port])
conn.set_proxy(proxy)
break
rescue ex
end
end
end
if region
conn = make_client(url, region)
response = yield conn
if region
conn.unset_proxy
else
conn = pool.checkout
begin
response = yield conn
rescue ex
conn.destroy_engine
conn = QUIC::Client.new(url)
conn.before_request { |r| add_yt_headers(r) } if url.host == "www.youtube.com"
response = yield conn
ensure
pool.checkin(conn)
end
response
rescue ex
conn = HTTPClient.new(url)
conn.before_request { |r| add_yt_headers(r) } if url.host == "www.youtube.com"
conn.family = (url.host == "www.youtube.com" || url.host == "suggestqueries.google.com") ? CONFIG.force_resolve : Socket::Family::UNSPEC
conn.read_timeout = 10.seconds
conn.connect_timeout = 10.seconds
yield conn
ensure
pool.checkin(conn)
end
response
end
private def build_pool
ConnectionPool(HTTPClient).new(capacity: capacity, timeout: timeout) do
client = HTTPClient.new(url)
ConnectionPool(QUIC::Client).new(capacity: capacity, timeout: timeout) do
client = QUIC::Client.new(url)
client.before_request { |r| add_yt_headers(r) } if url.host == "www.youtube.com"
client.family = (url.host == "www.youtube.com" || url.host == "suggestqueries.google.com") ? CONFIG.force_resolve : Socket::Family::UNSPEC
client.read_timeout = 10.seconds
client.connect_timeout = 10.seconds
client
end
end
end
struct QUICPool
property! url : URI
property! capacity : Int32
property! timeout : Float64
def initialize(url : URI, @capacity = 5, @timeout = 5.0)
@url = url
end
def client(region = nil, &block)
begin
if region
client = HTTPClient.new(url)
client.before_request { |r| add_yt_headers(r) } if url.host == "www.youtube.com"
client.read_timeout = 10.seconds
client.connect_timeout = 10.seconds
PROXY_LIST[region]?.try &.sample(40).each do |proxy|
begin
proxy = HTTPProxy.new(proxy_host: proxy[:ip], proxy_port: proxy[:port])
client.set_proxy(proxy)
break
rescue ex
end
end
yield client
else
conn = QUIC::Client.new(url)
conn.before_request { |r| add_yt_headers(r) } if url.host == "www.youtube.com"
yield conn
end
rescue ex
conn = QUIC::Client.new(url)
conn.before_request { |r| add_yt_headers(r) } if url.host == "www.youtube.com"
yield conn
end
end
end
# See http://www.evanmiller.org/how-not-to-sort-by-average-rating.html
def ci_lower_bound(pos, n)
if n == 0
@ -419,7 +363,7 @@ def sha256(text)
return digest.hexdigest
end
def subscribe_pubsub(topic, key, config, client_pool)
def subscribe_pubsub(topic, key, config)
case topic
when .match(/^UC[A-Za-z0-9_-]{22}$/)
topic = "channel_id=#{topic}"
@ -446,7 +390,7 @@ def subscribe_pubsub(topic, key, config, client_pool)
"hub.secret" => key.to_s,
}
return client_pool.client &.post("/subscribe", form: body)
return make_client(PUBSUB_URL).post("/subscribe", form: body)
end
def parse_range(range)