660 lines
20 KiB
Go
660 lines
20 KiB
Go
/*
|
|
Timelinize
|
|
Copyright (c) 2013 Matthew Holt
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as published
|
|
by the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
package timeline
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
)
|
|
|
|
// Conversation represents a conversation, or items that are sent from
|
|
// one entity to others.
|
|
type Conversation struct {
|
|
Entities []Entity `json:"entities"`
|
|
RecentMessages []ItemRow `json:"messages"`
|
|
|
|
// the set of entities defines a unique conversation
|
|
entities uint64Slice
|
|
}
|
|
|
|
// hasAllEntities returns true if the conversation has all the entities in entityIDs.
|
|
func (c Conversation) hasAllEntities(entityIDs []uint64) bool {
|
|
outer:
|
|
for _, findID := range entityIDs {
|
|
for _, ent := range c.Entities {
|
|
if ent.ID == findID {
|
|
continue outer
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// RecentConversations loads recent conversations from the DB. It honors a select number of ItemSearchParams fields.
|
|
func (tl *Timeline) RecentConversations(ctx context.Context, params ItemSearchParams) ([]*Conversation, error) {
|
|
tx, err := tl.db.ReadPool.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
// start by loading the bare conversations, which gives us entities involved and at least one message for each
|
|
convosMap, err := tl.loadRecentConversations(ctx, tx, params)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// TODO: we might want to try to fill each convo preview with as many messages as a preview allows,
|
|
// rather than only the messages we serendipitously discovered during our iterating
|
|
|
|
// convert convo map to slice for returning
|
|
conversations := make([]*Conversation, 0, len(convosMap))
|
|
for _, c := range convosMap {
|
|
conversations = append(conversations, c)
|
|
}
|
|
|
|
sort.Slice(conversations, func(i, j int) bool {
|
|
if conversations[j].RecentMessages[0].Timestamp == nil || conversations[i].RecentMessages[0].Timestamp == nil {
|
|
return false
|
|
}
|
|
return conversations[j].RecentMessages[0].Timestamp.Before(*conversations[i].RecentMessages[0].Timestamp)
|
|
})
|
|
|
|
// TODO: do we need to commit read-only transactions?
|
|
return conversations, tx.Commit()
|
|
}
|
|
|
|
func (tl *Timeline) loadRecentConversations(ctx context.Context, tx *sql.Tx, params ItemSearchParams) (map[string]*Conversation, error) {
|
|
// keyed by hash of entities, as a unique set of entities is what
|
|
// distinguishes conversations from one another (TODO: or attributes rather than entities? i.e. specific phone numbers instead of people?)
|
|
convosMap := make(map[string]*Conversation)
|
|
|
|
tl.convertNamesToIDs(¶ms)
|
|
|
|
// this search is O(n) so make sure it doesn't get too big
|
|
if params.Limit > 1000 || params.Limit <= 0 {
|
|
params.Limit = 30
|
|
}
|
|
|
|
// if item classification is unspecified (as opposed to empty), use default
|
|
if params.Classification == nil {
|
|
tl.cachesMu.RLock()
|
|
params.Classification = []string{
|
|
ClassMessage.Name,
|
|
ClassEmail.Name,
|
|
ClassSocial.Name,
|
|
}
|
|
tl.cachesMu.RUnlock()
|
|
}
|
|
|
|
const (
|
|
rowLimit = 1000
|
|
previewSize = 3
|
|
maxQueries = 100
|
|
)
|
|
|
|
// make sure our state is preserved across different query pages
|
|
currentConvo := new(Conversation) // aggregate, as convos with 3+ attributes involved span multiple rows
|
|
var lastItemID uint64 // to help us know when we've reached a new message
|
|
|
|
// make paging more efficient than using OFFSET; this way we just use
|
|
// the DB index to skip results with timestamps we've already traversed
|
|
var untilUnixMs int64
|
|
|
|
// worst case scenario is tens of thousands of items that pertain only to
|
|
// a few conversations that doesn't fill our quota... avoid super long
|
|
// operation by limiting how many queries we make
|
|
var queries int
|
|
|
|
saveConvoAndReset := func() bool {
|
|
if len(currentConvo.RecentMessages) == 0 {
|
|
return true // nothing to do, but keep going
|
|
}
|
|
|
|
// TODO: we have to filter in code if filtering by entities, because the query
|
|
// treats them like "any" instead of "all" have to be in the conversation (I bet
|
|
// we could use an INTERSECT query like we do for messages... maybe?)
|
|
convoQualifies := len(params.EntityID) <= 1 || currentConvo.hasAllEntities((params.EntityID))
|
|
|
|
if convoQualifies {
|
|
hash := currentConvo.entities.hash()
|
|
if conv, exists := convosMap[hash]; exists {
|
|
for i := 0; i < len(currentConvo.RecentMessages) && len(conv.RecentMessages) < previewSize; i++ {
|
|
conv.RecentMessages = append(conv.RecentMessages, currentConvo.RecentMessages[i])
|
|
}
|
|
} else {
|
|
convosMap[hash] = currentConvo
|
|
}
|
|
}
|
|
|
|
// reset aggregate value
|
|
currentConvo = new(Conversation)
|
|
|
|
// return false if we haven't filled our conversation limit yet
|
|
return len(convosMap) < params.Limit
|
|
}
|
|
|
|
for len(convosMap) < params.Limit && queries < maxQueries {
|
|
queries++
|
|
|
|
var whereClause strings.Builder
|
|
whereClause.WriteString(`WHERE relationships.to_attribute_id IS NOT NULL`)
|
|
var args []any
|
|
|
|
// include only conversational items and relations; otherwise we get results like
|
|
// media where people are depicted, or any other items that point to entities...
|
|
whereClause.WriteString(" AND (relationships.relation_id IN (?, ?) OR items.classification_id IN (?, ?, ?))")
|
|
args = append(args,
|
|
tl.relations[RelSent.Label],
|
|
tl.relations[RelReply.Label],
|
|
tl.classifications[ClassEmail.Name],
|
|
tl.classifications[ClassMessage.Name],
|
|
tl.classifications[ClassSocial.Name],
|
|
)
|
|
|
|
if len(params.classificationIDs) > 0 {
|
|
whereClause.WriteString(" AND (")
|
|
for i, classID := range params.classificationIDs {
|
|
if i > 0 {
|
|
whereClause.WriteString(" OR ")
|
|
}
|
|
whereClause.WriteString("items.classification_id=?")
|
|
args = append(args, classID)
|
|
}
|
|
whereClause.WriteString(")")
|
|
}
|
|
if len(params.AttributeID) > 0 {
|
|
whereClause.WriteString(" AND (")
|
|
for i, attrID := range params.AttributeID {
|
|
if i > 0 {
|
|
whereClause.WriteString(" OR ")
|
|
}
|
|
whereClause.WriteString("items.attribute_id=?")
|
|
args = append(args, attrID)
|
|
}
|
|
whereClause.WriteString(")")
|
|
}
|
|
if len(params.ToAttributeID) > 0 {
|
|
whereClause.WriteString(" AND (")
|
|
for i, attrID := range params.ToAttributeID {
|
|
if i > 0 {
|
|
whereClause.WriteString(" OR ")
|
|
}
|
|
whereClause.WriteString("relationships.to_attribute_id=?")
|
|
args = append(args, attrID)
|
|
}
|
|
whereClause.WriteString(")")
|
|
}
|
|
if len(params.EntityID) > 0 {
|
|
whereClause.WriteString(" AND (")
|
|
for i, entityID := range params.EntityID {
|
|
if i > 0 {
|
|
whereClause.WriteString(" OR ")
|
|
}
|
|
whereClause.WriteString("from_ent.id=? OR to_ent.id=?")
|
|
args = append(args, entityID, entityID)
|
|
}
|
|
whereClause.WriteString(")")
|
|
}
|
|
if len(params.DataText) > 0 {
|
|
whereClause.WriteString(" AND (")
|
|
for i, txt := range params.DataText {
|
|
if i > 0 {
|
|
whereClause.WriteString(" OR ")
|
|
}
|
|
whereClause.WriteString("items.data_text LIKE '%' || ? || '%'")
|
|
args = append(args, txt)
|
|
}
|
|
whereClause.WriteString(")")
|
|
}
|
|
if params.StartTimestamp != nil {
|
|
whereClause.WriteString(" AND items.timestamp > ?")
|
|
args = append(args, params.StartTimestamp.UnixMilli())
|
|
}
|
|
if params.EndTimestamp != nil && (untilUnixMs == 0 || untilUnixMs > params.EndTimestamp.UnixMilli()) {
|
|
whereClause.WriteString(andItemsTimestampLessThanArg)
|
|
args = append(args, params.EndTimestamp.UnixMilli())
|
|
} else if untilUnixMs > 0 {
|
|
whereClause.WriteString(andItemsTimestampLessThanArg)
|
|
args = append(args, untilUnixMs)
|
|
}
|
|
|
|
args = append(args, rowLimit)
|
|
|
|
//nolint:gosec
|
|
q := `SELECT ` + itemDBColumns + `,
|
|
relationships.to_attribute_id,
|
|
from_attr.name, to_attr.name,
|
|
from_attr.value, to_attr.value,
|
|
from_ent.id, from_ent.name, from_ent.picture_file,
|
|
to_ent.id, to_ent.name, to_ent.picture_file
|
|
FROM extended_items AS items
|
|
LEFT JOIN relationships ON relationships.from_item_id = items.id
|
|
LEFT JOIN entity_attributes from_ea ON from_ea.attribute_id = items.attribute_id
|
|
LEFT JOIN entity_attributes to_ea ON to_ea.attribute_id = relationships.to_attribute_id
|
|
LEFT JOIN entities AS from_ent ON from_ent.id = from_ea.entity_id
|
|
LEFT JOIN entities AS to_ent ON to_ent.id = to_ea.entity_id
|
|
LEFT JOIN attributes AS from_attr ON from_attr.id = from_ea.attribute_id
|
|
LEFT JOIN attributes AS to_attr ON to_attr.id = to_ea.attribute_id
|
|
` + whereClause.String() + `
|
|
ORDER BY items.timestamp DESC
|
|
LIMIT ?`
|
|
|
|
rows, err := tx.QueryContext(ctx, q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close() // FIXME: This is a bug since it's in a for loop; extract into own function
|
|
|
|
var count int
|
|
|
|
for rows.Next() {
|
|
var fromAttr, toAttr nullableAttribute
|
|
var fromEntity, toEntity Entity
|
|
ir, err := scanItemRow(rows, []any{&toAttr.ID,
|
|
&fromAttr.Name, &toAttr.Name,
|
|
&fromAttr.Value, &toAttr.Value,
|
|
&fromEntity.id, &fromEntity.name, &fromEntity.Picture,
|
|
&toEntity.id, &toEntity.name, &toEntity.Picture})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("loading recent conversations: %w", err)
|
|
}
|
|
count++
|
|
|
|
if ir.ID != lastItemID {
|
|
// this row is a new item, so what we have aggregated so far represents a single conversation
|
|
if !saveConvoAndReset() {
|
|
break
|
|
}
|
|
}
|
|
|
|
// remember this item ID for the next row so we can know if we've moved to the next item
|
|
lastItemID = ir.ID
|
|
|
|
// append this message to the current conversation aggregate
|
|
if len(currentConvo.RecentMessages) < previewSize {
|
|
currentConvo.RecentMessages = append(currentConvo.RecentMessages, ir)
|
|
}
|
|
|
|
// move nullable ints into non-nullable fields that are read from (and which are easier and safer to work with)
|
|
if fromEntity.id != nil {
|
|
fromEntity.ID = *fromEntity.id
|
|
}
|
|
if toEntity.id != nil {
|
|
toEntity.ID = *toEntity.id
|
|
}
|
|
|
|
// record the sender and receiver as part of this conversation
|
|
if fromEntity.ID > 0 {
|
|
currentConvo.entities.appendIfUnique(fromEntity.ID)
|
|
}
|
|
if toEntity.ID > 0 {
|
|
currentConvo.entities.appendIfUnique(toEntity.ID)
|
|
}
|
|
|
|
// if entity not added to conversation yet, do so now
|
|
// TODO: a more correct algorithm is to add the entity if it doesn't exist, but if it does, add the attribute to it if it isn't already
|
|
var fromEntFound, toEntFound bool
|
|
for _, ent := range currentConvo.Entities {
|
|
if fromEntFound && toEntFound {
|
|
break
|
|
}
|
|
if ent.ID == fromEntity.ID {
|
|
fromEntFound = true
|
|
continue
|
|
}
|
|
if ent.ID == toEntity.ID {
|
|
toEntFound = true
|
|
continue
|
|
}
|
|
}
|
|
if !fromEntFound {
|
|
if fromEntity.name != nil {
|
|
fromEntity.Name = *fromEntity.name
|
|
}
|
|
fromEntity.Attributes = append(fromEntity.Attributes, fromAttr.attribute())
|
|
currentConvo.Entities = append(currentConvo.Entities, fromEntity)
|
|
}
|
|
if !toEntFound {
|
|
if toEntity.name != nil {
|
|
toEntity.Name = *toEntity.name
|
|
}
|
|
toEntity.Attributes = append(toEntity.Attributes, toAttr.attribute())
|
|
currentConvo.Entities = append(currentConvo.Entities, toEntity)
|
|
}
|
|
|
|
if ir.Timestamp != nil {
|
|
untilUnixMs = ir.Timestamp.UnixMilli()
|
|
}
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if count < rowLimit {
|
|
break // no more rows
|
|
}
|
|
}
|
|
|
|
// save last aggregate
|
|
saveConvoAndReset()
|
|
|
|
return convosMap, nil
|
|
}
|
|
|
|
// LoadConversation loads the conversation according to select search parameters.
|
|
func (tl *Timeline) LoadConversation(ctx context.Context, params ItemSearchParams) (SearchResults, error) {
|
|
// for now, I don't think it makes sense to support both entities and attributes...
|
|
// I mean, you either are looking up a conversation between people or a conversation
|
|
// between phone numbers/email addresses/etc, right? but if the need arises we can
|
|
// probably make it work -- our code does support both but I haven't tested both together
|
|
if len(params.EntityID) > 0 && len(params.AttributeID) > 0 {
|
|
return SearchResults{}, errors.New("lookup by both entity and attribute not currently supported")
|
|
}
|
|
|
|
tx, err := tl.db.ReadPool.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return SearchResults{}, err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
// start by loading the bare conversations, which gives us entities involved and at least one message for each
|
|
results, err := tl.loadConversation(ctx, tx, params)
|
|
if err != nil {
|
|
return SearchResults{}, err
|
|
}
|
|
|
|
for _, sr := range results {
|
|
err = tl.expandRelationships(ctx, tx, params.Related, sr)
|
|
if err != nil {
|
|
return SearchResults{}, err
|
|
}
|
|
}
|
|
|
|
sr := SearchResults{
|
|
Items: results,
|
|
}
|
|
|
|
// TODO: do we need to commit read-only transactions?
|
|
return sr, tx.Commit()
|
|
}
|
|
|
|
func (tl *Timeline) loadConversation(ctx context.Context, tx *sql.Tx, params ItemSearchParams) ([]*SearchResult, error) {
|
|
// if no entities or attributes specified, no conversation can be found
|
|
if len(params.EntityID) == 0 && len(params.AttributeID) == 0 {
|
|
return []*SearchResult{}, nil
|
|
}
|
|
|
|
q, args, err := tl.prepareConversationQuery(params)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rows, err := tx.QueryContext(ctx, q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var results []*SearchResult
|
|
for rows.Next() {
|
|
var re relatedEntity
|
|
itemRow, err := scanItemRow(rows, []any{&re.ID, &re.Name, &re.Picture})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("loading conversation: %w", err)
|
|
}
|
|
results = append(results, &SearchResult{RepoID: tl.id.String(), ItemRow: itemRow, Entity: &re})
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// conversation messages are retrieved most-recent-first (timestamp DESC) but they'll
|
|
// generally be rendered in ASC order because that's more natural/intuitive
|
|
for i, j := 0, len(results)-1; i < j; i, j = i+1, j-1 {
|
|
results[i], results[j] = results[j], results[i]
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
func (tl *Timeline) prepareConversationQuery(params ItemSearchParams) (string, []any, error) {
|
|
tl.convertNamesToIDs(¶ms)
|
|
|
|
if params.Limit > 1000 || params.Limit <= 0 {
|
|
params.Limit = 100
|
|
}
|
|
|
|
sortDir := strings.ToUpper(string(params.Sort))
|
|
if sortDir == "" {
|
|
sortDir = string(SortDesc)
|
|
}
|
|
if sortDir != string(SortAsc) && sortDir != string(SortDesc) {
|
|
return "", nil, fmt.Errorf("invalid sort direction: %s", sortDir)
|
|
}
|
|
|
|
// if item classification is unspecified (as opposed to empty), use default
|
|
if params.Classification == nil {
|
|
tl.cachesMu.RLock()
|
|
params.Classification = []string{
|
|
ClassMessage.Name,
|
|
ClassEmail.Name,
|
|
ClassSocial.Name,
|
|
}
|
|
tl.cachesMu.RUnlock()
|
|
}
|
|
|
|
entityIDsArray, entityIDs := sqlArray(params.EntityID)
|
|
|
|
var sb strings.Builder
|
|
var args []any
|
|
|
|
sb.WriteString("WITH participant_list AS (\n\t")
|
|
for i, entityID := range params.EntityID {
|
|
if i > 0 {
|
|
sb.WriteString(" UNION ALL\n\t")
|
|
}
|
|
sb.WriteString("SELECT ?")
|
|
if i == 0 {
|
|
sb.WriteString(" AS entity_id")
|
|
}
|
|
args = append(args, entityID)
|
|
}
|
|
|
|
sb.WriteString(`
|
|
),
|
|
-- Get all participants for each message (both senders and receivers)
|
|
message_all_participants AS (
|
|
SELECT
|
|
items.id AS item_id,
|
|
from_ea.entity_id AS participant_id,
|
|
'from' AS participant_role
|
|
FROM extended_items AS items
|
|
JOIN relationships ON relationships.from_item_id = items.id
|
|
JOIN entity_attributes from_ea ON from_ea.attribute_id = items.attribute_id
|
|
|
|
UNION
|
|
|
|
SELECT
|
|
items.id AS item_id,
|
|
to_ea.entity_id AS participant_id,
|
|
'to' AS participant_role
|
|
FROM extended_items AS items
|
|
JOIN relationships ON relationships.from_item_id = items.id
|
|
JOIN entity_attributes to_ea ON to_ea.attribute_id = relationships.to_attribute_id
|
|
),
|
|
-- Find messages with exactly our participant set
|
|
exact_match_messages AS (
|
|
SELECT
|
|
item_id,
|
|
COUNT(DISTINCT CASE
|
|
WHEN participant_id IN `)
|
|
|
|
sb.WriteString(entityIDsArray)
|
|
args = append(args, entityIDs...)
|
|
|
|
sb.WriteString(`
|
|
THEN participant_id
|
|
END) AS matching_count,
|
|
COUNT(DISTINCT CASE
|
|
WHEN participant_id NOT IN `)
|
|
|
|
sb.WriteString(entityIDsArray)
|
|
args = append(args, entityIDs...)
|
|
|
|
sb.WriteString(`
|
|
THEN participant_id
|
|
END) AS other_count
|
|
FROM message_all_participants
|
|
GROUP BY item_id
|
|
HAVING matching_count = ? -- Exactly these participants present
|
|
AND other_count = 0 -- No other participants
|
|
)
|
|
-- Get full message details
|
|
SELECT DISTINCT
|
|
items.id, items.data_source_id, items.job_id, items.modified_job_id,
|
|
items.attribute_id, items.classification_id, items.original_id,
|
|
items.original_location, items.intermediate_location, items.filename,
|
|
items.timestamp, items.timespan, items.timeframe, items.time_offset,
|
|
items.time_offset_origin, items.time_uncertainty, items.stored,
|
|
items.modified, items.data_id, items.data_type, items.data_text,
|
|
items.data_file, items.data_hash, items.metadata, items.longitude,
|
|
items.latitude, items.altitude, items.coordinate_system,
|
|
items.coordinate_uncertainty, items.note, items.starred, items.thumb_hash,
|
|
items.original_id_hash, items.initial_content_hash, items.retrieval_key,
|
|
items.hidden, items.deleted, data_source_name, data_source_title,
|
|
classification_name, entities.id AS entity_id, entities.name, entities.picture_file
|
|
FROM extended_items AS items
|
|
JOIN exact_match_messages emm ON emm.item_id = items.id
|
|
JOIN relationships ON relationships.from_item_id = items.id
|
|
JOIN entity_attributes from_ea ON from_ea.attribute_id = items.attribute_id
|
|
JOIN entities ON from_ea.entity_id = entities.id`)
|
|
args = append(args, len(entityIDs))
|
|
|
|
var hasWhere bool
|
|
|
|
if len(params.classificationIDs) > 0 {
|
|
if !hasWhere {
|
|
sb.WriteString("\nWHERE (")
|
|
hasWhere = true
|
|
} else {
|
|
sb.WriteString(" AND (")
|
|
}
|
|
for i, classID := range params.classificationIDs {
|
|
if i > 0 {
|
|
sb.WriteString(" OR ")
|
|
}
|
|
sb.WriteString("items.classification_id=?")
|
|
args = append(args, classID)
|
|
}
|
|
sb.WriteRune(')')
|
|
}
|
|
if len(params.DataSourceName) > 0 {
|
|
if !hasWhere {
|
|
sb.WriteString("\nWHERE (")
|
|
hasWhere = true
|
|
} else {
|
|
sb.WriteString(" AND (")
|
|
}
|
|
for i, dsn := range params.DataSourceName {
|
|
if i > 0 {
|
|
sb.WriteString(" OR ")
|
|
}
|
|
sb.WriteString("data_source_name=?")
|
|
args = append(args, dsn)
|
|
}
|
|
sb.WriteRune(')')
|
|
}
|
|
if len(params.DataText) > 0 {
|
|
if !hasWhere {
|
|
sb.WriteString("\nWHERE (")
|
|
hasWhere = true
|
|
} else {
|
|
sb.WriteString(" AND (")
|
|
}
|
|
for i, txt := range params.DataText {
|
|
if i > 0 {
|
|
sb.WriteString(" OR ")
|
|
}
|
|
sb.WriteString("items.data_text LIKE '%' || ? || '%'")
|
|
args = append(args, txt)
|
|
}
|
|
sb.WriteRune(')')
|
|
}
|
|
if params.StartTimestamp != nil {
|
|
if !hasWhere {
|
|
sb.WriteString("\nWHERE ")
|
|
hasWhere = true
|
|
} else {
|
|
sb.WriteString(" AND ")
|
|
}
|
|
sb.WriteString("items.timestamp > ?")
|
|
args = append(args, params.StartTimestamp.UnixMilli())
|
|
}
|
|
if params.EndTimestamp != nil {
|
|
if !hasWhere {
|
|
sb.WriteString("\nWHERE ")
|
|
} else {
|
|
sb.WriteString(" AND ")
|
|
}
|
|
sb.WriteString("items.timestamp < ?")
|
|
args = append(args, params.EndTimestamp.UnixMilli())
|
|
}
|
|
|
|
sb.WriteString("\nORDER BY items.timestamp ")
|
|
sb.WriteString(sortDir)
|
|
sb.WriteString("\nLIMIT ?")
|
|
args = append(args, params.Limit)
|
|
|
|
return sb.String(), args, nil
|
|
}
|
|
|
|
type uint64Slice []uint64
|
|
|
|
func (s uint64Slice) hash() string {
|
|
sort.Sort(s) // TODO: this doesn't seem to be working
|
|
var sb strings.Builder
|
|
for _, v := range s {
|
|
sb.WriteString(strconv.FormatUint(v, 10))
|
|
sb.WriteRune(',')
|
|
}
|
|
return sb.String()
|
|
}
|
|
|
|
func (s *uint64Slice) appendIfUnique(v uint64) {
|
|
for _, elem := range *s {
|
|
if elem == v {
|
|
return
|
|
}
|
|
}
|
|
*s = append(*s, v)
|
|
}
|
|
|
|
const andItemsTimestampLessThanArg = " AND items.timestamp < ?"
|
|
|
|
// Implement sort.Interface
|
|
func (s uint64Slice) Len() int { return len(s) }
|
|
func (s uint64Slice) Less(i, j int) bool { return s[i] < s[j] }
|
|
func (s uint64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|