1
0
Fork 0
timelinize/timeline/imports.go
Matthew Holt ce601d5033
googlephotos: Ignore non-dir subfolders in Takeout albums
Also print data source in log messages when processing fails
2025-11-17 09:06:24 -07:00

667 lines
25 KiB
Go

/*
Timelinize
Copyright (c) 2013 Matthew Holt
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package timeline
import (
"bytes"
"encoding/json"
"fmt"
"io/fs"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/mholt/archives"
"github.com/ringsaturn/tzf"
"go.uber.org/zap"
)
type importJobCheckpoint struct {
EstimatedSize *int64 `json:"estimated_size"` // only set if currently in the estimating phase
OuterIndex int `json:"outer_index"`
InnerIndex int `json:"inner_index"`
// these fields remember the count of new items that will need thumbnails
ThumbnailCount int64 `json:"thumbnail_count"`
// This is passed through to the data source; and we would be using
// json.RawMessage here so that, when loading a checkpoint, the
// raw bytes are returned to the data source, but this would mean
// that we have to JSON-encode the data source's checkpoint for
// every graph that it sends with a checkpoint -- which is often all
// of them -- even though checkpoints only get persisted every so
// often... in other words, lots of unnecessary json.Marshal() calls;
// anyway, we let the job manager encode the whole checkpoint together
// in one call when it actually persists it to the DB; it just means
// that when we restore the checkpoint, we have to do one call to
// json.Marshal into bytes to give to the data source, but that is
// still way more efficient than marshaling for every item.
//
// As a general rule of thumb, no graphs should be sent down the
// pipeline while resuming from a checkpoint.
DataSourceCheckpoint any `json:"data_source_checkpoint,omitempty"`
}
type ImportJob struct {
// accessed atomically (placed first to align on 64-bit word boundary, for 32-bit systems)
// (reset each time the job is started)
itemCount, newItemCount, updatedItemCount, skippedItemCount *int64
newEntityCount *int64
thumbnailCount *int64
job *ActiveJob
// interactive jobs need access to the current processor to send graphs ready for processing
p *processor
pMu *sync.Mutex
Plan ImportPlan `json:"plan,omitempty"`
ProcessingOptions ProcessingOptions `json:"processing_options,omitempty"`
EstimateTotal bool `json:"estimate_total,omitempty"`
}
func (ij ImportJob) checkpoint(estimatedSize *int64, outer, inner int, ds any) error {
return ij.job.Checkpoint(importJobCheckpoint{
EstimatedSize: estimatedSize,
OuterIndex: outer,
InnerIndex: inner,
ThumbnailCount: atomic.LoadInt64(ij.thumbnailCount),
DataSourceCheckpoint: ds,
})
}
// TODO: This was useful during the refactoring of the import flow, but
// I ended up on a design that doesn't require rooting file systems at
// the root of the volume. Seems like good code, since the Go standard
// library doesn't have a way of doing this, so I hesitate to delete it
// entirely. :)
//
// // splitPathAtVolume splits an absolute filename into volume (or root)
// // and filename components. On Windows, the "root" of a filepath is the
// // volume (e.g. "C:"); on Unix-compatible systems, it is "/". Windows is
// // insane. Anyway, this is useful for constructing a fs.FS that provides
// // access to the whole file system. The returned rootOrVolume can be used
// // as the root of an os.DirFS or similar FS (such as archives.DeepFS),
// // and the relativePath is an fs.FS-compatible path that can be used to
// // access the file within the FS.
// //
// // See https://github.com/golang/go/issues/44279.
// func splitPathAtVolume(absFilename string) (rootOrVolume string, relativePath string, err error) {
// if !filepath.IsAbs(absFilename) {
// err = fmt.Errorf("filename is not absolute: %s", absFilename)
// return
// }
// rootOrVolume = "/" // assume non-Windows
// if vol := filepath.VolumeName(absFilename); vol != "" {
// rootOrVolume = vol // okay, it's actually Windows
// }
// relativePath, err = filepath.Rel(rootOrVolume, absFilename)
// if err != nil {
// return
// }
// relativePath = filepath.ToSlash(relativePath)
// return
// }
func (ij *ImportJob) Run(job *ActiveJob, checkpoint []byte) error {
ij.job = job
ij.itemCount = new(int64)
ij.newItemCount = new(int64)
ij.updatedItemCount = new(int64)
ij.skippedItemCount = new(int64)
ij.newEntityCount = new(int64)
ij.thumbnailCount = new(int64)
ij.pMu = new(sync.Mutex)
estimating := ij.EstimateTotal
if ij.ProcessingOptions.Interactive != nil {
ij.ProcessingOptions.Interactive.Graphs = make(chan *InteractiveGraph)
}
var chkpt importJobCheckpoint
if checkpoint != nil {
err := json.Unmarshal(checkpoint, &chkpt)
if err != nil {
return fmt.Errorf("decoding checkpoint: %w", err)
}
// restore thumbnail-eligible item count
atomic.StoreInt64(ij.thumbnailCount, chkpt.ThumbnailCount)
// in theory, resuming a job should have the same configuration as
// before, so this may take us out of "estimating" mode if that had
// already been completed, but I don't think it should ever put us
// INTO "estimating" mode
estimating = chkpt.EstimatedSize != nil
}
// two iterations of the phase loop: first to estimate size if enabled, then to actually import items
for {
start := time.Now()
// this should be cumulative across all the files
var totalSizeEstimate *int64
if estimating {
if chkpt.EstimatedSize == nil {
totalSizeEstimate = new(int64)
job.Logger().Info("estimating size; this may take a bit")
} else {
totalSizeEstimate = chkpt.EstimatedSize
job.Logger().Info("resuming size estimation; this may take a bit")
}
job.Message("Estimating total import size")
}
// outer loop: iterate the datasource+filename combos
for i := chkpt.OuterIndex; i < len(ij.Plan.Files); i++ {
if err := job.Context().Err(); err != nil {
return err
}
fileImport := ij.Plan.Files[i]
// load data source tidbits
ds, ok := dataSources[fileImport.DataSourceName]
if !ok {
return fmt.Errorf("file import %d: unknown data source: %s", i, fileImport.DataSourceName)
}
dsRowID, ok := job.tl.dataSources[fileImport.DataSourceName]
if !ok {
return fmt.Errorf("file import %d: data source ID is unknown: %s", i, fileImport.DataSourceName)
}
dsOpt, err := ds.UnmarshalOptions(fileImport.DataSourceOptions)
if err != nil {
return err
}
logger := job.Logger().With(zap.String("data_source_name", ds.Name))
// inner loop: iterate the filenames assigned to this data source configuration
for j := chkpt.InnerIndex; j < len(fileImport.Filenames); j++ {
if err := job.Context().Err(); err != nil {
return err
}
filename := fileImport.Filenames[j]
if !estimating {
job.Message("Importing " + filename)
}
// create a new "outer" checkpoint when arriving at a new filename to be imported
// as long as we are not resuming from a data source checkpoint (if the data
// source has a checkpoint, we should not overwrite it / clear it out, in case
// the job stops before it has a chance to checkpoint again)
if chkpt.DataSourceCheckpoint == nil {
if err := ij.checkpoint(totalSizeEstimate, i, j, nil); err != nil {
job.Logger().Error("checkpointing", zap.Error(err))
}
}
// check for job cancellation
if err := job.Context().Err(); err != nil {
return err
}
// if time zone inference is enabled, initialize the TZ finder,
// which can be expensive, so only do it once
var tzFinder tzf.F
if ij.ProcessingOptions.InferTimeZone {
tzFinder, err = tzf.NewDefaultFinder()
if err != nil {
return fmt.Errorf("initializing time zone finder: %w", err)
}
}
p := processor{
outerLoopIdx: i,
innerLoopIdx: j,
estimatedCount: totalSizeEstimate,
ij: ij,
ds: ds,
dsRowID: dsRowID,
dsOpt: dsOpt,
tl: job.Timeline(),
log: logger,
tzFinder: tzFinder,
}
ij.pMu.Lock()
ij.p = &p
ij.pMu.Unlock()
// Create the file system from which this file/dir will be accessed. It must be
// a DeepFS since the filename might refer to a path inside an archive file.
// Rooting the FS is nuanced. If the FS is rooted at the file or directory
// itself, data sources that walk the FS (like if they support importing from a
// directory of similar independent files) must root their walk at ".", and
// then must check the filename passed into the WalkDirFunc for "." and replace
// it with the real filename, if they want to access the file's actual name
// (e.g. to check its file extension). It also forbids access to siblings, which
// can be useful if sidecar files contain relevant data.
//
// One way to solve this is rooting the FS at the volume or real file system
// root (e.g. "C:" or "/"), then walking starting from the full path to the
// file. This provides important context about the file's location, which
// some data sources may use (e.g. looking for a timestamp in the folder
// hierarchy), and also always provides the actual filename (never "." since
// a walk wouldn't start from "."); however, this makes other FS operations
// tedious. For example, accessing a specific path in a subfolder of the
// starting filename involves convoluted logic to determine if the starting
// filename is a directory or file, and if a file, to call path.Dir() then
// path.Join(), just to get the desired path. This is error-prone and repetitive.
//
// We can improve on this by rooting the FS not at the filename or at the volume
// root, but somewhere in between: the file if it's a directory, or the parent
// dir if it's a file. That way, if the filename is a file, fs.WalkDir(filename)
// will walk only the file as expected, and with its true filename (not ".");
// and if it's a dir, the whole directory will be walked. (The filename of the
// top directory in that case will be ".", but when walking a directory, the name
// of the top directory is seldom important; it can still be retrieved using
// another method or variable anyway). This also makes Open()/Stat()/etc work
// as expected when simply passing in a relative path, without any complex lexical
// manipulation (i.e. passing in "a/b/c.txt" will open "/fs/root/a/b/c.txt" as
// intended because the fs is rooted at "/fs/root" regardless if the starting
// filename was "/fs/root" or "/fs/root/file.txt"). And because we root the FS
// at the parent dir of a file, it grants access to siblings/sidecars if needed.
// (We can't ALWAYS root the FS at the parent dir, because if the target is
// a directory, then its parent dir is a *different* directory, making the
// construction of subpaths more tedious to do correctly.)
//
// There are two main downsides I can see. One is that the filename associated
// with this DirEntry will no longer be the full path from the volume root, thus
// losing potentially valuable context as to the file's location. This can be
// remedied by adding a field to the DirEntry that stores the FS root path, so
// it can be referred to if needed. This has been done. The other downside is
// that when walking the FS, one must remember to root the walk at the associated
// Filename field in the DirEntry, and not "." (Filename will be "." in the case
// of a directory, but for a file, the FS root will be its parent folder and
// Filename will be the file's name in that directory, so "." would walk all
// files adjacent to the Filename).
// start by assuming the filename is a directory, in which case the FS is rooted
// at the directory itself, and thus the filename within the FS is "."
fsRoot, filenameInsideFS := filename, "."
// if the filepath contains an archive (i.e. the path itself traverses into
// an archive), we need to use DeepFS; otherwise, traversing into an archive
// during import is (probably?) not desired, so use a "regular" file system
var fsys fs.FS
if archives.PathContainsArchive(filepath.ToSlash(fsRoot)) {
// TODO: I'd love to know what the archive file is named, for troubleshooting multi-archive Google Takeouts
fsys = &archives.DeepFS{Root: fsRoot, Context: job.Context()}
} else {
fsys, err = archives.FileSystem(job.ctx, fsRoot, nil)
if err != nil {
return fmt.Errorf("creating file system at %s: %w", fsRoot, err)
}
}
// this stat can be slow-ish, depending on if we're in a large, compressed tar file,
// and the file happens to be at the end, but we need to know if it's not a directory;
// and since we reuse the DeepFS, the results get amortized for later
info, err := fs.Stat(fsys, filenameInsideFS)
if err != nil {
return fmt.Errorf("could not stat file to import: %s: %w", filename, err)
}
if !info.IsDir() {
// as explained above, if it's NOT a directory, we root the FS at the
// parent folder and set the filename to the file's name in the dir.
fsRoot, filenameInsideFS = filepath.Split(fsRoot)
// since we changed the FS root, we have to update the fsys to reflect that
switch f := fsys.(type) {
case *archives.DeepFS:
f.Root = fsRoot
default:
// recreate the file system, since we might have just changed a FileFS
// into a DirFS, for example
fsys, err = archives.FileSystem(job.ctx, fsRoot, nil)
if err != nil {
return fmt.Errorf("recreating file system at %s: %w", fsRoot, err)
}
}
}
dirEntry := DirEntry{
DirEntry: fs.FileInfoToDirEntry(info),
FS: fsys,
FSRoot: fsRoot,
Filename: filenameInsideFS,
}
// the data source decodes its own checkpoint, so we give it its checkpoint as bytes
var dsCheckpoint json.RawMessage
if chkpt.DataSourceCheckpoint != nil {
dsCheckpoint, err = json.Marshal(chkpt.DataSourceCheckpoint)
if err != nil {
return fmt.Errorf("re-encoding data source checkpoint to pass to DS to resume: %w", err)
}
}
if err := p.process(job.Context(), dirEntry, dsCheckpoint); err != nil {
ij.generateThumbnailsForImportedItems()
ij.generateEmbeddingsForImportedItems()
return fmt.Errorf("processing %s as %s: %w", filename, ds.Name, err)
}
// the data source checkpoint is only applicable to the starting point (the filename we resumed from)
chkpt.DataSourceCheckpoint = nil
// reset the checkpoint's inner index back to 0, since when we advance the outer loop
// we should always start its file list at 0 (otherwise we skip import tasks completely
// if it's > 0!)
chkpt.InnerIndex = 0
}
}
if estimating {
// inform the UI of the new total count
total := atomic.LoadInt64(totalSizeEstimate)
job.SetTotal(int(total))
job.FlushProgress()
job.Logger().Info("done with size estimation",
zap.Int64("estimated_size", total),
zap.Duration("duration", time.Since(start)))
// loop once more to import items (don't estimate again); and make sure we start the
// next phase from the beginning of the outer list (even if we resumed a checkpoint
// and the starting outer index is > 0), because the starting index only applies to
// that phase; we don't want to skip tasks for our import job!
estimating = false
chkpt.OuterIndex = 0
continue
}
// only import the data once
break
}
if err := job.Context().Err(); err != nil {
return err
}
job.Logger().Info("import complete; cleaning up")
if err := ij.successCleanup(); err != nil {
job.Logger().Error("cleaning up after import job", zap.Error(err))
}
ij.generateThumbnailsForImportedItems()
ij.generateEmbeddingsForImportedItems()
// this can prevent/resolve slow queries, especially useful after (large) imports
// TODO: maybe only necessary after *large* imports
go job.tl.optimizeDB(job.Logger())
return nil
}
// ImportParams specifies parameters for listing items
// from a data source. Some data sources might not be
// able to honor all fields.
type ImportParams struct {
// Send graphs on this channel to put them through the
// processing pipeline and add them to the timeline.
Pipeline chan<- *Graph `json:"-"`
// The logger to use.
Log *zap.Logger `json:"-"`
// Typically, job pauses are honored by the processor whenever
// a graph is sent down the pipeline. However, if data sources
// do a nontrivial amount of work between sending graphs, this
// can result in a long delay before pausing. So we expose the
// pause function here for the data source to call at each unit
// of work. This could be, for example, an iteration of a loop,
// which may be looking for the next item within the designated
// timeframe, that calls Continue(), which blocks until the job
// gets unpaused (and does nothing if the job is not paused).
Continue func() error `json:"-"`
// Time bounds on which data to retrieve.
// The respective time and item ID fields
// which are set must never conflict if
// both are set.
Timeframe Timeframe `json:"timeframe,omitempty"`
// A checkpoint from which to resume the import.
Checkpoint json.RawMessage `json:"checkpoint,omitempty"`
// Options specific to the data source,
// as provided by NewOptions.
DataSourceOptions any `json:"data_source_options,omitempty"`
// TODO: WIP...
// // Maximum number of items to list; useful
// // for previews. Data sources should not
// // checkpoint previews.
// // TODO: still should enforce this in the processor... but this is good for the DS to know too, so it can limit its API calls, for example
// MaxItems int `json:"max_items,omitempty"`
// TODO: WIP...
// // Whether the data source should prioritize precise
// // checkpoints for fast resumption over performance.
// // Typically, this implies that the data source
// // operates in single-threaded mode, and processes
// // its input linearly, rather than concurrently,
// // which can be difficult to checkpoint; whereas a
// // checkpoint during a linear traversal of the data
// // can be as simple as an integer index or count.
// // User can enable this if they are okay with
// // potentially slower processing time, but want
// // faster job resumption.
// PrioritizeCheckpoints bool
}
func (ij ImportJob) successCleanup() error {
// delete empty items from this import (items with no content and no meaningful relationships)
if err := ij.deleteEmptyItems(); err != nil {
return fmt.Errorf("deleting empty items: %w", err)
}
return nil
}
// deleteEmptyItems deletes items that have no content and no meaningful relationships,
// from the given import.
func (ij ImportJob) deleteEmptyItems() error {
// we actually keep rows with no content if they are in a relationship, or if
// they have a retrieval key, which implies that they will be completed later
// (bookmark items are also a special case: they may be empty, to later be populated
// by a snapshot, as long as they have metadata)
rows, err := ij.job.tl.db.ReadPool.QueryContext(ij.job.ctx, `SELECT id FROM extended_items
WHERE job_id=?
AND (data_text IS NULL OR data_text='')
AND (classification_name != ? OR metadata IS NULL)
AND data_file IS NULL
AND longitude IS NULL
AND latitude IS NULL
AND altitude IS NULL
AND retrieval_key IS NULL
AND id NOT IN (SELECT from_item_id FROM relationships WHERE to_item_id IS NOT NULL OR to_attribute_id IS NOT NULL)`,
ij.job.id, ClassBookmark.Name) // TODO: consider deleting regardless of relationships existing (remember the iMessage data source until we figured out why some referred-to rows were totally missing?)
if err != nil {
return fmt.Errorf("querying empty items: %w", err)
}
var emptyItems []int64
for rows.Next() {
var rowID int64
err := rows.Scan(&rowID)
if err != nil {
rows.Close() //nolint:sqlclosecheck
return fmt.Errorf("scanning item: %w", err)
}
emptyItems = append(emptyItems, rowID)
}
rows.Close()
if err = rows.Err(); err != nil {
return fmt.Errorf("iterating item rows: %w", err)
}
// nothing to do if no items were empty
if len(emptyItems) == 0 {
ij.job.Logger().Info("no empty items to delete")
return nil
}
ij.job.Logger().Info("deleting empty items from this import", zap.Int("count", len(emptyItems)))
retention := time.Duration(0)
return ij.job.tl.deleteItemRows(ij.job.ctx, emptyItems, false, &retention)
}
// generateThumbnailsForImportedItems generates thumbnails for qualifying items
// that were a part of the import associated with this processor. It should be
// run after the import completes. It only creates a thumbnail job if there
// are any imported items that qualify for a thumbnail.
func (ij ImportJob) generateThumbnailsForImportedItems() {
// no-op if thumbnails were generated during the import
if ij.ProcessingOptions.Thumbnails {
return
}
thumbnailCount := atomic.LoadInt64(ij.thumbnailCount)
if thumbnailCount == 0 {
ij.job.Logger().Info("no items qualify for thumbnail, so skipping thumbnail generation job")
return
}
ij.job.Logger().Info("creating thumbnail generation job from import")
job := thumbnailJob{
TasksFromImportJob: ij.job.ID(),
}
// thumbnail job will calculate its total size
if _, err := ij.job.tl.CreateJob(job, time.Time{}, 0, 0, ij.job.id); err != nil {
ij.job.Logger().Error("creating thumbnail job", zap.Error(err))
return
}
}
func (ij ImportJob) generateEmbeddingsForImportedItems() {
if enabled, ok := ij.job.tl.GetProperty(ij.job.tl.ctx, "semantic_features").(bool); !ok || !enabled {
return
}
ij.job.Logger().Info("creating embeddings job from import")
job := embeddingJob{
ItemsFromImportJob: ij.job.ID(),
}
// embedding job will calculate its total size
if _, err := ij.job.tl.CreateJob(job, time.Time{}, 0, 0, ij.job.id); err != nil {
ij.job.Logger().Error("creating embeddings job", zap.Error(err))
return
}
}
func (ij ImportJob) tempGraphFolder() string {
return filepath.Join(
os.TempDir(),
"timelinize",
fmt.Sprintf("job-%d", ij.job.ID()),
"interactive")
}
// couldBeMarkdown is a very naive Markdown detector. I'm trying to avoid regexp for performance,
// but this implementation is (admittedly) mildly effective. May have lots of false positives.
func couldBeMarkdown(input []byte) bool {
for _, pattern := range [][]byte{
// links, images, and banners
[]byte("](http"),
[]byte("](/"),
[]byte("!["), // images
[]byte("[!"), // (GitHub-flavored) banners/notices
// blocks
[]byte("\n> "),
[]byte("\n```"),
// lists
[]byte("\n1. "),
[]byte("\n- "),
[]byte("\n* "),
// headings; ignore the h1 "# " since it could just as likely be a code comment
[]byte("\n## "),
[]byte("\n### "),
[]byte("\n=\n"),
[]byte("\n==\n"),
[]byte("\n==="),
[]byte("\n-\n"),
[]byte("\n--\n"),
[]byte("\n---"),
} {
if bytes.Contains(input, pattern) {
return true
}
}
for _, pair := range [][]byte{
{'_'},
{'*', '*'},
{'*'},
{'`'},
} {
// this isn't perfect, because the matching "end token" could have been truncated
if count := bytes.Count(input, pair); count > 0 && count%2 == 0 {
return true
}
}
return false
}
// ImportPlan describes what will be imported and how.
type ImportPlan struct {
Files []FileImport `json:"files,omitempty"` // map of data source name to list of files+DSopt pairs
}
// FileImport represents a list of files to import with a specific
// configuration from a data source.
type FileImport struct {
// The name of the data source to use.
DataSourceName string `json:"data_source_name"`
// Configuration specific to the data source.
DataSourceOptions json.RawMessage `json:"data_source_options,omitempty"`
// The (OS-compatible) absolute filepaths to the files or directories
// to be imported. These paths may refer to files within archive files,
// so the archives.DeepFS type should be used to access them.
Filenames []string `json:"filenames"`
}
// ProposedImportPlan is a suggested import plan, to be reviewed and
// tweaked by the user. It is similar to the ImportPlan type, but
// instead of grouping a list of files by specific data source
// configurations, each file is listed together with its results
// from data source recognition, to be shown to the user so they
// can tweak the results and approve a final import plan.
type ProposedImportPlan struct {
Files []ProposedFileImport `json:"files,omitempty"`
}
type ProposedFileImport struct {
Filename string `json:"filename"`
FileType string `json:"file_type,omitempty"` // file, dir, or archive
DataSources []DataSourceRecognition `json:"data_sources,omitempty"`
}
func (p ProposedFileImport) String() string {
return fmt.Sprintf("{%s:%s %v}", p.FileType, p.Filename, p.DataSources)
}