Merge branch 'main' into feature-parity-with-ytproxy

This commit is contained in:
Andrea Spacca 2023-12-02 09:24:10 +09:00 committed by GitHub
commit 1a3884cad2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 1779 additions and 646 deletions

View file

@ -14,7 +14,16 @@ jobs:
build: build:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- uses: Swatinem/rust-cache@v2 - uses: Swatinem/rust-cache@v2
- uses: rui314/setup-mold@v1
- name: Set up NASM
uses: ilammy/setup-nasm@v1.4.0
- name: Build - name: Build
run: cargo build --release run: RUSTFLAGS='-C target-feature=+crt-static' cargo build --release --target x86_64-unknown-linux-gnu
- run: mv target/x86_64-unknown-linux-gnu/release/piped-proxy piped-proxy
- name: Upload artifact
uses: actions/upload-artifact@v3
with:
name: piped-proxy
path: piped-proxy

16
.github/workflows/reviewdog.yml vendored Normal file
View file

@ -0,0 +1,16 @@
name: reviewdog / clippy
on: pull_request
jobs:
clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install clippy
uses: dtolnay/rust-toolchain@stable
with:
components: clippy
- uses: Swatinem/rust-cache@v2
- uses: sksat/action-clippy@main
with:
reporter: github-pr-review

1716
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -6,11 +6,41 @@ version = "0.1.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
actix-web = "4.3.1" # Web Requests & Async Runtime
image = "0.24.5" tokio = { version = "1.34.0", features = ["full"] }
lazy_static = "1.4.0" actix-web = "4.4.0"
reqwest = { version = "0.11.22", features = ["stream", "brotli", "gzip", "socks"], default-features = false }
qstring = "0.7.2" qstring = "0.7.2"
regex = "1.7.1"
reqwest = {version = "0.11.14", features = ["rustls-tls", "stream", "brotli", "gzip"], default-features = false} # Alternate Allocator
tokio = {version = "1.25.0", features = ["full"]} mimalloc = { version = "0.1.39", optional = true }
webp = "0.2.2"
# Transcoding Images to WebP/AVIF to save bandwidth
image = { version = "0.24.7", features = ["jpeg", "jpeg_rayon", "webp"], default-features = false, optional = true }
libwebp-sys = { version = "0.9.4", optional = true }
ravif = { version = "0.11.3", optional = true }
rgb = { version = "0.8.37", optional = true }
once_cell = "1.18.0"
regex = "1.10.2"
blake3 = { version = "1.5.0", optional = true }
bytes = "1.5.0"
futures-util = "0.3.29"
[features]
default = ["webp", "mimalloc", "reqwest-rustls", "qhash"]
reqwest-rustls = ["reqwest/rustls-tls"]
reqwest-native-tls = ["reqwest/default-tls"]
avif = ["dep:ravif", "dep:rgb", "dep:image"]
webp = ["dep:libwebp-sys", "dep:image"]
mimalloc = ["dep:mimalloc"]
optimized = ["libwebp-sys?/sse41", "libwebp-sys?/avx2", "libwebp-sys?/neon"]
qhash = ["blake3"]
[profile.release]
lto = true

View file

@ -4,6 +4,12 @@ WORKDIR /app/
COPY . . COPY . .
RUN --mount=type=cache,target=/var/cache/apt \
apt-get update && \
apt-get install -y --no-install-recommends \
nasm && \
rm -rf /var/lib/apt/lists/*
RUN --mount=type=cache,target=/usr/local/cargo/registry \ RUN --mount=type=cache,target=/usr/local/cargo/registry \
--mount=type=cache,target=/app/target/ \ --mount=type=cache,target=/app/target/ \
cargo build --release && \ cargo build --release && \
@ -20,7 +26,6 @@ RUN --mount=type=cache,target=/var/cache/apt \
WORKDIR /app/ WORKDIR /app/
COPY --from=BUILD /app/piped-proxy . COPY --from=BUILD /app/piped-proxy .
RUN mkdir -p /app/socket/
EXPOSE 8080 EXPOSE 8080

View file

@ -7,11 +7,15 @@
"ignorePresets": [ "ignorePresets": [
":prHourlyLimit2" ":prHourlyLimit2"
], ],
"platformAutomerge": true,
"packageRules": [ "packageRules": [
{ {
"matchUpdateTypes": ["minor", "patch", "pin", "digest"], "matchUpdateTypes": ["minor", "patch", "pin", "digest"],
"automerge": true, "automerge": true
"platformAutomerge": true
} }
] ],
"lockFileMaintenance": {
"enabled": true,
"automerge": true
}
} }

View file

@ -1,15 +1,28 @@
use std::env;
use std::error::Error;
use std::string::ToString;
use actix_web::{App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer, web};
use actix_web::dev::ResourcePath;
use actix_web::http::Method; use actix_web::http::Method;
use image::EncodableLayout; use actix_web::{web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer};
use lazy_static::lazy_static; use once_cell::sync::Lazy;
use qstring::QString; use qstring::QString;
use regex::Regex; use regex::Regex;
use reqwest::{Client, Request, Url}; use reqwest::Error as ReqwestError;
use reqwest::{Body, Client, Request, Url};
use std::collections::BTreeMap;
use std::error::Error;
use std::io::ErrorKind;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{env, io};
#[cfg(not(any(feature = "reqwest-native-tls", feature = "reqwest-rustls")))]
compile_error!("feature \"reqwest-native-tls\" or \"reqwest-rustls\" must be set for proxy to have TLS support");
use bytes::{Bytes, BytesMut};
use futures_util::Stream;
#[cfg(any(feature = "webp", feature = "avif", feature = "qhash"))]
use tokio::task::spawn_blocking;
#[cfg(feature = "mimalloc")]
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
@ -17,45 +30,64 @@ async fn main() -> std::io::Result<()> {
let server = HttpServer::new(|| { let server = HttpServer::new(|| {
// match all requests // match all requests
App::new() App::new().default_service(web::to(index))
.default_service(web::to(index))
}); });
// get port from env
if env::var("UDS").is_ok() { // get socket/port from env
server.bind_uds("./socket/actix.sock")? // backwards compat when only UDS is set
if get_env_bool("UDS") {
let socket_path =
env::var("BIND_UNIX").unwrap_or_else(|_| "./socket/actix.sock".to_string());
server.bind_uds(socket_path)?
} else { } else {
let bind = env::var("BIND").unwrap_or_else(|_| "0.0.0.0:8080".to_string()); let bind = env::var("BIND").unwrap_or_else(|_| "0.0.0.0:8080".to_string());
server.bind(bind)? server.bind(bind)?
}.run().await }
.run()
.await
} }
lazy_static!( static PREFIX_PATH: Lazy<String> = Lazy::new(|| String = env::var("PREFIX_PATH").unwrap_or_else(|_| "".to_string());
static ref RE_DOMAIN: Regex = Regex::new(r"^(?:[a-z\d\.-]*\.)?((?:[a-z\d-]*)\.(?:[a-z\d-]*))$").unwrap(); static RE_DOMAIN: Lazy<Regex> =
static ref RE_MANIFEST: Regex = Regex::new("(?m)URI=\"([^\"]+)\"").unwrap(); Lazy::new(|| Regex::new(r"^(?:[a-z\d.-]*\.)?([a-z\d-]*\.[a-z\d-]*)$").unwrap());
static ref RE_DASH_MANIFEST: Regex = Regex::new("BaseURL>(https://[^<]+)</BaseURL").unwrap(); static RE_MANIFEST: Lazy<Regex> = Lazy::new(|| Regex::new("(?m)URI=\"([^\"]+)\"").unwrap());
); static RE_DASH_MANIFEST: Lazy<Regex> =
Lazy::new(|| Regex::new("BaseURL>(https://[^<]+)</BaseURL").unwrap());
lazy_static!( static CLIENT: Lazy<Client> = Lazy::new(|| {
static ref CLIENT: Client = { let builder = Client::builder()
let builder = Client::builder() .user_agent("Mozilla/5.0 (Windows NT 10.0; rv:102.0) Gecko/20100101 Firefox/102.0");
.user_agent("Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0");
if env::var("IPV4_ONLY").is_ok() { let proxy = if let Ok(proxy) = env::var("PROXY") {
builder Some(reqwest::Proxy::all(proxy).unwrap())
} else {
None
};
let builder = if let Some(proxy) = proxy {
// proxy basic auth
if let Ok(proxy_auth_user) = env::var("PROXY_USER") {
let proxy_auth_pass = env::var("PROXY_PASS").unwrap_or_default();
builder.proxy(proxy.basic_auth(&proxy_auth_user, &proxy_auth_pass))
} else {
builder.proxy(proxy)
}
} else {
builder
};
if get_env_bool("IPV4_ONLY") {
builder
.local_address(Some("0.0.0.0".parse().unwrap())) .local_address(Some("0.0.0.0".parse().unwrap()))
.build() .build()
.unwrap() .unwrap()
} else { } else {
builder.build().unwrap() builder.build().unwrap()
} }
}; });
);
lazy_static!( const ANDROID_USER_AGENT: &str = "com.google.android.youtube/1537338816 (Linux; U; Android 13; en_US; ; Build/TQ2A.230505.002; Cronet/113.0.5672.24)";
static ref PREFIX_PATH: String = env::var("PREFIX_PATH").unwrap_or_else(|_| "".to_string()); const ALLOWED_DOMAINS: [&str; 8] = [
);
const ALLOWED_DOMAINS: [&str; 7] = [
"youtube.com", "youtube.com",
"googlevideo.com", "googlevideo.com",
"ytimg.com", "ytimg.com",
@ -63,6 +95,7 @@ const ALLOWED_DOMAINS: [&str; 7] = [
"googleusercontent.com", "googleusercontent.com",
"lbryplayer.xyz", "lbryplayer.xyz",
"odycdn.com", "odycdn.com",
"ajay.app",
]; ];
@ -79,19 +112,29 @@ fn is_header_allowed(header: &str) -> bool {
return false; return false;
} }
!matches!(header, "host" | !matches!(
"authorization" | header,
"origin" | | "host"
"referer" | | "authorization"
"cookie" | | "origin"
"etag" | | "referer"
"content-length" | | "cookie"
"set-cookie" | | "etag"
"alt-svc" | | "content-length"
"accept-ch" | | "set-cookie"
"report-to" | | "alt-svc"
"strict-transport-security" | | "accept-ch"
"user-agent") | "report-to"
| "strict-transport-security"
| "user-agent"
)
}
fn get_env_bool(key: &str) -> bool {
match env::var(key) {
Ok(val) => val.to_lowercase() == "true" || val == "1",
Err(_) => false,
}
} }
async fn index(req: HttpRequest) -> Result<HttpResponse, Box<dyn Error>> { async fn index(req: HttpRequest) -> Result<HttpResponse, Box<dyn Error>> {
@ -106,24 +149,82 @@ async fn index(req: HttpRequest) -> Result<HttpResponse, Box<dyn Error>> {
} }
// parse query string // parse query string
let query = QString::from(req.query_string()); let mut query = QString::from(req.query_string());
#[cfg(feature = "qhash")]
{
use std::collections::BTreeSet;
let secret = env::var("HASH_SECRET");
if let Ok(secret) = secret {
let qhash = query.get("qhash");
if qhash.is_none() {
return Err("No qhash provided".into());
}
let qhash = qhash.unwrap();
if qhash.len() != 8 {
return Err("Invalid qhash provided".into());
}
let path = req.path().as_bytes().to_owned();
// Store sorted key-value pairs
let mut set = BTreeSet::new();
{
let pairs = query.to_pairs();
for (key, value) in &pairs {
if matches!(*key, "qhash" | "range" | "rewrite") {
continue;
}
set.insert((key.as_bytes().to_owned(), value.as_bytes().to_owned()));
}
}
let hash = spawn_blocking(move || {
let mut hasher = blake3::Hasher::new();
for (key, value) in set {
hasher.update(&key);
hasher.update(&value);
}
hasher.update(&path);
hasher.update(secret.as_bytes());
let hash = hasher.finalize().to_hex();
hash[..8].to_owned()
})
.await
.unwrap();
if hash != qhash {
return Err("Invalid qhash provided".into());
}
}
}
let res = query.get("host"); let res = query.get("host");
let res = res.map(|s| s.to_string());
let rewrite = {
if let Some(rewrite) = query.get("rewrite") {
rewrite == "true"
} else {
true
}
};
if res.is_none() { if res.is_none() {
return Err("No host provided".into()); return Err("No host provided".into());
} }
#[cfg(any(feature = "webp", feature = "avif"))]
let disallow_image_transcoding = get_env_bool("DISALLOW_IMAGE_TRANSCODING");
let rewrite = query.get("rewrite") != Some("false");
#[cfg(feature = "avif")]
let avif = query.get("avif") == Some("true");
let host = res.unwrap(); let host = res.unwrap();
let domain = RE_DOMAIN.captures(host); let domain = RE_DOMAIN.captures(host.as_str());
if domain.is_none() { if domain.is_none() {
return Err("Invalid host provided".into()); return Err("Invalid host provided".into());
@ -131,24 +232,30 @@ async fn index(req: HttpRequest) -> Result<HttpResponse, Box<dyn Error>> {
let domain = domain.unwrap().get(1).unwrap().as_str(); let domain = domain.unwrap().get(1).unwrap().as_str();
let mut allowed = false; if !ALLOWED_DOMAINS.contains(&domain) {
for allowed_domain in ALLOWED_DOMAINS.iter() {
if &domain == allowed_domain {
allowed = true;
break;
}
}
if !allowed {
return Err("Domain not allowed".into()); return Err("Domain not allowed".into());
} }
let video_playback = req.path().eq("/videoplayback");
let is_android = video_playback && query.get("c").unwrap_or("").eq("ANDROID");
let is_ump = video_playback && query.get("ump").is_some();
let mime_type = query.get("mime").map(|s| s.to_string());
if is_ump && !query.has("range") {
if let Some(range) = req.headers().get("range") {
let range = range.to_str().unwrap();
let range = range.replace("bytes=", "");
query.add_pair(("range", range));
}
}
let qs = { let qs = {
let qs = query.clone(); let collected = query
let collected = qs.into_pairs() .into_pairs()
.into_iter() .into_iter()
.filter(|(key, _)| key != "host" && key != "rewrite") .filter(|(key, _)| !matches!(key.as_str(), "host" | "rewrite" | "qhash"))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
QString::new(collected) QString::new(collected)
}; };
@ -156,26 +263,39 @@ async fn index(req: HttpRequest) -> Result<HttpResponse, Box<dyn Error>> {
let mut url = Url::parse(&format!("https://{}{}", host, req.path()))?; let mut url = Url::parse(&format!("https://{}{}", host, req.path()))?;
url.set_query(Some(qs.to_string().as_str())); url.set_query(Some(qs.to_string().as_str()));
let mut request = Request::new( let method = {
req.method().clone(), if !is_android && video_playback {
url, Method::POST
); } else {
req.method().clone()
}
};
let mut request = Request::new(method, url);
if !is_android && video_playback {
request.body_mut().replace(Body::from("x\0"));
}
let request_headers = request.headers_mut(); let request_headers = request.headers_mut();
for (key, value) in req.headers() { for (key, value) in req.headers() {
if is_header_allowed(key.as_str()) { if is_header_allowed(key.as_str()) {
request_headers.insert(key.clone(), value.clone()); request_headers.insert(key, value.clone());
} }
} }
if is_android {
request_headers.insert("User-Agent", ANDROID_USER_AGENT.parse().unwrap());
}
let resp = CLIENT.execute(request).await; let resp = CLIENT.execute(request).await;
if resp.is_err() { if resp.is_err() {
return Err(resp.err().unwrap().into()); return Err(resp.err().unwrap().into());
} }
let resp = resp.unwrap(); let resp = resp?;
let mut response = HttpResponse::build(resp.status()); let mut response = HttpResponse::build(resp.status());
@ -183,56 +303,123 @@ async fn index(req: HttpRequest) -> Result<HttpResponse, Box<dyn Error>> {
for (key, value) in resp.headers() { for (key, value) in resp.headers() {
if is_header_allowed(key.as_str()) { if is_header_allowed(key.as_str()) {
response.append_header((key.as_str(), value.to_str().unwrap())); response.append_header((key.as_str(), value.as_bytes()));
} }
} }
if rewrite { if rewrite {
if let Some(content_type) = resp.headers().get("content-type") { if let Some(content_type) = resp.headers().get("content-type") {
if content_type == "image/jpeg" { #[cfg(feature = "avif")]
if !disallow_image_transcoding
&& (content_type == "image/webp" || content_type == "image/jpeg" && avif)
{
let resp_bytes = resp.bytes().await.unwrap(); let resp_bytes = resp.bytes().await.unwrap();
let (body, content_type) = spawn_blocking(|| {
use ravif::{Encoder, Img};
use rgb::FromSlice;
let image = image::load_from_memory(&resp_bytes).unwrap(); let image = image::load_from_memory(&resp_bytes).unwrap();
let encoder = webp::Encoder::from_image(&image).unwrap(); let width = image.width() as usize;
let height = image.height() as usize;
let encoded = encoder.encode(85f32); let buf = image.into_rgb8();
let bytes = encoded.as_bytes().to_vec(); let buf = buf.as_raw().as_rgb();
if bytes.len() < resp_bytes.len() { let buffer = Img::new(buf, width, height);
response.content_type("image/webp");
return Ok(response.body(bytes));
}
return Ok(response.body(resp_bytes)); let res = Encoder::new()
.with_quality(80f32)
.with_speed(7)
.encode_rgb(buffer);
if let Ok(res) = res {
(res.avif_file.to_vec(), "image/avif")
} else {
(resp_bytes.into(), "image/jpeg")
}
})
.await
.unwrap();
response.content_type(content_type);
return Ok(response.body(body));
} }
if content_type == "application/x-mpegurl" || content_type == "application/vnd.apple.mpegurl" {
#[cfg(feature = "webp")]
if !disallow_image_transcoding && content_type == "image/jpeg" {
let resp_bytes = resp.bytes().await.unwrap();
let (body, content_type) = spawn_blocking(|| {
use libwebp_sys::{WebPEncodeRGB, WebPFree};
let image = image::load_from_memory(&resp_bytes).unwrap();
let width = image.width();
let height = image.height();
let quality = 85;
let data = image.as_rgb8().unwrap().as_raw();
let bytes: Vec<u8> = unsafe {
let mut out_buf = std::ptr::null_mut();
let stride = width as i32 * 3;
let len: usize = WebPEncodeRGB(
data.as_ptr(),
width as i32,
height as i32,
stride,
quality as f32,
&mut out_buf,
);
let vec = std::slice::from_raw_parts(out_buf, len).into();
WebPFree(out_buf as *mut _);
vec
};
if bytes.len() < resp_bytes.len() {
(bytes, "image/webp")
} else {
(resp_bytes.into(), "image/jpeg")
}
})
.await
.unwrap();
response.content_type(content_type);
return Ok(response.body(body));
}
if content_type == "application/x-mpegurl"
|| content_type == "application/vnd.apple.mpegurl"
{
let resp_str = resp.text().await.unwrap(); let resp_str = resp.text().await.unwrap();
let modified = resp_str.lines().map(|line| { let modified = resp_str
let captures = RE_MANIFEST.captures(line); .lines()
if let Some(captures) = captures { .map(|line| {
let url = captures.get(1).unwrap().as_str(); let captures = RE_MANIFEST.captures(line);
if url.starts_with("https://") { if let Some(captures) = captures {
return line.replace(url, localize_url(url, host).as_str()); let url = captures.get(1).unwrap().as_str();
if url.starts_with("https://") {
return line
.replace(url, localize_url(url, host.as_str()).as_str());
}
} }
} localize_url(line, host.as_str())
localize_url(line, host) })
}).collect::<Vec<String>>().join("\n"); .collect::<Vec<String>>()
.join("\n");
return Ok(response.body(modified)); return Ok(response.body(modified));
} }
if content_type == "video/vnd.mpeg.dash.mpd" || content_type == "application/dash+xml" { if content_type == "video/vnd.mpeg.dash.mpd" || content_type == "application/dash+xml" {
let mut resp_str = resp.text().await.unwrap(); let resp_str = resp.text().await.unwrap();
let clone_resp = resp_str.clone(); let mut new_resp = resp_str.clone();
let captures = RE_DASH_MANIFEST.captures_iter(&clone_resp); let captures = RE_DASH_MANIFEST.captures_iter(&resp_str);
for capture in captures { for capture in captures {
let url = capture.get(1).unwrap().as_str(); let url = capture.get(1).unwrap().as_str();
let new_url = localize_url(url, host); let new_url = localize_url(url, host.as_str());
resp_str = resp_str.replace(url, new_url.as_str()) new_resp = new_resp.replace(url, new_url.as_str());
.clone();
} }
return Ok(response.body(resp_str)); return Ok(response.body(new_resp));
} }
} }
} }
@ -241,26 +428,224 @@ async fn index(req: HttpRequest) -> Result<HttpResponse, Box<dyn Error>> {
response.append_header(("content-length", content_length)); response.append_header(("content-length", content_length));
} }
let resp = resp.bytes_stream();
if is_ump {
if let Some(mime_type) = mime_type {
response.content_type(mime_type);
}
let transformed_stream = UmpTransformStream::new(resp);
return Ok(response.streaming(transformed_stream));
}
// Stream response // Stream response
Ok(response.streaming(resp.bytes_stream())) Ok(response.streaming(resp))
}
fn read_buf(buf: &[u8], pos: &mut usize) -> u8 {
let byte = buf[*pos];
*pos += 1;
byte
}
fn read_variable_integer(buf: &[u8], offset: usize) -> io::Result<(i32, usize)> {
let mut pos = offset;
let prefix = read_buf(buf, &mut pos);
let mut size = 0;
for shift in 1..=5 {
if prefix & (128 >> (shift - 1)) == 0 {
size = shift;
break;
}
}
if !(1..=5).contains(&size) {
return Err(io::Error::new(
ErrorKind::InvalidData,
format!("Invalid integer size {} at position {}", size, offset),
));
}
match size {
1 => Ok((prefix as i32, size)),
2 => {
let value = ((read_buf(buf, &mut pos) as i32) << 6) | (prefix as i32 & 0b111111);
Ok((value, size))
}
3 => {
let value =
(((read_buf(buf, &mut pos) as i32) | ((read_buf(buf, &mut pos) as i32) << 8)) << 5)
| (prefix as i32 & 0b11111);
Ok((value, size))
}
4 => {
let value = (((read_buf(buf, &mut pos) as i32)
| ((read_buf(buf, &mut pos) as i32) << 8)
| ((read_buf(buf, &mut pos) as i32) << 16))
<< 4)
| (prefix as i32 & 0b1111);
Ok((value, size))
}
_ => {
let value = (read_buf(buf, &mut pos) as i32)
| ((read_buf(buf, &mut pos) as i32) << 8)
| ((read_buf(buf, &mut pos) as i32) << 16)
| ((read_buf(buf, &mut pos) as i32) << 24);
Ok((value, size))
}
}
}
struct UmpTransformStream<S>
where
S: Stream<Item = Result<Bytes, ReqwestError>> + Unpin,
{
inner: S,
buffer: BytesMut,
found_stream: bool,
remaining: usize,
}
impl<S> UmpTransformStream<S>
where
S: Stream<Item = Result<Bytes, ReqwestError>> + Unpin,
{
pub fn new(stream: S) -> Self {
UmpTransformStream {
inner: stream,
buffer: BytesMut::new(),
found_stream: false,
remaining: 0,
}
}
}
impl<S> Stream for UmpTransformStream<S>
where
S: Stream<Item = Result<Bytes, ReqwestError>> + Unpin,
{
type Item = Result<Bytes, ReqwestError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
while let Poll::Ready(item) = Pin::new(&mut this.inner).poll_next(cx) {
match item {
Some(Ok(bytes)) => {
if this.found_stream {
if this.remaining > 0 {
let len = std::cmp::min(this.remaining, bytes.len());
this.remaining -= len;
if this.remaining == 0 {
this.buffer.clear();
this.buffer.extend_from_slice(&bytes[len..]);
this.found_stream = false;
}
return Poll::Ready(Some(Ok(bytes.slice(0..len))));
} else {
this.found_stream = false;
this.buffer.clear();
this.buffer.extend_from_slice(&bytes);
};
} else {
this.buffer.extend_from_slice(&bytes);
}
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => {
return Poll::Ready(None);
}
}
}
if !this.found_stream && !this.buffer.is_empty() {
let (segment_type, s1) = read_variable_integer(&this.buffer, 0).unwrap();
let (segment_length, s2) = read_variable_integer(&this.buffer, s1).unwrap();
if segment_type != 21 {
// Not the stream
if this.buffer.len() > s1 + s2 + segment_length as usize {
let _ = this.buffer.split_to(s1 + s2 + segment_length as usize);
}
} else {
this.remaining = segment_length as usize - 1;
let _ = this.buffer.split_to(s1 + s2 + 1);
if this.buffer.len() > segment_length as usize {
let len = std::cmp::min(this.remaining, this.buffer.len());
this.remaining -= len;
return Poll::Ready(Some(Ok(this.buffer.split_to(len).into())));
} else {
this.remaining -= this.buffer.len();
this.found_stream = true;
return Poll::Ready(Some(Ok(this.buffer.to_vec().into())));
}
}
}
Poll::Pending
}
}
fn finalize_url(path: &str, query: BTreeMap<String, String>) -> String {
#[cfg(feature = "qhash")]
{
use std::collections::BTreeSet;
let qhash = {
let secret = env::var("HASH_SECRET");
if let Ok(secret) = secret {
let set = query
.iter()
.filter(|(key, _)| !matches!(key.as_str(), "qhash" | "range" | "rewrite"))
.map(|(key, value)| (key.as_bytes().to_owned(), value.as_bytes().to_owned()))
.collect::<BTreeSet<_>>();
let mut hasher = blake3::Hasher::new();
for (key, value) in set {
hasher.update(&key);
hasher.update(&value);
}
hasher.update(path.as_bytes());
hasher.update(secret.as_bytes());
let hash = hasher.finalize().to_hex();
Some(hash[..8].to_owned())
} else {
None
}
};
if qhash.is_some() {
let mut query = QString::new(query.into_iter().collect::<Vec<_>>());
query.add_pair(("qhash", qhash.unwrap()));
return format!("{}?{}", path, query);
}
}
let query = QString::new(query.into_iter().collect::<Vec<_>>());
format!("{}{}?{}", PREFIX_PATH.as_str(), path, query);
} }
fn localize_url(url: &str, host: &str) -> String { fn localize_url(url: &str, host: &str) -> String {
if url.starts_with("https://") { if url.starts_with("https://") {
let mut url = Url::parse(url).unwrap(); let url = Url::parse(url).unwrap();
let host = url.host().unwrap().to_string(); let host = url.host().unwrap().to_string();
// set host query param let mut query = url.query_pairs().into_owned().collect::<BTreeMap<_, _>>();
url.query_pairs_mut() query.insert("host".to_string(), host.clone());
.append_pair("host", &host);
return format!("{}{}?{}", PREFIX_PATH.as_str(), url.path(), url.query().unwrap()); return finalize_url(url.path(), query);
} else if url.ends_with(".m3u8") || url.ends_with(".ts") { } else if url.ends_with(".m3u8") || url.ends_with(".ts") {
return if url.contains('?') { let mut query = BTreeMap::new();
format!("{}&host={}", url, host) query.insert("host".to_string(), host.to_string());
} else {
format!("{}?host={}", url, host) return finalize_url(url, query);
};
} }
url.to_string() url.to_string()