1
0
Fork 0
timelinize/timeline/processor.go
Matthew Holt 4bd1ae8856
Optionally generate thumbnails during import
This does away with the experimental generation of thumbhashes during import. It's easier to generate the thumbnails and thumbhashes at the same time.

Does add a DB lock to phase1, but at this point the DB isn't the bottleneck in that phase.
2025-09-18 17:37:53 -06:00

191 lines
7.2 KiB
Go

/*
Timelinize
Copyright (c) 2013 Matthew Holt
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package timeline
import (
"context"
"encoding/json"
"runtime/debug"
"sync/atomic"
"github.com/ringsaturn/tzf"
"go.uber.org/zap"
)
type processor struct {
// these three fields are used for checkpointing; the first is
// accessed atomically and 64-bit aligned; also used for estimating
// total size before running
estimatedCount *int64
outerLoopIdx, innerLoopIdx int
ij *ImportJob
ds DataSource
dsRowID uint64
dsOpt any
tl *Timeline
log *zap.Logger
// batching inserts can greatly increase speed
batch []*Graph
batchSize int // size is at least len(batch) but edges on a graph can add to it
// counter used for periodic DB optimization during an import
// TODO: this should ideally be global per timeline, even if multiple jobs run simultaneously
rootGraphCount int
// used for guessing time zones based on geo coordinates
tzFinder tzf.F
}
func (p processor) process(ctx context.Context, dirEntry DirEntry, dsCheckpoint json.RawMessage) error {
// don't allow the importer, which is processing who-knows-what input, to completely bring down the app
defer func() {
if r := recover(); r != nil {
p.log.Error("panic",
zap.Any("error", r),
zap.String("stack", string(debug.Stack())))
}
}()
// add timeline owner to the context, since that is useful if data sources need an
// item owner that is otherwise unknown
owner, err := p.tl.LoadEntity(ownerEntityID)
if err == nil {
ctx = context.WithValue(ctx, RepoOwnerCtxKey, owner)
}
params := ImportParams{
Log: p.log.With(zap.String("filename", dirEntry.FullPath())),
Continue: p.ij.job.Continue,
Timeframe: p.ij.ProcessingOptions.Timeframe,
Checkpoint: dsCheckpoint,
DataSourceOptions: p.dsOpt,
}
// if enabled, estimate the units of work to be completed before starting the actual import
if p.estimatedCount != nil {
fileImporter := p.ds.NewFileImporter()
if estimator, ok := fileImporter.(SizeEstimator); ok {
// faster (but probably still slow) path: data source supports optimized size estimation
totalSize, err := estimator.EstimateSize(ctx, dirEntry, params)
if err != nil {
params.Log.Error("could not estimate import size", zap.Error(err))
}
// don't call SetTotal() yet -- wait until we're done counting,
// so progress bars don't think we are done with the estimate
atomic.AddInt64(p.estimatedCount, int64(totalSize))
} else {
done := make(chan struct{})
wg, graphs := p.beginProcessing(ctx, p.ij.ProcessingOptions, true, done)
params.Pipeline = graphs
// slow path: data source does not have an optimized estimator implementation, but we can count
// the graph sizes that come in to do our own estimates
err := fileImporter.FileImport(ctx, dirEntry, params)
if err != nil {
params.Log.Error("failed estimating size before import", zap.Error(err))
}
// sending on the pipeline must be complete by now; signal to workers to exit
close(done)
// wait for all processing workers to complete so we have an accurate count
wg.Wait()
}
return nil
}
done := make(chan struct{})
wg, graphs := p.beginProcessing(ctx, p.ij.ProcessingOptions, false, done)
params.Pipeline = graphs
// even if we estimated size above, use a fresh file importer to avoid any potentially reused state (TODO: necessary?)
err = p.ds.NewFileImporter().FileImport(ctx, dirEntry, params)
// handle error in a little bit (see below)
// sending on the pipeline must be complete by now; signal to workers to exit
close(done)
params.Log.Info("importer done sending items; waiting for processing to finish", zap.Error(err))
// wait for all processing workers to complete
wg.Wait()
return err
}
// ProcessingOptions configures how item processing is carried out.
type ProcessingOptions struct {
// Whether to perform integrity checks
Integrity bool `json:"integrity,omitempty"`
// Constrain processed items to within a timeframe
Timeframe Timeframe `json:"timeframe,omitempty"`
// If true, items with manual modifications may be updated, overwriting local changes.
OverwriteLocalChanges bool `json:"overwrite_local_changes,omitempty"`
// Names of columns in the items table to check for sameness when loading an item
// that doesn't have data_source+original_id. The field/column is the same if the
// values are identical or if one of the values is NULL. If the map value is true,
// however, strict NULL comparison is applied, where NULL=NULL only. In other words,
// with a false value, it is as if the field is not in the unique constraints at
// all when applied to a field that is null.
ItemUniqueConstraints map[string]bool `json:"item_unique_constraints,omitempty"`
// How to update existing items, specified per-field.
// If not set, items that already exist will simply be skipped.
ItemUpdatePreferences []FieldUpdatePreference `json:"item_update_preferences,omitempty"`
// TODO: WIP (Should this be in importjob or processingoptions?)
Interactive *InteractiveImport `json:"interactive,omitempty"`
// How many items to process in one database transaction. This is a minimum value,
// not a maximum, due to the recursive nature of graphs. (Hopefully data sources
// do not build out graphs that are too big for available memory. Most graphs
// just have 1-2 things on it). This setting is sensitive. Smaller batches can
// reduce performance, slowing down imports. Larger batches can be faster, allowing
// for more throughput especially when data files are large, but reduces the
// granularity of the live progress updates, and the performance benefit gets
// smaller as the database gets larger and the items get smaller.
BatchSize int `json:"batch_size,omitempty"`
// When enabled, items that are received for processing which have coordinates and
// a timestamp but lack a time zone may have the time zone augmented by doing a
// geopoly lookup. The data set uses more memory but can help make timestamps more
// correct/complete. Since Go does not actually support time.Time values without a
// time zone, by "lack a time zone" we mean the location is time.Local (i.e. "wall
// time"), which essentially means, "time zone unknown".
InferTimeZone bool `json:"infer_time_zone,omitempty"`
// Generate thumbnails of images and videos as they are processed, rather than waiting
// until the thumbnailing job after the import finishes. This uses more memory and
// slows down the import job during media items.
Thumbnails bool `json:"thumbnails,omitempty"`
}
type ctxKey string
var RepoOwnerCtxKey ctxKey = "repo_owner"
const ownerEntityID = 1