1079 lines
42 KiB
Go
1079 lines
42 KiB
Go
/*
|
|
Timelinize
|
|
Copyright (c) 2013 Matthew Holt
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as published
|
|
by the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
package timeline
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"hash"
|
|
"io"
|
|
"io/fs"
|
|
"maps"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
)
|
|
|
|
func (p *processor) pipeline(ctx context.Context, batch []*Graph) error {
|
|
// During large imports, I've found that running ANALYZE every so often
|
|
// can be helpful for improving performance, since an import is much more
|
|
// than just an INSERT, there's lots of SELECTs along the way that use
|
|
// indexes. For example, in my tests importing about a quarter million
|
|
// messages (relation-heavy, since they are sent to an attribute), which
|
|
// I repeated twice, it would take 30 minutes to import without ANALYZE.
|
|
// But when running ANALYZE every so often, it only took 23 minutes.
|
|
// (This was before the DB indexes in the import process were optimized.)
|
|
// We optimize more frequently at the beginning of large imports, and
|
|
// less often thereafter. We actually just run PRAGMA optimize to let
|
|
// sqlite decide if ANALYZE is needed.
|
|
const optimizeFrequencyThreshold = 10000
|
|
p.rootGraphCount += len(batch)
|
|
if (p.rootGraphCount <= optimizeFrequencyThreshold && p.rootGraphCount%3500 < len(batch)) ||
|
|
(p.rootGraphCount > optimizeFrequencyThreshold && p.rootGraphCount%100000 < len(batch)) {
|
|
p.tl.optimizeDB(p.log.Named("optimizer"))
|
|
}
|
|
|
|
// sanitization / normalization phase
|
|
if err := p.phase0(ctx, batch); err != nil {
|
|
return err
|
|
}
|
|
|
|
// download item datas so we know what we're dealing with
|
|
if err := p.phase1(ctx, batch); err != nil {
|
|
return err
|
|
}
|
|
|
|
// now that we have all the data, and it's sanitized, add it to the timeline
|
|
if err := p.phase2(ctx, batch); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// phase0 sanitizes graphs in the batch, and also marks items for skipping if they
|
|
// cannot be sanitized in a way that makes them processable. It also enhances the
|
|
// data if possible, for example adding time zone information, if enabled.
|
|
func (p *processor) phase0(ctx context.Context, batch []*Graph) error {
|
|
// quick input sanitization: timestamps outside a certain range are
|
|
// invalid and obviously wrong (cannot be serialized to JSON), and
|
|
// clean up any metadata as well
|
|
for _, g := range batch {
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
if err := p.sanitizeAndEnhance(g); err != nil {
|
|
p.log.Error("sanitizing batch", zap.Error(err))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *processor) sanitizeAndEnhance(g *Graph) error {
|
|
if g == nil {
|
|
return nil
|
|
}
|
|
|
|
if g.Item != nil {
|
|
// TODO: Also ensure Timestamp, Timespan, and Timeframe, and all
|
|
// other time values are in the same zone as Timestamp. (If not,
|
|
// change them to that zone?)
|
|
|
|
// resolve reports of bad location data (#144) -- extremely unlikely a to actually be at (0, 0)
|
|
if g.Item.Location.Latitude != nil && *g.Item.Location.Latitude == 0 {
|
|
g.Item.Location.Latitude = nil
|
|
}
|
|
if g.Item.Location.Longitude != nil && *g.Item.Location.Longitude == 0 {
|
|
g.Item.Location.Longitude = nil
|
|
}
|
|
|
|
// yeah, right; no way 1-1-4193 is an intentional timestamp (as of 2025) (see #145)
|
|
const maxYearsFuture = 500
|
|
if g.Item.Timestamp.Year() >= time.Now().Year()+maxYearsFuture && g.Item.Timestamp.Month() == time.January && g.Item.Timestamp.Day() == 1 {
|
|
g.Item.Timestamp, g.Item.Timespan, g.Item.Timeframe = time.Time{}, time.Time{}, time.Time{}
|
|
p.log.Warn("sanitized timestamp", zap.Time("timestamp", g.Item.Timestamp))
|
|
}
|
|
|
|
// before we know whether we need to skip an item, we may need to fill
|
|
// out its time zone so our calculations are accurate
|
|
// augment timestamp with time zone if enabled, and time zone is missing
|
|
// TODO: somehow indicate that we've filled it in so it can be stored in the time_offset_origin column
|
|
if p.ij.ProcessingOptions.InferTimeZone {
|
|
timeZone := g.Item.Timestamp.Location()
|
|
if !g.Item.Timestamp.IsZero() &&
|
|
(timeZone == time.Local || timeZone == time.UTC) &&
|
|
g.Item.Location.Latitude != nil && g.Item.Location.Longitude != nil {
|
|
start := time.Now()
|
|
tzName := p.tzFinder.GetTimezoneName(*g.Item.Location.Longitude, *g.Item.Location.Latitude)
|
|
loc, err := time.LoadLocation(tzName)
|
|
duration := time.Since(start)
|
|
if err == nil {
|
|
if timeZone == time.Local {
|
|
// the time value is "wall time", or the time local to whenever it was recorded;
|
|
// we need to preserve the exact components of the wall time, while setting its
|
|
// time zone to what should be local based on the coordinates; in Go, this requires
|
|
// creating a new time.Time since we're actually changing the absolute moment in time
|
|
g.Item.Timestamp = time.Date(
|
|
g.Item.Timestamp.Year(),
|
|
g.Item.Timestamp.Month(),
|
|
g.Item.Timestamp.Day(),
|
|
g.Item.Timestamp.Hour(),
|
|
g.Item.Timestamp.Minute(),
|
|
g.Item.Timestamp.Second(),
|
|
g.Item.Timestamp.Nanosecond(),
|
|
loc,
|
|
)
|
|
} else if timeZone == time.UTC {
|
|
// the time is already an absolute moment in time, so we interpret that to be
|
|
// correct, we just need to assign a time zone for display/serialization purposes
|
|
g.Item.Timestamp = g.Item.Timestamp.In(loc)
|
|
}
|
|
g.Item.tsOffsetOrigin = tzOriginGeoLookup // record how/why we implicitly changed or set the time zone
|
|
p.log.Debug("assigned time zone from coordinates",
|
|
zap.Float64p("latitude", g.Item.Location.Latitude),
|
|
zap.Float64p("longitude", g.Item.Location.Longitude),
|
|
zap.String("time_zone", tzName),
|
|
zap.Duration("duration", duration))
|
|
}
|
|
if err != nil {
|
|
p.log.Error("could not load inferred location based on coordinates",
|
|
zap.Float64p("latitude", g.Item.Location.Latitude),
|
|
zap.Float64p("longitude", g.Item.Location.Longitude),
|
|
zap.String("time_zone", tzName),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// items outside the configured timeframe should be skipped
|
|
// (data sources should also elide these items and not even
|
|
// send them, if possible)
|
|
if p.skip(g.Item) {
|
|
return nil
|
|
}
|
|
|
|
g.Item.Metadata.Clean()
|
|
g.Item.Timestamp = validTime(g.Item.Timestamp)
|
|
g.Item.Timespan = validTime(g.Item.Timespan)
|
|
g.Item.Timeframe = validTime(g.Item.Timeframe)
|
|
}
|
|
|
|
// traverse graph nodes recursively to sanitize them
|
|
for _, edge := range g.Edges {
|
|
if err := p.sanitizeAndEnhance(edge.From); err != nil {
|
|
return err
|
|
}
|
|
if err := p.sanitizeAndEnhance(edge.To); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// skip returns true if the item is marked for skipping. Future processing
|
|
// phases should check an item's skipped field and no-op if it is true.
|
|
func (p *processor) skip(it *Item) bool {
|
|
// skip item if outside of timeframe (data source should do this for us, but
|
|
// ultimately we should enforce it: it just means the data source is being
|
|
// less efficient than it could be)
|
|
// TODO: also consider Timespan, Timeframe
|
|
if !it.Timestamp.IsZero() && !p.ij.ProcessingOptions.Timeframe.Contains(it.Timestamp) {
|
|
p.log.Warn("ignoring item outside of designated timeframe (data source should not send this item; it is probably being less efficient than it could be)",
|
|
zap.String("item_id", it.ID),
|
|
zap.Timep("tf_since", p.ij.ProcessingOptions.Timeframe.Since),
|
|
zap.Timep("tf_until", p.ij.ProcessingOptions.Timeframe.Until),
|
|
zap.Time("item_timestamp", it.Timestamp),
|
|
)
|
|
it.skip = true
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// phase1 downloads the data of each item, no lock on the DB unless thumbnail
|
|
// generation is enabled, but in that case DB isn't the bottleneck.
|
|
// The whole batch is downloaded concurrently.
|
|
func (p *processor) phase1(ctx context.Context, batch []*Graph) error {
|
|
wg := new(sync.WaitGroup)
|
|
for _, g := range batch {
|
|
if g.err != nil {
|
|
continue
|
|
}
|
|
if err := p.downloadDataFilesInGraph(ctx, g, wg); err != nil {
|
|
p.log.Error("downloading data files in graph", zap.Error(err))
|
|
g.err = err
|
|
}
|
|
}
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
func (p *processor) downloadDataFilesInGraph(ctx context.Context, g *Graph, wg *sync.WaitGroup) error {
|
|
if g == nil {
|
|
return nil
|
|
}
|
|
|
|
if g.Item != nil && !g.Item.skip {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
if err := p.downloadItemData(ctx, g.Item); err != nil {
|
|
p.log.Error("failed downloading item's data", zap.Error(err))
|
|
}
|
|
}()
|
|
}
|
|
|
|
// traverse graph and download their data files
|
|
for _, edge := range g.Edges {
|
|
if err := p.downloadDataFilesInGraph(ctx, edge.From, wg); err != nil {
|
|
return err
|
|
}
|
|
if err := p.downloadDataFilesInGraph(ctx, edge.To, wg); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *processor) downloadItemData(ctx context.Context, it *Item) error {
|
|
if it == nil || it.Content.Data == nil {
|
|
return nil
|
|
}
|
|
|
|
source, err := it.Content.Data(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("getting item's data stream: %w", err)
|
|
}
|
|
if source == nil {
|
|
// this is probably not intentional? warn developers so they hopefully notice
|
|
p.log.Warn("item's data func returned no reader and no error")
|
|
return nil
|
|
}
|
|
defer source.Close()
|
|
|
|
// if we don't know the content-type, try to detect it, first by sniffing
|
|
// the start of the data (more reliable), or look at the file extension if
|
|
// needed (less reliable) -- also do this if the content-type is known to
|
|
// be plaintext or markdown, which we can store in the table, since we will
|
|
// need to see if it's small enough to comfortably fit in the table
|
|
if it.Content.MediaType == "" || it.Content.isPlainTextOrMarkdown() {
|
|
// to detect it, we have to read the file into memory (or at least part
|
|
// of it); and for plaintext data, this also tells us whether the data
|
|
// is too large for us to want to put in this table or if we'll store it
|
|
// elsewhere
|
|
|
|
// NOTE: if this code ever gets moved into a separate function, make sure
|
|
// that the buffer is not returned until we're done processing the item; or
|
|
// copy it to a new buffer first, due to buffer pooling & reuse
|
|
bufPtr := sizePeekBufPool.Get().(*[]byte)
|
|
buf := *bufPtr
|
|
defer func() {
|
|
// From the standard lib's crypto/tls package:
|
|
// "You might be tempted to simplify this by just passing &buf to Put,
|
|
// but that would make the local copy of the buf slice header escape
|
|
// to the heap, causing an allocation. Instead, we keep around the
|
|
// pointer to the slice header returned by Get, which is already on the
|
|
// heap, and overwrite and return that."
|
|
// See: https://github.com/dominikh/go-tools/issues/1336
|
|
*bufPtr = buf
|
|
sizePeekBufPool.Put(bufPtr)
|
|
}()
|
|
|
|
n, err := io.ReadFull(source, buf)
|
|
if err == io.EOF { // no content
|
|
p.log.Debug("empty data stream",
|
|
zap.String("original_id", it.ID),
|
|
zap.String("data_file_path", it.dataFilePath),
|
|
zap.String("filename", it.Content.Filename),
|
|
zap.String("intermediate_path", it.IntermediateLocation))
|
|
return nil
|
|
}
|
|
if err != nil && err != io.ErrUnexpectedEOF { // it's OK if the buffer wasn't filled, it just means the item is smaller
|
|
return fmt.Errorf("buffering item's data stream to peek size and content-type: %w", err)
|
|
}
|
|
|
|
// In case the size of the data is less than the peek buffer, we carefully use only
|
|
// the first n bytes of the buffer, since beyond that is likely data from another
|
|
// item that this pooled buffer was previously used with
|
|
bufferedContent := buf[:n]
|
|
|
|
// now that we have a sample of the data (or possibly all of it),
|
|
// we can sniff the content-type
|
|
if it.Content.MediaType == "" {
|
|
it.Content.MediaType = detectContentType(bufferedContent, it.Content.Filename)
|
|
}
|
|
|
|
if n < len(buf) && it.Content.isPlainTextOrMarkdown() {
|
|
// item's content is smaller than the buffer and is plaintext, so
|
|
// we can store this comfortably in the items table; move a string
|
|
// copy of it to the item and return
|
|
dataTextStr := strings.TrimSpace(string(bufferedContent))
|
|
it.dataText = &dataTextStr
|
|
return nil
|
|
}
|
|
|
|
// content is either binary, or too large to fit comfortably in the
|
|
// items table, so we'll need to finish downloading and processing
|
|
// the data file; we ensure the data stream first reads from our
|
|
// buffer before it finishes reading from the source stream
|
|
source = io.NopCloser(io.MultiReader(bytes.NewReader(bufferedContent), source))
|
|
}
|
|
|
|
// this might be a naive assumption, but if the item classification is
|
|
// missing, but the item is clearly a common media type, we can probably
|
|
// classify the item anyway
|
|
if it.Classification.Name == "" {
|
|
if strings.HasPrefix(it.Content.MediaType, "image/") ||
|
|
strings.HasPrefix(it.Content.MediaType, "video/") ||
|
|
strings.HasPrefix(it.Content.MediaType, "audio/") {
|
|
it.Classification = ClassMedia
|
|
}
|
|
}
|
|
|
|
// open the output file and copy the data
|
|
var destination *os.File
|
|
destination, it.dataFilePath, err = p.tl.openUniqueCanonicalItemDataFile(ctx, p.log, nil, it, p.ds.Name)
|
|
if err != nil {
|
|
return fmt.Errorf("opening output data file: %w", err)
|
|
}
|
|
defer destination.Close()
|
|
|
|
p.log.Debug("opened canonical data file for writing",
|
|
zap.String("filename", it.Content.Filename),
|
|
zap.String("intermediate_path", it.IntermediateLocation),
|
|
zap.String("data_file", it.dataFilePath))
|
|
|
|
if err := p.downloadDataFile(it, source, destination); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// downloadDataFile downloads the data file and hashes it. It attaches the
|
|
// results to the item.
|
|
func (p *processor) downloadDataFile(it *Item, source io.Reader, destination *os.File) error {
|
|
if it == nil {
|
|
return nil
|
|
}
|
|
|
|
h := newHash()
|
|
|
|
dataFileSize, err := p.downloadAndHashDataFile(source, destination, h, path.Dir(it.dataFilePath))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if dataFileSize == 0 {
|
|
// don't keep empty data files laying around
|
|
p.log.Warn("downloaded data file was empty; removing file",
|
|
zap.String("item_original_id", it.ID),
|
|
zap.String("intermediate_path", it.IntermediateLocation),
|
|
zap.String("data_file_name", it.dataFilePath),
|
|
zap.Int64("bytes_written", it.dataFileSize))
|
|
|
|
if err := p.tl.deleteRepoFile(it.dataFilePath); err != nil {
|
|
p.log.Error("could not delete empty data file",
|
|
zap.String("item_original_id", it.ID),
|
|
zap.String("data_file_name", it.dataFilePath),
|
|
zap.Error(err))
|
|
}
|
|
// make sure it doesn't look like we still have a data file
|
|
it.dataFilePath = ""
|
|
} else {
|
|
it.dataFileSize = dataFileSize
|
|
it.dataFileHash = h.Sum(nil)
|
|
|
|
// If inline thumbnail generation is enabled, do that now; this acquires a lock or two on the DB,
|
|
// so it's definitely not super efficient, but: this is an opt-in code path, and the bottleneck
|
|
// here is usually the thumbnail generation itself, not the DB. I have not noticed a huge difference
|
|
// in the runtimes of this versus import then thumbnail job after.
|
|
if p.ij.ProcessingOptions.Thumbnails && qualifiesForThumbnail(&it.Content.MediaType) {
|
|
task := thumbnailTask{
|
|
tl: p.tl,
|
|
DataFile: it.dataFilePath,
|
|
DataType: it.Content.MediaType,
|
|
ThumbType: thumbnailType(it.Content.MediaType, false),
|
|
}
|
|
start := time.Now()
|
|
if thumb, thash, err := task.thumbnailAndThumbhash(p.ij.job.ctx, true); err != nil {
|
|
p.log.Error("failed generating thumbnail and thumbhash for item",
|
|
zap.String("item_original_id", it.ID),
|
|
zap.String("item_intermediate_path", it.IntermediateLocation),
|
|
zap.String("data_file_name", it.dataFilePath),
|
|
zap.Error(err))
|
|
} else {
|
|
it.thumbhash = thash // make sure to keep this, so it gets stored in the DB with the item
|
|
verb := "generated"
|
|
if thumb.alreadyExisted {
|
|
verb = "assigned existing"
|
|
}
|
|
p.log.Info(verb+" thumbnail for item",
|
|
zap.String("item_original_id", it.ID),
|
|
zap.String("item_intermediate_path", it.IntermediateLocation),
|
|
zap.String("data_file_name", it.dataFilePath),
|
|
zap.Duration("duration", time.Since(start)))
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// handleDuplicateItemDataFile checks to see if the checksum of the file downloaded for the item
|
|
// already exists in the database. It returns true if the file already exists and a replacement
|
|
// occurred to use the existing file (or, if integrity check on the existing file failed, the new
|
|
// file instead). If false is returned, the downloaded file is unique.
|
|
func (p *processor) handleDuplicateItemDataFile(ctx context.Context, tx *sql.Tx, it *Item) (bool, error) {
|
|
if it.dataFilePath == "" || len(it.dataFileHash) == 0 {
|
|
return false, errors.New("missing data filename and/or hash of contents")
|
|
}
|
|
|
|
var existingDataFilePath *string
|
|
|
|
// we can reuse the existing thumbhash (if there is one) for items that share the same data file
|
|
// (the "data_text IS NULL" condition is necessary to use the "idx_items_data_text_hash" index,
|
|
// and has been observed to yield a much faster query plan, but is not strictly needed for correctness;
|
|
// it should be fine since we don't set data_text when there's a data_file)
|
|
err := tx.QueryRowContext(ctx, `SELECT data_file, thumb_hash FROM items WHERE data_text IS NULL AND data_hash=? AND data_file!=? LIMIT 1`,
|
|
it.dataFileHash, it.dataFilePath).Scan(&existingDataFilePath, &it.thumbhash)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return false, nil // file is unique; carry on
|
|
}
|
|
if err != nil {
|
|
return false, fmt.Errorf("querying DB: %w", err)
|
|
}
|
|
|
|
// file is a duplicate! by the time this function returns (if successful),
|
|
// it.dataFileName should not exist anymore and should be reassigned to
|
|
// *existingDataFilePath instead.
|
|
|
|
p.log.Info("data file is a duplicate",
|
|
zap.String("duplicate_data_file", it.dataFilePath),
|
|
zap.Stringp("existing_data_file", existingDataFilePath),
|
|
zap.Binary("checksum", it.dataFileHash))
|
|
|
|
if existingDataFilePath == nil {
|
|
// ... that's weird, how's this possible? it has a hash but no file name recorded
|
|
return false, fmt.Errorf("item with matching hash is missing data file name; hash: %x", it.dataFileHash)
|
|
}
|
|
|
|
// TODO: maybe this all should be limited to only when integrity checks are enabled? how do we know that this download has the right version/contents?
|
|
p.log.Debug("verifying existing file is still the same",
|
|
zap.Stringp("existing_data_file", existingDataFilePath),
|
|
zap.Binary("checksum", it.dataFileHash))
|
|
|
|
// ensure the existing file is still the same
|
|
h := newHash()
|
|
f, err := os.Open(p.tl.FullPath(*existingDataFilePath))
|
|
if err != nil {
|
|
return false, fmt.Errorf("opening existing file: %w", err)
|
|
}
|
|
defer f.Close()
|
|
|
|
_, err = io.Copy(h, f)
|
|
if err != nil {
|
|
return false, fmt.Errorf("reading file for opportunistic integrity check: %w", err)
|
|
}
|
|
|
|
existingFileHash := h.Sum(nil)
|
|
|
|
if !bytes.Equal(it.dataFileHash, existingFileHash) {
|
|
// the existing file was corrupted, so restore it with
|
|
// what we just downloaded, which presumably succeeded
|
|
// (by simply renaming the file on disk, we don't have
|
|
// to update any entries in the DB)
|
|
p.log.Warn("existing data file failed integrity check (checksum on disk changed; file corrupted or modified?) - replacing existing file with this one",
|
|
zap.Stringp("existing_data_file", existingDataFilePath),
|
|
zap.Binary("expected_checksum", it.dataFileHash),
|
|
zap.Binary("actual_checksum", existingFileHash),
|
|
zap.String("replacement_data_file", it.dataFilePath))
|
|
err := os.Rename(p.tl.FullPath(it.dataFilePath), p.tl.FullPath(*existingDataFilePath))
|
|
if err != nil {
|
|
return false, fmt.Errorf("replacing modified data file: %w", err)
|
|
}
|
|
} else {
|
|
// everything checks out; delete the newly-downloaded file
|
|
// and use the existing file instead of duplicating it
|
|
p.log.Debug("existing file passed integrity check; using it instead of newly-downloaded duplicate",
|
|
zap.Stringp("existing_data_file", existingDataFilePath),
|
|
zap.String("new_data_file", it.dataFilePath),
|
|
zap.Binary("checksum", it.dataFileHash))
|
|
err = p.tl.deleteRepoFile(it.dataFilePath)
|
|
if err != nil {
|
|
return false, fmt.Errorf("removing duplicate data file: %w", err)
|
|
}
|
|
}
|
|
|
|
p.log.Info("removed duplicate data file based on integrity check",
|
|
zap.String("duplicate_data_file", it.dataFilePath),
|
|
zap.Stringp("existing_data_file", existingDataFilePath),
|
|
zap.Binary("checksum", it.dataFileHash))
|
|
|
|
it.dataFilePath = *existingDataFilePath
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// downloadAndHashDataFile writes source to destination, hashing it along the way.
|
|
// It returns the number of bytes copied.
|
|
func (p *processor) downloadAndHashDataFile(source io.Reader, destination *os.File, h hash.Hash, destinationDir string) (int64, error) {
|
|
// writing a data file consists of non-atomic steps: creating directory, writing the
|
|
// file, and cleaning up the file and its empty dir tree if the file was empty; and
|
|
// since data files are downloaded concurrently, we need to sync these steps, otherwise
|
|
// it's possible to delete a directory that another file is writing into... so we keep
|
|
// track of which directories are in use while creating data files, and when we're
|
|
// done, we can remove the "lock" on that directory
|
|
defer func() {
|
|
p.tl.dataFileWorkingDirsMu.Lock()
|
|
p.tl.dataFileWorkingDirs[destinationDir]--
|
|
if p.tl.dataFileWorkingDirs[destinationDir] == 0 {
|
|
delete(p.tl.dataFileWorkingDirs, destinationDir)
|
|
}
|
|
p.tl.dataFileWorkingDirsMu.Unlock()
|
|
}()
|
|
|
|
// give the hasher a copy of the file bytes
|
|
tr := io.TeeReader(source, h)
|
|
|
|
start := time.Now()
|
|
|
|
n, err := io.Copy(destination, tr)
|
|
if err != nil {
|
|
// TODO: The error should be highlighted as a notification of some sort. Ideally, keep what we have, but somehow indicate in the DB that it's corrupt/incomplete (like not filling out a data_hash)
|
|
_ = p.tl.deleteRepoFile(destination.Name())
|
|
return n, fmt.Errorf("copying contents: %w", err)
|
|
}
|
|
|
|
// TODO: If n == 0, should we retry? (would need to call h.Reset() first) - to help handle sporadic I/O issues maybe
|
|
|
|
// we can probably increase performance if we don't sync all the time, but that might be less reliable...
|
|
if n > 0 {
|
|
if err := destination.Sync(); err != nil {
|
|
return n, fmt.Errorf("syncing file after downloading: %w", err)
|
|
}
|
|
}
|
|
|
|
p.log.Debug("downloaded data file",
|
|
zap.String("filename", destination.Name()),
|
|
zap.Duration("duration", time.Since(start)),
|
|
zap.Int64("size", n))
|
|
|
|
return n, nil
|
|
}
|
|
|
|
// phase2 inserts or updates the graphs in the database. It tidies up the data file, if any, according
|
|
// to information found in the database. This is the sole phase that obtains a DB lock and must come
|
|
// after the data files have been downloaded. It also emits logs for the frontend.
|
|
func (p *processor) phase2(ctx context.Context, batch []*Graph) error {
|
|
tx, err := p.tl.db.WritePool.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("beginning transaction for batch: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
for _, g := range batch {
|
|
if err = p.processGraph(ctx, tx, g); err != nil {
|
|
p.log.Error("processing graph", zap.String("graph", g.String()), zap.Error(err))
|
|
g.err = err
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return fmt.Errorf("committing transaction for batch: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *processor) processGraph(ctx context.Context, tx *sql.Tx, g *Graph) error {
|
|
if g == nil {
|
|
return nil
|
|
}
|
|
|
|
// validate node type
|
|
if g.Item != nil && g.Entity != nil {
|
|
return fmt.Errorf("ambiguous node in graph is both an item and entity node (item_graph=%p)", g)
|
|
}
|
|
|
|
// this whole big thing is one huge log so the UI can stream
|
|
// a sample of live import data
|
|
defer func() {
|
|
// logging the progress every item/entity that gets processed is actually
|
|
// not super efficient, so we use this trick to only prepare the log entry
|
|
// if it will, in fact, be logged (we sample logs to increase efficiency,
|
|
// but those gains are most realized when we avoid our own processing if
|
|
// a particular log entry will be dropped too, hence the call to Check())
|
|
if checkedLog := p.log.Check(zapcore.InfoLevel, "finished graph"); checkedLog != nil {
|
|
graphType := "item"
|
|
if g.Entity != nil {
|
|
graphType = "entity"
|
|
}
|
|
|
|
fields := []zapcore.Field{
|
|
zap.String("graph", fmt.Sprintf("%p", g)),
|
|
zap.String("type", graphType),
|
|
zap.Uint64("row_id", g.rowID.id()),
|
|
zap.Int64("new_entities", atomic.LoadInt64(p.ij.newEntityCount)),
|
|
zap.Int64("new_items", atomic.LoadInt64(p.ij.newItemCount)),
|
|
zap.Int64("updated_items", atomic.LoadInt64(p.ij.updatedItemCount)),
|
|
zap.Int64("skipped_items", atomic.LoadInt64(p.ij.skippedItemCount)),
|
|
zap.Int64("total_items", atomic.LoadInt64(p.ij.itemCount)),
|
|
}
|
|
|
|
if g.Item != nil && !g.Item.Timestamp.IsZero() {
|
|
fields = append(fields, zap.Time("item_timestamp", g.Item.Timestamp))
|
|
}
|
|
|
|
entityAttr := func(e Entity) zapcore.Field {
|
|
if e.Name != "" {
|
|
return zap.String("entity", e.Name)
|
|
}
|
|
for _, attr := range e.Attributes {
|
|
if attr.Identifying || attr.Identity {
|
|
return zap.Any("entity", attr.Value)
|
|
}
|
|
}
|
|
return zap.Stringp("entity", nil)
|
|
}
|
|
item, entity := g.Item, g.Entity
|
|
if options, obfuscate := p.tl.obfuscationMode(); obfuscate {
|
|
// We tediously (shallow-ish) copy the values that are anonymized,
|
|
// since we have to log them with obfuscation mode enabled. Why
|
|
// do we need to copy them first? Because the tx isn't finished
|
|
// yet. This whole method is one iteration's call as part of a
|
|
// batch, so if we change the values right now, they'll go into
|
|
// the DB like that -- I have verified this by inspecting the DB,
|
|
// and found obfuscated values -- yikes! so we do have to copy
|
|
// the values that get anonymized, even if we don't log them.
|
|
if item != nil {
|
|
anonItem := *item
|
|
anonItem.row.Anonymize(options)
|
|
anonItem.row.Metadata = make(json.RawMessage, len(item.row.Metadata))
|
|
copy(anonItem.row.Metadata, item.row.Metadata)
|
|
anonItem.Owner.Attributes = make([]Attribute, len(item.Owner.Attributes))
|
|
copy(anonItem.Owner.Attributes, item.Owner.Attributes)
|
|
for i := range anonItem.Owner.Attributes {
|
|
anonItem.Owner.Attributes[i].Metadata = make(Metadata)
|
|
maps.Copy(anonItem.Owner.Attributes[i].Metadata, item.Owner.Attributes[i].Metadata)
|
|
}
|
|
anonItem.Owner.Anonymize(options)
|
|
item = &anonItem
|
|
}
|
|
if entity != nil {
|
|
anonEntity := *entity
|
|
anonEntity.Metadata = make(Metadata, len(entity.Metadata))
|
|
maps.Copy(anonEntity.Metadata, entity.Metadata)
|
|
anonEntity.Attributes = make([]Attribute, len(entity.Attributes))
|
|
copy(anonEntity.Attributes, entity.Attributes)
|
|
for i := range anonEntity.Attributes {
|
|
anonEntity.Attributes[i].Metadata = make(Metadata)
|
|
maps.Copy(anonEntity.Attributes[i].Metadata, entity.Attributes[i].Metadata)
|
|
}
|
|
anonEntity.ID = g.rowID.id()
|
|
anonEntity.Anonymize(options)
|
|
entity = &anonEntity
|
|
}
|
|
}
|
|
if item != nil {
|
|
size := item.dataFileSize
|
|
if item.row.DataText != nil {
|
|
size = int64(len(*item.row.DataText))
|
|
}
|
|
preview := item.row.DataText
|
|
const maxPreviewLen = 35
|
|
if preview != nil && len(*preview) > maxPreviewLen {
|
|
shortPreview := (*preview)[:maxPreviewLen]
|
|
preview = &shortPreview
|
|
}
|
|
fields = append(fields,
|
|
zap.String("status", string(item.row.howStored)),
|
|
zap.String("classification", item.Classification.Name),
|
|
zap.Stringp("preview", preview),
|
|
zap.Stringp("filename", item.row.Filename),
|
|
zap.Stringp("original_path", item.row.OriginalLocation),
|
|
zap.Stringp("intermediate_path", item.row.IntermediateLocation),
|
|
zap.Int64("size", size),
|
|
zap.Float64p("lat", item.row.Latitude),
|
|
zap.Float64p("lon", item.row.Longitude),
|
|
zap.String("media_type", item.Content.MediaType),
|
|
entityAttr(item.Owner))
|
|
} else if entity != nil {
|
|
fields = append(fields, entityAttr(*entity))
|
|
}
|
|
checkedLog.Write(fields...)
|
|
}
|
|
}()
|
|
|
|
// process root node
|
|
switch {
|
|
case g.Entity != nil:
|
|
var err error
|
|
g.rowID, err = p.processEntity(ctx, tx, *g.Entity)
|
|
if err != nil {
|
|
return fmt.Errorf("processing entity node: %w", err)
|
|
}
|
|
case g.Item != nil:
|
|
var err error
|
|
g.rowID, err = p.processItem(ctx, tx, g.Item)
|
|
if err != nil {
|
|
return fmt.Errorf("processing item node: %w", err)
|
|
}
|
|
}
|
|
|
|
// process connected nodes
|
|
for _, r := range g.Edges {
|
|
err := p.processRelationship(ctx, tx, r, g)
|
|
if err != nil {
|
|
p.log.Error("processing relationship",
|
|
zap.Uint64("item_or_attribute_row_id", g.rowID.id()),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *processor) processItem(ctx context.Context, tx *sql.Tx, it *Item) (latentID, error) {
|
|
if it.skip {
|
|
return latentID{}, nil
|
|
}
|
|
|
|
// if this item has a data file, check if it's a duplicate and handle accordingly
|
|
if it.dataFilePath != "" {
|
|
_, err := p.handleDuplicateItemDataFile(ctx, tx, it)
|
|
if err != nil {
|
|
return latentID{}, fmt.Errorf("checking if item data file was %w", err)
|
|
}
|
|
}
|
|
|
|
itemRowID, err := p.storeItem(ctx, tx, it)
|
|
if err != nil {
|
|
return latentID{itemID: itemRowID}, err
|
|
}
|
|
|
|
return latentID{itemID: itemRowID}, nil
|
|
}
|
|
|
|
// storeItem stores the item in the database, updating an existing row if pertinent.
|
|
// It returns the row ID of the item.
|
|
func (p *processor) storeItem(ctx context.Context, tx *sql.Tx, it *Item) (uint64, error) {
|
|
// keep count of number of items processed, mainly for logging
|
|
defer atomic.AddInt64(p.ij.itemCount, 1)
|
|
|
|
// prepare for DB queries to see if we have this same item already
|
|
// in some form or another
|
|
var dsName *string
|
|
if p.ds.Name != "" {
|
|
dsName = &p.ds.Name
|
|
}
|
|
it.makeIDHash(dsName)
|
|
it.makeContentHash()
|
|
|
|
// if this function returns with an error, we can avoid an orphaned data file by deleting it
|
|
// TODO: a background maintenance routine/job that regularly deletes orphaned data files
|
|
var err error
|
|
if it.dataFilePath != "" {
|
|
defer func() {
|
|
if err != nil {
|
|
if err := p.tl.deleteRepoFile(it.dataFilePath); err != nil {
|
|
p.log.Warn("could not clean up data file of failed item",
|
|
zap.String("data_file_path", it.dataFilePath),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
uniqueConstraints := it.Retrieval.finalUniqueConstraints(p.ij.ProcessingOptions.ItemUniqueConstraints)
|
|
|
|
// if the item is already in our DB, load it
|
|
var ir ItemRow
|
|
ir, err = p.tl.loadItemRow(ctx, tx, 0, 0, it, dsName, uniqueConstraints, true)
|
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
|
return 0, fmt.Errorf("looking up item in database: %w", err)
|
|
}
|
|
defer func() { it.row = ir }() // used later in the pipeline for logging
|
|
|
|
// if the item is already in the DB, figure out what, if anything, will be updated
|
|
if ir.ID > 0 {
|
|
// first we need to distill user's update preferences down to update policies for the DB
|
|
if err := p.distillUpdatePolicies(it, ir); err != nil {
|
|
return 0, fmt.Errorf("distilling initial update policies: %w", err)
|
|
}
|
|
|
|
// now determine if we should process this duplicate/existing item at all
|
|
reprocessItem, _ := p.shouldProcessExistingItem(it, ir, it.dataFilePath != "")
|
|
if !reprocessItem {
|
|
atomic.AddInt64(p.ij.skippedItemCount, 1)
|
|
p.log.Debug("skipping processing of existing item",
|
|
zap.Uint64("row_id", ir.ID),
|
|
zap.String("intermediate_path", it.IntermediateLocation),
|
|
zap.String("filename", it.Content.Filename),
|
|
zap.String("classification", it.Classification.Name),
|
|
zap.String("original_id", it.ID))
|
|
// add some info for logging purposes
|
|
ir.howStored = itemSkipped
|
|
return ir.ID, nil
|
|
}
|
|
|
|
p.log.Debug("found existing item row according to configured unique constraints",
|
|
zap.Uint64("row_id", ir.ID),
|
|
zap.String("filename", it.Content.Filename),
|
|
zap.String("intermediate_path", it.IntermediateLocation),
|
|
zap.String("original_id", it.ID))
|
|
|
|
// if existing item row already has a data file, see if the incoming item changes a filename;
|
|
// this can happen if the filename differs from what is in the DB (or adds one where it was
|
|
// missing), or if there is no filename and the timestamp is different
|
|
if ir.DataFile != nil {
|
|
tsUpdatePol := it.fieldUpdatePolicies["timestamp"]
|
|
filenameUpdatePol := it.fieldUpdatePolicies["filename"]
|
|
|
|
incomingItemAddsFilename := ir.Filename == nil && it.Content.Filename != ""
|
|
incomingItemUpdatesFilename := filenameUpdatePol > 0 && (ir.Filename == nil || it.Content.Filename != *ir.Filename)
|
|
incomingItemUpdatesFilenameWithTime := ir.Filename == nil && tsUpdatePol > 0 &&
|
|
!it.Timestamp.IsZero() && (ir.Timestamp == nil || !it.Timestamp.Equal(*ir.Timestamp))
|
|
|
|
if incomingItemAddsFilename || incomingItemUpdatesFilename || incomingItemUpdatesFilenameWithTime {
|
|
newDataFilePath, err := p.renameDataFile(ctx, tx, it, *ir.DataFile, it.Content.Filename)
|
|
if err != nil {
|
|
p.log.Error("could not rename existing data file given new filename information", zap.Error(err))
|
|
} else {
|
|
p.log.Info("renamed existing data file given new filename information",
|
|
zap.String("new_filename", it.Content.Filename),
|
|
zap.String("old_data_file_path", *ir.DataFile),
|
|
zap.String("new_data_file_path", newDataFilePath))
|
|
ir.DataFile = &newDataFilePath
|
|
ir.Filename = &it.Content.Filename
|
|
}
|
|
}
|
|
}
|
|
|
|
// if a filename is already in the database, but is not in this item graph, use the one in the database
|
|
// (useful if the data source sends an item piecewise, like filename first, then actual data file in a later graph)
|
|
if ir.Filename != nil && it.Content.Filename == "" && it.dataFilePath != "" {
|
|
newDataFilePath, err := p.renameDataFile(ctx, tx, it, it.dataFilePath, *ir.Filename)
|
|
if err != nil {
|
|
p.log.Error("could not rename data file given filename information found in database", zap.Error(err))
|
|
} else {
|
|
p.log.Info("renamed data file given filename information found in database",
|
|
zap.String("discovered_filename", *ir.Filename),
|
|
zap.String("old_data_file_path", it.dataFilePath),
|
|
zap.String("new_data_file_path", newDataFilePath))
|
|
it.dataFilePath = newDataFilePath
|
|
}
|
|
}
|
|
}
|
|
|
|
// we assume we're on a case-sensitive file system; that means that the filename might collide
|
|
// with another if moved to a case-sensitive file system - that'd be really bad, so let's make
|
|
// sure the filename is unique case-insensitively too
|
|
if it.dataFilePath != "" {
|
|
// query the DB for a data_file that matches case-insensitively (data_file is defined to be COLLATE NOCASE)
|
|
// and doesn't match exactly ("COLLATE BINARY")
|
|
var count int
|
|
err = tx.QueryRowContext(ctx, `SELECT count() FROM items WHERE data_file=? AND data_file!=? COLLATE BINARY LIMIT 1`,
|
|
it.dataFilePath, it.dataFilePath).Scan(&count)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("checking DB for case-insensitive filename uniqueness: %w", err)
|
|
}
|
|
if count > 0 {
|
|
newDataFileName, err := p.renameDataFile(ctx, tx, it, it.dataFilePath, path.Base(it.dataFilePath))
|
|
if err == nil {
|
|
p.log.Info("data file renamed to be case-insensitively unique",
|
|
zap.String("old_data_file_path", it.dataFilePath),
|
|
zap.String("new_data_file_path", newDataFileName))
|
|
it.dataFilePath = newDataFileName
|
|
} else {
|
|
p.log.Error("could not rename existing data file to avoid potential case-insensitive collisions", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// make a copy of this 'cause we might use it later to clean up a data file if we ended up setting it to NULL or replacing it
|
|
startingDataFile := ir.DataFile
|
|
|
|
// convert incoming item to a row that can be inserted into the DB
|
|
err = p.fillItemRow(ctx, tx, &ir, it)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("assembling item for storage: %w", err)
|
|
}
|
|
|
|
// execute query to insert or update the item
|
|
ir.ID, ir.howStored, err = p.insertOrUpdateItem(ctx, tx, ir, it.HasContent(), it.fieldUpdatePolicies)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("storing item in database: %w (row_id=%d item_id=%v)", err, ir.ID, ir.OriginalID)
|
|
}
|
|
|
|
// items that are updated should have new embeddings generated; delete any old ones
|
|
if ir.howStored == itemUpdated {
|
|
_, err = tx.ExecContext(ctx, `DELETE FROM embeddings WHERE item_id=?`, ir.ID)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("deleting old embeddings of updated item(s): %w", err)
|
|
}
|
|
}
|
|
|
|
// if the incoming data file didn't end up being used, clean it up
|
|
if it.dataFilePath != "" {
|
|
var count int
|
|
err = tx.QueryRowContext(ctx, `SELECT count() FROM items WHERE data_file=? LIMIT 1`, it.dataFilePath).Scan(&count)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("checking DB for unused incoming data file: %w", err)
|
|
}
|
|
if count == 0 {
|
|
if err = p.tl.deleteRepoFile(it.dataFilePath); err != nil {
|
|
p.log.Error("unable to clean up unused incoming data file",
|
|
zap.String("data_file_path", it.dataFilePath),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// if there's a chance that we just set the data_file to NULL, check to see if the
|
|
// old file is no longer referenced in the DB, and if not, clean it up
|
|
if startingDataFile != nil && ir.DataFile == nil {
|
|
if err := p.tl.deleteDataFileAndThumbnailIfUnreferenced(ctx, tx, *startingDataFile); err != nil {
|
|
p.log.Error("cleaning up unused data file",
|
|
zap.Uint64("item_row_id", ir.ID),
|
|
zap.Stringp("data_file_name", startingDataFile),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// if this replaced an item's previous data file, clean up the old one if it is no longer
|
|
// referenced by any other items
|
|
if it.dataFilePath != "" && startingDataFile != nil && it.dataFilePath != *startingDataFile {
|
|
if err := p.tl.deleteDataFileAndThumbnailIfUnreferenced(ctx, tx, *startingDataFile); err != nil {
|
|
p.log.Error("could not clean up old, unreferenced data file and any associated thumbnail",
|
|
zap.String("old_data_file", *startingDataFile),
|
|
zap.String("replaced_by", it.dataFilePath),
|
|
zap.Uint64("row_id", ir.ID),
|
|
zap.Error(err))
|
|
}
|
|
|
|
// additionally, if the incoming file has the same filename as the one we just deleted, let's see
|
|
// if now that filename is available, to try to preserve the item's original filename best we can
|
|
// (for example, if an item with a file named "A.JPG" is being replaced by a new "A.JPG", the
|
|
// processor will initially create "A__asdf.jpg" because "A.JPG" already exists; but once we get
|
|
// here, that old file has been deleted, so we can now restore the original name to the new file)
|
|
if it.intendedDataFileName != "" && path.Base(it.dataFilePath) != it.intendedDataFileName {
|
|
desiredFilename := path.Join(path.Dir(it.dataFilePath), it.intendedDataFileName)
|
|
desiredFilenameFullPath := p.tl.FullPath(desiredFilename)
|
|
f, err := os.OpenFile(desiredFilenameFullPath, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0600)
|
|
if err == nil {
|
|
f.Close()
|
|
if err := os.Rename(p.tl.FullPath(it.dataFilePath), desiredFilenameFullPath); err != nil {
|
|
p.log.Error("could not restore data file name from temporary name",
|
|
zap.String("temporary_name", it.dataFilePath),
|
|
zap.String("name_to_restore", desiredFilename),
|
|
zap.Uint64("row_id", ir.ID),
|
|
zap.Error(err))
|
|
} else {
|
|
_, err = tx.ExecContext(ctx, "UPDATE items SET data_file=? WHERE data_file=?", desiredFilename, it.dataFilePath)
|
|
if err == nil {
|
|
it.dataFilePath = desiredFilename
|
|
} else {
|
|
p.log.Error("could not update row with renamed data file paths",
|
|
zap.String("previous_path", it.dataFilePath),
|
|
zap.String("new_path", desiredFilename),
|
|
zap.Uint64("row_id", ir.ID),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
} else if !errors.Is(err, fs.ErrExist) {
|
|
p.log.Error("could not create placeholder file for data file of the which original filename is being restored",
|
|
zap.String("temporary_name", it.dataFilePath),
|
|
zap.String("name_to_restore", desiredFilename),
|
|
zap.Uint64("row_id", ir.ID),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Count data files which are eligible for a thumbnail and which are not excluded from
|
|
// receiving a thumbnail (like live photos/motion picture sidecar files, which are
|
|
// generally an exception from normal related items). This count is not used to configure
|
|
// the thumbnail job's total size, but it was in a previous version of the code. Now the
|
|
// job calculates it when it starts. This count is mainly used to determine whether to
|
|
// even start a thumbnail job. TODO: That said, maybe we should always start a thumbnail
|
|
// job even if it does nothing? Then we don't have to count here.
|
|
if qualifiesForThumbnail(ir.DataType) && !it.skipThumb {
|
|
atomic.AddInt64(p.ij.thumbnailCount, 1)
|
|
}
|
|
|
|
return ir.ID, nil
|
|
}
|
|
|
|
func (p *processor) renameDataFile(ctx context.Context, tx *sql.Tx, incoming *Item, oldDataFilePath, newFilename string) (string, error) {
|
|
incoming.Content.Filename = newFilename
|
|
|
|
placeholderFile, newDataFilePath, err := p.tl.openUniqueCanonicalItemDataFile(ctx, p.log, tx, incoming, p.ds.Name)
|
|
if err != nil {
|
|
return "", fmt.Errorf("could not rename data file: %w (old_data_file_name=%s new_filename=%s)", err, oldDataFilePath, newFilename)
|
|
}
|
|
_ = placeholderFile.Close()
|
|
|
|
// NOTE: if DB transaction fails, this rename does not get rolled back
|
|
err = os.Rename(p.tl.FullPath(oldDataFilePath), p.tl.FullPath(newDataFilePath))
|
|
if err != nil {
|
|
return "", fmt.Errorf("could not rename data file: %w (old=%s new=%s)", err, oldDataFilePath, newDataFilePath)
|
|
}
|
|
|
|
logger := p.log.With(
|
|
zap.String("old_data_file_path", oldDataFilePath),
|
|
zap.String("new_data_file_path", newDataFilePath),
|
|
zap.String("new_filename", newFilename),
|
|
)
|
|
|
|
// update all rows that may point to this data file to use the new path
|
|
if _, err := tx.ExecContext(ctx, "UPDATE items SET data_file=? WHERE data_file=?", newDataFilePath, oldDataFilePath); err != nil {
|
|
logger.Error("renamed data file, but failed to update timeline database", zap.Error(err))
|
|
} else {
|
|
logger.Info("renamed data file")
|
|
}
|
|
|
|
// also update thumbnails DB in case any thumbnails were already generated for the item
|
|
_, err = p.tl.thumbs.WritePool.ExecContext(ctx, "UPDATE thumbnails SET data_file=? WHERE data_file=?", newDataFilePath, oldDataFilePath)
|
|
if err != nil {
|
|
logger.Error("renamed data file, but failed to update thumbnail database", zap.Error(err))
|
|
}
|
|
|
|
// if the rename left the folder empty, clean it up
|
|
oldDir := path.Dir(oldDataFilePath)
|
|
if err := p.tl.cleanDirs(oldDir); err != nil {
|
|
logger.Error("failed tidying potentially empty folder of old path",
|
|
zap.String("old_data_file_path", oldDir),
|
|
zap.Error(err))
|
|
}
|
|
|
|
return newDataFilePath, nil
|
|
}
|