Queue for tag downloads
This commit is contained in:
parent
be489e9a18
commit
5e9ea6db66
6 changed files with 58 additions and 8 deletions
|
@ -82,7 +82,7 @@ async function loadCollection(inputUsername) {
|
|||
|
||||
// load full tracks/tags immediately if there's not too many
|
||||
const downloader = loadTags.downloadManager.check(account)
|
||||
if (downloader.total <= 5) loadTags.downloadManager.start(account)
|
||||
if (downloader.total > 0 && downloader.total <= 20 && loadTags.downloadManager.queue === 0) loadTags.downloadManager.start(account)
|
||||
|
||||
return {
|
||||
storedItemCount,
|
||||
|
|
|
@ -2,6 +2,8 @@
|
|||
|
||||
const domino = require("domino")
|
||||
const {getValidatedQuery, readValidatedBody, defineEventHandler} = require("h3")
|
||||
/** @type {import("@chriscdn/promise-semaphore")["default"]} */ // @ts-ignore
|
||||
const Semaphore = require("@chriscdn/promise-semaphore")
|
||||
|
||||
const {sync, db, router} = require("../passthrough")
|
||||
|
||||
|
@ -22,6 +24,7 @@ class TagDownloader extends sync.reloadClassMethods(() => TagDownloader) {
|
|||
this.untaggedItems = []
|
||||
this.total = this.untaggedItems.length
|
||||
this.running = false
|
||||
this.queuePosition = 0
|
||||
this.outcome = null
|
||||
this.check()
|
||||
}
|
||||
|
@ -32,8 +35,14 @@ class TagDownloader extends sync.reloadClassMethods(() => TagDownloader) {
|
|||
this.total = this.untaggedItems.length
|
||||
}
|
||||
|
||||
_setQueued(queuePosition) {
|
||||
if (this.running) return
|
||||
this.queuePosition = queuePosition
|
||||
}
|
||||
|
||||
async _start() {
|
||||
if (this.running) return
|
||||
this.queuePosition = 0
|
||||
this.running = true
|
||||
this.outcome = null
|
||||
this.processed = 0
|
||||
|
@ -79,7 +88,7 @@ class TagDownloader extends sync.reloadClassMethods(() => TagDownloader) {
|
|||
}
|
||||
this.outcome = "Success"
|
||||
} catch (e) {
|
||||
console.error(e)
|
||||
console.error(`error downloading tags for ${this.account} - ${e}`)
|
||||
this.outcome = e.toString()
|
||||
} finally {
|
||||
this.running = false
|
||||
|
@ -94,6 +103,8 @@ class TagDownloader extends sync.reloadClassMethods(() => TagDownloader) {
|
|||
const downloadManager = new class {
|
||||
/** @type {Map<string, TagDownloader>} */
|
||||
inProgressTagDownloads = sync.remember(() => new Map())
|
||||
semaphore = sync.remember(() => new Semaphore(1))
|
||||
queue = 0
|
||||
|
||||
/** @param {string} account */
|
||||
check(account) {
|
||||
|
@ -109,14 +120,21 @@ const downloadManager = new class {
|
|||
/** @param {string} account */
|
||||
start(account) {
|
||||
const downloader = this.check(account)
|
||||
downloader._start()
|
||||
downloader._setQueued(++this.queue)
|
||||
console.log(`requested tag download for ${account} - ${this.queue} in queue`)
|
||||
this.semaphore.request(() => downloader._start()).finally(() => {
|
||||
this.queue--
|
||||
for (const otherDownloader of this.inProgressTagDownloads.values()) {
|
||||
otherDownloader.queuePosition--
|
||||
}
|
||||
})
|
||||
return downloader
|
||||
}
|
||||
|
||||
/** @param {string} account */
|
||||
resolve(account) {
|
||||
const downloader = this.check(account)
|
||||
if (!downloader.running) {
|
||||
if (!downloader.running && downloader.queuePosition <= 0) {
|
||||
this.inProgressTagDownloads.delete(account)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue