This commit is contained in:
taskylizard 2024-05-19 07:05:01 +00:00
parent af1d873de0
commit a451755d2b
No known key found for this signature in database
GPG key ID: 1820131ED1A24120
8 changed files with 333 additions and 273 deletions

2
proxy/.prettierignore Normal file
View file

@ -0,0 +1,2 @@
**/*.md
pnpm-lock.yaml

6
proxy/.prettierrc.yaml Normal file
View file

@ -0,0 +1,6 @@
proseWrap: always
semi: false
singleQuote: true
printWidth: 80
trailingComma: none
htmlWhitespaceSensitivity: ignore

View file

@ -4,6 +4,7 @@
"scripts": {
"clean": "rm -rf build",
"prebuild": "npm run clean",
"format": "prettier -w --cache --check .",
"build": "tsc --build"
},
"author": "",
@ -22,6 +23,7 @@
},
"devDependencies": {
"@types/node": "^20.12.12",
"dotenv": "^16.4.4"
"dotenv": "^16.4.4",
"prettier": "^3.2.5"
}
}

View file

@ -45,6 +45,9 @@ importers:
dotenv:
specifier: ^16.4.4
version: 16.4.4
prettier:
specifier: ^3.2.5
version: 3.2.5
packages:
@ -292,6 +295,11 @@ packages:
resolution: {integrity: sha512-Mz/gKiRyuXu4HnpHgi1YWdHQCoWMufapzooisvFn78zl4dZciAxS+YeRkUxXl1ee/SzU80YCz1zpECCh4oC6Aw==}
hasBin: true
prettier@3.2.5:
resolution: {integrity: sha512-3/GWa9aOC0YeD7LUfvOG2NiDyhOWRvt1k+rcKhOuYnMY24iiCphgneUfJDyFXd6rZCAnuLBv6UeAULtrhT/F4A==}
engines: {node: '>=14'}
hasBin: true
process-warning@2.3.2:
resolution: {integrity: sha512-n9wh8tvBe5sFmsqlg+XQhaQLumwpqoAUruLwjCopgTmUBjJ/fjtBsJzKleCaIGBOMXYEhp1YfKl4d7rJ5ZKJGA==}
@ -668,6 +676,8 @@ snapshots:
sonic-boom: 3.8.0
thread-stream: 2.4.1
prettier@3.2.5: {}
process-warning@2.3.2: {}
process-warning@3.0.0: {}

View file

@ -1,89 +1,116 @@
import fastify, {
FastifyInstance,
FastifyListenOptions,
FastifyReply,
FastifyRequest,
} from "fastify"
import { PinoLoggerOptions } from "fastify/types/logger"
import { Proxy } from "./proxy"
import { Logger } from "pino"
FastifyRequest
} from 'fastify'
import { PinoLoggerOptions } from 'fastify/types/logger'
import { Proxy } from './proxy'
import { Logger } from 'pino'
import 'dotenv/config'
const host = process.env.HOST
const port = parseInt(process.env.PORT ?? "8080", 10)
const port = parseInt(process.env.PORT ?? '8080', 10)
const baseUrl = process.env.NITTER_BASE_URL
const concurrency = parseInt(process.env.CONCURRENCY ?? "1", 10)
const retryAfterMillis = process.env.RETRY_AFTER_MILLIS ? parseInt(process.env.RETRY_AFTER_MILLIS, 10) : null
const maxCacheSize = parseInt(process.env.MAX_CACHE_SIZE ?? "100000", 10)
const logLevel = process.env.LOG_LEVEL ?? "debug"
const concurrency = parseInt(process.env.CONCURRENCY ?? '1', 10)
const retryAfterMillis = process.env.RETRY_AFTER_MILLIS
? parseInt(process.env.RETRY_AFTER_MILLIS, 10)
: null
const maxCacheSize = parseInt(process.env.MAX_CACHE_SIZE ?? '100000', 10)
const logLevel = process.env.LOG_LEVEL ?? 'debug'
const server = fastify({
logger: {
name: "app",
name: 'app',
level: logLevel,
...( logLevel == "trace" ? { transport: { target: 'pino-pretty' } } : {})
...(logLevel == 'trace' ? { transport: { target: 'pino-pretty' } } : {})
} as PinoLoggerOptions
})
const log = server.log as Logger
const proxy = new Proxy(log, baseUrl, concurrency, retryAfterMillis, maxCacheSize)
const proxy = new Proxy(
log,
baseUrl,
concurrency,
retryAfterMillis,
maxCacheSize
)
async function main() {
server.register((fastify: FastifyInstance, opts, done) => {
fastify.get(`/user/:username`, {},
server.register(
(fastify: FastifyInstance, opts, done) => {
fastify.get(
`/user/:username`,
{},
async (request: FastifyRequest, reply: FastifyReply) => {
log.debug({
log.debug(
{
headers: request.headers,
reqId: request.id,
params: request.params }, 'incoming request /user/:username')
params: request.params
},
'incoming request /user/:username'
)
const { username } = request.params as any
const { status, data } = await proxy.getUser(username, { reqId: request.id })
const { status, data } = await proxy.getUser(username, {
reqId: request.id
})
reply.status(status).send(data)
});
}
)
fastify.get(`/user/:userId/tweets`, {},
fastify.get(
`/user/:userId/tweets`,
{},
async (request: FastifyRequest, reply: FastifyReply) => {
const { userId } = request.params as any
const { cursor } = request.query as any
const { status, data } = await proxy.getUserTweets(userId, cursor, { reqId: request.id })
const { status, data } = await proxy.getUserTweets(userId, cursor, {
reqId: request.id
})
reply.status(status).send(data)
});
}
)
fastify.get(`/tweet/:id`, {},
fastify.get(
`/tweet/:id`,
{},
async (request: FastifyRequest, reply: FastifyReply) => {
const { id } = request.params as any
const { status, data } = await proxy.getTweetById(id, { reqId: request.id })
const { status, data } = await proxy.getTweetById(id, {
reqId: request.id
})
reply.status(status).send(data)
});
}
)
done()
}, { prefix: '/api' })
},
{ prefix: '/api' }
)
server.setNotFoundHandler((request: FastifyRequest, reply: FastifyReply) => {
reply.status(404)
.send({ message: `Method not found` })
reply.status(404).send({ message: `Method not found` })
})
server.setErrorHandler((err: Error, request: FastifyRequest, reply: FastifyReply) => {
server.setErrorHandler(
(err: Error, request: FastifyRequest, reply: FastifyReply) => {
const { log } = request
log.error(err)
// Send error response
reply.status(500).send({ message: `Internal server error` })
})
}
)
// await server.register(import('@fastify/rate-limit'), {
// max: 100,
// timeWindow: '1 minute'
// })
await server.listen({ port, host } as FastifyListenOptions);
await server.listen({ port, host } as FastifyListenOptions)
}
main().catch(err => {
main().catch((err) => {
log.fatal(err)
process.exit(1)
})

View file

@ -1,10 +1,10 @@
// noinspection TypeScriptUnresolvedReference
import axios from "axios"
import { AxiosInstance, AxiosRequestConfig } from "axios"
import fastq from "fastq"
import { Logger } from "pino"
import retry from "axios-retry-after"
import axios from 'axios'
import { AxiosInstance, AxiosRequestConfig } from 'axios'
import fastq from 'fastq'
import { Logger } from 'pino'
import retry from 'axios-retry-after'
import { LRUCache } from 'lru-cache'
const GET_USER_POSITIVE_TTL_MS = process.env.GET_USER_POSITIVE_TTL
@ -33,12 +33,11 @@ export interface Job {
}
export interface JobResponse {
status: number,
status: number
data: any
}
export class Proxy {
private readonly cache: LRUCache<string, JobResponse>
private readonly client: AxiosInstance
private readonly queue: fastq.queueAsPromised<Job, JobResponse>
@ -64,11 +63,19 @@ export class Proxy {
this.counter.requests = 0
}, this.timeWindowMillis)
if ( retryAfterMillis ) {
this.client.interceptors.response.use(null, retry(this.client, {
if (retryAfterMillis) {
this.client.interceptors.response.use(
null,
retry(this.client, {
// Determine when we should attempt to retry
isRetryable (error) {
log.debug({ status: error.response?.status, headers: error.response?.headers }, 'checking retryable')
isRetryable(error) {
log.debug(
{
status: error.response?.status,
headers: error.response?.headers
},
'checking retryable'
)
return (
error.response && error.response.status === 429
// Use X-Retry-After rather than Retry-After, and cap retry delay at 60 seconds
@ -76,57 +83,68 @@ export class Proxy {
)
},
// Customize the wait behavior
wait (error) {
log.debug({ status: error.response?.status, headers: error.response?.headers }, 'waiting for retry')
wait(error) {
log.debug(
{
status: error.response?.status,
headers: error.response?.headers
},
'waiting for retry'
)
return new Promise(
// Use X-Retry-After rather than Retry-After
// resolve => setTimeout(resolve, error.response.headers['x-retry-after'])
resolve => setTimeout(resolve, retryAfterMillis)
(resolve) => setTimeout(resolve, retryAfterMillis)
)
}
}))
})
)
}
}
async getUser(username: string, options?: { reqId?: string }) {
const key = `usernames:${username}`
if ( this.cache.has(key)) {
if (this.cache.has(key)) {
return this.cache.get(key)
}
const result = await this.queue.push({
url: `/api/user/${ username }`,
url: `/api/user/${username}`,
reqId: options?.reqId
})
if ( result.status === 200 ) {
if (result.status === 200) {
this.cache.set(key, result, { ttl: GET_USER_POSITIVE_TTL_MS })
}
if ( result.status === 404 ) {
if (result.status === 404) {
this.cache.set(key, result, { ttl: GET_USER_NEGATIVE_TTL_MS })
}
return result
}
async getUserTweets(userId: string, cursor?: string, options?: { reqId?: string }) {
async getUserTweets(
userId: string,
cursor?: string,
options?: { reqId?: string }
) {
const key = `users:${userId}:tweets:${cursor ?? 'last'}`
if ( this.cache.has(key) ) {
if (this.cache.has(key)) {
return this.cache.get(key)
}
const result = await this.queue.push({
url: `/api/user/${ userId }/tweets`,
url: `/api/user/${userId}/tweets`,
params: { cursor },
reqId: options?.reqId
})
if ( result.status === 200 ) {
if (result.status === 200) {
this.cache.set(key, result, { ttl: GET_TWEETS_POSITIVE_TTL_MS })
}
if ( result.status === 404 ) {
if (result.status === 404) {
this.cache.set(key, result, { ttl: GET_TWEETS_NEGATIVE_TTL_MS })
}
@ -136,19 +154,19 @@ export class Proxy {
async getTweetById(tweetId: string, options?: { reqId?: string }) {
const key = `tweets:${tweetId}`
if ( this.cache.has(key) ) {
if (this.cache.has(key)) {
return this.cache.get(key)
}
const result = await this.queue.push({
url: `/api/tweet/${ tweetId }`,
url: `/api/tweet/${tweetId}`,
reqId: options?.reqId
})
if ( result.status === 200 ) {
if (result.status === 200) {
this.cache.set(key, result, { ttl: GET_TWEET_POSITIVE_TTL_MS })
}
if ( result.status === 404 ) {
if (result.status === 404) {
this.cache.set(key, result, { ttl: GET_TWEET_NEGATIVE_TTL_MS })
}
@ -156,10 +174,9 @@ export class Proxy {
}
private async sendRequest(job: Job): Promise<any> {
const { reqId, url, params } = job
if ( this.counter.requests > this.concurrency * this.maxRequestsPerAccount ) {
if (this.counter.requests > this.concurrency * this.maxRequestsPerAccount) {
return {
status: 429
}
@ -167,9 +184,9 @@ export class Proxy {
let config = {
url,
method: "get",
method: 'get',
baseURL: this.baseUrl,
params,
params
} as AxiosRequestConfig
this.log.trace({ config, reqId: reqId }, 'sending request to nitter')
@ -177,23 +194,23 @@ export class Proxy {
try {
const response = await this.client.request(config)
this.log.trace({
this.log.trace(
{
status: response.status,
data: response.data,
reqId: reqId
}, 'nitter response')
},
'nitter response'
)
return {
status: response.status,
data: response.data,
data: response.data
} as JobResponse
} catch(err) {
} catch (err) {
this.log.warn({ err, reqId }, 'nitter error')
if ( err.name === "AxiosError" ) {
if (err.name === 'AxiosError') {
this.counter.requests = Number.MAX_SAFE_INTEGER
return {

14
proxy/src/types.d.ts vendored
View file

@ -1,16 +1,15 @@
declare module 'axios-retry-after' {
import { AxiosError, AxiosInstance } from "axios";
import { AxiosError, AxiosInstance } from 'axios'
/**
* Function to enhance Axios instance with retry-after functionality.
* @param axios Axios instance to be enhanced.
* @param options Configuration options for retry behavior.
*/
export default function(
export default function (
axios: AxiosInstance,
options?: AxiosRetryAfterOptions
): (error: AxiosError) => Promise<void>;
): (error: AxiosError) => Promise<void>
/**
* Configuration options for axios-retry-after.
@ -20,20 +19,19 @@ declare module 'axios-retry-after' {
* Function to determine if an error response is retryable.
* @param error The Axios error to evaluate.
*/
isRetryable?: (error: AxiosError) => boolean;
isRetryable?: (error: AxiosError) => boolean
/**
* Function to wait for a specified amount of time.
* @param error The Axios error that contains retry-after header.
*/
wait?: (error: AxiosError) => Promise<void>;
wait?: (error: AxiosError) => Promise<void>
/**
* Function to retry the original request.
* @param axios The Axios instance used for retrying the request.
* @param error The Axios error to retry.
*/
retry?: (axios: AxiosInstance, error: AxiosError) => Promise<any>;
retry?: (axios: AxiosInstance, error: AxiosError) => Promise<any>
}
}

View file

@ -12,7 +12,5 @@
"outDir": "./build"
},
"include": ["src/**/*"],
"exclude": [
"node_modules"
]
"exclude": ["node_modules"]
}