1
0
Fork 0
timelinize/timeline/jobs.go
2026-01-16 23:32:10 -07:00

1210 lines
39 KiB
Go

/*
Timelinize
Copyright (c) 2013 Matthew Holt
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package timeline
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"os"
"runtime"
"runtime/debug"
"sync"
"time"
"github.com/zeebo/blake3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
const maxThrottleCPUPct = 0.75
var (
throttleSize = max(int(float64(runtime.NumCPU())*maxThrottleCPUPct), 1)
cpuIntensiveThrottle = make(chan struct{}, throttleSize)
)
// CreateJob creates and runs a job described by action, with an estimated total units
// of work, to be repeated after a certain interval (if > 0). If an identical job is
// already running, the job will be queued. If an identical job is already queued, this
// will be a no-op. The created job ID is returned.
func (tl *Timeline) CreateJob(action JobAction, scheduled time.Time, repeat time.Duration, total int, parentJobID uint64) (uint64, error) {
config, err := json.Marshal(action)
if err != nil {
return 0, fmt.Errorf("JSON-encoding job action: %w", err)
}
var jobType JobType
switch action.(type) {
case *ImportJob:
jobType = JobTypeImport
case thumbnailJob:
jobType = JobTypeThumbnails
case embeddingJob:
jobType = JobTypeEmbeddings
default:
return 0, fmt.Errorf("unexpected job action: %#v", action)
}
var repeatPtr *time.Duration
var startPtr *time.Time
var totalPtr *int
var parentJobIDPtr *uint64
if repeat > 0 {
repeatPtr = &repeat
}
if total > 0 {
totalPtr = &total
}
if parentJobID > 0 {
parentJobIDPtr = &parentJobID
}
if !scheduled.IsZero() {
startPtr = &scheduled
}
// this value is not the one that gets run, it is only used for storing
job := Job{
Type: jobType,
Config: string(config),
Start: startPtr,
Total: totalPtr,
Repeat: repeatPtr,
ParentJobID: parentJobIDPtr,
}
tx, err := tl.db.WritePool.BeginTx(tl.ctx, nil)
if err != nil {
return 0, err
}
defer tx.Rollback()
jobID, created, err := tl.storeJob(tx, job)
if err != nil {
return 0, err
}
// TODO: This log is a bit of a hack; it's to notify the UI that a new job
// has been created, so it can be shown in the UI if relevant; but the
// longer-term status logger doesn't get made until runJob... so we have
// to make a logger with the same name and many of the same fields...
Log.Named("job.status").Info("created",
zap.Object("job", Job{
RepoID: tl.ID().String(),
ID: jobID,
Type: job.Type,
Created: created,
Start: job.Start,
ParentJobID: parentJobIDPtr,
Total: totalPtr,
}))
err = tl.startJob(tl.ctx, tx, jobID)
if err != nil {
return 0, err
}
if err = tx.Commit(); err != nil {
return 0, fmt.Errorf("committing new job transaction: %w", err)
}
return jobID, nil
}
// storeJob adds the job to the database. It does not start it.
// TODO: Job configs could be compressed to save space in the DB...
func (tl *Timeline) storeJob(tx *sql.Tx, job Job) (uint64, time.Time, error) {
job.Hash = job.hash()
hostname, err := os.Hostname()
if err != nil {
Log.Error("unable to lookup hostname while adding job", zap.Error(err))
}
job.Hostname = &hostname
// first see if any job with the same hash is already queued; if so,
// no-op since this would be a duplicate
var count int
err = tx.QueryRowContext(tl.ctx,
`SELECT count() FROM jobs WHERE hash=? AND state=? LIMIT 1`,
job.Hash, JobQueued).Scan(&count)
if err != nil {
return 0, time.Time{}, fmt.Errorf("checking for duplicate job: %w", err)
}
if count > 0 {
return 0, time.Time{}, nil
}
var start *int64
if job.Start != nil {
startVal := job.Start.UnixMilli()
start = &startVal
}
var id uint64
var createdUnix int64
err = tx.QueryRowContext(tl.ctx, `
INSERT INTO jobs (type, configuration, hash, hostname, start, total, repeat, parent_job_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
RETURNING id, created`,
job.Type, job.Config, job.Hash, job.Hostname, start, job.Total, job.Repeat, job.ParentJobID).Scan(&id, &createdUnix)
if err != nil {
return 0, time.Time{}, fmt.Errorf("inserting new job row: %w", err)
}
created := time.UnixMilli(createdUnix)
return id, created, nil
}
// loadJob loads a job from the database, using tx if set. A lock MUST be obtained on the
// timeline database when calling this function!
func (tl *Timeline) loadJob(ctx context.Context, tx *sql.Tx, jobID uint64, parentAndChildJobs bool) (Job, error) {
q := `SELECT
id, type, name, configuration, hash, state, hostname,
created, updated, start, ended,
message, total, progress, checkpoint,
repeat, parent_job_id
FROM jobs
WHERE id=?`
vals := []any{jobID}
if parentAndChildJobs {
// we can get child jobs now, parent job will be had in a second query
q += "OR parent_job_id=? LIMIT 100" // limit just in case, I guess
vals = append(vals, jobID)
} else {
q += " LIMIT 1"
}
var rows *sql.Rows
var err error
if tx == nil {
rows, err = tl.db.ReadPool.QueryContext(ctx, q, vals...)
} else {
rows, err = tx.QueryContext(ctx, q, vals...)
}
if err != nil {
return Job{}, err
}
defer rows.Close()
tlID := tl.id.String()
var jobs []Job
// scan all the job rows into the same slice for now, we'll figure out
// which is the "main"/parent job later (the result set is not usually more
// than a few rows)
for rows.Next() {
job, err := scanJob(rows, tlID)
if err != nil {
return Job{}, err
}
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
return Job{}, err
}
if len(jobs) == 0 {
return Job{}, sql.ErrNoRows
}
var parentJob Job
for _, job := range jobs {
if job.ID == jobID {
parentJob = job
} else {
parentJob.Children = append(parentJob.Children, job)
}
}
// load the parent of the parent, I guess
if parentAndChildJobs && parentJob.ParentJobID != nil {
grandparent, err := tl.loadJob(ctx, tx, *parentJob.ParentJobID, false)
if err != nil {
return Job{}, fmt.Errorf("loading parent job: %w", err)
}
parentJob.Parent = &grandparent
}
return parentJob, nil
}
// startJob makes sure an identical job is not already running, then
// sets the job's state to started and syncs it with the DB, then
// calls the action function in a goroutine.
func (tl *Timeline) startJob(ctx context.Context, tx *sql.Tx, jobID uint64) error {
// if an identical job is already running, we shouldn't start yet
var count int
err := tx.QueryRowContext(tl.ctx,
`SELECT count()
FROM jobs
WHERE id!=?
AND hash=(SELECT hash FROM jobs WHERE id=? LIMIT 1)
AND (state=? OR state=?)
LIMIT 1`,
jobID, jobID, JobStarted, JobPaused).Scan(&count)
if err != nil {
return fmt.Errorf("checking for duplicate running job: %w", err)
}
if count > 0 {
return errors.New("identical job is already running")
}
// verify the job is not already loaded (using UnpauseJob is suitable in that case, so we don't
// double-load it) and then load the job to verify it is in a startable state
tl.activeJobsMu.RLock()
_, runningOrPaused := tl.activeJobs[jobID]
tl.activeJobsMu.RUnlock()
if runningOrPaused {
return fmt.Errorf("job %d is already active/loaded and cannot be loaded and started again", jobID)
}
job, err := tl.loadJob(tl.ctx, tx, jobID, false)
if err != nil {
return fmt.Errorf("loading job %d: %w", jobID, err)
}
if job.State == JobStarted || job.State == JobSucceeded {
// paused is an allowed state to start from, as long as it's not already loaded/active (checked above),
// because the process could have stopped while the job was paused
return fmt.Errorf("job %d has already %s and is not startable in this state", jobID, job.State)
}
start := func(tx *sql.Tx) error {
// update the job's state to started
now := time.Now()
_, err = tx.ExecContext(ctx, `UPDATE jobs SET state=?, start=?, updated=? WHERE id=?`, // TODO: LIMIT 1
JobStarted, now.UnixMilli(), now.UnixMilli(), jobID)
if err != nil {
return fmt.Errorf("updating job state: %w", err)
}
job.State = JobStarted
job.Start = &now
job.Updated = &now
// update the job's state as we prepare to run it
err = tx.QueryRowContext(ctx, `SELECT total, progress FROM jobs WHERE id=? LIMIT 1`, jobID).Scan(&job.Total, &job.Progress)
if err != nil {
return fmt.Errorf("unable to get starting progress and total amounts: %w", err)
}
// run the job
if err = tl.runJob(job); err != nil {
return fmt.Errorf("running job: %w", err)
}
return nil
}
if job.State == JobQueued && job.Start != nil && time.Now().Before(*job.Start) {
go func(scheduledStart time.Time) {
select {
case <-ctx.Done():
return
case <-time.After(time.Until(scheduledStart)):
logger := Log.Named("job")
tx, err := tl.db.WritePool.BeginTx(ctx, nil)
if err != nil {
logger.Error("could not start transaction to start job", zap.Error(err))
return
}
defer tx.Rollback()
// ensure job was not aborted while we were waiting for it to start
var unabortedJobCount int
err = tx.QueryRowContext(ctx, `SELECT count() FROM jobs WHERE id=? AND state!=? LIMIT 1`, jobID, JobAborted).Scan(&unabortedJobCount)
if errors.Is(err, sql.ErrNoRows) || unabortedJobCount == 0 {
logger.Error("job was aborted before it started")
return
}
if err = start(tx); err != nil {
logger.Error("could not start scheduled job", zap.Error(err))
return
}
if err = tx.Commit(); err != nil {
logger.Error("could not commit transaction for scheduled job", zap.Error(err))
return
}
}
}(*job.Start)
} else {
return start(tx)
}
return nil
}
// runJob deserializes the job's and starts a goroutine and
// calls its associated action function. The goroutine then finalizes
// the job and runs the next queued job, if any. This method does
// not sync the job's starting state to the DB, so call startJob()
// instead. It is expected that its state is running/started.
func (tl *Timeline) runJob(row Job) error {
// create the job action by deserializing the config into its assocated struct
var action JobAction
switch row.Type {
case JobTypeImport:
var importJob *ImportJob
if err := json.Unmarshal([]byte(row.Config), &importJob); err != nil {
return fmt.Errorf("unmarshaling import job config: %w", err)
}
action = importJob
case JobTypeThumbnails:
var thumbnailJob thumbnailJob
if err := json.Unmarshal([]byte(row.Config), &thumbnailJob); err != nil {
return fmt.Errorf("unmarshaling thumbnail job config: %w", err)
}
action = thumbnailJob
case JobTypeEmbeddings:
var embeddingJob embeddingJob
if err := json.Unmarshal([]byte(row.Config), &embeddingJob); err != nil {
return fmt.Errorf("unmarshaling embedding job config: %w", err)
}
action = embeddingJob
default:
return fmt.Errorf("unknown job type '%s'", row.Type)
}
baseLogger := Log.Named("job")
// keep a pointer outside the ActiveJob struct because after
// the job is done, we will update the logger, and it's not
// thread-safe to change that field in the struct
statusLog := baseLogger.Named("status")
ctx, cancel := context.WithCancel(tl.ctx)
job := &ActiveJob{
ctx: ctx,
cancel: cancel,
pause: make(chan chan struct{}),
done: make(chan struct{}),
id: row.ID,
tl: tl,
logger: baseLogger.Named("action").With(zap.Object("job", row)),
statusLog: statusLog,
parentJobID: row.ParentJobID,
action: action,
currentState: row.State,
currentProgress: row.Progress,
currentTotal: row.Total,
currentMessage: row.Message,
started: *row.Start,
jobType: row.Type,
}
// run the job asynchronously -- never block the calling goroutine!
go func(job *ActiveJob, logger, statusLog *zap.Logger, row Job, action JobAction) {
// don't allow a job, which is doing who-knows-what and processing who-knows-what
// input, to bring down the program
defer func() {
if r := recover(); r != nil {
logger.Error("panic",
zap.Any("error", r),
zap.String("stack", string(debug.Stack())))
}
// apparently it's best-practice to checkpoint at the end of large write operations:
// https://github.com/mattn/go-sqlite3/issues/1022#issuecomment-1071755879
// "One thing I will suggest is running pragma wal_checkpoint after committing the
// insert transaction, as that will merge the WAL file back into the main database file,
// which will make subsequent reads faster." -rittneje
// and since the default mode is PASSIVE, this page recommends using FULL or TRUNCATE
// but I haven't personally seen a need for those heavier operations yet:
// https://phiresky.github.io/blog/2020/sqlite-performance-tuning/#regarding-wal-mode
if _, err := tl.db.WritePool.ExecContext(ctx, "PRAGMA wal_checkpoint"); err != nil {
logger.Error("could not create WAL checkpoint", zap.Error(err))
}
}()
// when we're done here, clean up our map of active jobs
defer func() {
tl.activeJobsMu.Lock()
delete(tl.activeJobs, job.id)
tl.activeJobsMu.Unlock()
}()
statusLog.Info("running",
zap.Object("job", row),
zap.String("state", string(row.State)))
// signal to any waiters when this job action returns; we intentionally
// don't signal until after we've synced the job state to the DB
defer close(job.done)
// TODO: we should have a way of letting jobs report errors without
// having to terminate, but still marking it as failed when done,
// or at least record the errors even if it gets marked as "succeeded"
// (maybe "succeeded" is too specific/optimistic a term, maybe "completed" or "done" or "finished" instead?)
// type conversion for checkpoint; it's easier for actions to use []byte, but we like using *string
// for database since most DB viewers make it easier to read TEXT columns than BLOB columns
var chkpt []byte
if row.Checkpoint != nil && len(*row.Checkpoint) > 0 {
chkpt = []byte(*row.Checkpoint)
}
// run the job; we'll handle the error by logging the result and updating the state
actionErr := action.Run(job, chkpt)
var newState JobState
switch {
case actionErr == nil:
newState = JobSucceeded
case errors.Is(actionErr, context.Canceled):
newState = JobAborted
default:
newState = JobFailed
}
// add info to the logger and log the result
end := time.Now()
row.Ended = &end
if row.Start != nil {
statusLog = statusLog.With(zap.Duration("duration", end.Sub(*row.Start)))
}
job.mu.Lock()
job.currentState = newState
job.ended = end
job.flushProgress(statusLog) // allow the UI to update live job progress display one last time
job.mu.Unlock()
// print a final message indicating the state and error, if any (TODO: could do above on that last flushProgress(), I guess)
switch newState {
case JobSucceeded:
statusLog.Info(string(newState), zap.Object("job", row))
case JobAborted:
statusLog.Warn(string(newState), zap.Object("job", row), zap.Error(actionErr))
case JobFailed:
statusLog.Error(string(newState), zap.Object("job", row), zap.Error(actionErr))
}
// sync to the DB, and dequeue the next job
tx, err := tl.db.WritePool.BeginTx(tl.ctx, nil) // don't use the job's context, it might have been canceled!
if err != nil {
logger.Error("beginning transaction for ended job", zap.Error(err))
return
}
defer tx.Rollback()
_, err = tx.ExecContext(tl.ctx,
`UPDATE jobs SET state=?, ended=?, updated=? WHERE id=?`, // TODO: LIMIT 1 (see https://github.com/mattn/go-sqlite3/pull/802)
newState, end.UnixMilli(), end.UnixMilli(), job.id)
if err != nil {
logger.Error("updating job state", zap.Error(err))
}
if newState == JobSucceeded {
// clear message and checkpoint in the DB if job succeeds
job.Message("")
if err = job.sync(tx, nil); err != nil {
logger.Error("clearing job checkpoint and message from successful job", zap.Error(err))
}
} else {
// for any other termination, sync only the job state to the DB
job.mu.Lock()
checkpoint := job.currentCheckpoint
job.mu.Unlock()
err := job.sync(tx, checkpoint)
if err != nil {
logger.Error("syncing job state with DB", zap.Error(err))
}
}
hostname, err := os.Hostname()
if err != nil {
Log.Error("unable to lookup hostname while dequeuing next job", zap.Error(err))
}
// see if there's another job queued up we should run
// (only run import jobs on the same machine they were configured from, since it uses external file paths)
var nextJobID uint64
err = tx.QueryRowContext(tl.ctx,
`SELECT id
FROM jobs
WHERE (state=? OR state=?) AND (hostname=? OR type!=?)
ORDER BY start, created
LIMIT 1`,
JobQueued, JobInterrupted, hostname, JobTypeImport).Scan(&nextJobID)
if nextJobID > 0 {
err := tl.startJob(tl.ctx, tx, nextJobID)
if err != nil {
logger.Error("starting next job", zap.Error(err))
}
}
if err != nil && !errors.Is(err, sql.ErrNoRows) {
logger.Error("querying for next job", zap.Error(err))
}
if err = tx.Commit(); err != nil {
logger.Error("committing post-job transaction", zap.Error(err))
}
}(job, baseLogger, statusLog, row, action)
tl.activeJobsMu.Lock()
tl.activeJobs[job.id] = job
tl.activeJobsMu.Unlock()
return nil
}
// ActiveJob represents a job that is being actively loaded and run.
// It has methods which can manage the state of the job.
// It has a context which should be honored by implementations
// of JobAction. A ActiveJob value must not be copied. It maintains
// current progress/etc. that should be as authoritative as
// what is in the DB, although this struct is updated more
// frequently than the DB is synced.
type ActiveJob struct {
ctx context.Context
cancel context.CancelFunc
pause chan chan struct{} // send to the outer channel to pause, receive on the inner channel to unpause
done chan struct{} // signaling channel that is closed when the job action returns
id uint64
tl *Timeline
logger *zap.Logger
statusLog *zap.Logger
parentJobID *uint64
action JobAction
// these fields are needed mainly for accurate real-time logging purposes
jobType JobType
started time.Time
ended time.Time
mu sync.Mutex
// protected by mu
paused chan struct{} // set by the job manager when pausing; closing this unpauses the job
currentState JobState
currentProgress *int
currentTotal *int
currentMessage *string // nil => unchanged, "" => clear
currentCheckpoint any
lastCheckpoint *time.Time // when last checkpoint was created (may be more recent than last DB sync, which is throttled)
lastSync time.Time // last DB update
lastFlush time.Time // last frontend update
}
// MarshalLogObject satisfies the zapcore.ObjectMarshaler interface.
func (j *ActiveJob) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("repo_id", j.tl.id.String())
enc.AddUint64("id", j.id)
enc.AddString("type", string(j.jobType))
enc.AddString("state", string(j.currentState))
enc.AddTime("start", j.started)
if !j.ended.IsZero() {
enc.AddTime("ended", j.ended)
}
if j.parentJobID != nil {
enc.AddUint64("parent_job_id", *j.parentJobID)
}
if j.lastCheckpoint != nil {
enc.AddTime("checkpointed", *j.lastCheckpoint)
}
if j.currentMessage != nil {
enc.AddString("message", *j.currentMessage)
}
if j.currentProgress != nil {
enc.AddInt("progress", *j.currentProgress)
}
if j.currentTotal != nil {
enc.AddInt("total", *j.currentTotal)
}
return nil
}
// Context returns the context the job is being run in. It should
// (usually) be the parent context of all others in the job.
func (j *ActiveJob) Context() context.Context { return j.ctx }
// ID returns the database row ID of the job.
func (j *ActiveJob) ID() uint64 { return j.id }
// Timeline returns the timeline associated with the job.
func (j *ActiveJob) Timeline() *Timeline { return j.tl }
// Logger returns a logger for the job.
func (j *ActiveJob) Logger() *zap.Logger { return j.logger }
// Continue should be called by all job actions frequently, typically at
// the beginning of their main loop (and any longer-running inner loops).
// It blocks if the job is paused, until it is unpaused. It returns an
// error if the job context has been canceled; i.e. the job is being
// canceled and should terminate. It does not block if the job is not
// paused or canceled.
func (j *ActiveJob) Continue() error {
select {
case <-j.ctx.Done():
return j.ctx.Err()
case unpause := <-j.pause:
// block until either cancelled or unpaused
select {
case <-j.ctx.Done():
return j.ctx.Err()
case <-unpause:
}
default:
}
return nil
}
// Set the total size of the job in the DB. Ideally, this should only
// be set once the total size is properly known, not accumulated as
// you go, since progress bars rely on this to configure their display.
// So for example, if you do a bunch of work up front to estimate the
// size before starting the real work, don't call SetTotal() until
// that calculation has finished (use checkpoints to preserve your
// accumulated count). But if the total happens to change a little as
// you go, it's okay to call this a few times if needed. But the
// progress bar will show up as "indeterminate" as long as the total
// in the DB is nil (or 0?), so if the real work hasn't quite started
// yet, you don't want to disable that kind of display until you
// have started the work.
func (j *ActiveJob) SetTotal(total int) {
j.mu.Lock()
j.currentTotal = &total
j.flushProgress(nil)
j.mu.Unlock()
}
// FlushProgress forces a log to be written that updates the UI
// about the job.
func (j *ActiveJob) FlushProgress() {
j.mu.Lock()
j.flushProgress(j.statusLog)
j.mu.Unlock()
}
// Progress updates the progress of the job in the DB by adding delta,
// which is work completed since the previous update, towards the expected
// total. This will update progress bars in the frontend UI. If the
// current progress is nil, it will be interpreted as 0 and delta will
// be the new progress value.
func (j *ActiveJob) Progress(delta int) {
j.mu.Lock()
newProgress := delta
if j.currentProgress != nil {
newProgress = *j.currentProgress + delta
}
j.currentProgress = &newProgress
j.flushProgress(nil)
j.mu.Unlock()
}
// flushProgress writes a log that the UI uses to update the progress of the job
// if it has been enough time since the last flush (we don't need to update the
// UI 1000x/sec). To force a flush regardless, pass in a logger, even if it's
// the default j.statusLog logger. This lets you add fields to the output when
// you have a reason to force a log emission.
// MUST BE CALLED IN A LOCK ON THE JOB MUTEX.
func (j *ActiveJob) flushProgress(logger *zap.Logger) {
if logger != nil || time.Since(j.lastFlush) > jobFlushInterval {
if logger == nil {
logger = j.statusLog
}
logger.Info("progress", zap.Object("job", j))
j.lastFlush = time.Now()
_ = logger.Sync() // ensure it gets written promptly
}
}
// Message updates the current job message or status to show the user.
func (j *ActiveJob) Message(message string) {
j.mu.Lock()
if message == "" {
j.currentMessage = nil
} else {
j.currentMessage = &message
}
j.flushProgress(nil)
j.mu.Unlock()
}
// Checkpoint creates a checkpoint and syncs the job state with the database.
// When creating a checkpoint, the total, progress, and message data should
// be current such that resuming the job from this checkpoint would have the
// correct total, progress, and message displayed. Usually, a checkpoint is
// where to begin/resume the job, since it's often the starting index in a
// loop. For example, if a job started at unit of work ("task"?) index 0 and
// finished up through 3, the progress value would have been updated to be 4
// and the checkpoint would contain index 4. The job would then resume with
// starting at index 4, which had not been started yet.
//
// Checkpoint values are opaque and MUST be JSON-marshallable. They will be
// returned to the job action in the form of JSON bytes.
func (j *ActiveJob) Checkpoint(newCheckpoint any) error {
return j.sync(nil, newCheckpoint)
// TODO: emit log for the frontend to update the UI?
}
// sync writes changes/updates to the DB. In case of rapid job progression, we avoid
// writing to the DB with every call; only if enough time has passed, or if the job
// is done. If tx is non-nil, it is assumed the job is done (but if that assumption
// no longer holds true, we can change the signature of this method). In other words,
// passing in a tx forces a flush/sync to the DB.
//
// This is how a checkpoint is stored as well, since it doesn't make sense to only
// have a checkpoint in memory (pausing a job does not use a checkpoint since it
// remains in memory) -- only resuming a job after it has been removed from memory
// (usually by program exit or intentional abort, etc.) needs a checkpoint, so
// creating a checkpoint and syncing to the DB are the same operation. By the same
// token, the progress, message, and total fields in the DB row must also correspond
// with the checkpoint, otherwise resuming a job will be out of whack. So it is
// important that syncing to the DB only happens with a checkpoint to go with it
// (if there is a checkpoint at all). Passing a nil checkpoint here will clear it.
func (j *ActiveJob) sync(tx *sql.Tx, checkpoint any) error {
j.mu.Lock()
defer j.mu.Unlock()
j.currentCheckpoint = checkpoint
// no-op if last sync was too recent and the job isn't done yet (job done <==> tx!=nil)
if time.Since(j.lastSync) < jobSyncInterval && tx == nil {
return nil
}
var chkpt []byte
if checkpoint != nil {
var err error
chkpt, err = json.Marshal(checkpoint)
if err != nil {
return fmt.Errorf("JSON-encoding checkpoint %#v: %w", checkpoint, err)
}
now := time.Now()
j.lastCheckpoint = &now
}
q := `UPDATE jobs SET progress=?, total=?, message=?, checkpoint=?, updated=? WHERE id=?` // TODO: LIMIT 1
vals := []any{j.currentProgress, j.currentTotal, j.currentMessage, string(chkpt), time.Now().UnixMilli(), j.id}
var err error
if tx == nil {
_, err = j.tl.db.WritePool.ExecContext(j.tl.ctx, q, vals...)
} else {
_, err = tx.ExecContext(j.tl.ctx, q, vals...)
}
if err != nil {
return fmt.Errorf("syncing progress with DB: %w", err)
}
j.lastSync = time.Now()
return nil
}
// GetJobs loads the jobs with the specified IDs, or by the most recent jobs, whichever is set.
// Both technically can be set, but why?
func (tl *Timeline) GetJobs(ctx context.Context, jobIDs []uint64, mostRecent int) ([]Job, error) {
var jobs []Job
// load most recent jobs
if mostRecent > 0 {
q := `SELECT
id, type, name, configuration, hash, state, hostname,
created, updated, start, ended,
message, total, progress, checkpoint,
repeat, parent_job_id
FROM jobs
ORDER BY created DESC
LIMIT ?`
var rows *sql.Rows
var err error
rows, err = tl.db.ReadPool.QueryContext(ctx, q, mostRecent)
if err != nil {
return nil, err
}
defer rows.Close()
tlID := tl.id.String()
for rows.Next() {
job, err := scanJob(rows, tlID)
if err != nil {
return nil, err
}
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
return nil, err
}
if len(jobs) == 0 {
return nil, sql.ErrNoRows
}
}
// load specific jobs by ID
for _, id := range jobIDs {
job, err := tl.loadJob(ctx, nil, id, true)
if err != nil {
return nil, fmt.Errorf("loading job %d: %w", id, err)
}
jobs = append(jobs, job)
}
for i := range jobs {
// if job is running, we can provide more recent information than what
// was last synced to the DB, which may be out-of-date at the moment
if jobs[i].State == JobStarted {
tl.activeJobsMu.RLock()
activeJob, ok := tl.activeJobs[jobs[i].ID]
tl.activeJobsMu.RUnlock()
if ok {
// we don't use a RWMutex because this mutex is write-heavy
activeJob.mu.Lock()
jobs[i].Progress = activeJob.currentProgress
jobs[i].Message = activeJob.currentMessage
jobs[i].Total = activeJob.currentTotal
activeJob.mu.Unlock()
}
}
}
return jobs, nil
}
func (tl *Timeline) CancelJob(ctx context.Context, jobID uint64) error {
tl.activeJobsMu.Lock()
activeJob, ok := tl.activeJobs[jobID]
tl.activeJobsMu.Unlock()
if ok {
// job is actively running -- fun! we get to cancel it,
// and our job is actually easier this way
activeJob.cancel()
// wait for job action to return; this ensures its
// state has been synced to the DB and it has been
// removed from the map of active jobs
select {
case <-ctx.Done():
return ctx.Err()
case <-activeJob.done:
}
return nil
}
// the job is not actively running; it could have been paused
// from a prior execution of the program, for example; or maybe
// it is still queued and they just want to prevent it from
// running; etc, let's check the DB
tx, err := tl.db.WritePool.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
job, err := tl.loadJob(ctx, tx, jobID, false)
if err != nil {
return fmt.Errorf("loading job %d from database: %w", jobID, err)
}
switch job.State {
case JobQueued, JobStarted, JobPaused, JobInterrupted:
// Started and Interrupted should probably not be encountered since
// running jobs should be active (taken care of above by canceling
// its context), and we should resume interrupted jobs at startup,
// but might as well allow them here since it's no matter
job.State = JobAborted
_, err = tl.db.WritePool.ExecContext(ctx, `UPDATE jobs SET state=? WHERE id=?`, job.State, jobID) // TODO: LIMIT 1
if err != nil {
return fmt.Errorf("updating job state: %w", err)
}
default:
return fmt.Errorf("job %d is in state %s, which cannot be canceled", jobID, job.State)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing transaction: %w", err)
}
// update any UI elements that may be showing info about this inactive job
statusLog := Log.Named("job.status")
inactiveJob := &ActiveJob{
id: job.ID,
statusLog: statusLog,
parentJobID: job.ParentJobID,
currentState: job.State,
currentProgress: job.Progress,
currentTotal: job.Total,
currentMessage: job.Message,
lastCheckpoint: job.Updated,
}
inactiveJob.flushProgress(statusLog)
return nil
}
func (tl *Timeline) PauseJob(ctx context.Context, jobID uint64) error {
tl.activeJobsMu.Lock()
job, ok := tl.activeJobs[jobID]
tl.activeJobsMu.Unlock()
if !ok {
return fmt.Errorf("job %d is either inactive or does not exist", jobID)
}
// create the channel that the job will block on; that is the pausing
// (closing this channel will unpause the job)
paused := make(chan struct{})
// What happens next is done deliberately in this order:
// 1. Update the job's state in the DB. This is authoritative, so if the program
// terminates before the job pauses, at least the user's intent will be preserved.
// 2. Signal the job to pause. The UI is currently showing temporary "Pausing..."
// text, and it can take time for the job to receive our signal and pause,
// for example if it's in the middle of a batch of thumbnails being generated;
// while it's finishing those, it's emitting job status updates and the UI will
// update accordingly, so if we set the state to "paused" before the job has
// actually paused, it looks dumb because it'll be streaming progress while paused.
// 3. Update the UI's state to "paused" so that controls appear to allow resumption.
// step 1. update the DB with the authoritative job state
_, err := tl.db.WritePool.ExecContext(ctx, `UPDATE jobs SET state=? WHERE state=? AND id=?`, JobPaused, JobStarted, job.id) // TODO: LIMIT 1
if err != nil {
return fmt.Errorf("job paused, but error updating job state in DB: %w", err)
}
// step 2. the job should soon select on the pause channel to receive this, then
// immediately try to receive from the paused channel, which is effectively the
// pause behavior, as it blocks the job's goroutine until we unpause
job.pause <- paused
// step 3. store reference to the new channel so it can be closed (to unpause) later,
// and update the UI with the job's paused state
job.mu.Lock()
if job.paused != nil {
// already paused (not an error, just a no-op)
job.mu.Unlock()
return nil
}
job.paused = paused
job.currentState = JobPaused
job.flushProgress(job.statusLog) // note that the UI might get the state update before this function, and thus its HTTP request, returns -- TODO: not true if this change works
job.mu.Unlock()
return nil
}
func (tl *Timeline) UnpauseJob(ctx context.Context, jobID uint64) error {
tl.activeJobsMu.Lock()
job, ok := tl.activeJobs[jobID]
tl.activeJobsMu.Unlock()
if !ok {
// not active; see if it's in the DB... if so, it's the same as starting the job
var state JobState
err := tl.db.ReadPool.QueryRowContext(ctx, `SELECT state FROM jobs WHERE id=? LIMIT 1`, jobID).Scan(&state)
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("job %d not found", jobID)
}
if err != nil {
return err
}
if state != JobStarted && state != JobPaused && state != JobFailed {
return fmt.Errorf("job %d is in %s state and cannot be resumed", jobID, state)
}
return tl.StartJob(ctx, jobID, false)
}
// update the DB first because it's authoritative for the job's state
_, err := tl.db.WritePool.ExecContext(ctx, `UPDATE jobs SET state=? WHERE state=? AND id=?`, JobStarted, JobPaused, job.id) // TODO: LIMIT 1
if err != nil {
return fmt.Errorf("job resumed, but error updating job state in DB: %w", err)
}
// unpause by closing the pause channel, delete the pause channel,
// and then update the UI as to the job's new state
job.mu.Lock()
defer job.mu.Unlock()
if job.paused == nil {
// already not paused (not an error, just a no-op)
return nil
}
close(job.paused)
job.paused = nil
job.currentState = JobStarted
job.flushProgress(job.statusLog)
return nil
}
func (tl *Timeline) StartJob(ctx context.Context, jobID uint64, startOver bool) error {
tx, err := tl.db.WritePool.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
if startOver {
_, err = tx.ExecContext(ctx, `UPDATE jobs SET checkpoint=NULL, progress=NULL, message=NULL WHERE id=?`, jobID) // TODO: LIMIT 1
if err != nil {
return fmt.Errorf("clearing checkpoint, progress, and message to start over: %w", err)
}
}
if err = tl.startJob(ctx, tx, jobID); err != nil {
return err
}
return tx.Commit()
}
func scanJob(rows *sql.Rows, repoID string) (Job, error) {
var job Job
var created, updated, start, end *int64
err := rows.Scan(
&job.ID, &job.Type, &job.Name, &job.Config, &job.Hash, &job.State, &job.Hostname,
&created, &updated, &start, &end,
&job.Message, &job.Total, &job.Progress, &job.Checkpoint,
&job.Repeat, &job.ParentJobID)
if err != nil {
return job, fmt.Errorf("scanning job fields: %w", err)
}
if created != nil {
job.Created = time.UnixMilli(*created)
}
if updated != nil {
ts := time.UnixMilli(*updated)
job.Updated = &ts
}
if start != nil {
ts := time.UnixMilli(*start)
job.Start = &ts
}
if end != nil {
ts := time.UnixMilli(*end)
job.Ended = &ts
}
job.RepoID = repoID
return job, nil
}
// Job is only to be used for shuttling job data in and out of the DB,
// it should not be assumed to accurately reflect the current state of the
// job. Mainly used for inserting a job row and getting a job row for the
// frontend.
type Job struct {
// not a field in the DB, but useful for bookkeeping where multiple timelines are open
RepoID string `json:"repo_id"`
ID uint64 `json:"id"`
Type JobType `json:"type"`
Name *string `json:"name,omitempty"`
Config string `json:"config,omitempty"` // JSON encoding of, for instance, ImportParameters, etc.
Hash []byte `json:"hash,omitempty"`
State JobState `json:"state"`
Hostname *string `json:"hostname,omitempty"`
Created time.Time `json:"created,omitempty"`
Updated *time.Time `json:"updated,omitempty"`
Start *time.Time `json:"start,omitempty"` // could be future, so not "started"
Ended *time.Time `json:"ended,omitempty"`
Message *string `json:"message,omitempty"`
Total *int `json:"total,omitempty"`
Progress *int `json:"progress,omitempty"`
Checkpoint *string `json:"checkpoint,omitempty"`
Repeat *time.Duration `json:"repeat,omitempty"`
ParentJobID *uint64 `json:"parent_job_id,omitempty"`
// only used when loading jobs from the DB for the frontend
Parent *Job `json:"parent,omitempty"`
Children []Job `json:"children,omitempty"`
}
// MarshalLogObject satisfies the zapcore.ObjectMarshaler interface.
func (j Job) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("repo_id", j.RepoID)
enc.AddUint64("id", j.ID)
if j.ParentJobID != nil {
enc.AddUint64("parent_job_id", *j.ParentJobID)
}
enc.AddString("type", string(j.Type))
enc.AddTime("created", j.Created)
if j.Start != nil {
enc.AddTime("start", *j.Start)
}
if j.Ended != nil {
enc.AddTime("ended", *j.Ended)
}
if j.Name != nil {
enc.AddString("name", *j.Name)
}
return nil
}
func (j Job) hash() []byte {
h := blake3.New()
_, _ = h.WriteString(string(j.Type))
_, _ = h.WriteString(j.Config)
if j.Hostname != nil {
_, _ = h.WriteString(*j.Hostname)
}
return h.Sum(nil)
}
// JobAction is a type that does actual work of a job.
// It must be JSON-encodable.
type JobAction interface {
// Run runs or resumes a job. It should utilize the
// checkpoint, if set, to resume an interrupted job.
// CPU-intensive work should use the throttle. If
// possible, the job status should be updated as it
// progresses. The job has a context which should
// be honored. The function should not return until
// the job is done (whether successful or not); this
// means waiting until any spawned goroutines have
// finished (using sync.WaitGroup is good parctice).
Run(job *ActiveJob, checkpoint []byte) error
}
type JobType string
const (
JobTypeImport JobType = "import"
JobTypeThumbnails JobType = "thumbnails"
JobTypeEmbeddings JobType = "embeddings"
)
type JobState string
const (
JobQueued JobState = "queued" // on deck
JobStarted JobState = "started" // currently running
JobPaused JobState = "paused" // intentional suspension (don't auto-resume) (still considered "active" since its action remains blocked in memory)
JobAborted JobState = "aborted" // intentional termination (don't auto-resume) (job unloaded from memory)
JobInterrupted JobState = "interrupted" // unintentional, forced interruption (may be auto-resumed)
JobSucceeded JobState = "succeeded" // final (cannot be restarted) // TODO: what if units of work fail, though? maybe this should be "done" or "completed"
JobFailed JobState = "failed" // final (can be manually restarted)
)
const (
jobSyncInterval = 2 * time.Second // how often to update the DB and save checkpoints
jobFlushInterval = 100 * time.Millisecond // how often to update the frontend
)