/* Timelinize Copyright (c) 2013 Matthew Holt This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ package timeline import ( "context" "database/sql" "encoding/json" "errors" "fmt" "os" "runtime" "runtime/debug" "sync" "time" "github.com/zeebo/blake3" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) const maxThrottleCPUPct = 0.75 var ( throttleSize = max(int(float64(runtime.NumCPU())*maxThrottleCPUPct), 1) cpuIntensiveThrottle = make(chan struct{}, throttleSize) ) // CreateJob creates and runs a job described by action, with an estimated total units // of work, to be repeated after a certain interval (if > 0). If an identical job is // already running, the job will be queued. If an identical job is already queued, this // will be a no-op. The created job ID is returned. func (tl *Timeline) CreateJob(action JobAction, scheduled time.Time, repeat time.Duration, total int, parentJobID uint64) (uint64, error) { config, err := json.Marshal(action) if err != nil { return 0, fmt.Errorf("JSON-encoding job action: %w", err) } var jobType JobType switch action.(type) { case *ImportJob: jobType = JobTypeImport case thumbnailJob: jobType = JobTypeThumbnails case embeddingJob: jobType = JobTypeEmbeddings default: return 0, fmt.Errorf("unexpected job action: %#v", action) } var repeatPtr *time.Duration var startPtr *time.Time var totalPtr *int var parentJobIDPtr *uint64 if repeat > 0 { repeatPtr = &repeat } if total > 0 { totalPtr = &total } if parentJobID > 0 { parentJobIDPtr = &parentJobID } if !scheduled.IsZero() { startPtr = &scheduled } // this value is not the one that gets run, it is only used for storing job := Job{ Type: jobType, Config: string(config), Start: startPtr, Total: totalPtr, Repeat: repeatPtr, ParentJobID: parentJobIDPtr, } tx, err := tl.db.WritePool.BeginTx(tl.ctx, nil) if err != nil { return 0, err } defer tx.Rollback() jobID, created, err := tl.storeJob(tx, job) if err != nil { return 0, err } // TODO: This log is a bit of a hack; it's to notify the UI that a new job // has been created, so it can be shown in the UI if relevant; but the // longer-term status logger doesn't get made until runJob... so we have // to make a logger with the same name and many of the same fields... Log.Named("job.status").Info("created", zap.Object("job", Job{ RepoID: tl.ID().String(), ID: jobID, Type: job.Type, Created: created, Start: job.Start, ParentJobID: parentJobIDPtr, Total: totalPtr, })) err = tl.startJob(tl.ctx, tx, jobID) if err != nil { return 0, err } if err = tx.Commit(); err != nil { return 0, fmt.Errorf("committing new job transaction: %w", err) } return jobID, nil } // storeJob adds the job to the database. It does not start it. // TODO: Job configs could be compressed to save space in the DB... func (tl *Timeline) storeJob(tx *sql.Tx, job Job) (uint64, time.Time, error) { job.Hash = job.hash() hostname, err := os.Hostname() if err != nil { Log.Error("unable to lookup hostname while adding job", zap.Error(err)) } job.Hostname = &hostname // first see if any job with the same hash is already queued; if so, // no-op since this would be a duplicate var count int err = tx.QueryRowContext(tl.ctx, `SELECT count() FROM jobs WHERE hash=? AND state=? LIMIT 1`, job.Hash, JobQueued).Scan(&count) if err != nil { return 0, time.Time{}, fmt.Errorf("checking for duplicate job: %w", err) } if count > 0 { return 0, time.Time{}, nil } var start *int64 if job.Start != nil { startVal := job.Start.UnixMilli() start = &startVal } var id uint64 var createdUnix int64 err = tx.QueryRowContext(tl.ctx, ` INSERT INTO jobs (type, configuration, hash, hostname, start, total, repeat, parent_job_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id, created`, job.Type, job.Config, job.Hash, job.Hostname, start, job.Total, job.Repeat, job.ParentJobID).Scan(&id, &createdUnix) if err != nil { return 0, time.Time{}, fmt.Errorf("inserting new job row: %w", err) } created := time.UnixMilli(createdUnix) return id, created, nil } // loadJob loads a job from the database, using tx if set. A lock MUST be obtained on the // timeline database when calling this function! func (tl *Timeline) loadJob(ctx context.Context, tx *sql.Tx, jobID uint64, parentAndChildJobs bool) (Job, error) { q := `SELECT id, type, name, configuration, hash, state, hostname, created, updated, start, ended, message, total, progress, checkpoint, repeat, parent_job_id FROM jobs WHERE id=?` vals := []any{jobID} if parentAndChildJobs { // we can get child jobs now, parent job will be had in a second query q += "OR parent_job_id=? LIMIT 100" // limit just in case, I guess vals = append(vals, jobID) } else { q += " LIMIT 1" } var rows *sql.Rows var err error if tx == nil { rows, err = tl.db.ReadPool.QueryContext(ctx, q, vals...) } else { rows, err = tx.QueryContext(ctx, q, vals...) } if err != nil { return Job{}, err } defer rows.Close() tlID := tl.id.String() var jobs []Job // scan all the job rows into the same slice for now, we'll figure out // which is the "main"/parent job later (the result set is not usually more // than a few rows) for rows.Next() { job, err := scanJob(rows, tlID) if err != nil { return Job{}, err } jobs = append(jobs, job) } if err := rows.Err(); err != nil { return Job{}, err } if len(jobs) == 0 { return Job{}, sql.ErrNoRows } var parentJob Job for _, job := range jobs { if job.ID == jobID { parentJob = job } else { parentJob.Children = append(parentJob.Children, job) } } // load the parent of the parent, I guess if parentAndChildJobs && parentJob.ParentJobID != nil { grandparent, err := tl.loadJob(ctx, tx, *parentJob.ParentJobID, false) if err != nil { return Job{}, fmt.Errorf("loading parent job: %w", err) } parentJob.Parent = &grandparent } return parentJob, nil } // startJob makes sure an identical job is not already running, then // sets the job's state to started and syncs it with the DB, then // calls the action function in a goroutine. func (tl *Timeline) startJob(ctx context.Context, tx *sql.Tx, jobID uint64) error { // if an identical job is already running, we shouldn't start yet var count int err := tx.QueryRowContext(tl.ctx, `SELECT count() FROM jobs WHERE id!=? AND hash=(SELECT hash FROM jobs WHERE id=? LIMIT 1) AND (state=? OR state=?) LIMIT 1`, jobID, jobID, JobStarted, JobPaused).Scan(&count) if err != nil { return fmt.Errorf("checking for duplicate running job: %w", err) } if count > 0 { return errors.New("identical job is already running") } // verify the job is not already loaded (using UnpauseJob is suitable in that case, so we don't // double-load it) and then load the job to verify it is in a startable state tl.activeJobsMu.RLock() _, runningOrPaused := tl.activeJobs[jobID] tl.activeJobsMu.RUnlock() if runningOrPaused { return fmt.Errorf("job %d is already active/loaded and cannot be loaded and started again", jobID) } job, err := tl.loadJob(tl.ctx, tx, jobID, false) if err != nil { return fmt.Errorf("loading job %d: %w", jobID, err) } if job.State == JobStarted || job.State == JobSucceeded { // paused is an allowed state to start from, as long as it's not already loaded/active (checked above), // because the process could have stopped while the job was paused return fmt.Errorf("job %d has already %s and is not startable in this state", jobID, job.State) } start := func(tx *sql.Tx) error { // update the job's state to started now := time.Now() _, err = tx.ExecContext(ctx, `UPDATE jobs SET state=?, start=?, updated=? WHERE id=?`, // TODO: LIMIT 1 JobStarted, now.UnixMilli(), now.UnixMilli(), jobID) if err != nil { return fmt.Errorf("updating job state: %w", err) } job.State = JobStarted job.Start = &now job.Updated = &now // update the job's state as we prepare to run it err = tx.QueryRowContext(ctx, `SELECT total, progress FROM jobs WHERE id=? LIMIT 1`, jobID).Scan(&job.Total, &job.Progress) if err != nil { return fmt.Errorf("unable to get starting progress and total amounts: %w", err) } // run the job if err = tl.runJob(job); err != nil { return fmt.Errorf("running job: %w", err) } return nil } if job.State == JobQueued && job.Start != nil && time.Now().Before(*job.Start) { go func(scheduledStart time.Time) { select { case <-ctx.Done(): return case <-time.After(time.Until(scheduledStart)): logger := Log.Named("job") tx, err := tl.db.WritePool.BeginTx(ctx, nil) if err != nil { logger.Error("could not start transaction to start job", zap.Error(err)) return } defer tx.Rollback() // ensure job was not aborted while we were waiting for it to start var unabortedJobCount int err = tx.QueryRowContext(ctx, `SELECT count() FROM jobs WHERE id=? AND state!=? LIMIT 1`, jobID, JobAborted).Scan(&unabortedJobCount) if errors.Is(err, sql.ErrNoRows) || unabortedJobCount == 0 { logger.Error("job was aborted before it started") return } if err = start(tx); err != nil { logger.Error("could not start scheduled job", zap.Error(err)) return } if err = tx.Commit(); err != nil { logger.Error("could not commit transaction for scheduled job", zap.Error(err)) return } } }(*job.Start) } else { return start(tx) } return nil } // runJob deserializes the job's and starts a goroutine and // calls its associated action function. The goroutine then finalizes // the job and runs the next queued job, if any. This method does // not sync the job's starting state to the DB, so call startJob() // instead. It is expected that its state is running/started. func (tl *Timeline) runJob(row Job) error { // create the job action by deserializing the config into its assocated struct var action JobAction switch row.Type { case JobTypeImport: var importJob *ImportJob if err := json.Unmarshal([]byte(row.Config), &importJob); err != nil { return fmt.Errorf("unmarshaling import job config: %w", err) } action = importJob case JobTypeThumbnails: var thumbnailJob thumbnailJob if err := json.Unmarshal([]byte(row.Config), &thumbnailJob); err != nil { return fmt.Errorf("unmarshaling thumbnail job config: %w", err) } action = thumbnailJob case JobTypeEmbeddings: var embeddingJob embeddingJob if err := json.Unmarshal([]byte(row.Config), &embeddingJob); err != nil { return fmt.Errorf("unmarshaling embedding job config: %w", err) } action = embeddingJob default: return fmt.Errorf("unknown job type '%s'", row.Type) } baseLogger := Log.Named("job") // keep a pointer outside the ActiveJob struct because after // the job is done, we will update the logger, and it's not // thread-safe to change that field in the struct statusLog := baseLogger.Named("status") ctx, cancel := context.WithCancel(tl.ctx) job := &ActiveJob{ ctx: ctx, cancel: cancel, pause: make(chan chan struct{}), done: make(chan struct{}), id: row.ID, tl: tl, logger: baseLogger.Named("action").With(zap.Object("job", row)), statusLog: statusLog, parentJobID: row.ParentJobID, action: action, currentState: row.State, currentProgress: row.Progress, currentTotal: row.Total, currentMessage: row.Message, started: *row.Start, jobType: row.Type, } // run the job asynchronously -- never block the calling goroutine! go func(job *ActiveJob, logger, statusLog *zap.Logger, row Job, action JobAction) { // don't allow a job, which is doing who-knows-what and processing who-knows-what // input, to bring down the program defer func() { if r := recover(); r != nil { logger.Error("panic", zap.Any("error", r), zap.String("stack", string(debug.Stack()))) } // apparently it's best-practice to checkpoint at the end of large write operations: // https://github.com/mattn/go-sqlite3/issues/1022#issuecomment-1071755879 // "One thing I will suggest is running pragma wal_checkpoint after committing the // insert transaction, as that will merge the WAL file back into the main database file, // which will make subsequent reads faster." -rittneje // and since the default mode is PASSIVE, this page recommends using FULL or TRUNCATE // but I haven't personally seen a need for those heavier operations yet: // https://phiresky.github.io/blog/2020/sqlite-performance-tuning/#regarding-wal-mode if _, err := tl.db.WritePool.ExecContext(ctx, "PRAGMA wal_checkpoint"); err != nil { logger.Error("could not create WAL checkpoint", zap.Error(err)) } }() // when we're done here, clean up our map of active jobs defer func() { tl.activeJobsMu.Lock() delete(tl.activeJobs, job.id) tl.activeJobsMu.Unlock() }() statusLog.Info("running", zap.Object("job", row), zap.String("state", string(row.State))) // signal to any waiters when this job action returns; we intentionally // don't signal until after we've synced the job state to the DB defer close(job.done) // TODO: we should have a way of letting jobs report errors without // having to terminate, but still marking it as failed when done, // or at least record the errors even if it gets marked as "succeeded" // (maybe "succeeded" is too specific/optimistic a term, maybe "completed" or "done" or "finished" instead?) // type conversion for checkpoint; it's easier for actions to use []byte, but we like using *string // for database since most DB viewers make it easier to read TEXT columns than BLOB columns var chkpt []byte if row.Checkpoint != nil && len(*row.Checkpoint) > 0 { chkpt = []byte(*row.Checkpoint) } // run the job; we'll handle the error by logging the result and updating the state actionErr := action.Run(job, chkpt) var newState JobState switch { case actionErr == nil: newState = JobSucceeded case errors.Is(actionErr, context.Canceled): newState = JobAborted default: newState = JobFailed } // add info to the logger and log the result end := time.Now() row.Ended = &end if row.Start != nil { statusLog = statusLog.With(zap.Duration("duration", end.Sub(*row.Start))) } job.mu.Lock() job.currentState = newState job.ended = end job.flushProgress(statusLog) // allow the UI to update live job progress display one last time job.mu.Unlock() // print a final message indicating the state and error, if any (TODO: could do above on that last flushProgress(), I guess) switch newState { case JobSucceeded: statusLog.Info(string(newState), zap.Object("job", row)) case JobAborted: statusLog.Warn(string(newState), zap.Object("job", row), zap.Error(actionErr)) case JobFailed: statusLog.Error(string(newState), zap.Object("job", row), zap.Error(actionErr)) } // sync to the DB, and dequeue the next job tx, err := tl.db.WritePool.BeginTx(tl.ctx, nil) // don't use the job's context, it might have been canceled! if err != nil { logger.Error("beginning transaction for ended job", zap.Error(err)) return } defer tx.Rollback() _, err = tx.ExecContext(tl.ctx, `UPDATE jobs SET state=?, ended=?, updated=? WHERE id=?`, // TODO: LIMIT 1 (see https://github.com/mattn/go-sqlite3/pull/802) newState, end.UnixMilli(), end.UnixMilli(), job.id) if err != nil { logger.Error("updating job state", zap.Error(err)) } if newState == JobSucceeded { // clear message and checkpoint in the DB if job succeeds job.Message("") if err = job.sync(tx, nil); err != nil { logger.Error("clearing job checkpoint and message from successful job", zap.Error(err)) } } else { // for any other termination, sync only the job state to the DB job.mu.Lock() checkpoint := job.currentCheckpoint job.mu.Unlock() err := job.sync(tx, checkpoint) if err != nil { logger.Error("syncing job state with DB", zap.Error(err)) } } hostname, err := os.Hostname() if err != nil { Log.Error("unable to lookup hostname while dequeuing next job", zap.Error(err)) } // see if there's another job queued up we should run // (only run import jobs on the same machine they were configured from, since it uses external file paths) var nextJobID uint64 err = tx.QueryRowContext(tl.ctx, `SELECT id FROM jobs WHERE (state=? OR state=?) AND (hostname=? OR type!=?) ORDER BY start, created LIMIT 1`, JobQueued, JobInterrupted, hostname, JobTypeImport).Scan(&nextJobID) if nextJobID > 0 { err := tl.startJob(tl.ctx, tx, nextJobID) if err != nil { logger.Error("starting next job", zap.Error(err)) } } if err != nil && !errors.Is(err, sql.ErrNoRows) { logger.Error("querying for next job", zap.Error(err)) } if err = tx.Commit(); err != nil { logger.Error("committing post-job transaction", zap.Error(err)) } }(job, baseLogger, statusLog, row, action) tl.activeJobsMu.Lock() tl.activeJobs[job.id] = job tl.activeJobsMu.Unlock() return nil } // ActiveJob represents a job that is being actively loaded and run. // It has methods which can manage the state of the job. // It has a context which should be honored by implementations // of JobAction. A ActiveJob value must not be copied. It maintains // current progress/etc. that should be as authoritative as // what is in the DB, although this struct is updated more // frequently than the DB is synced. type ActiveJob struct { ctx context.Context cancel context.CancelFunc pause chan chan struct{} // send to the outer channel to pause, receive on the inner channel to unpause done chan struct{} // signaling channel that is closed when the job action returns id uint64 tl *Timeline logger *zap.Logger statusLog *zap.Logger parentJobID *uint64 action JobAction // these fields are needed mainly for accurate real-time logging purposes jobType JobType started time.Time ended time.Time mu sync.Mutex // protected by mu paused chan struct{} // set by the job manager when pausing; closing this unpauses the job currentState JobState currentProgress *int currentTotal *int currentMessage *string // nil => unchanged, "" => clear currentCheckpoint any lastCheckpoint *time.Time // when last checkpoint was created (may be more recent than last DB sync, which is throttled) lastSync time.Time // last DB update lastFlush time.Time // last frontend update } // MarshalLogObject satisfies the zapcore.ObjectMarshaler interface. func (j *ActiveJob) MarshalLogObject(enc zapcore.ObjectEncoder) error { enc.AddString("repo_id", j.tl.id.String()) enc.AddUint64("id", j.id) enc.AddString("type", string(j.jobType)) enc.AddString("state", string(j.currentState)) enc.AddTime("start", j.started) if !j.ended.IsZero() { enc.AddTime("ended", j.ended) } if j.parentJobID != nil { enc.AddUint64("parent_job_id", *j.parentJobID) } if j.lastCheckpoint != nil { enc.AddTime("checkpointed", *j.lastCheckpoint) } if j.currentMessage != nil { enc.AddString("message", *j.currentMessage) } if j.currentProgress != nil { enc.AddInt("progress", *j.currentProgress) } if j.currentTotal != nil { enc.AddInt("total", *j.currentTotal) } return nil } // Context returns the context the job is being run in. It should // (usually) be the parent context of all others in the job. func (j *ActiveJob) Context() context.Context { return j.ctx } // ID returns the database row ID of the job. func (j *ActiveJob) ID() uint64 { return j.id } // Timeline returns the timeline associated with the job. func (j *ActiveJob) Timeline() *Timeline { return j.tl } // Logger returns a logger for the job. func (j *ActiveJob) Logger() *zap.Logger { return j.logger } // Continue should be called by all job actions frequently, typically at // the beginning of their main loop (and any longer-running inner loops). // It blocks if the job is paused, until it is unpaused. It returns an // error if the job context has been canceled; i.e. the job is being // canceled and should terminate. It does not block if the job is not // paused or canceled. func (j *ActiveJob) Continue() error { select { case <-j.ctx.Done(): return j.ctx.Err() case unpause := <-j.pause: // block until either cancelled or unpaused select { case <-j.ctx.Done(): return j.ctx.Err() case <-unpause: } default: } return nil } // Set the total size of the job in the DB. Ideally, this should only // be set once the total size is properly known, not accumulated as // you go, since progress bars rely on this to configure their display. // So for example, if you do a bunch of work up front to estimate the // size before starting the real work, don't call SetTotal() until // that calculation has finished (use checkpoints to preserve your // accumulated count). But if the total happens to change a little as // you go, it's okay to call this a few times if needed. But the // progress bar will show up as "indeterminate" as long as the total // in the DB is nil (or 0?), so if the real work hasn't quite started // yet, you don't want to disable that kind of display until you // have started the work. func (j *ActiveJob) SetTotal(total int) { j.mu.Lock() j.currentTotal = &total j.flushProgress(nil) j.mu.Unlock() } // FlushProgress forces a log to be written that updates the UI // about the job. func (j *ActiveJob) FlushProgress() { j.mu.Lock() j.flushProgress(j.statusLog) j.mu.Unlock() } // Progress updates the progress of the job in the DB by adding delta, // which is work completed since the previous update, towards the expected // total. This will update progress bars in the frontend UI. If the // current progress is nil, it will be interpreted as 0 and delta will // be the new progress value. func (j *ActiveJob) Progress(delta int) { j.mu.Lock() newProgress := delta if j.currentProgress != nil { newProgress = *j.currentProgress + delta } j.currentProgress = &newProgress j.flushProgress(nil) j.mu.Unlock() } // flushProgress writes a log that the UI uses to update the progress of the job // if it has been enough time since the last flush (we don't need to update the // UI 1000x/sec). To force a flush regardless, pass in a logger, even if it's // the default j.statusLog logger. This lets you add fields to the output when // you have a reason to force a log emission. // MUST BE CALLED IN A LOCK ON THE JOB MUTEX. func (j *ActiveJob) flushProgress(logger *zap.Logger) { if logger != nil || time.Since(j.lastFlush) > jobFlushInterval { if logger == nil { logger = j.statusLog } logger.Info("progress", zap.Object("job", j)) j.lastFlush = time.Now() _ = logger.Sync() // ensure it gets written promptly } } // Message updates the current job message or status to show the user. func (j *ActiveJob) Message(message string) { j.mu.Lock() if message == "" { j.currentMessage = nil } else { j.currentMessage = &message } j.flushProgress(nil) j.mu.Unlock() } // Checkpoint creates a checkpoint and syncs the job state with the database. // When creating a checkpoint, the total, progress, and message data should // be current such that resuming the job from this checkpoint would have the // correct total, progress, and message displayed. Usually, a checkpoint is // where to begin/resume the job, since it's often the starting index in a // loop. For example, if a job started at unit of work ("task"?) index 0 and // finished up through 3, the progress value would have been updated to be 4 // and the checkpoint would contain index 4. The job would then resume with // starting at index 4, which had not been started yet. // // Checkpoint values are opaque and MUST be JSON-marshallable. They will be // returned to the job action in the form of JSON bytes. func (j *ActiveJob) Checkpoint(newCheckpoint any) error { return j.sync(nil, newCheckpoint) // TODO: emit log for the frontend to update the UI? } // sync writes changes/updates to the DB. In case of rapid job progression, we avoid // writing to the DB with every call; only if enough time has passed, or if the job // is done. If tx is non-nil, it is assumed the job is done (but if that assumption // no longer holds true, we can change the signature of this method). In other words, // passing in a tx forces a flush/sync to the DB. // // This is how a checkpoint is stored as well, since it doesn't make sense to only // have a checkpoint in memory (pausing a job does not use a checkpoint since it // remains in memory) -- only resuming a job after it has been removed from memory // (usually by program exit or intentional abort, etc.) needs a checkpoint, so // creating a checkpoint and syncing to the DB are the same operation. By the same // token, the progress, message, and total fields in the DB row must also correspond // with the checkpoint, otherwise resuming a job will be out of whack. So it is // important that syncing to the DB only happens with a checkpoint to go with it // (if there is a checkpoint at all). Passing a nil checkpoint here will clear it. func (j *ActiveJob) sync(tx *sql.Tx, checkpoint any) error { j.mu.Lock() defer j.mu.Unlock() j.currentCheckpoint = checkpoint // no-op if last sync was too recent and the job isn't done yet (job done <==> tx!=nil) if time.Since(j.lastSync) < jobSyncInterval && tx == nil { return nil } var chkpt []byte if checkpoint != nil { var err error chkpt, err = json.Marshal(checkpoint) if err != nil { return fmt.Errorf("JSON-encoding checkpoint %#v: %w", checkpoint, err) } now := time.Now() j.lastCheckpoint = &now } q := `UPDATE jobs SET progress=?, total=?, message=?, checkpoint=?, updated=? WHERE id=?` // TODO: LIMIT 1 vals := []any{j.currentProgress, j.currentTotal, j.currentMessage, string(chkpt), time.Now().UnixMilli(), j.id} var err error if tx == nil { _, err = j.tl.db.WritePool.ExecContext(j.tl.ctx, q, vals...) } else { _, err = tx.ExecContext(j.tl.ctx, q, vals...) } if err != nil { return fmt.Errorf("syncing progress with DB: %w", err) } j.lastSync = time.Now() return nil } // GetJobs loads the jobs with the specified IDs, or by the most recent jobs, whichever is set. // Both technically can be set, but why? func (tl *Timeline) GetJobs(ctx context.Context, jobIDs []uint64, mostRecent int) ([]Job, error) { var jobs []Job // load most recent jobs if mostRecent > 0 { q := `SELECT id, type, name, configuration, hash, state, hostname, created, updated, start, ended, message, total, progress, checkpoint, repeat, parent_job_id FROM jobs ORDER BY created DESC LIMIT ?` var rows *sql.Rows var err error rows, err = tl.db.ReadPool.QueryContext(ctx, q, mostRecent) if err != nil { return nil, err } defer rows.Close() tlID := tl.id.String() for rows.Next() { job, err := scanJob(rows, tlID) if err != nil { return nil, err } jobs = append(jobs, job) } if err := rows.Err(); err != nil { return nil, err } if len(jobs) == 0 { return nil, sql.ErrNoRows } } // load specific jobs by ID for _, id := range jobIDs { job, err := tl.loadJob(ctx, nil, id, true) if err != nil { return nil, fmt.Errorf("loading job %d: %w", id, err) } jobs = append(jobs, job) } for i := range jobs { // if job is running, we can provide more recent information than what // was last synced to the DB, which may be out-of-date at the moment if jobs[i].State == JobStarted { tl.activeJobsMu.RLock() activeJob, ok := tl.activeJobs[jobs[i].ID] tl.activeJobsMu.RUnlock() if ok { // we don't use a RWMutex because this mutex is write-heavy activeJob.mu.Lock() jobs[i].Progress = activeJob.currentProgress jobs[i].Message = activeJob.currentMessage jobs[i].Total = activeJob.currentTotal activeJob.mu.Unlock() } } } return jobs, nil } func (tl *Timeline) CancelJob(ctx context.Context, jobID uint64) error { tl.activeJobsMu.Lock() activeJob, ok := tl.activeJobs[jobID] tl.activeJobsMu.Unlock() if ok { // job is actively running -- fun! we get to cancel it, // and our job is actually easier this way activeJob.cancel() // wait for job action to return; this ensures its // state has been synced to the DB and it has been // removed from the map of active jobs select { case <-ctx.Done(): return ctx.Err() case <-activeJob.done: } return nil } // the job is not actively running; it could have been paused // from a prior execution of the program, for example; or maybe // it is still queued and they just want to prevent it from // running; etc, let's check the DB tx, err := tl.db.WritePool.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() job, err := tl.loadJob(ctx, tx, jobID, false) if err != nil { return fmt.Errorf("loading job %d from database: %w", jobID, err) } switch job.State { case JobQueued, JobStarted, JobPaused, JobInterrupted: // Started and Interrupted should probably not be encountered since // running jobs should be active (taken care of above by canceling // its context), and we should resume interrupted jobs at startup, // but might as well allow them here since it's no matter job.State = JobAborted _, err = tl.db.WritePool.ExecContext(ctx, `UPDATE jobs SET state=? WHERE id=?`, job.State, jobID) // TODO: LIMIT 1 if err != nil { return fmt.Errorf("updating job state: %w", err) } default: return fmt.Errorf("job %d is in state %s, which cannot be canceled", jobID, job.State) } if err := tx.Commit(); err != nil { return fmt.Errorf("committing transaction: %w", err) } // update any UI elements that may be showing info about this inactive job statusLog := Log.Named("job.status") inactiveJob := &ActiveJob{ id: job.ID, statusLog: statusLog, parentJobID: job.ParentJobID, currentState: job.State, currentProgress: job.Progress, currentTotal: job.Total, currentMessage: job.Message, lastCheckpoint: job.Updated, } inactiveJob.flushProgress(statusLog) return nil } func (tl *Timeline) PauseJob(ctx context.Context, jobID uint64) error { tl.activeJobsMu.Lock() job, ok := tl.activeJobs[jobID] tl.activeJobsMu.Unlock() if !ok { return fmt.Errorf("job %d is either inactive or does not exist", jobID) } // create the channel that the job will block on; that is the pausing // (closing this channel will unpause the job) paused := make(chan struct{}) // What happens next is done deliberately in this order: // 1. Update the job's state in the DB. This is authoritative, so if the program // terminates before the job pauses, at least the user's intent will be preserved. // 2. Signal the job to pause. The UI is currently showing temporary "Pausing..." // text, and it can take time for the job to receive our signal and pause, // for example if it's in the middle of a batch of thumbnails being generated; // while it's finishing those, it's emitting job status updates and the UI will // update accordingly, so if we set the state to "paused" before the job has // actually paused, it looks dumb because it'll be streaming progress while paused. // 3. Update the UI's state to "paused" so that controls appear to allow resumption. // step 1. update the DB with the authoritative job state _, err := tl.db.WritePool.ExecContext(ctx, `UPDATE jobs SET state=? WHERE state=? AND id=?`, JobPaused, JobStarted, job.id) // TODO: LIMIT 1 if err != nil { return fmt.Errorf("job paused, but error updating job state in DB: %w", err) } // step 2. the job should soon select on the pause channel to receive this, then // immediately try to receive from the paused channel, which is effectively the // pause behavior, as it blocks the job's goroutine until we unpause job.pause <- paused // step 3. store reference to the new channel so it can be closed (to unpause) later, // and update the UI with the job's paused state job.mu.Lock() if job.paused != nil { // already paused (not an error, just a no-op) job.mu.Unlock() return nil } job.paused = paused job.currentState = JobPaused job.flushProgress(job.statusLog) // note that the UI might get the state update before this function, and thus its HTTP request, returns -- TODO: not true if this change works job.mu.Unlock() return nil } func (tl *Timeline) UnpauseJob(ctx context.Context, jobID uint64) error { tl.activeJobsMu.Lock() job, ok := tl.activeJobs[jobID] tl.activeJobsMu.Unlock() if !ok { // not active; see if it's in the DB... if so, it's the same as starting the job var state JobState err := tl.db.ReadPool.QueryRowContext(ctx, `SELECT state FROM jobs WHERE id=? LIMIT 1`, jobID).Scan(&state) if errors.Is(err, sql.ErrNoRows) { return fmt.Errorf("job %d not found", jobID) } if err != nil { return err } if state != JobStarted && state != JobPaused && state != JobFailed { return fmt.Errorf("job %d is in %s state and cannot be resumed", jobID, state) } return tl.StartJob(ctx, jobID, false) } // update the DB first because it's authoritative for the job's state _, err := tl.db.WritePool.ExecContext(ctx, `UPDATE jobs SET state=? WHERE state=? AND id=?`, JobStarted, JobPaused, job.id) // TODO: LIMIT 1 if err != nil { return fmt.Errorf("job resumed, but error updating job state in DB: %w", err) } // unpause by closing the pause channel, delete the pause channel, // and then update the UI as to the job's new state job.mu.Lock() defer job.mu.Unlock() if job.paused == nil { // already not paused (not an error, just a no-op) return nil } close(job.paused) job.paused = nil job.currentState = JobStarted job.flushProgress(job.statusLog) return nil } func (tl *Timeline) StartJob(ctx context.Context, jobID uint64, startOver bool) error { tx, err := tl.db.WritePool.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() if startOver { _, err = tx.ExecContext(ctx, `UPDATE jobs SET checkpoint=NULL, progress=NULL, message=NULL WHERE id=?`, jobID) // TODO: LIMIT 1 if err != nil { return fmt.Errorf("clearing checkpoint, progress, and message to start over: %w", err) } } if err = tl.startJob(ctx, tx, jobID); err != nil { return err } return tx.Commit() } func scanJob(rows *sql.Rows, repoID string) (Job, error) { var job Job var created, updated, start, end *int64 err := rows.Scan( &job.ID, &job.Type, &job.Name, &job.Config, &job.Hash, &job.State, &job.Hostname, &created, &updated, &start, &end, &job.Message, &job.Total, &job.Progress, &job.Checkpoint, &job.Repeat, &job.ParentJobID) if err != nil { return job, fmt.Errorf("scanning job fields: %w", err) } if created != nil { job.Created = time.UnixMilli(*created) } if updated != nil { ts := time.UnixMilli(*updated) job.Updated = &ts } if start != nil { ts := time.UnixMilli(*start) job.Start = &ts } if end != nil { ts := time.UnixMilli(*end) job.Ended = &ts } job.RepoID = repoID return job, nil } // Job is only to be used for shuttling job data in and out of the DB, // it should not be assumed to accurately reflect the current state of the // job. Mainly used for inserting a job row and getting a job row for the // frontend. type Job struct { // not a field in the DB, but useful for bookkeeping where multiple timelines are open RepoID string `json:"repo_id"` ID uint64 `json:"id"` Type JobType `json:"type"` Name *string `json:"name,omitempty"` Config string `json:"config,omitempty"` // JSON encoding of, for instance, ImportParameters, etc. Hash []byte `json:"hash,omitempty"` State JobState `json:"state"` Hostname *string `json:"hostname,omitempty"` Created time.Time `json:"created,omitempty"` Updated *time.Time `json:"updated,omitempty"` Start *time.Time `json:"start,omitempty"` // could be future, so not "started" Ended *time.Time `json:"ended,omitempty"` Message *string `json:"message,omitempty"` Total *int `json:"total,omitempty"` Progress *int `json:"progress,omitempty"` Checkpoint *string `json:"checkpoint,omitempty"` Repeat *time.Duration `json:"repeat,omitempty"` ParentJobID *uint64 `json:"parent_job_id,omitempty"` // only used when loading jobs from the DB for the frontend Parent *Job `json:"parent,omitempty"` Children []Job `json:"children,omitempty"` } // MarshalLogObject satisfies the zapcore.ObjectMarshaler interface. func (j Job) MarshalLogObject(enc zapcore.ObjectEncoder) error { enc.AddString("repo_id", j.RepoID) enc.AddUint64("id", j.ID) if j.ParentJobID != nil { enc.AddUint64("parent_job_id", *j.ParentJobID) } enc.AddString("type", string(j.Type)) enc.AddTime("created", j.Created) if j.Start != nil { enc.AddTime("start", *j.Start) } if j.Ended != nil { enc.AddTime("ended", *j.Ended) } if j.Name != nil { enc.AddString("name", *j.Name) } return nil } func (j Job) hash() []byte { h := blake3.New() _, _ = h.WriteString(string(j.Type)) _, _ = h.WriteString(j.Config) if j.Hostname != nil { _, _ = h.WriteString(*j.Hostname) } return h.Sum(nil) } // JobAction is a type that does actual work of a job. // It must be JSON-encodable. type JobAction interface { // Run runs or resumes a job. It should utilize the // checkpoint, if set, to resume an interrupted job. // CPU-intensive work should use the throttle. If // possible, the job status should be updated as it // progresses. The job has a context which should // be honored. The function should not return until // the job is done (whether successful or not); this // means waiting until any spawned goroutines have // finished (using sync.WaitGroup is good parctice). Run(job *ActiveJob, checkpoint []byte) error } type JobType string const ( JobTypeImport JobType = "import" JobTypeThumbnails JobType = "thumbnails" JobTypeEmbeddings JobType = "embeddings" ) type JobState string const ( JobQueued JobState = "queued" // on deck JobStarted JobState = "started" // currently running JobPaused JobState = "paused" // intentional suspension (don't auto-resume) (still considered "active" since its action remains blocked in memory) JobAborted JobState = "aborted" // intentional termination (don't auto-resume) (job unloaded from memory) JobInterrupted JobState = "interrupted" // unintentional, forced interruption (may be auto-resumed) JobSucceeded JobState = "succeeded" // final (cannot be restarted) // TODO: what if units of work fail, though? maybe this should be "done" or "completed" JobFailed JobState = "failed" // final (can be manually restarted) ) const ( jobSyncInterval = 2 * time.Second // how often to update the DB and save checkpoints jobFlushInterval = 100 * time.Millisecond // how often to update the frontend )