Use snappy for rss compression, refactor
This commit is contained in:
parent
68a5ac20b6
commit
181ef3bca7
5 changed files with 73 additions and 53 deletions
|
@ -21,6 +21,7 @@ requires "https://github.com/zedeus/redis#head"
|
||||||
requires "redpool#head"
|
requires "redpool#head"
|
||||||
requires "msgpack4nim >= 0.3.1"
|
requires "msgpack4nim >= 0.3.1"
|
||||||
requires "packedjson"
|
requires "packedjson"
|
||||||
|
requires "snappy#head"
|
||||||
|
|
||||||
|
|
||||||
# Tasks
|
# Tasks
|
||||||
|
|
|
@ -16,7 +16,7 @@ updateDefaultPrefs(fullCfg)
|
||||||
setCacheTimes(cfg)
|
setCacheTimes(cfg)
|
||||||
setHmacKey(cfg.hmacKey)
|
setHmacKey(cfg.hmacKey)
|
||||||
|
|
||||||
initRedisPool(cfg)
|
waitFor initRedisPool(cfg)
|
||||||
asyncCheck initTokenPool(cfg)
|
asyncCheck initTokenPool(cfg)
|
||||||
|
|
||||||
createUnsupportedRouter(cfg)
|
createUnsupportedRouter(cfg)
|
||||||
|
|
|
@ -16,10 +16,20 @@ proc setCacheTimes*(cfg: Config) =
|
||||||
rssCacheTime = cfg.rssCacheTime * 60
|
rssCacheTime = cfg.rssCacheTime * 60
|
||||||
listCacheTime = cfg.listCacheTime * 60
|
listCacheTime = cfg.listCacheTime * 60
|
||||||
|
|
||||||
proc initRedisPool*(cfg: Config) =
|
proc initRedisPool*(cfg: Config) {.async.} =
|
||||||
try:
|
try:
|
||||||
pool = waitFor newRedisPool(cfg.redisConns, maxConns=cfg.redisMaxConns,
|
pool = await newRedisPool(cfg.redisConns, maxConns=cfg.redisMaxConns,
|
||||||
host=cfg.redisHost, port=cfg.redisPort)
|
host=cfg.redisHost, port=cfg.redisPort)
|
||||||
|
|
||||||
|
pool.withAcquire(r):
|
||||||
|
let snappyRss = await r.get("snappyRss")
|
||||||
|
if snappyRss == redisNil:
|
||||||
|
let list = await r.scan(newCursor(0), "rss:*", 10000)
|
||||||
|
r.startPipelining()
|
||||||
|
for rss in list:
|
||||||
|
discard await r.del(rss)
|
||||||
|
discard await r.flushPipeline()
|
||||||
|
await r.setk("snappyRss", "true")
|
||||||
except OSError:
|
except OSError:
|
||||||
echo "Failed to connect to Redis."
|
echo "Failed to connect to Redis."
|
||||||
quit(1)
|
quit(1)
|
||||||
|
@ -60,12 +70,12 @@ proc cacheProfileId*(username, id: string) {.async.} =
|
||||||
pool.withAcquire(r):
|
pool.withAcquire(r):
|
||||||
discard await r.hset("p:", toLower(username), id)
|
discard await r.hset("p:", toLower(username), id)
|
||||||
|
|
||||||
proc cacheRss*(query, rss, cursor: string) {.async.} =
|
proc cacheRss*(query: string; rss: Rss) {.async.} =
|
||||||
let key = "rss:" & query
|
let key = "rss:" & query
|
||||||
pool.withAcquire(r):
|
pool.withAcquire(r):
|
||||||
r.startPipelining()
|
r.startPipelining()
|
||||||
discard await r.hset(key, "rss", rss)
|
discard await r.hset(key, "rss", rss.feed)
|
||||||
discard await r.hset(key, "min", cursor)
|
discard await r.hset(key, "min", rss.cursor)
|
||||||
discard await r.expire(key, rssCacheTime)
|
discard await r.expire(key, rssCacheTime)
|
||||||
discard await r.flushPipeline()
|
discard await r.flushPipeline()
|
||||||
|
|
||||||
|
@ -104,10 +114,11 @@ proc getCachedList*(username=""; name=""; id=""): Future[List] {.async.} =
|
||||||
result = await getGraphList(username, name)
|
result = await getGraphList(username, name)
|
||||||
await cache(result)
|
await cache(result)
|
||||||
|
|
||||||
proc getCachedRss*(key: string): Future[(string, string)] {.async.} =
|
proc getCachedRss*(key: string): Future[Rss] {.async.} =
|
||||||
var res: Table[string, string]
|
let k = "rss:" & key
|
||||||
pool.withAcquire(r):
|
pool.withAcquire(r):
|
||||||
res = await r.hgetall("rss:" & key)
|
result.cursor = await r.hget(k, "min")
|
||||||
|
if result.cursor.len > 2:
|
||||||
if "rss" in res:
|
result.feed = await r.hget(k, "rss")
|
||||||
result = (res["rss"], res.getOrDefault("min"))
|
else:
|
||||||
|
result.cursor.setLen 0
|
||||||
|
|
|
@ -1,15 +1,15 @@
|
||||||
import asyncdispatch, strutils, tables, times, sequtils, hashes
|
import asyncdispatch, strutils, tables, times, sequtils, hashes, snappy
|
||||||
|
|
||||||
import jester
|
import jester
|
||||||
|
|
||||||
import router_utils, timeline
|
import router_utils, timeline
|
||||||
import ".."/[redis_cache, query], ../views/general
|
import ../query, ../views/general
|
||||||
|
|
||||||
include "../views/rss.nimf"
|
include "../views/rss.nimf"
|
||||||
|
|
||||||
export times, hashes
|
export times, hashes, snappy
|
||||||
|
|
||||||
proc showRss*(req: Request; hostname: string; query: Query): Future[(string, string)] {.async.} =
|
proc showRss*(req: Request; hostname: string; query: Query): Future[Rss] {.async.} =
|
||||||
var profile: Profile
|
var profile: Profile
|
||||||
var timeline: Timeline
|
var timeline: Timeline
|
||||||
let
|
let
|
||||||
|
@ -31,19 +31,21 @@ proc showRss*(req: Request; hostname: string; query: Query): Future[(string, str
|
||||||
)
|
)
|
||||||
|
|
||||||
if profile.suspended:
|
if profile.suspended:
|
||||||
return (profile.username, "suspended")
|
return Rss(feed: profile.username, cursor: "suspended")
|
||||||
|
|
||||||
if profile.fullname.len > 0:
|
if profile.fullname.len > 0:
|
||||||
let rss = renderTimelineRss(timeline, profile, hostname, multi=(names.len > 1))
|
let rss = compress renderTimelineRss(timeline, profile, hostname,
|
||||||
return (rss, timeline.bottom)
|
multi=(names.len > 1))
|
||||||
|
return Rss(feed: rss, cursor: timeline.bottom)
|
||||||
|
|
||||||
template respRss*(rss, minId) =
|
template respRss*(rss) =
|
||||||
if rss.len == 0:
|
if rss.cursor.len == 0:
|
||||||
resp Http404, showError("User \"" & @"name" & "\" not found", cfg)
|
resp Http404, showError("User \"" & @"name" & "\" not found", cfg)
|
||||||
elif minId == "suspended":
|
elif rss.cursor.len == 9 and rss.cursor == "suspended":
|
||||||
resp Http404, showError(getSuspended(rss), cfg)
|
resp Http404, showError(getSuspended(rss.feed), cfg)
|
||||||
let headers = {"Content-Type": "application/rss+xml; charset=utf-8", "Min-Id": minId}
|
let headers = {"Content-Type": "application/rss+xml; charset=utf-8",
|
||||||
resp Http200, headers, rss
|
"Min-Id": rss.cursor}
|
||||||
|
resp Http200, headers, uncompress rss.feed
|
||||||
|
|
||||||
proc createRssRouter*(cfg: Config) =
|
proc createRssRouter*(cfg: Config) =
|
||||||
router rss:
|
router rss:
|
||||||
|
@ -58,33 +60,34 @@ proc createRssRouter*(cfg: Config) =
|
||||||
let
|
let
|
||||||
cursor = getCursor()
|
cursor = getCursor()
|
||||||
key = $hash(genQueryUrl(query)) & cursor
|
key = $hash(genQueryUrl(query)) & cursor
|
||||||
(cRss, cCursor) = await getCachedRss(key)
|
|
||||||
|
|
||||||
if cRss.len > 0:
|
var rss = await getCachedRss(key)
|
||||||
respRss(cRss, cCursor)
|
if rss.cursor.len > 0:
|
||||||
|
respRss(rss)
|
||||||
|
|
||||||
let
|
let tweets = await getSearch[Tweet](query, cursor)
|
||||||
tweets = await getSearch[Tweet](query, cursor)
|
rss.cursor = tweets.bottom
|
||||||
rss = renderSearchRss(tweets.content, query.text, genQueryUrl(query), cfg.hostname)
|
rss.feed = compress renderSearchRss(tweets.content, query.text,
|
||||||
|
genQueryUrl(query), cfg.hostname)
|
||||||
|
|
||||||
await cacheRss(key, rss, tweets.bottom)
|
await cacheRss(key, rss)
|
||||||
respRss(rss, tweets.bottom)
|
respRss(rss)
|
||||||
|
|
||||||
get "/@name/rss":
|
get "/@name/rss":
|
||||||
cond '.' notin @"name"
|
cond '.' notin @"name"
|
||||||
let
|
let
|
||||||
cursor = getCursor()
|
cursor = getCursor()
|
||||||
name = @"name"
|
name = @"name"
|
||||||
(cRss, cCursor) = await getCachedRss(name & cursor)
|
key = name & cursor
|
||||||
|
|
||||||
if cRss.len > 0:
|
var rss = await getCachedRss(key)
|
||||||
respRss(cRss, cCursor)
|
if rss.cursor.len > 0:
|
||||||
|
respRss(rss)
|
||||||
|
|
||||||
let (rss, rssCursor) = await showRss(request, cfg.hostname,
|
rss = await showRss(request, cfg.hostname, Query(fromUser: @[name]))
|
||||||
Query(fromUser: @[name]))
|
|
||||||
|
|
||||||
await cacheRss(name & cursor, rss, rssCursor)
|
await cacheRss(key, rss)
|
||||||
respRss(rss, rssCursor)
|
respRss(rss)
|
||||||
|
|
||||||
get "/@name/@tab/rss":
|
get "/@name/@tab/rss":
|
||||||
cond '.' notin @"name"
|
cond '.' notin @"name"
|
||||||
|
@ -102,28 +105,30 @@ proc createRssRouter*(cfg: Config) =
|
||||||
key &= hash(genQueryUrl(query))
|
key &= hash(genQueryUrl(query))
|
||||||
key &= getCursor()
|
key &= getCursor()
|
||||||
|
|
||||||
let (cRss, cCursor) = await getCachedRss(key)
|
var rss = await getCachedRss(key)
|
||||||
if cRss.len > 0:
|
if rss.cursor.len > 0:
|
||||||
respRss(cRss, cCursor)
|
respRss(rss)
|
||||||
|
|
||||||
let (rss, rssCursor) = await showRss(request, cfg.hostname, query)
|
rss = await showRss(request, cfg.hostname, query)
|
||||||
await cacheRss(key, rss, rssCursor)
|
|
||||||
respRss(rss, rssCursor)
|
await cacheRss(key, rss)
|
||||||
|
respRss(rss)
|
||||||
|
|
||||||
get "/@name/lists/@list/rss":
|
get "/@name/lists/@list/rss":
|
||||||
cond '.' notin @"name"
|
cond '.' notin @"name"
|
||||||
let
|
let
|
||||||
cursor = getCursor()
|
cursor = getCursor()
|
||||||
key = @"name" & "/" & @"list" & cursor
|
key = @"name" & "/" & @"list" & cursor
|
||||||
(cRss, cCursor) = await getCachedRss(key)
|
|
||||||
|
|
||||||
if cRss.len > 0:
|
var rss = await getCachedRss(key)
|
||||||
respRss(cRss, cCursor)
|
if rss.cursor.len > 0:
|
||||||
|
respRss(rss)
|
||||||
|
|
||||||
let
|
let
|
||||||
list = await getCachedList(@"name", @"list")
|
list = await getCachedList(@"name", @"list")
|
||||||
timeline = await getListTimeline(list.id, cursor)
|
timeline = await getListTimeline(list.id, cursor)
|
||||||
rss = renderListRss(timeline.content, list, cfg.hostname)
|
rss.cursor = timeline.bottom
|
||||||
|
rss.feed = compress renderListRss(timeline.content, list, cfg.hostname)
|
||||||
|
|
||||||
await cacheRss(key, rss, timeline.bottom)
|
await cacheRss(key, rss)
|
||||||
respRss(rss, timeline.bottom)
|
respRss(rss)
|
||||||
|
|
|
@ -212,5 +212,8 @@ type
|
||||||
redisConns*: int
|
redisConns*: int
|
||||||
redisMaxConns*: int
|
redisMaxConns*: int
|
||||||
|
|
||||||
|
Rss* = object
|
||||||
|
feed*, cursor*: string
|
||||||
|
|
||||||
proc contains*(thread: Chain; tweet: Tweet): bool =
|
proc contains*(thread: Chain; tweet: Tweet): bool =
|
||||||
thread.content.anyIt(it.id == tweet.id)
|
thread.content.anyIt(it.id == tweet.id)
|
||||||
|
|
Loading…
Reference in a new issue