Solr indexer code cleanup
This commit is contained in:
parent
aea627b27f
commit
27c9680f5b
1 changed files with 200 additions and 202 deletions
402
archiver/fts.rkt
402
archiver/fts.rkt
|
@ -1,202 +1,200 @@
|
||||||
#lang racket
|
#lang cli
|
||||||
(require racket/function
|
(require (for-syntax racket/base))
|
||||||
racket/future
|
(require racket/format
|
||||||
racket/match
|
racket/function
|
||||||
racket/path
|
racket/future
|
||||||
racket/promise
|
racket/match
|
||||||
racket/port
|
racket/path
|
||||||
racket/string
|
racket/promise
|
||||||
file/gunzip
|
racket/port
|
||||||
db
|
racket/runtime-path
|
||||||
db/unsafe/sqlite3
|
racket/string
|
||||||
net/http-easy
|
file/gunzip
|
||||||
json
|
db
|
||||||
json-pointer
|
db/unsafe/sqlite3
|
||||||
"../lib/html-parsing/main.rkt"
|
net/http-easy
|
||||||
"../lib/xexpr-utils.rkt"
|
json
|
||||||
"../lib/tree-updater.rkt")
|
json-pointer
|
||||||
|
"../lib/html-parsing/main.rkt"
|
||||||
(define-syntax (seq stx)
|
"../lib/xexpr-utils.rkt"
|
||||||
(syntax-case stx ()
|
"../lib/tree-updater.rkt")
|
||||||
[(_ body ...)
|
|
||||||
#`(for ([op (list (lambda () body) ...)]
|
(flag (read-from-cache?)
|
||||||
[i (in-naturals)])
|
("-c" "--read-from-cache" "read from last run cache instead of rebuilding documents")
|
||||||
(define res (op))
|
(read-from-cache? #t))
|
||||||
(when (>= (response-status-code res) 400)
|
|
||||||
(error 'seq "op #~a: status code was ~a: ~v" i (response-status-code res) (response-json res)))
|
(define-runtime-path storage-path "../storage/archive")
|
||||||
(define taskuid (json-pointer-value "/taskUid" (response-json res)))
|
|
||||||
(for/or ([ticks (in-naturals)]
|
;; ***************************************************************************************************
|
||||||
[res2 (in-producer (lambda () (get (format "http://localhost:7700/tasks/~a" taskuid))))])
|
;; Progress bar display
|
||||||
(define status (json-pointer-value "/status" (response-json res2)))
|
;; ***************************************************************************************************
|
||||||
(case status
|
|
||||||
[("enqueued" "processing")
|
(struct progress^ (n max title) #:transparent)
|
||||||
(sleep 1)
|
|
||||||
#f]
|
(define (make-m-s seconds)
|
||||||
[("succeeded")
|
(define-values (eta-m eta-s) (quotient/remainder seconds 60))
|
||||||
(printf "op #~a: ~a (~a ticks)~n" i status ticks)
|
(format "~a:~a" eta-m (~a eta-s #:width 2 #:align 'right #:pad-string "0")))
|
||||||
#t]
|
|
||||||
[else
|
(define (make-progress get-p [history-size 20])
|
||||||
(error 'seq "op #~a: task status was ~a: ~v" i status res2)])))]))
|
(define update-sleep 1)
|
||||||
|
(define name-width 30)
|
||||||
(define (class-has? attributes substrs)
|
(define max-width 105)
|
||||||
(define cl (or (get-attribute 'class attributes) ""))
|
(define history (make-vector history-size 0))
|
||||||
(ormap (λ (substr) (string-contains? cl substr)) substrs))
|
(define history-pointer 0)
|
||||||
|
(define elapsed 0)
|
||||||
(define (updater element element-type attributes children)
|
(define (report-progress)
|
||||||
(cond
|
(define p (get-p))
|
||||||
[(class-has? attributes '("collapsed" "selflink" "label" "toc" "editsection" "reviews"))
|
(define history-cycle (vector-ref history history-pointer))
|
||||||
(list 'div '() '())]
|
(vector-set! history history-pointer (progress^-n p))
|
||||||
[#t
|
(set! history-pointer (modulo (add1 history-pointer) history-size))
|
||||||
(list element-type attributes children)]))
|
(set! elapsed (add1 elapsed))
|
||||||
|
(define-values (eta-display diff-per-second)
|
||||||
(define slc (sqlite3-connect #:database "../storage/fts-separate.db"))
|
(cond
|
||||||
(sqlite3-load-extension slc "fts5")
|
[((progress^-n p) . >= . (progress^-max p)) (values (format "~a **" (make-m-s elapsed)) (format "** ~a" (quotient (progress^-max p) (max elapsed 1))))]
|
||||||
|
[(= history-cycle 0) (values "-:--" "--")]
|
||||||
(define (writer tables-mode? page)
|
[else (define diff-per-second (/ (- (progress^-n p) history-cycle) (* history-size update-sleep)))
|
||||||
(define (writer-inner page)
|
(define eta-total
|
||||||
(for ([bit page])
|
(if (diff-per-second . > . 0)
|
||||||
(cond
|
(floor (round (/ (- (progress^-max p) (progress^-n p)) diff-per-second)))
|
||||||
[(and tables-mode? (pair? bit) (memq (car bit) '(h1 h2 h3 p blockquote q))) (void)]
|
0))
|
||||||
[(and (not tables-mode?) (pair? bit) (memq (car bit) '(ul ol dl table))) (void)]
|
(values (make-m-s eta-total)
|
||||||
[(memq bit '(div p li td dd dt br)) (displayln "")]
|
(round diff-per-second))]))
|
||||||
[(symbol? bit) (void)]
|
(define left (format "~a/~a ~a/s ~a ~a%"
|
||||||
[(and (pair? bit) (eq? (car bit) '*COMMENT*)) (void)]
|
(~a (progress^-n p) #:width (string-length (~a (progress^-max p))) #:align 'right #:pad-string " ")
|
||||||
[(and (pair? bit) (eq? (car bit) '@)) (void)]
|
(progress^-max p)
|
||||||
[(pair? bit) (writer-inner bit)]
|
diff-per-second
|
||||||
[(string? bit) (display bit)])))
|
eta-display
|
||||||
(writer-inner page))
|
(floor (* 100 (/ (progress^-n p) (progress^-max p))))))
|
||||||
|
(define name-display (~a (progress^-title p) #:max-width name-width #:limit-marker "..."))
|
||||||
(define (write-and-post-process tables-mode? page)
|
(define remaining-space (- max-width name-width (string-length left) 2))
|
||||||
(define text (with-output-to-string (λ () (writer tables-mode? page))))
|
(define bar-width
|
||||||
;; (define text-no-numbers (regexp-replace* #px"(?:-|[+$£€¥] *)?[0-9,.]{2,}%?\\s*" text ""))
|
(floor (* (sub1 remaining-space)
|
||||||
(define shrink-text (regexp-replace* #px"([ \t]*\r?\n+)+" text "\n"))
|
(/ (progress^-n p) (progress^-max p)))))
|
||||||
shrink-text)
|
(define bar (string-append (make-string bar-width #\=)
|
||||||
|
">"
|
||||||
(define wikiname "bloons")
|
(make-string (- remaining-space bar-width) #\ )))
|
||||||
(define tablename (format "page_~a" wikiname))
|
(printf "\e[2K\r~a~a~a" left bar name-display)
|
||||||
|
(flush-output))
|
||||||
(define ((extract f)) ; f - filename
|
(define (report-progress-loop)
|
||||||
(with-handlers
|
(sleep update-sleep)
|
||||||
([exn:fail? (λ (err) (println f) (raise err))])
|
(report-progress)
|
||||||
(define j
|
(report-progress-loop))
|
||||||
(case (path-get-extension f)
|
(define t (thread report-progress-loop))
|
||||||
[(#".json")
|
(define (quit)
|
||||||
(with-input-from-file f (λ () (read-json)))]
|
(kill-thread t)
|
||||||
[(#".gz")
|
(report-progress)
|
||||||
(define-values (in out) (make-pipe))
|
(displayln ""))
|
||||||
(with-input-from-file f (λ () (gunzip-through-ports (current-input-port) out)))
|
quit)
|
||||||
(read-json in)]
|
|
||||||
[else #f]))
|
;; ***************************************************************************************************
|
||||||
(define title (json-pointer-value "/parse/title" j))
|
;; Page text extractor
|
||||||
(define pageid (json-pointer-value "/parse/pageid" j))
|
;; ***************************************************************************************************
|
||||||
(define page-html (preprocess-html-wiki (json-pointer-value "/parse/text" j)))
|
|
||||||
(define page (update-tree updater (html->xexp page-html)))
|
(define (class-has? attributes substrs)
|
||||||
(define body (write-and-post-process #f page))
|
(define cl (or (get-attribute 'class attributes) ""))
|
||||||
(define table (write-and-post-process #t page))
|
(ormap (λ (substr) (string-contains? cl substr)) substrs))
|
||||||
(values title body table pageid)))
|
|
||||||
|
(define (updater element element-type attributes children)
|
||||||
(define results
|
(cond
|
||||||
(for/list ([f (directory-list (format "../storage/archive/~a" wikiname) #:build? #t)]
|
[(class-has? attributes '("collapsed" "selflink" "label" "toc" "editsection" "reviews"))
|
||||||
#:when (member (path-get-extension f) '(#".json" #".gz")))
|
(list 'div '() '())]
|
||||||
(extract f)))
|
[#t
|
||||||
|
(list element-type attributes children)]))
|
||||||
;; ***************************************************************************************************
|
|
||||||
;; TESTING WRITER
|
(define (writer tables-mode? page)
|
||||||
;; ***************************************************************************************************
|
(define (writer-inner page)
|
||||||
#;(for/first ([fut results]
|
(for ([bit page])
|
||||||
[i (in-naturals 1)]
|
(cond
|
||||||
#:when (i . >= . 4859))
|
[(and tables-mode? (pair? bit) (memq (car bit) '(h1 h2 h3 p blockquote q))) (void)]
|
||||||
(define-values (title body table pageid) (fut))
|
[(and (not tables-mode?) (pair? bit) (memq (car bit) '(ul ol dl table))) (void)]
|
||||||
(println title)
|
[(memq bit '(div p li td dd dt br)) (displayln "")]
|
||||||
(println body)
|
[(symbol? bit) (void)]
|
||||||
(println table))
|
[(and (pair? bit) (eq? (car bit) '*COMMENT*)) (void)]
|
||||||
|
[(and (pair? bit) (eq? (car bit) '@)) (void)]
|
||||||
(println "inserting...")
|
[(pair? bit) (writer-inner bit)]
|
||||||
|
[(string? bit) (display bit)])))
|
||||||
;; ***************************************************************************************************
|
(writer-inner page))
|
||||||
;; SQLite FTS5
|
|
||||||
;; ***************************************************************************************************
|
(define (write-and-post-process tables-mode? page)
|
||||||
#;(begin
|
(define text (with-output-to-string (λ () (writer tables-mode? page))))
|
||||||
(query-exec slc "begin transaction")
|
;; (define text-no-numbers (regexp-replace* #px"(?:-|[+$£€¥] *)?[0-9,.]{2,}%?\\s*" text ""))
|
||||||
#;(query-exec slc (format "create virtual table \"~a\" using fts5 (title, body, tokenize='porter unicode61')" wikiname))
|
(define shrink-text (regexp-replace* #px"([ \t]*\r?\n+)+" text "\n"))
|
||||||
(time
|
shrink-text)
|
||||||
(for ([fut results]
|
|
||||||
[i (in-naturals 1)])
|
(define ((extract f)) ; f - filename
|
||||||
(display "-")
|
(with-handlers
|
||||||
(when (and (> i 0) (= (modulo i 100) 0))
|
([exn:fail? (λ (err) (printf "extract: ~a: ~v~n" f err))])
|
||||||
(println i))
|
(define j
|
||||||
(define-values (title shrink-text) (fut))
|
(case (path-get-extension f)
|
||||||
(query-exec slc (format "insert into \"~a\" (title, body) values (?, ?)" tablename) title shrink-text)))
|
[(#".json")
|
||||||
|
(with-input-from-file f (λ () (read-json)))]
|
||||||
(println "running optimize...")
|
[(#".gz")
|
||||||
(query-exec slc (format "insert into \"~a\" (\"~a\") values ('optimize')" tablename tablename))
|
(define-values (in out) (make-pipe))
|
||||||
|
(with-input-from-file f (λ () (gunzip-through-ports (current-input-port) out)))
|
||||||
(println "committing...")
|
(read-json in)]
|
||||||
(query-exec slc "commit"))
|
[else #f]))
|
||||||
|
(define title (json-pointer-value "/parse/title" j))
|
||||||
;; ***************************************************************************************************
|
(define pageid (json-pointer-value "/parse/pageid" j))
|
||||||
;; Solr
|
(define page-html (preprocess-html-wiki (json-pointer-value "/parse/text" j)))
|
||||||
;; ***************************************************************************************************
|
(define page (update-tree updater (html->xexp page-html)))
|
||||||
(begin
|
(define body (write-and-post-process #f page))
|
||||||
(define data
|
(define table (write-and-post-process #t page))
|
||||||
(cond
|
(list title body table pageid)))
|
||||||
#;[(file-exists? "cache.rkt")
|
|
||||||
(println "reading in...")
|
;; ***************************************************************************************************
|
||||||
(with-input-from-file "cache.rkt" (λ () (read)))]
|
;; Program, loop, Solr APIs
|
||||||
[else
|
;; ***************************************************************************************************
|
||||||
(define data
|
|
||||||
(for/list ([fut results]
|
(program
|
||||||
[i (in-naturals 1)])
|
(start [wikiname "wikiname to download"])
|
||||||
(display "-")
|
|
||||||
(when (and (> i 0) (= (modulo i 100) 0))
|
(define results
|
||||||
(println i))
|
(for/list ([f (directory-list (build-path storage-path wikiname) #:build? #t)]
|
||||||
(define-values (title body table pageid) (fut))
|
#:when (member (path-get-extension f) '(#".gz")))
|
||||||
(define len (string-length body))
|
(extract f)))
|
||||||
`#hasheq((id . ,(number->string pageid))
|
|
||||||
(title . ,title)
|
(define data
|
||||||
(body . ,body)
|
(cond
|
||||||
(table . ,table)
|
[(and (read-from-cache?) (file-exists? "cache.rkt"))
|
||||||
(len . ,len))))
|
(displayln "Reading in...")
|
||||||
|
(with-input-from-file "cache.rkt" (λ () (read)))]
|
||||||
(println "writing out...")
|
[else
|
||||||
(with-output-to-file "cache.rkt" (λ () (write data)) #:exists 'truncate/replace)
|
(define x (box (progress^ 0 1 "...")))
|
||||||
data]))
|
(define quit (make-progress (λ () (unbox x))))
|
||||||
|
(define data
|
||||||
(println "posting...")
|
(for/list ([fut results]
|
||||||
(define res
|
[i (in-naturals 1)]
|
||||||
(post (format "http://localhost:8983/solr/~a/update?commit=true" wikiname)
|
#:do [(define page (fut))]
|
||||||
#:json data)))
|
#:when (not (void? page)))
|
||||||
|
(match-define (list title body table pageid) page)
|
||||||
;; ***************************************************************************************************
|
(define len (string-length body))
|
||||||
;; Meilisearch
|
(set-box! x (progress^ i (length results) title))
|
||||||
;; ***************************************************************************************************
|
`#hasheq((id . ,(number->string pageid))
|
||||||
#;(begin
|
(title . ,title)
|
||||||
(seq
|
(body . ,body)
|
||||||
(put (format "http://localhost:7700/indexes/~a/settings/searchable-attributes" wikiname)
|
(table . ,table)
|
||||||
#:json '("title" "body"))
|
(len . ,len))))
|
||||||
(put (format "http://localhost:7700/indexes/~a/settings/ranking-rules" wikiname)
|
(quit)
|
||||||
#:json '("words" "typo" #;"proximity" "attribute" "sort" "exactness" #;"len:desc"))
|
|
||||||
(call-with-input-file "stop-words.json"
|
(display "Writing out... ")
|
||||||
(λ (in)
|
(flush-output)
|
||||||
(put (format "http://localhost:7700/indexes/~a/settings/stop-words" wikiname)
|
(with-output-to-file "cache.rkt" (λ () (write data)) #:exists 'truncate/replace)
|
||||||
#:headers '#hasheq((Content-Type . "application/json"))
|
data]))
|
||||||
#:data in))))
|
|
||||||
(define data
|
(display "Converting... ")
|
||||||
(for/list ([fut results]
|
(flush-output)
|
||||||
[i (in-naturals 1)])
|
(define ser (jsexpr->bytes data))
|
||||||
(display "-")
|
(define ser-port (open-input-bytes ser))
|
||||||
(when (and (> i 0) (= (modulo i 100) 0))
|
(define quit (make-progress (λ () (progress^ (ceiling (/ (file-position ser-port) 64 1024))
|
||||||
(println i))
|
(ceiling (/ (bytes-length ser) 64 1024))
|
||||||
(define-values (title body pageid) (fut))
|
"Posting..."))
|
||||||
(define len (string-length body))
|
2))
|
||||||
`#hasheq((id . ,pageid)
|
(define res
|
||||||
(title . ,title)
|
(post (format "http://localhost:8983/solr/~a/update?commit=true" wikiname)
|
||||||
(body . ,body)
|
#:data ser-port
|
||||||
(len . ,len))))
|
#:headers '#hasheq((Content-Type . "application/json"))
|
||||||
(define res
|
#:timeouts (make-timeout-config #:lease 5 #:connect 5 #:request 300)))
|
||||||
(post (format "http://localhost:7700/indexes/~a/documents" wikiname)
|
(quit)
|
||||||
#:json data))
|
(displayln (response-status-line res)))
|
||||||
(seq res)
|
|
||||||
(println (response-json res)))
|
(run start)
|
||||||
|
|
||||||
(disconnect slc)
|
|
||||||
|
|
Loading…
Reference in a new issue