1
0
Fork 0
timelinize/datasources/email/email.go
2025-10-27 15:49:29 -06:00

464 lines
13 KiB
Go

/*
Timelinize
Copyright (c) 2013 Matthew Holt
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
// Package email implements a data source for emails (mbox and eml files).
package email
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"net/mail"
"path"
"strings"
"time"
"github.com/jhillyerd/enmime/v2"
"github.com/timelinize/timelinize/timeline"
"go.uber.org/zap"
)
func init() {
err := timeline.RegisterDataSource(timeline.DataSource{
Name: "email",
Title: "Email",
Icon: "email.png",
NewOptions: func() any { return new(Options) },
NewFileImporter: func() timeline.FileImporter { return new(FileImporter) },
})
if err != nil {
timeline.Log.Fatal("registering data source", zap.Error(err))
}
}
// FileImporter can import the data from a file.
type FileImporter struct{}
// Recognize returns whether the file is recognized for this data source.
func (fi FileImporter) Recognize(_ context.Context, dirEntry timeline.DirEntry, _ timeline.RecognizeParams) (timeline.Recognition, error) {
rec := timeline.Recognition{DirThreshold: 0.9}
// TODO: proper detection, not just filename
if !dirEntry.IsDir() {
ext := strings.ToLower(path.Ext(dirEntry.Name()))
if ext == extMbox || ext == extEml {
rec.Confidence = 1
}
}
return rec, nil
}
// Options configures the data source.
type Options struct {
// Gmail labels to skip
GmailSkipLabels []string `json:"gmail_skip_labels"`
// TODO: We can use enmime.NewParser() to set custom options
}
// FileImport imports data from a file.
func (fi FileImporter) FileImport(ctx context.Context, dirEntry timeline.DirEntry, params timeline.ImportParams) error {
dsOpt := params.DataSourceOptions.(*Options)
// load prior checkpoint, if set
var chkpt checkpoint
if params.Checkpoint != nil {
err := json.Unmarshal(params.Checkpoint, &chkpt)
if err != nil {
return fmt.Errorf("decoding checkpoint: %w", err)
}
}
err := fs.WalkDir(dirEntry.FS, dirEntry.Filename, func(fpath string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
if strings.HasPrefix(d.Name(), ".") {
// skip hidden files & folders
if d.IsDir() {
return fs.SkipDir
}
return nil
}
if d.IsDir() {
return nil // traverse into subdirectories
}
// skip unsupported file types
ext := path.Ext(strings.ToLower(d.Name()))
if ext != extEml && ext != extMbox {
return nil
}
// catch up to checkpoint
if chkpt.File != "" {
if fpath != chkpt.File {
return nil
}
// at checkpointed file; clear checkpoint file so we don't skip remaining files
chkpt.File = ""
}
file, err := dirEntry.FS.Open(fpath)
if err != nil {
return err
}
defer file.Close()
// .eml files are easy: should be just a single message in them
if ext == extEml {
msg := message{mboxPath: fpath}
fi.processMessage(file, msg, params, dsOpt)
return nil
}
// .mbox files contain multiple messages
bufr := bufio.NewReader(file)
// we gradually fill buf with every line we read,
// and we'll keep current message state in msg
buf := new(bytes.Buffer)
msg := message{mboxPath: fpath}
// read each line of the mbox file, looking for boundary/separator
// lines that start with "From ", and fill the buffer up to each one
for {
// check for context cancellation
if err := ctx.Err(); err != nil {
return err
}
line, err := bufr.ReadBytes('\n')
if errors.Is(err, io.EOF) {
// don't forget to process last message in file
fi.processMessage(buf, msg, params, dsOpt)
break
}
if err != nil {
return err
}
// if not at a message boundary, append to buffer and continue
if !isBoundary(line, buf) {
buf.Write(line)
continue
}
// reached message boundary
// process buffered message and reset for next one
if buf.Len() > 0 {
// only process the message if we've caught up to the checkpoint (if any)
if chkpt.MessageIndex == 0 || msg.index >= chkpt.MessageIndex {
fi.processMessage(buf, msg, params, dsOpt)
chkpt.MessageIndex = 0 // clear checkpoint so we don't skip remaining messages
}
buf.Reset()
msg = message{mboxPath: fpath, index: msg.index + 1}
}
// boundary lines are anything goes, but generally we see a gibberish email address followed by a timestamp
if err = parseFromLine(&msg, line); err != nil {
params.Log.Warn("invalid or unrecognized 'From ' boundary line fields", zap.Error(err))
}
}
return nil
})
if err != nil {
return err
}
return nil
}
func (fi FileImporter) processMessage(r io.Reader, msg message, params timeline.ImportParams, dsOpt *Options) {
graph, err := fi.messageToGraph(r, msg, params, dsOpt)
if err != nil {
params.Log.Error("building item graph from envelope",
zap.Error(err),
zap.Int("message_index", msg.index))
}
if graph != nil {
params.Pipeline <- graph
}
}
func (FileImporter) messageToGraph(r io.Reader, msg message, opt timeline.ImportParams, dsOpt *Options) (*timeline.Graph, error) {
// parse message
env, err := enmime.ReadEnvelope(r)
if err != nil {
return nil, fmt.Errorf("reading envelope: %w (message_index=%d)", err, msg.index)
}
msg.Envelope = env
// process the result
ig, err := itemGraphFromEnvelope(msg, opt, dsOpt)
if err != nil {
return nil, fmt.Errorf("building item graph from envelope: %w (message_index=%d)", err, msg.index)
}
return ig, nil
}
// parseFromLine tries to parse the 'From ' boundary line of a mbox file.
// Boundary lines are "anything goes", but generally we see a gibberish
// email address followed by a timestamp.
func parseFromLine(msg *message, line []byte) error {
fields := bytes.Fields(line)
if len(fields) > 1 {
msg.FromLineEmail = string(fields[1])
}
const minFieldsRequired = 8
if len(fields) >= minFieldsRequired {
tsStr := string(bytes.Join(fields[2:minFieldsRequired], spaceBytes))
ts, err := time.Parse("Mon Jan 02 15:04:05 -0700 2006", tsStr)
if err != nil {
return fmt.Errorf("parsing timestamp: %w", err)
}
msg.FromLineTimestamp = ts
}
return nil
}
// itemGraphFromEnvelope builds the message's item graph. It may return nil and nil if the message
// is to be skipped, either because of a severe error that was logged, or configuration options.
func itemGraphFromEnvelope(m message, opt timeline.ImportParams, dsOpt *Options) (*timeline.Graph, error) {
// checkErrors returns the first severe error, and logs all others.
checkErrors := func(part *enmime.Part) error {
for _, err := range part.Errors {
if err.Severe {
return err
}
opt.Log.Warn(err.Name, zap.String("detail", err.Detail))
}
return nil
}
// skip message if there is a severe error at the root
if err := checkErrors(m.Root); err != nil {
return nil, err
}
// skip desired labels
labels := strings.Split(m.GetHeader("X-Gmail-Labels"), ",")
for _, skipLabel := range dsOpt.GmailSkipLabels {
for _, label := range labels {
if strings.EqualFold(label, skipLabel) {
return nil, nil
}
}
}
// TODO: make this configurable if user wants to prefer HTML...
rootDataText := m.Text
rootMediaType := "text/plain"
if rootDataText == "" {
rootDataText = m.HTML
rootMediaType = "text/html"
}
item := &timeline.Item{
Classification: timeline.ClassEmail,
Timestamp: m.timestamp(),
Owner: m.firstFrom(),
Content: timeline.ItemData{
Filename: "", // TODO:...?
MediaType: rootMediaType,
Data: timeline.StringData(rootDataText),
},
Metadata: timeline.Metadata{}, // TODO: lots of metadata in headers, probably!
}
// create graph and relate recipients to it
ig := &timeline.Graph{Item: item}
for _, recipient := range m.to("To") {
recipCopy := recipient
ig.ToEntity(timeline.RelSent, &recipCopy)
}
for _, cc := range m.to("Cc") {
ccCopy := cc
ig.ToEntity(timeline.RelCCed, &ccCopy)
}
// add attachments to graph
for i, attach := range m.Attachments {
// skip part if there are any severe errors
if err := checkErrors(attach); err != nil {
opt.Log.Error("parsing attachment",
zap.Error(err),
zap.Int("message_index", m.index),
zap.Int("attachment_index", i))
continue
}
item := &timeline.Item{
Classification: timeline.ClassEmail,
Timestamp: m.timestamp(), // TODO: if this is an image, could we try to get TS from exif?
Owner: m.firstFrom(),
Content: timeline.ItemData{
Filename: attach.FileName,
MediaType: attach.ContentType,
Data: timeline.ByteData(attach.Content),
},
Metadata: timeline.Metadata{}, // TODO: lots of metadata in headers, probably!
}
ig.ToItem(timeline.RelAttachment, item)
}
ig.Checkpoint = checkpoint{
File: m.mboxPath,
MessageIndex: m.index,
}
return ig, nil
}
// isBoundary returns true if line is a boundary/separator line
// starting with "From " and is preceded by an empty line at the
// tail end of buf (or is at the beginning of the file).
func isBoundary(line []byte, buf *bytes.Buffer) bool {
return bytes.HasPrefix(line, nextMailboxMessage) &&
(buf.Len() == 0 || // beginning of file
bytes.HasSuffix(buf.Bytes(), doubleLFbytes) ||
bytes.HasSuffix(buf.Bytes(), doubleCRLFbytes))
}
// message holds information about a single message/entry in a mailbox (.mbox) file.
type message struct {
mboxPath string // the path to the mbox file
index int // the position of the message in the mbox file (starting at 0)
FromLineEmail string // first field of the "From " separator line
FromLineTimestamp time.Time // timestamp following the email on the separator line
*enmime.Envelope // parsed message contents
}
// timestamp returns the best known timestamp for the message.
func (m message) timestamp() time.Time {
// prefer Date header
ts, err := mail.ParseDate(m.Root.Header.Get("Date"))
if err == nil {
return ts
}
// next, try Received headers... (there's also X-Received; not sure which to use...)
if recvHeaders := m.Root.Header["Received"]; len(recvHeaders) > 0 {
// prefer last Received header; these aren't great to rely on, but maybe better than nothing
for i := len(recvHeaders) - 1; i >= 0; i-- {
recvHeader := recvHeaders[len(recvHeaders)-1]
// date usually appears at the end, after a semicolon
semiColonPos := strings.LastIndex(recvHeader, "; ")
if semiColonPos > -1 {
end := strings.TrimSpace(recvHeader[semiColonPos+2:])
ts, err := time.Parse("Mon, 02 Jan 2006 15:04:05 -0700 (MST)", end)
if err == nil {
continue
}
return ts
}
}
}
// last resort, maybe we can use the date in the starting
// line of this mailbox database entry
if !m.FromLineTimestamp.IsZero() {
return m.FromLineTimestamp
}
return time.Time{}
}
// firstFrom returns the first person in the "From" header.
func (m message) firstFrom() timeline.Entity {
froms, err := m.AddressList("From")
if err == nil && len(froms) > 0 {
name := froms[0].Name
if name == froms[0].Address {
// very common for email to be repeated; leave this empty so a potential
// future import can fill in this information automatically
name = ""
}
return timeline.Entity{
Name: name,
Attributes: []timeline.Attribute{
{
Name: timeline.AttributeEmail,
Value: froms[0].Address,
Identity: true,
},
},
}
}
return timeline.Entity{}
}
// to returns all the recipients in the "To" header.
func (m message) to(fieldName string) []timeline.Entity {
tos, err := m.AddressList(fieldName)
if err != nil {
return nil
}
persons := make([]timeline.Entity, len(tos))
for i, to := range tos {
if to.Name == to.Address {
// very common for email to be repeated; leave this empty so a potential
// future import can fill in this information automatically
to.Name = ""
}
persons[i] = timeline.Entity{
Name: to.Name,
Attributes: []timeline.Attribute{
{
Name: timeline.AttributeEmail,
Value: to.Address,
Identity: true,
},
},
}
}
return persons
}
type checkpoint struct {
File string `json:"file"`
MessageIndex int `json:"message_index,omitempty"`
}
var (
nextMailboxMessage = []byte("From ") // prefix of line that separates messages in mailbox files
spaceBytes = []byte{' '}
doubleLFbytes = []byte("\n\n")
doubleCRLFbytes = []byte("\r\n\r\n")
)
const (
extMbox = ".mbox"
extEml = ".eml"
)