667 lines
25 KiB
Go
667 lines
25 KiB
Go
/*
|
|
Timelinize
|
|
Copyright (c) 2013 Matthew Holt
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as published
|
|
by the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
package timeline
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/fs"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/mholt/archives"
|
|
"github.com/ringsaturn/tzf"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type importJobCheckpoint struct {
|
|
EstimatedSize *int64 `json:"estimated_size"` // only set if currently in the estimating phase
|
|
OuterIndex int `json:"outer_index"`
|
|
InnerIndex int `json:"inner_index"`
|
|
|
|
// these fields remember the count of new items that will need thumbnails
|
|
ThumbnailCount int64 `json:"thumbnail_count"`
|
|
|
|
// This is passed through to the data source; and we would be using
|
|
// json.RawMessage here so that, when loading a checkpoint, the
|
|
// raw bytes are returned to the data source, but this would mean
|
|
// that we have to JSON-encode the data source's checkpoint for
|
|
// every graph that it sends with a checkpoint -- which is often all
|
|
// of them -- even though checkpoints only get persisted every so
|
|
// often... in other words, lots of unnecessary json.Marshal() calls;
|
|
// anyway, we let the job manager encode the whole checkpoint together
|
|
// in one call when it actually persists it to the DB; it just means
|
|
// that when we restore the checkpoint, we have to do one call to
|
|
// json.Marshal into bytes to give to the data source, but that is
|
|
// still way more efficient than marshaling for every item.
|
|
//
|
|
// As a general rule of thumb, no graphs should be sent down the
|
|
// pipeline while resuming from a checkpoint.
|
|
DataSourceCheckpoint any `json:"data_source_checkpoint,omitempty"`
|
|
}
|
|
|
|
type ImportJob struct {
|
|
// accessed atomically (placed first to align on 64-bit word boundary, for 32-bit systems)
|
|
// (reset each time the job is started)
|
|
itemCount, newItemCount, updatedItemCount, skippedItemCount *int64
|
|
newEntityCount *int64
|
|
thumbnailCount *int64
|
|
|
|
job *ActiveJob
|
|
|
|
// interactive jobs need access to the current processor to send graphs ready for processing
|
|
p *processor
|
|
pMu *sync.Mutex
|
|
|
|
Plan ImportPlan `json:"plan,omitempty"`
|
|
ProcessingOptions ProcessingOptions `json:"processing_options,omitempty"`
|
|
EstimateTotal bool `json:"estimate_total,omitempty"`
|
|
}
|
|
|
|
func (ij ImportJob) checkpoint(estimatedSize *int64, outer, inner int, ds any) error {
|
|
return ij.job.Checkpoint(importJobCheckpoint{
|
|
EstimatedSize: estimatedSize,
|
|
OuterIndex: outer,
|
|
InnerIndex: inner,
|
|
ThumbnailCount: atomic.LoadInt64(ij.thumbnailCount),
|
|
DataSourceCheckpoint: ds,
|
|
})
|
|
}
|
|
|
|
// TODO: This was useful during the refactoring of the import flow, but
|
|
// I ended up on a design that doesn't require rooting file systems at
|
|
// the root of the volume. Seems like good code, since the Go standard
|
|
// library doesn't have a way of doing this, so I hesitate to delete it
|
|
// entirely. :)
|
|
//
|
|
// // splitPathAtVolume splits an absolute filename into volume (or root)
|
|
// // and filename components. On Windows, the "root" of a filepath is the
|
|
// // volume (e.g. "C:"); on Unix-compatible systems, it is "/". Windows is
|
|
// // insane. Anyway, this is useful for constructing a fs.FS that provides
|
|
// // access to the whole file system. The returned rootOrVolume can be used
|
|
// // as the root of an os.DirFS or similar FS (such as archives.DeepFS),
|
|
// // and the relativePath is an fs.FS-compatible path that can be used to
|
|
// // access the file within the FS.
|
|
// //
|
|
// // See https://github.com/golang/go/issues/44279.
|
|
// func splitPathAtVolume(absFilename string) (rootOrVolume string, relativePath string, err error) {
|
|
// if !filepath.IsAbs(absFilename) {
|
|
// err = fmt.Errorf("filename is not absolute: %s", absFilename)
|
|
// return
|
|
// }
|
|
// rootOrVolume = "/" // assume non-Windows
|
|
// if vol := filepath.VolumeName(absFilename); vol != "" {
|
|
// rootOrVolume = vol // okay, it's actually Windows
|
|
// }
|
|
// relativePath, err = filepath.Rel(rootOrVolume, absFilename)
|
|
// if err != nil {
|
|
// return
|
|
// }
|
|
// relativePath = filepath.ToSlash(relativePath)
|
|
// return
|
|
// }
|
|
|
|
func (ij *ImportJob) Run(job *ActiveJob, checkpoint []byte) error {
|
|
ij.job = job
|
|
ij.itemCount = new(int64)
|
|
ij.newItemCount = new(int64)
|
|
ij.updatedItemCount = new(int64)
|
|
ij.skippedItemCount = new(int64)
|
|
ij.newEntityCount = new(int64)
|
|
ij.thumbnailCount = new(int64)
|
|
ij.pMu = new(sync.Mutex)
|
|
|
|
estimating := ij.EstimateTotal
|
|
|
|
if ij.ProcessingOptions.Interactive != nil {
|
|
ij.ProcessingOptions.Interactive.Graphs = make(chan *InteractiveGraph)
|
|
}
|
|
|
|
var chkpt importJobCheckpoint
|
|
if checkpoint != nil {
|
|
err := json.Unmarshal(checkpoint, &chkpt)
|
|
if err != nil {
|
|
return fmt.Errorf("decoding checkpoint: %w", err)
|
|
}
|
|
// restore thumbnail-eligible item count
|
|
atomic.StoreInt64(ij.thumbnailCount, chkpt.ThumbnailCount)
|
|
// in theory, resuming a job should have the same configuration as
|
|
// before, so this may take us out of "estimating" mode if that had
|
|
// already been completed, but I don't think it should ever put us
|
|
// INTO "estimating" mode
|
|
estimating = chkpt.EstimatedSize != nil
|
|
}
|
|
|
|
// two iterations of the phase loop: first to estimate size if enabled, then to actually import items
|
|
for {
|
|
start := time.Now()
|
|
|
|
// this should be cumulative across all the files
|
|
var totalSizeEstimate *int64
|
|
if estimating {
|
|
if chkpt.EstimatedSize == nil {
|
|
totalSizeEstimate = new(int64)
|
|
job.Logger().Info("estimating size; this may take a bit")
|
|
} else {
|
|
totalSizeEstimate = chkpt.EstimatedSize
|
|
job.Logger().Info("resuming size estimation; this may take a bit")
|
|
}
|
|
job.Message("Estimating total import size")
|
|
}
|
|
|
|
// outer loop: iterate the datasource+filename combos
|
|
for i := chkpt.OuterIndex; i < len(ij.Plan.Files); i++ {
|
|
if err := job.Context().Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
fileImport := ij.Plan.Files[i]
|
|
|
|
// load data source tidbits
|
|
ds, ok := dataSources[fileImport.DataSourceName]
|
|
if !ok {
|
|
return fmt.Errorf("file import %d: unknown data source: %s", i, fileImport.DataSourceName)
|
|
}
|
|
dsRowID, ok := job.tl.dataSources[fileImport.DataSourceName]
|
|
if !ok {
|
|
return fmt.Errorf("file import %d: data source ID is unknown: %s", i, fileImport.DataSourceName)
|
|
}
|
|
dsOpt, err := ds.UnmarshalOptions(fileImport.DataSourceOptions)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
logger := job.Logger().With(zap.String("data_source_name", ds.Name))
|
|
|
|
// inner loop: iterate the filenames assigned to this data source configuration
|
|
for j := chkpt.InnerIndex; j < len(fileImport.Filenames); j++ {
|
|
if err := job.Context().Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
filename := fileImport.Filenames[j]
|
|
|
|
if !estimating {
|
|
job.Message("Importing " + filename)
|
|
}
|
|
|
|
// create a new "outer" checkpoint when arriving at a new filename to be imported
|
|
// as long as we are not resuming from a data source checkpoint (if the data
|
|
// source has a checkpoint, we should not overwrite it / clear it out, in case
|
|
// the job stops before it has a chance to checkpoint again)
|
|
if chkpt.DataSourceCheckpoint == nil {
|
|
if err := ij.checkpoint(totalSizeEstimate, i, j, nil); err != nil {
|
|
job.Logger().Error("checkpointing", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// check for job cancellation
|
|
if err := job.Context().Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// if time zone inference is enabled, initialize the TZ finder,
|
|
// which can be expensive, so only do it once
|
|
var tzFinder tzf.F
|
|
if ij.ProcessingOptions.InferTimeZone {
|
|
tzFinder, err = tzf.NewDefaultFinder()
|
|
if err != nil {
|
|
return fmt.Errorf("initializing time zone finder: %w", err)
|
|
}
|
|
}
|
|
|
|
p := processor{
|
|
outerLoopIdx: i,
|
|
innerLoopIdx: j,
|
|
estimatedCount: totalSizeEstimate,
|
|
|
|
ij: ij,
|
|
ds: ds,
|
|
dsRowID: dsRowID,
|
|
dsOpt: dsOpt,
|
|
tl: job.Timeline(),
|
|
log: logger,
|
|
tzFinder: tzFinder,
|
|
}
|
|
ij.pMu.Lock()
|
|
ij.p = &p
|
|
ij.pMu.Unlock()
|
|
|
|
// Create the file system from which this file/dir will be accessed. It must be
|
|
// a DeepFS since the filename might refer to a path inside an archive file.
|
|
// Rooting the FS is nuanced. If the FS is rooted at the file or directory
|
|
// itself, data sources that walk the FS (like if they support importing from a
|
|
// directory of similar independent files) must root their walk at ".", and
|
|
// then must check the filename passed into the WalkDirFunc for "." and replace
|
|
// it with the real filename, if they want to access the file's actual name
|
|
// (e.g. to check its file extension). It also forbids access to siblings, which
|
|
// can be useful if sidecar files contain relevant data.
|
|
//
|
|
// One way to solve this is rooting the FS at the volume or real file system
|
|
// root (e.g. "C:" or "/"), then walking starting from the full path to the
|
|
// file. This provides important context about the file's location, which
|
|
// some data sources may use (e.g. looking for a timestamp in the folder
|
|
// hierarchy), and also always provides the actual filename (never "." since
|
|
// a walk wouldn't start from "."); however, this makes other FS operations
|
|
// tedious. For example, accessing a specific path in a subfolder of the
|
|
// starting filename involves convoluted logic to determine if the starting
|
|
// filename is a directory or file, and if a file, to call path.Dir() then
|
|
// path.Join(), just to get the desired path. This is error-prone and repetitive.
|
|
//
|
|
// We can improve on this by rooting the FS not at the filename or at the volume
|
|
// root, but somewhere in between: the file if it's a directory, or the parent
|
|
// dir if it's a file. That way, if the filename is a file, fs.WalkDir(filename)
|
|
// will walk only the file as expected, and with its true filename (not ".");
|
|
// and if it's a dir, the whole directory will be walked. (The filename of the
|
|
// top directory in that case will be ".", but when walking a directory, the name
|
|
// of the top directory is seldom important; it can still be retrieved using
|
|
// another method or variable anyway). This also makes Open()/Stat()/etc work
|
|
// as expected when simply passing in a relative path, without any complex lexical
|
|
// manipulation (i.e. passing in "a/b/c.txt" will open "/fs/root/a/b/c.txt" as
|
|
// intended because the fs is rooted at "/fs/root" regardless if the starting
|
|
// filename was "/fs/root" or "/fs/root/file.txt"). And because we root the FS
|
|
// at the parent dir of a file, it grants access to siblings/sidecars if needed.
|
|
// (We can't ALWAYS root the FS at the parent dir, because if the target is
|
|
// a directory, then its parent dir is a *different* directory, making the
|
|
// construction of subpaths more tedious to do correctly.)
|
|
//
|
|
// There are two main downsides I can see. One is that the filename associated
|
|
// with this DirEntry will no longer be the full path from the volume root, thus
|
|
// losing potentially valuable context as to the file's location. This can be
|
|
// remedied by adding a field to the DirEntry that stores the FS root path, so
|
|
// it can be referred to if needed. This has been done. The other downside is
|
|
// that when walking the FS, one must remember to root the walk at the associated
|
|
// Filename field in the DirEntry, and not "." (Filename will be "." in the case
|
|
// of a directory, but for a file, the FS root will be its parent folder and
|
|
// Filename will be the file's name in that directory, so "." would walk all
|
|
// files adjacent to the Filename).
|
|
|
|
// start by assuming the filename is a directory, in which case the FS is rooted
|
|
// at the directory itself, and thus the filename within the FS is "."
|
|
fsRoot, filenameInsideFS := filename, "."
|
|
|
|
// if the filepath contains an archive (i.e. the path itself traverses into
|
|
// an archive), we need to use DeepFS; otherwise, traversing into an archive
|
|
// during import is (probably?) not desired, so use a "regular" file system
|
|
var fsys fs.FS
|
|
if archives.PathContainsArchive(filepath.ToSlash(fsRoot)) {
|
|
// TODO: I'd love to know what the archive file is named, for troubleshooting multi-archive Google Takeouts
|
|
fsys = &archives.DeepFS{Root: fsRoot, Context: job.Context()}
|
|
} else {
|
|
fsys, err = archives.FileSystem(job.ctx, fsRoot, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("creating file system at %s: %w", fsRoot, err)
|
|
}
|
|
}
|
|
|
|
// this stat can be slow-ish, depending on if we're in a large, compressed tar file,
|
|
// and the file happens to be at the end, but we need to know if it's not a directory;
|
|
// and since we reuse the DeepFS, the results get amortized for later
|
|
info, err := fs.Stat(fsys, filenameInsideFS)
|
|
if err != nil {
|
|
return fmt.Errorf("could not stat file to import: %s: %w", filename, err)
|
|
}
|
|
if !info.IsDir() {
|
|
// as explained above, if it's NOT a directory, we root the FS at the
|
|
// parent folder and set the filename to the file's name in the dir.
|
|
fsRoot, filenameInsideFS = filepath.Split(fsRoot)
|
|
|
|
// since we changed the FS root, we have to update the fsys to reflect that
|
|
switch f := fsys.(type) {
|
|
case *archives.DeepFS:
|
|
f.Root = fsRoot
|
|
default:
|
|
// recreate the file system, since we might have just changed a FileFS
|
|
// into a DirFS, for example
|
|
fsys, err = archives.FileSystem(job.ctx, fsRoot, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("recreating file system at %s: %w", fsRoot, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
dirEntry := DirEntry{
|
|
DirEntry: fs.FileInfoToDirEntry(info),
|
|
FS: fsys,
|
|
FSRoot: fsRoot,
|
|
Filename: filenameInsideFS,
|
|
}
|
|
|
|
// the data source decodes its own checkpoint, so we give it its checkpoint as bytes
|
|
var dsCheckpoint json.RawMessage
|
|
if chkpt.DataSourceCheckpoint != nil {
|
|
dsCheckpoint, err = json.Marshal(chkpt.DataSourceCheckpoint)
|
|
if err != nil {
|
|
return fmt.Errorf("re-encoding data source checkpoint to pass to DS to resume: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := p.process(job.Context(), dirEntry, dsCheckpoint); err != nil {
|
|
ij.generateThumbnailsForImportedItems()
|
|
ij.generateEmbeddingsForImportedItems()
|
|
return fmt.Errorf("processing %s as %s: %w", filename, ds.Name, err)
|
|
}
|
|
|
|
// the data source checkpoint is only applicable to the starting point (the filename we resumed from)
|
|
chkpt.DataSourceCheckpoint = nil
|
|
|
|
// reset the checkpoint's inner index back to 0, since when we advance the outer loop
|
|
// we should always start its file list at 0 (otherwise we skip import tasks completely
|
|
// if it's > 0!)
|
|
chkpt.InnerIndex = 0
|
|
}
|
|
}
|
|
|
|
if estimating {
|
|
// inform the UI of the new total count
|
|
total := atomic.LoadInt64(totalSizeEstimate)
|
|
job.SetTotal(int(total))
|
|
job.FlushProgress()
|
|
job.Logger().Info("done with size estimation",
|
|
zap.Int64("estimated_size", total),
|
|
zap.Duration("duration", time.Since(start)))
|
|
|
|
// loop once more to import items (don't estimate again); and make sure we start the
|
|
// next phase from the beginning of the outer list (even if we resumed a checkpoint
|
|
// and the starting outer index is > 0), because the starting index only applies to
|
|
// that phase; we don't want to skip tasks for our import job!
|
|
estimating = false
|
|
chkpt.OuterIndex = 0
|
|
continue
|
|
}
|
|
|
|
// only import the data once
|
|
break
|
|
}
|
|
if err := job.Context().Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
job.Logger().Info("import complete; cleaning up")
|
|
|
|
if err := ij.successCleanup(); err != nil {
|
|
job.Logger().Error("cleaning up after import job", zap.Error(err))
|
|
}
|
|
|
|
ij.generateThumbnailsForImportedItems()
|
|
ij.generateEmbeddingsForImportedItems()
|
|
|
|
// this can prevent/resolve slow queries, especially useful after (large) imports
|
|
// TODO: maybe only necessary after *large* imports
|
|
go job.tl.optimizeDB(job.Logger())
|
|
|
|
return nil
|
|
}
|
|
|
|
// ImportParams specifies parameters for listing items
|
|
// from a data source. Some data sources might not be
|
|
// able to honor all fields.
|
|
type ImportParams struct {
|
|
// Send graphs on this channel to put them through the
|
|
// processing pipeline and add them to the timeline.
|
|
Pipeline chan<- *Graph `json:"-"`
|
|
|
|
// The logger to use.
|
|
Log *zap.Logger `json:"-"`
|
|
|
|
// Typically, job pauses are honored by the processor whenever
|
|
// a graph is sent down the pipeline. However, if data sources
|
|
// do a nontrivial amount of work between sending graphs, this
|
|
// can result in a long delay before pausing. So we expose the
|
|
// pause function here for the data source to call at each unit
|
|
// of work. This could be, for example, an iteration of a loop,
|
|
// which may be looking for the next item within the designated
|
|
// timeframe, that calls Continue(), which blocks until the job
|
|
// gets unpaused (and does nothing if the job is not paused).
|
|
Continue func() error `json:"-"`
|
|
|
|
// Time bounds on which data to retrieve.
|
|
// The respective time and item ID fields
|
|
// which are set must never conflict if
|
|
// both are set.
|
|
Timeframe Timeframe `json:"timeframe,omitempty"`
|
|
|
|
// A checkpoint from which to resume the import.
|
|
Checkpoint json.RawMessage `json:"checkpoint,omitempty"`
|
|
|
|
// Options specific to the data source,
|
|
// as provided by NewOptions.
|
|
DataSourceOptions any `json:"data_source_options,omitempty"`
|
|
|
|
// TODO: WIP...
|
|
// // Maximum number of items to list; useful
|
|
// // for previews. Data sources should not
|
|
// // checkpoint previews.
|
|
// // TODO: still should enforce this in the processor... but this is good for the DS to know too, so it can limit its API calls, for example
|
|
// MaxItems int `json:"max_items,omitempty"`
|
|
|
|
// TODO: WIP...
|
|
// // Whether the data source should prioritize precise
|
|
// // checkpoints for fast resumption over performance.
|
|
// // Typically, this implies that the data source
|
|
// // operates in single-threaded mode, and processes
|
|
// // its input linearly, rather than concurrently,
|
|
// // which can be difficult to checkpoint; whereas a
|
|
// // checkpoint during a linear traversal of the data
|
|
// // can be as simple as an integer index or count.
|
|
// // User can enable this if they are okay with
|
|
// // potentially slower processing time, but want
|
|
// // faster job resumption.
|
|
// PrioritizeCheckpoints bool
|
|
}
|
|
|
|
func (ij ImportJob) successCleanup() error {
|
|
// delete empty items from this import (items with no content and no meaningful relationships)
|
|
if err := ij.deleteEmptyItems(); err != nil {
|
|
return fmt.Errorf("deleting empty items: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// deleteEmptyItems deletes items that have no content and no meaningful relationships,
|
|
// from the given import.
|
|
func (ij ImportJob) deleteEmptyItems() error {
|
|
// we actually keep rows with no content if they are in a relationship, or if
|
|
// they have a retrieval key, which implies that they will be completed later
|
|
// (bookmark items are also a special case: they may be empty, to later be populated
|
|
// by a snapshot, as long as they have metadata)
|
|
rows, err := ij.job.tl.db.ReadPool.QueryContext(ij.job.ctx, `SELECT id FROM extended_items
|
|
WHERE job_id=?
|
|
AND (data_text IS NULL OR data_text='')
|
|
AND (classification_name != ? OR metadata IS NULL)
|
|
AND data_file IS NULL
|
|
AND longitude IS NULL
|
|
AND latitude IS NULL
|
|
AND altitude IS NULL
|
|
AND retrieval_key IS NULL
|
|
AND id NOT IN (SELECT from_item_id FROM relationships WHERE to_item_id IS NOT NULL OR to_attribute_id IS NOT NULL)`,
|
|
ij.job.id, ClassBookmark.Name) // TODO: consider deleting regardless of relationships existing (remember the iMessage data source until we figured out why some referred-to rows were totally missing?)
|
|
if err != nil {
|
|
return fmt.Errorf("querying empty items: %w", err)
|
|
}
|
|
|
|
var emptyItems []int64
|
|
for rows.Next() {
|
|
var rowID int64
|
|
err := rows.Scan(&rowID)
|
|
if err != nil {
|
|
rows.Close() //nolint:sqlclosecheck
|
|
return fmt.Errorf("scanning item: %w", err)
|
|
}
|
|
emptyItems = append(emptyItems, rowID)
|
|
}
|
|
rows.Close()
|
|
if err = rows.Err(); err != nil {
|
|
return fmt.Errorf("iterating item rows: %w", err)
|
|
}
|
|
|
|
// nothing to do if no items were empty
|
|
if len(emptyItems) == 0 {
|
|
ij.job.Logger().Info("no empty items to delete")
|
|
return nil
|
|
}
|
|
|
|
ij.job.Logger().Info("deleting empty items from this import", zap.Int("count", len(emptyItems)))
|
|
|
|
retention := time.Duration(0)
|
|
return ij.job.tl.deleteItemRows(ij.job.ctx, emptyItems, false, &retention)
|
|
}
|
|
|
|
// generateThumbnailsForImportedItems generates thumbnails for qualifying items
|
|
// that were a part of the import associated with this processor. It should be
|
|
// run after the import completes. It only creates a thumbnail job if there
|
|
// are any imported items that qualify for a thumbnail.
|
|
func (ij ImportJob) generateThumbnailsForImportedItems() {
|
|
// no-op if thumbnails were generated during the import
|
|
if ij.ProcessingOptions.Thumbnails {
|
|
return
|
|
}
|
|
|
|
thumbnailCount := atomic.LoadInt64(ij.thumbnailCount)
|
|
if thumbnailCount == 0 {
|
|
ij.job.Logger().Info("no items qualify for thumbnail, so skipping thumbnail generation job")
|
|
return
|
|
}
|
|
|
|
ij.job.Logger().Info("creating thumbnail generation job from import")
|
|
|
|
job := thumbnailJob{
|
|
TasksFromImportJob: ij.job.ID(),
|
|
}
|
|
|
|
// thumbnail job will calculate its total size
|
|
if _, err := ij.job.tl.CreateJob(job, time.Time{}, 0, 0, ij.job.id); err != nil {
|
|
ij.job.Logger().Error("creating thumbnail job", zap.Error(err))
|
|
return
|
|
}
|
|
}
|
|
|
|
func (ij ImportJob) generateEmbeddingsForImportedItems() {
|
|
if enabled, ok := ij.job.tl.GetProperty(ij.job.tl.ctx, "semantic_features").(bool); !ok || !enabled {
|
|
return
|
|
}
|
|
|
|
ij.job.Logger().Info("creating embeddings job from import")
|
|
|
|
job := embeddingJob{
|
|
ItemsFromImportJob: ij.job.ID(),
|
|
}
|
|
|
|
// embedding job will calculate its total size
|
|
if _, err := ij.job.tl.CreateJob(job, time.Time{}, 0, 0, ij.job.id); err != nil {
|
|
ij.job.Logger().Error("creating embeddings job", zap.Error(err))
|
|
return
|
|
}
|
|
}
|
|
|
|
func (ij ImportJob) tempGraphFolder() string {
|
|
return filepath.Join(
|
|
os.TempDir(),
|
|
"timelinize",
|
|
fmt.Sprintf("job-%d", ij.job.ID()),
|
|
"interactive")
|
|
}
|
|
|
|
// couldBeMarkdown is a very naive Markdown detector. I'm trying to avoid regexp for performance,
|
|
// but this implementation is (admittedly) mildly effective. May have lots of false positives.
|
|
func couldBeMarkdown(input []byte) bool {
|
|
for _, pattern := range [][]byte{
|
|
// links, images, and banners
|
|
[]byte("](http"),
|
|
[]byte("](/"),
|
|
[]byte("!["), // images
|
|
[]byte("[!"), // (GitHub-flavored) banners/notices
|
|
// blocks
|
|
[]byte("\n> "),
|
|
[]byte("\n```"),
|
|
// lists
|
|
[]byte("\n1. "),
|
|
[]byte("\n- "),
|
|
[]byte("\n* "),
|
|
// headings; ignore the h1 "# " since it could just as likely be a code comment
|
|
[]byte("\n## "),
|
|
[]byte("\n### "),
|
|
[]byte("\n=\n"),
|
|
[]byte("\n==\n"),
|
|
[]byte("\n==="),
|
|
[]byte("\n-\n"),
|
|
[]byte("\n--\n"),
|
|
[]byte("\n---"),
|
|
} {
|
|
if bytes.Contains(input, pattern) {
|
|
return true
|
|
}
|
|
}
|
|
for _, pair := range [][]byte{
|
|
{'_'},
|
|
{'*', '*'},
|
|
{'*'},
|
|
{'`'},
|
|
} {
|
|
// this isn't perfect, because the matching "end token" could have been truncated
|
|
if count := bytes.Count(input, pair); count > 0 && count%2 == 0 {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ImportPlan describes what will be imported and how.
|
|
type ImportPlan struct {
|
|
Files []FileImport `json:"files,omitempty"` // map of data source name to list of files+DSopt pairs
|
|
}
|
|
|
|
// FileImport represents a list of files to import with a specific
|
|
// configuration from a data source.
|
|
type FileImport struct {
|
|
// The name of the data source to use.
|
|
DataSourceName string `json:"data_source_name"`
|
|
|
|
// Configuration specific to the data source.
|
|
DataSourceOptions json.RawMessage `json:"data_source_options,omitempty"`
|
|
|
|
// The (OS-compatible) absolute filepaths to the files or directories
|
|
// to be imported. These paths may refer to files within archive files,
|
|
// so the archives.DeepFS type should be used to access them.
|
|
Filenames []string `json:"filenames"`
|
|
}
|
|
|
|
// ProposedImportPlan is a suggested import plan, to be reviewed and
|
|
// tweaked by the user. It is similar to the ImportPlan type, but
|
|
// instead of grouping a list of files by specific data source
|
|
// configurations, each file is listed together with its results
|
|
// from data source recognition, to be shown to the user so they
|
|
// can tweak the results and approve a final import plan.
|
|
type ProposedImportPlan struct {
|
|
Files []ProposedFileImport `json:"files,omitempty"`
|
|
}
|
|
|
|
type ProposedFileImport struct {
|
|
Filename string `json:"filename"`
|
|
FileType string `json:"file_type,omitempty"` // file, dir, or archive
|
|
DataSources []DataSourceRecognition `json:"data_sources,omitempty"`
|
|
}
|
|
|
|
func (p ProposedFileImport) String() string {
|
|
return fmt.Sprintf("{%s:%s %v}", p.FileType, p.Filename, p.DataSources)
|
|
}
|