1
0
Fork 0
timelinize/timeline/pipeline.go
2025-10-24 00:00:43 -06:00

1079 lines
42 KiB
Go

/*
Timelinize
Copyright (c) 2013 Matthew Holt
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package timeline
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"hash"
"io"
"io/fs"
"maps"
"os"
"path"
"strings"
"sync"
"sync/atomic"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func (p *processor) pipeline(ctx context.Context, batch []*Graph) error {
// During large imports, I've found that running ANALYZE every so often
// can be helpful for improving performance, since an import is much more
// than just an INSERT, there's lots of SELECTs along the way that use
// indexes. For example, in my tests importing about a quarter million
// messages (relation-heavy, since they are sent to an attribute), which
// I repeated twice, it would take 30 minutes to import without ANALYZE.
// But when running ANALYZE every so often, it only took 23 minutes.
// (This was before the DB indexes in the import process were optimized.)
// We optimize more frequently at the beginning of large imports, and
// less often thereafter. We actually just run PRAGMA optimize to let
// sqlite decide if ANALYZE is needed.
const optimizeFrequencyThreshold = 10000
p.rootGraphCount += len(batch)
if (p.rootGraphCount <= optimizeFrequencyThreshold && p.rootGraphCount%3500 < len(batch)) ||
(p.rootGraphCount > optimizeFrequencyThreshold && p.rootGraphCount%100000 < len(batch)) {
p.tl.optimizeDB(p.log.Named("optimizer"))
}
// sanitization / normalization phase
if err := p.phase0(ctx, batch); err != nil {
return err
}
// download item datas so we know what we're dealing with
if err := p.phase1(ctx, batch); err != nil {
return err
}
// now that we have all the data, and it's sanitized, add it to the timeline
if err := p.phase2(ctx, batch); err != nil {
return err
}
return nil
}
// phase0 sanitizes graphs in the batch, and also marks items for skipping if they
// cannot be sanitized in a way that makes them processable. It also enhances the
// data if possible, for example adding time zone information, if enabled.
func (p *processor) phase0(ctx context.Context, batch []*Graph) error {
// quick input sanitization: timestamps outside a certain range are
// invalid and obviously wrong (cannot be serialized to JSON), and
// clean up any metadata as well
for _, g := range batch {
if err := ctx.Err(); err != nil {
return err
}
if err := p.sanitizeAndEnhance(g); err != nil {
p.log.Error("sanitizing batch", zap.Error(err))
}
}
return nil
}
func (p *processor) sanitizeAndEnhance(g *Graph) error {
if g == nil {
return nil
}
if g.Item != nil {
// TODO: Also ensure Timestamp, Timespan, and Timeframe, and all
// other time values are in the same zone as Timestamp. (If not,
// change them to that zone?)
// resolve reports of bad location data (#144) -- extremely unlikely a to actually be at (0, 0)
if g.Item.Location.Latitude != nil && *g.Item.Location.Latitude == 0 {
g.Item.Location.Latitude = nil
}
if g.Item.Location.Longitude != nil && *g.Item.Location.Longitude == 0 {
g.Item.Location.Longitude = nil
}
// yeah, right; no way 1-1-4193 is an intentional timestamp (as of 2025) (see #145)
const maxYearsFuture = 500
if g.Item.Timestamp.Year() >= time.Now().Year()+maxYearsFuture && g.Item.Timestamp.Month() == time.January && g.Item.Timestamp.Day() == 1 {
g.Item.Timestamp, g.Item.Timespan, g.Item.Timeframe = time.Time{}, time.Time{}, time.Time{}
p.log.Warn("sanitized timestamp", zap.Time("timestamp", g.Item.Timestamp))
}
// before we know whether we need to skip an item, we may need to fill
// out its time zone so our calculations are accurate
// augment timestamp with time zone if enabled, and time zone is missing
// TODO: somehow indicate that we've filled it in so it can be stored in the time_offset_origin column
if p.ij.ProcessingOptions.InferTimeZone {
timeZone := g.Item.Timestamp.Location()
if !g.Item.Timestamp.IsZero() &&
(timeZone == time.Local || timeZone == time.UTC) &&
g.Item.Location.Latitude != nil && g.Item.Location.Longitude != nil {
start := time.Now()
tzName := p.tzFinder.GetTimezoneName(*g.Item.Location.Longitude, *g.Item.Location.Latitude)
loc, err := time.LoadLocation(tzName)
duration := time.Since(start)
if err == nil {
if timeZone == time.Local {
// the time value is "wall time", or the time local to whenever it was recorded;
// we need to preserve the exact components of the wall time, while setting its
// time zone to what should be local based on the coordinates; in Go, this requires
// creating a new time.Time since we're actually changing the absolute moment in time
g.Item.Timestamp = time.Date(
g.Item.Timestamp.Year(),
g.Item.Timestamp.Month(),
g.Item.Timestamp.Day(),
g.Item.Timestamp.Hour(),
g.Item.Timestamp.Minute(),
g.Item.Timestamp.Second(),
g.Item.Timestamp.Nanosecond(),
loc,
)
} else if timeZone == time.UTC {
// the time is already an absolute moment in time, so we interpret that to be
// correct, we just need to assign a time zone for display/serialization purposes
g.Item.Timestamp = g.Item.Timestamp.In(loc)
}
g.Item.tsOffsetOrigin = tzOriginGeoLookup // record how/why we implicitly changed or set the time zone
p.log.Debug("assigned time zone from coordinates",
zap.Float64p("latitude", g.Item.Location.Latitude),
zap.Float64p("longitude", g.Item.Location.Longitude),
zap.String("time_zone", tzName),
zap.Duration("duration", duration))
}
if err != nil {
p.log.Error("could not load inferred location based on coordinates",
zap.Float64p("latitude", g.Item.Location.Latitude),
zap.Float64p("longitude", g.Item.Location.Longitude),
zap.String("time_zone", tzName),
zap.Error(err))
}
}
}
// items outside the configured timeframe should be skipped
// (data sources should also elide these items and not even
// send them, if possible)
if p.skip(g.Item) {
return nil
}
g.Item.Metadata.Clean()
g.Item.Timestamp = validTime(g.Item.Timestamp)
g.Item.Timespan = validTime(g.Item.Timespan)
g.Item.Timeframe = validTime(g.Item.Timeframe)
}
// traverse graph nodes recursively to sanitize them
for _, edge := range g.Edges {
if err := p.sanitizeAndEnhance(edge.From); err != nil {
return err
}
if err := p.sanitizeAndEnhance(edge.To); err != nil {
return err
}
}
return nil
}
// skip returns true if the item is marked for skipping. Future processing
// phases should check an item's skipped field and no-op if it is true.
func (p *processor) skip(it *Item) bool {
// skip item if outside of timeframe (data source should do this for us, but
// ultimately we should enforce it: it just means the data source is being
// less efficient than it could be)
// TODO: also consider Timespan, Timeframe
if !it.Timestamp.IsZero() && !p.ij.ProcessingOptions.Timeframe.Contains(it.Timestamp) {
p.log.Warn("ignoring item outside of designated timeframe (data source should not send this item; it is probably being less efficient than it could be)",
zap.String("item_id", it.ID),
zap.Timep("tf_since", p.ij.ProcessingOptions.Timeframe.Since),
zap.Timep("tf_until", p.ij.ProcessingOptions.Timeframe.Until),
zap.Time("item_timestamp", it.Timestamp),
)
it.skip = true
return true
}
return false
}
// phase1 downloads the data of each item, no lock on the DB unless thumbnail
// generation is enabled, but in that case DB isn't the bottleneck.
// The whole batch is downloaded concurrently.
func (p *processor) phase1(ctx context.Context, batch []*Graph) error {
wg := new(sync.WaitGroup)
for _, g := range batch {
if g.err != nil {
continue
}
if err := p.downloadDataFilesInGraph(ctx, g, wg); err != nil {
p.log.Error("downloading data files in graph", zap.Error(err))
g.err = err
}
}
wg.Wait()
return nil
}
func (p *processor) downloadDataFilesInGraph(ctx context.Context, g *Graph, wg *sync.WaitGroup) error {
if g == nil {
return nil
}
if g.Item != nil && !g.Item.skip {
wg.Add(1)
go func() {
defer wg.Done()
if err := p.downloadItemData(ctx, g.Item); err != nil {
p.log.Error("failed downloading item's data", zap.Error(err))
}
}()
}
// traverse graph and download their data files
for _, edge := range g.Edges {
if err := p.downloadDataFilesInGraph(ctx, edge.From, wg); err != nil {
return err
}
if err := p.downloadDataFilesInGraph(ctx, edge.To, wg); err != nil {
return err
}
}
return nil
}
func (p *processor) downloadItemData(ctx context.Context, it *Item) error {
if it == nil || it.Content.Data == nil {
return nil
}
source, err := it.Content.Data(ctx)
if err != nil {
return fmt.Errorf("getting item's data stream: %w", err)
}
if source == nil {
// this is probably not intentional? warn developers so they hopefully notice
p.log.Warn("item's data func returned no reader and no error")
return nil
}
defer source.Close()
// if we don't know the content-type, try to detect it, first by sniffing
// the start of the data (more reliable), or look at the file extension if
// needed (less reliable) -- also do this if the content-type is known to
// be plaintext or markdown, which we can store in the table, since we will
// need to see if it's small enough to comfortably fit in the table
if it.Content.MediaType == "" || it.Content.isPlainTextOrMarkdown() {
// to detect it, we have to read the file into memory (or at least part
// of it); and for plaintext data, this also tells us whether the data
// is too large for us to want to put in this table or if we'll store it
// elsewhere
// NOTE: if this code ever gets moved into a separate function, make sure
// that the buffer is not returned until we're done processing the item; or
// copy it to a new buffer first, due to buffer pooling & reuse
bufPtr := sizePeekBufPool.Get().(*[]byte)
buf := *bufPtr
defer func() {
// From the standard lib's crypto/tls package:
// "You might be tempted to simplify this by just passing &buf to Put,
// but that would make the local copy of the buf slice header escape
// to the heap, causing an allocation. Instead, we keep around the
// pointer to the slice header returned by Get, which is already on the
// heap, and overwrite and return that."
// See: https://github.com/dominikh/go-tools/issues/1336
*bufPtr = buf
sizePeekBufPool.Put(bufPtr)
}()
n, err := io.ReadFull(source, buf)
if err == io.EOF { // no content
p.log.Debug("empty data stream",
zap.String("original_id", it.ID),
zap.String("data_file_path", it.dataFilePath),
zap.String("filename", it.Content.Filename),
zap.String("intermediate_path", it.IntermediateLocation))
return nil
}
if err != nil && err != io.ErrUnexpectedEOF { // it's OK if the buffer wasn't filled, it just means the item is smaller
return fmt.Errorf("buffering item's data stream to peek size and content-type: %w", err)
}
// In case the size of the data is less than the peek buffer, we carefully use only
// the first n bytes of the buffer, since beyond that is likely data from another
// item that this pooled buffer was previously used with
bufferedContent := buf[:n]
// now that we have a sample of the data (or possibly all of it),
// we can sniff the content-type
if it.Content.MediaType == "" {
it.Content.MediaType = detectContentType(bufferedContent, it.Content.Filename)
}
if n < len(buf) && it.Content.isPlainTextOrMarkdown() {
// item's content is smaller than the buffer and is plaintext, so
// we can store this comfortably in the items table; move a string
// copy of it to the item and return
dataTextStr := strings.TrimSpace(string(bufferedContent))
it.dataText = &dataTextStr
return nil
}
// content is either binary, or too large to fit comfortably in the
// items table, so we'll need to finish downloading and processing
// the data file; we ensure the data stream first reads from our
// buffer before it finishes reading from the source stream
source = io.NopCloser(io.MultiReader(bytes.NewReader(bufferedContent), source))
}
// this might be a naive assumption, but if the item classification is
// missing, but the item is clearly a common media type, we can probably
// classify the item anyway
if it.Classification.Name == "" {
if strings.HasPrefix(it.Content.MediaType, "image/") ||
strings.HasPrefix(it.Content.MediaType, "video/") ||
strings.HasPrefix(it.Content.MediaType, "audio/") {
it.Classification = ClassMedia
}
}
// open the output file and copy the data
var destination *os.File
destination, it.dataFilePath, err = p.tl.openUniqueCanonicalItemDataFile(ctx, p.log, nil, it, p.ds.Name)
if err != nil {
return fmt.Errorf("opening output data file: %w", err)
}
defer destination.Close()
p.log.Debug("opened canonical data file for writing",
zap.String("filename", it.Content.Filename),
zap.String("intermediate_path", it.IntermediateLocation),
zap.String("data_file", it.dataFilePath))
if err := p.downloadDataFile(it, source, destination); err != nil {
return err
}
return nil
}
// downloadDataFile downloads the data file and hashes it. It attaches the
// results to the item.
func (p *processor) downloadDataFile(it *Item, source io.Reader, destination *os.File) error {
if it == nil {
return nil
}
h := newHash()
dataFileSize, err := p.downloadAndHashDataFile(source, destination, h, path.Dir(it.dataFilePath))
if err != nil {
return err
}
if dataFileSize == 0 {
// don't keep empty data files laying around
p.log.Warn("downloaded data file was empty; removing file",
zap.String("item_original_id", it.ID),
zap.String("intermediate_path", it.IntermediateLocation),
zap.String("data_file_name", it.dataFilePath),
zap.Int64("bytes_written", it.dataFileSize))
if err := p.tl.deleteRepoFile(it.dataFilePath); err != nil {
p.log.Error("could not delete empty data file",
zap.String("item_original_id", it.ID),
zap.String("data_file_name", it.dataFilePath),
zap.Error(err))
}
// make sure it doesn't look like we still have a data file
it.dataFilePath = ""
} else {
it.dataFileSize = dataFileSize
it.dataFileHash = h.Sum(nil)
// If inline thumbnail generation is enabled, do that now; this acquires a lock or two on the DB,
// so it's definitely not super efficient, but: this is an opt-in code path, and the bottleneck
// here is usually the thumbnail generation itself, not the DB. I have not noticed a huge difference
// in the runtimes of this versus import then thumbnail job after.
if p.ij.ProcessingOptions.Thumbnails && qualifiesForThumbnail(&it.Content.MediaType) {
task := thumbnailTask{
tl: p.tl,
DataFile: it.dataFilePath,
DataType: it.Content.MediaType,
ThumbType: thumbnailType(it.Content.MediaType, false),
}
start := time.Now()
if thumb, thash, err := task.thumbnailAndThumbhash(p.ij.job.ctx, true); err != nil {
p.log.Error("failed generating thumbnail and thumbhash for item",
zap.String("item_original_id", it.ID),
zap.String("item_intermediate_path", it.IntermediateLocation),
zap.String("data_file_name", it.dataFilePath),
zap.Error(err))
} else {
it.thumbhash = thash // make sure to keep this, so it gets stored in the DB with the item
verb := "generated"
if thumb.alreadyExisted {
verb = "assigned existing"
}
p.log.Info(verb+" thumbnail for item",
zap.String("item_original_id", it.ID),
zap.String("item_intermediate_path", it.IntermediateLocation),
zap.String("data_file_name", it.dataFilePath),
zap.Duration("duration", time.Since(start)))
}
}
}
return nil
}
// handleDuplicateItemDataFile checks to see if the checksum of the file downloaded for the item
// already exists in the database. It returns true if the file already exists and a replacement
// occurred to use the existing file (or, if integrity check on the existing file failed, the new
// file instead). If false is returned, the downloaded file is unique.
func (p *processor) handleDuplicateItemDataFile(ctx context.Context, tx *sql.Tx, it *Item) (bool, error) {
if it.dataFilePath == "" || len(it.dataFileHash) == 0 {
return false, errors.New("missing data filename and/or hash of contents")
}
var existingDataFilePath *string
// we can reuse the existing thumbhash (if there is one) for items that share the same data file
// (the "data_text IS NULL" condition is necessary to use the "idx_items_data_text_hash" index,
// and has been observed to yield a much faster query plan, but is not strictly needed for correctness;
// it should be fine since we don't set data_text when there's a data_file)
err := tx.QueryRowContext(ctx, `SELECT data_file, thumb_hash FROM items WHERE data_text IS NULL AND data_hash=? AND data_file!=? LIMIT 1`,
it.dataFileHash, it.dataFilePath).Scan(&existingDataFilePath, &it.thumbhash)
if errors.Is(err, sql.ErrNoRows) {
return false, nil // file is unique; carry on
}
if err != nil {
return false, fmt.Errorf("querying DB: %w", err)
}
// file is a duplicate! by the time this function returns (if successful),
// it.dataFileName should not exist anymore and should be reassigned to
// *existingDataFilePath instead.
p.log.Info("data file is a duplicate",
zap.String("duplicate_data_file", it.dataFilePath),
zap.Stringp("existing_data_file", existingDataFilePath),
zap.Binary("checksum", it.dataFileHash))
if existingDataFilePath == nil {
// ... that's weird, how's this possible? it has a hash but no file name recorded
return false, fmt.Errorf("item with matching hash is missing data file name; hash: %x", it.dataFileHash)
}
// TODO: maybe this all should be limited to only when integrity checks are enabled? how do we know that this download has the right version/contents?
p.log.Debug("verifying existing file is still the same",
zap.Stringp("existing_data_file", existingDataFilePath),
zap.Binary("checksum", it.dataFileHash))
// ensure the existing file is still the same
h := newHash()
f, err := os.Open(p.tl.FullPath(*existingDataFilePath))
if err != nil {
return false, fmt.Errorf("opening existing file: %w", err)
}
defer f.Close()
_, err = io.Copy(h, f)
if err != nil {
return false, fmt.Errorf("reading file for opportunistic integrity check: %w", err)
}
existingFileHash := h.Sum(nil)
if !bytes.Equal(it.dataFileHash, existingFileHash) {
// the existing file was corrupted, so restore it with
// what we just downloaded, which presumably succeeded
// (by simply renaming the file on disk, we don't have
// to update any entries in the DB)
p.log.Warn("existing data file failed integrity check (checksum on disk changed; file corrupted or modified?) - replacing existing file with this one",
zap.Stringp("existing_data_file", existingDataFilePath),
zap.Binary("expected_checksum", it.dataFileHash),
zap.Binary("actual_checksum", existingFileHash),
zap.String("replacement_data_file", it.dataFilePath))
err := os.Rename(p.tl.FullPath(it.dataFilePath), p.tl.FullPath(*existingDataFilePath))
if err != nil {
return false, fmt.Errorf("replacing modified data file: %w", err)
}
} else {
// everything checks out; delete the newly-downloaded file
// and use the existing file instead of duplicating it
p.log.Debug("existing file passed integrity check; using it instead of newly-downloaded duplicate",
zap.Stringp("existing_data_file", existingDataFilePath),
zap.String("new_data_file", it.dataFilePath),
zap.Binary("checksum", it.dataFileHash))
err = p.tl.deleteRepoFile(it.dataFilePath)
if err != nil {
return false, fmt.Errorf("removing duplicate data file: %w", err)
}
}
p.log.Info("removed duplicate data file based on integrity check",
zap.String("duplicate_data_file", it.dataFilePath),
zap.Stringp("existing_data_file", existingDataFilePath),
zap.Binary("checksum", it.dataFileHash))
it.dataFilePath = *existingDataFilePath
return true, nil
}
// downloadAndHashDataFile writes source to destination, hashing it along the way.
// It returns the number of bytes copied.
func (p *processor) downloadAndHashDataFile(source io.Reader, destination *os.File, h hash.Hash, destinationDir string) (int64, error) {
// writing a data file consists of non-atomic steps: creating directory, writing the
// file, and cleaning up the file and its empty dir tree if the file was empty; and
// since data files are downloaded concurrently, we need to sync these steps, otherwise
// it's possible to delete a directory that another file is writing into... so we keep
// track of which directories are in use while creating data files, and when we're
// done, we can remove the "lock" on that directory
defer func() {
p.tl.dataFileWorkingDirsMu.Lock()
p.tl.dataFileWorkingDirs[destinationDir]--
if p.tl.dataFileWorkingDirs[destinationDir] == 0 {
delete(p.tl.dataFileWorkingDirs, destinationDir)
}
p.tl.dataFileWorkingDirsMu.Unlock()
}()
// give the hasher a copy of the file bytes
tr := io.TeeReader(source, h)
start := time.Now()
n, err := io.Copy(destination, tr)
if err != nil {
// TODO: The error should be highlighted as a notification of some sort. Ideally, keep what we have, but somehow indicate in the DB that it's corrupt/incomplete (like not filling out a data_hash)
_ = p.tl.deleteRepoFile(destination.Name())
return n, fmt.Errorf("copying contents: %w", err)
}
// TODO: If n == 0, should we retry? (would need to call h.Reset() first) - to help handle sporadic I/O issues maybe
// we can probably increase performance if we don't sync all the time, but that might be less reliable...
if n > 0 {
if err := destination.Sync(); err != nil {
return n, fmt.Errorf("syncing file after downloading: %w", err)
}
}
p.log.Debug("downloaded data file",
zap.String("filename", destination.Name()),
zap.Duration("duration", time.Since(start)),
zap.Int64("size", n))
return n, nil
}
// phase2 inserts or updates the graphs in the database. It tidies up the data file, if any, according
// to information found in the database. This is the sole phase that obtains a DB lock and must come
// after the data files have been downloaded. It also emits logs for the frontend.
func (p *processor) phase2(ctx context.Context, batch []*Graph) error {
tx, err := p.tl.db.WritePool.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("beginning transaction for batch: %w", err)
}
defer tx.Rollback()
for _, g := range batch {
if err = p.processGraph(ctx, tx, g); err != nil {
p.log.Error("processing graph", zap.String("graph", g.String()), zap.Error(err))
g.err = err
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing transaction for batch: %w", err)
}
return nil
}
func (p *processor) processGraph(ctx context.Context, tx *sql.Tx, g *Graph) error {
if g == nil {
return nil
}
// validate node type
if g.Item != nil && g.Entity != nil {
return fmt.Errorf("ambiguous node in graph is both an item and entity node (item_graph=%p)", g)
}
// this whole big thing is one huge log so the UI can stream
// a sample of live import data
defer func() {
// logging the progress every item/entity that gets processed is actually
// not super efficient, so we use this trick to only prepare the log entry
// if it will, in fact, be logged (we sample logs to increase efficiency,
// but those gains are most realized when we avoid our own processing if
// a particular log entry will be dropped too, hence the call to Check())
if checkedLog := p.log.Check(zapcore.InfoLevel, "finished graph"); checkedLog != nil {
graphType := "item"
if g.Entity != nil {
graphType = "entity"
}
fields := []zapcore.Field{
zap.String("graph", fmt.Sprintf("%p", g)),
zap.String("type", graphType),
zap.Uint64("row_id", g.rowID.id()),
zap.Int64("new_entities", atomic.LoadInt64(p.ij.newEntityCount)),
zap.Int64("new_items", atomic.LoadInt64(p.ij.newItemCount)),
zap.Int64("updated_items", atomic.LoadInt64(p.ij.updatedItemCount)),
zap.Int64("skipped_items", atomic.LoadInt64(p.ij.skippedItemCount)),
zap.Int64("total_items", atomic.LoadInt64(p.ij.itemCount)),
}
if g.Item != nil && !g.Item.Timestamp.IsZero() {
fields = append(fields, zap.Time("item_timestamp", g.Item.Timestamp))
}
entityAttr := func(e Entity) zapcore.Field {
if e.Name != "" {
return zap.String("entity", e.Name)
}
for _, attr := range e.Attributes {
if attr.Identifying || attr.Identity {
return zap.Any("entity", attr.Value)
}
}
return zap.Stringp("entity", nil)
}
item, entity := g.Item, g.Entity
if options, obfuscate := p.tl.obfuscationMode(); obfuscate {
// We tediously (shallow-ish) copy the values that are anonymized,
// since we have to log them with obfuscation mode enabled. Why
// do we need to copy them first? Because the tx isn't finished
// yet. This whole method is one iteration's call as part of a
// batch, so if we change the values right now, they'll go into
// the DB like that -- I have verified this by inspecting the DB,
// and found obfuscated values -- yikes! so we do have to copy
// the values that get anonymized, even if we don't log them.
if item != nil {
anonItem := *item
anonItem.row.Anonymize(options)
anonItem.row.Metadata = make(json.RawMessage, len(item.row.Metadata))
copy(anonItem.row.Metadata, item.row.Metadata)
anonItem.Owner.Attributes = make([]Attribute, len(item.Owner.Attributes))
copy(anonItem.Owner.Attributes, item.Owner.Attributes)
for i := range anonItem.Owner.Attributes {
anonItem.Owner.Attributes[i].Metadata = make(Metadata)
maps.Copy(anonItem.Owner.Attributes[i].Metadata, item.Owner.Attributes[i].Metadata)
}
anonItem.Owner.Anonymize(options)
item = &anonItem
}
if entity != nil {
anonEntity := *entity
anonEntity.Metadata = make(Metadata, len(entity.Metadata))
maps.Copy(anonEntity.Metadata, entity.Metadata)
anonEntity.Attributes = make([]Attribute, len(entity.Attributes))
copy(anonEntity.Attributes, entity.Attributes)
for i := range anonEntity.Attributes {
anonEntity.Attributes[i].Metadata = make(Metadata)
maps.Copy(anonEntity.Attributes[i].Metadata, entity.Attributes[i].Metadata)
}
anonEntity.ID = g.rowID.id()
anonEntity.Anonymize(options)
entity = &anonEntity
}
}
if item != nil {
size := item.dataFileSize
if item.row.DataText != nil {
size = int64(len(*item.row.DataText))
}
preview := item.row.DataText
const maxPreviewLen = 35
if preview != nil && len(*preview) > maxPreviewLen {
shortPreview := (*preview)[:maxPreviewLen]
preview = &shortPreview
}
fields = append(fields,
zap.String("status", string(item.row.howStored)),
zap.String("classification", item.Classification.Name),
zap.Stringp("preview", preview),
zap.Stringp("filename", item.row.Filename),
zap.Stringp("original_path", item.row.OriginalLocation),
zap.Stringp("intermediate_path", item.row.IntermediateLocation),
zap.Int64("size", size),
zap.Float64p("lat", item.row.Latitude),
zap.Float64p("lon", item.row.Longitude),
zap.String("media_type", item.Content.MediaType),
entityAttr(item.Owner))
} else if entity != nil {
fields = append(fields, entityAttr(*entity))
}
checkedLog.Write(fields...)
}
}()
// process root node
switch {
case g.Entity != nil:
var err error
g.rowID, err = p.processEntity(ctx, tx, *g.Entity)
if err != nil {
return fmt.Errorf("processing entity node: %w", err)
}
case g.Item != nil:
var err error
g.rowID, err = p.processItem(ctx, tx, g.Item)
if err != nil {
return fmt.Errorf("processing item node: %w", err)
}
}
// process connected nodes
for _, r := range g.Edges {
err := p.processRelationship(ctx, tx, r, g)
if err != nil {
p.log.Error("processing relationship",
zap.Uint64("item_or_attribute_row_id", g.rowID.id()),
zap.Error(err))
}
}
return nil
}
func (p *processor) processItem(ctx context.Context, tx *sql.Tx, it *Item) (latentID, error) {
if it.skip {
return latentID{}, nil
}
// if this item has a data file, check if it's a duplicate and handle accordingly
if it.dataFilePath != "" {
_, err := p.handleDuplicateItemDataFile(ctx, tx, it)
if err != nil {
return latentID{}, fmt.Errorf("checking if item data file was %w", err)
}
}
itemRowID, err := p.storeItem(ctx, tx, it)
if err != nil {
return latentID{itemID: itemRowID}, err
}
return latentID{itemID: itemRowID}, nil
}
// storeItem stores the item in the database, updating an existing row if pertinent.
// It returns the row ID of the item.
func (p *processor) storeItem(ctx context.Context, tx *sql.Tx, it *Item) (uint64, error) {
// keep count of number of items processed, mainly for logging
defer atomic.AddInt64(p.ij.itemCount, 1)
// prepare for DB queries to see if we have this same item already
// in some form or another
var dsName *string
if p.ds.Name != "" {
dsName = &p.ds.Name
}
it.makeIDHash(dsName)
it.makeContentHash()
// if this function returns with an error, we can avoid an orphaned data file by deleting it
// TODO: a background maintenance routine/job that regularly deletes orphaned data files
var err error
if it.dataFilePath != "" {
defer func() {
if err != nil {
if err := p.tl.deleteRepoFile(it.dataFilePath); err != nil {
p.log.Warn("could not clean up data file of failed item",
zap.String("data_file_path", it.dataFilePath),
zap.Error(err))
}
}
}()
}
uniqueConstraints := it.Retrieval.finalUniqueConstraints(p.ij.ProcessingOptions.ItemUniqueConstraints)
// if the item is already in our DB, load it
var ir ItemRow
ir, err = p.tl.loadItemRow(ctx, tx, 0, 0, it, dsName, uniqueConstraints, true)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return 0, fmt.Errorf("looking up item in database: %w", err)
}
defer func() { it.row = ir }() // used later in the pipeline for logging
// if the item is already in the DB, figure out what, if anything, will be updated
if ir.ID > 0 {
// first we need to distill user's update preferences down to update policies for the DB
if err := p.distillUpdatePolicies(it, ir); err != nil {
return 0, fmt.Errorf("distilling initial update policies: %w", err)
}
// now determine if we should process this duplicate/existing item at all
reprocessItem, _ := p.shouldProcessExistingItem(it, ir, it.dataFilePath != "")
if !reprocessItem {
atomic.AddInt64(p.ij.skippedItemCount, 1)
p.log.Debug("skipping processing of existing item",
zap.Uint64("row_id", ir.ID),
zap.String("intermediate_path", it.IntermediateLocation),
zap.String("filename", it.Content.Filename),
zap.String("classification", it.Classification.Name),
zap.String("original_id", it.ID))
// add some info for logging purposes
ir.howStored = itemSkipped
return ir.ID, nil
}
p.log.Debug("found existing item row according to configured unique constraints",
zap.Uint64("row_id", ir.ID),
zap.String("filename", it.Content.Filename),
zap.String("intermediate_path", it.IntermediateLocation),
zap.String("original_id", it.ID))
// if existing item row already has a data file, see if the incoming item changes a filename;
// this can happen if the filename differs from what is in the DB (or adds one where it was
// missing), or if there is no filename and the timestamp is different
if ir.DataFile != nil {
tsUpdatePol := it.fieldUpdatePolicies["timestamp"]
filenameUpdatePol := it.fieldUpdatePolicies["filename"]
incomingItemAddsFilename := ir.Filename == nil && it.Content.Filename != ""
incomingItemUpdatesFilename := filenameUpdatePol > 0 && (ir.Filename == nil || it.Content.Filename != *ir.Filename)
incomingItemUpdatesFilenameWithTime := ir.Filename == nil && tsUpdatePol > 0 &&
!it.Timestamp.IsZero() && (ir.Timestamp == nil || !it.Timestamp.Equal(*ir.Timestamp))
if incomingItemAddsFilename || incomingItemUpdatesFilename || incomingItemUpdatesFilenameWithTime {
newDataFilePath, err := p.renameDataFile(ctx, tx, it, *ir.DataFile, it.Content.Filename)
if err != nil {
p.log.Error("could not rename existing data file given new filename information", zap.Error(err))
} else {
p.log.Info("renamed existing data file given new filename information",
zap.String("new_filename", it.Content.Filename),
zap.String("old_data_file_path", *ir.DataFile),
zap.String("new_data_file_path", newDataFilePath))
ir.DataFile = &newDataFilePath
ir.Filename = &it.Content.Filename
}
}
}
// if a filename is already in the database, but is not in this item graph, use the one in the database
// (useful if the data source sends an item piecewise, like filename first, then actual data file in a later graph)
if ir.Filename != nil && it.Content.Filename == "" && it.dataFilePath != "" {
newDataFilePath, err := p.renameDataFile(ctx, tx, it, it.dataFilePath, *ir.Filename)
if err != nil {
p.log.Error("could not rename data file given filename information found in database", zap.Error(err))
} else {
p.log.Info("renamed data file given filename information found in database",
zap.String("discovered_filename", *ir.Filename),
zap.String("old_data_file_path", it.dataFilePath),
zap.String("new_data_file_path", newDataFilePath))
it.dataFilePath = newDataFilePath
}
}
}
// we assume we're on a case-sensitive file system; that means that the filename might collide
// with another if moved to a case-sensitive file system - that'd be really bad, so let's make
// sure the filename is unique case-insensitively too
if it.dataFilePath != "" {
// query the DB for a data_file that matches case-insensitively (data_file is defined to be COLLATE NOCASE)
// and doesn't match exactly ("COLLATE BINARY")
var count int
err = tx.QueryRowContext(ctx, `SELECT count() FROM items WHERE data_file=? AND data_file!=? COLLATE BINARY LIMIT 1`,
it.dataFilePath, it.dataFilePath).Scan(&count)
if err != nil {
return 0, fmt.Errorf("checking DB for case-insensitive filename uniqueness: %w", err)
}
if count > 0 {
newDataFileName, err := p.renameDataFile(ctx, tx, it, it.dataFilePath, path.Base(it.dataFilePath))
if err == nil {
p.log.Info("data file renamed to be case-insensitively unique",
zap.String("old_data_file_path", it.dataFilePath),
zap.String("new_data_file_path", newDataFileName))
it.dataFilePath = newDataFileName
} else {
p.log.Error("could not rename existing data file to avoid potential case-insensitive collisions", zap.Error(err))
}
}
}
// make a copy of this 'cause we might use it later to clean up a data file if we ended up setting it to NULL or replacing it
startingDataFile := ir.DataFile
// convert incoming item to a row that can be inserted into the DB
err = p.fillItemRow(ctx, tx, &ir, it)
if err != nil {
return 0, fmt.Errorf("assembling item for storage: %w", err)
}
// execute query to insert or update the item
ir.ID, ir.howStored, err = p.insertOrUpdateItem(ctx, tx, ir, it.HasContent(), it.fieldUpdatePolicies)
if err != nil {
return 0, fmt.Errorf("storing item in database: %w (row_id=%d item_id=%v)", err, ir.ID, ir.OriginalID)
}
// items that are updated should have new embeddings generated; delete any old ones
if ir.howStored == itemUpdated {
_, err = tx.ExecContext(ctx, `DELETE FROM embeddings WHERE item_id=?`, ir.ID)
if err != nil {
return 0, fmt.Errorf("deleting old embeddings of updated item(s): %w", err)
}
}
// if the incoming data file didn't end up being used, clean it up
if it.dataFilePath != "" {
var count int
err = tx.QueryRowContext(ctx, `SELECT count() FROM items WHERE data_file=? LIMIT 1`, it.dataFilePath).Scan(&count)
if err != nil {
return 0, fmt.Errorf("checking DB for unused incoming data file: %w", err)
}
if count == 0 {
if err = p.tl.deleteRepoFile(it.dataFilePath); err != nil {
p.log.Error("unable to clean up unused incoming data file",
zap.String("data_file_path", it.dataFilePath),
zap.Error(err))
}
}
}
// if there's a chance that we just set the data_file to NULL, check to see if the
// old file is no longer referenced in the DB, and if not, clean it up
if startingDataFile != nil && ir.DataFile == nil {
if err := p.tl.deleteDataFileAndThumbnailIfUnreferenced(ctx, tx, *startingDataFile); err != nil {
p.log.Error("cleaning up unused data file",
zap.Uint64("item_row_id", ir.ID),
zap.Stringp("data_file_name", startingDataFile),
zap.Error(err))
}
}
// if this replaced an item's previous data file, clean up the old one if it is no longer
// referenced by any other items
if it.dataFilePath != "" && startingDataFile != nil && it.dataFilePath != *startingDataFile {
if err := p.tl.deleteDataFileAndThumbnailIfUnreferenced(ctx, tx, *startingDataFile); err != nil {
p.log.Error("could not clean up old, unreferenced data file and any associated thumbnail",
zap.String("old_data_file", *startingDataFile),
zap.String("replaced_by", it.dataFilePath),
zap.Uint64("row_id", ir.ID),
zap.Error(err))
}
// additionally, if the incoming file has the same filename as the one we just deleted, let's see
// if now that filename is available, to try to preserve the item's original filename best we can
// (for example, if an item with a file named "A.JPG" is being replaced by a new "A.JPG", the
// processor will initially create "A__asdf.jpg" because "A.JPG" already exists; but once we get
// here, that old file has been deleted, so we can now restore the original name to the new file)
if it.intendedDataFileName != "" && path.Base(it.dataFilePath) != it.intendedDataFileName {
desiredFilename := path.Join(path.Dir(it.dataFilePath), it.intendedDataFileName)
desiredFilenameFullPath := p.tl.FullPath(desiredFilename)
f, err := os.OpenFile(desiredFilenameFullPath, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0600)
if err == nil {
f.Close()
if err := os.Rename(p.tl.FullPath(it.dataFilePath), desiredFilenameFullPath); err != nil {
p.log.Error("could not restore data file name from temporary name",
zap.String("temporary_name", it.dataFilePath),
zap.String("name_to_restore", desiredFilename),
zap.Uint64("row_id", ir.ID),
zap.Error(err))
} else {
_, err = tx.ExecContext(ctx, "UPDATE items SET data_file=? WHERE data_file=?", desiredFilename, it.dataFilePath)
if err == nil {
it.dataFilePath = desiredFilename
} else {
p.log.Error("could not update row with renamed data file paths",
zap.String("previous_path", it.dataFilePath),
zap.String("new_path", desiredFilename),
zap.Uint64("row_id", ir.ID),
zap.Error(err))
}
}
} else if !errors.Is(err, fs.ErrExist) {
p.log.Error("could not create placeholder file for data file of the which original filename is being restored",
zap.String("temporary_name", it.dataFilePath),
zap.String("name_to_restore", desiredFilename),
zap.Uint64("row_id", ir.ID),
zap.Error(err))
}
}
}
// Count data files which are eligible for a thumbnail and which are not excluded from
// receiving a thumbnail (like live photos/motion picture sidecar files, which are
// generally an exception from normal related items). This count is not used to configure
// the thumbnail job's total size, but it was in a previous version of the code. Now the
// job calculates it when it starts. This count is mainly used to determine whether to
// even start a thumbnail job. TODO: That said, maybe we should always start a thumbnail
// job even if it does nothing? Then we don't have to count here.
if qualifiesForThumbnail(ir.DataType) && !it.skipThumb {
atomic.AddInt64(p.ij.thumbnailCount, 1)
}
return ir.ID, nil
}
func (p *processor) renameDataFile(ctx context.Context, tx *sql.Tx, incoming *Item, oldDataFilePath, newFilename string) (string, error) {
incoming.Content.Filename = newFilename
placeholderFile, newDataFilePath, err := p.tl.openUniqueCanonicalItemDataFile(ctx, p.log, tx, incoming, p.ds.Name)
if err != nil {
return "", fmt.Errorf("could not rename data file: %w (old_data_file_name=%s new_filename=%s)", err, oldDataFilePath, newFilename)
}
_ = placeholderFile.Close()
// NOTE: if DB transaction fails, this rename does not get rolled back
err = os.Rename(p.tl.FullPath(oldDataFilePath), p.tl.FullPath(newDataFilePath))
if err != nil {
return "", fmt.Errorf("could not rename data file: %w (old=%s new=%s)", err, oldDataFilePath, newDataFilePath)
}
logger := p.log.With(
zap.String("old_data_file_path", oldDataFilePath),
zap.String("new_data_file_path", newDataFilePath),
zap.String("new_filename", newFilename),
)
// update all rows that may point to this data file to use the new path
if _, err := tx.ExecContext(ctx, "UPDATE items SET data_file=? WHERE data_file=?", newDataFilePath, oldDataFilePath); err != nil {
logger.Error("renamed data file, but failed to update timeline database", zap.Error(err))
} else {
logger.Info("renamed data file")
}
// also update thumbnails DB in case any thumbnails were already generated for the item
_, err = p.tl.thumbs.WritePool.ExecContext(ctx, "UPDATE thumbnails SET data_file=? WHERE data_file=?", newDataFilePath, oldDataFilePath)
if err != nil {
logger.Error("renamed data file, but failed to update thumbnail database", zap.Error(err))
}
// if the rename left the folder empty, clean it up
oldDir := path.Dir(oldDataFilePath)
if err := p.tl.cleanDirs(oldDir); err != nil {
logger.Error("failed tidying potentially empty folder of old path",
zap.String("old_data_file_path", oldDir),
zap.Error(err))
}
return newDataFilePath, nil
}