727 lines
28 KiB
Go
727 lines
28 KiB
Go
/*
|
|
Timelinize
|
|
Copyright (c) 2013 Matthew Holt
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as published
|
|
by the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
package googlelocation
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"slices"
|
|
"time"
|
|
|
|
"github.com/timelinize/timelinize/timeline"
|
|
)
|
|
|
|
// LocationSource is a type that can get the next location to process.
|
|
type LocationSource interface {
|
|
// NextLocation returns the next location to process.
|
|
// When there are no more locations, it should return
|
|
// (nil, nil).
|
|
//
|
|
// Implementations must honor context cancellation.
|
|
NextLocation(ctx context.Context) (*Location, error)
|
|
}
|
|
|
|
type LocationProcessingOptions struct {
|
|
// Set to a value 1-10 to enable path simplification.
|
|
// 10 means very aggressive simplification (skip many
|
|
// points, leave practically only clusters or endpoints)
|
|
// and 1 means to only drop points on the straightest paths.
|
|
// (My preferred is ~2 when scaled to between 1000 and 50000; i.e. about epsilon=6-7k)
|
|
Simplification float64 `json:"simplification,omitempty"`
|
|
|
|
// Points are clustered together if the mean change of the centroid of
|
|
// points in a sliding window is less than the standard deviation of
|
|
// the points times some coefficient based on the size of the window.
|
|
// This coefficient can be scaled up or down to make clustering more
|
|
// or less aggressive. Must be a non-negative value. Zero disables
|
|
// clustering; the default behavior is enabled with a value of 1.0.
|
|
// TODO: Experimental. Clustering algorithm WIP. Likely to change.
|
|
ClusteringCoefficient float64 `json:"clustering_coefficient,omitempty"`
|
|
}
|
|
|
|
// NewLocationProcessor returns a new location source that filters, refines, clusters,
|
|
// and possibly simplifies the locations it reads from source.
|
|
//
|
|
// The returned locations may have new or different values/information from what
|
|
// the input values were. The fields on the outputted locations should be preferred
|
|
// over the inputted locations for filling in item structs. Of special note are
|
|
// timespan and metadata fields: timespan and metadata will be set if the point
|
|
// represents a cluster, so those fields should be added to the resulting item.
|
|
func NewLocationProcessor(source LocationSource, options LocationProcessingOptions) (LocationSource, error) {
|
|
if options.Simplification < 0 || options.Simplification > 10 {
|
|
return nil, fmt.Errorf("invalid simplification factor; must be in [1,10]: %f", options.Simplification)
|
|
}
|
|
if options.ClusteringCoefficient < 0 {
|
|
return nil, fmt.Errorf("invalid clustering coefficient; must be >= 0: %f", options.ClusteringCoefficient)
|
|
}
|
|
|
|
locProc := &locationProcessor{
|
|
source: source,
|
|
clusteringBuffer: make([]*Location, 0, clusterBufferSize),
|
|
clusteringCoeffScale: options.ClusteringCoefficient,
|
|
}
|
|
|
|
if options.Simplification != 0 {
|
|
// To scale a number x into range [a,b]:
|
|
// x_scaled = (b-a) * ((x - x_min) / (x_max - x_min)) + a
|
|
//
|
|
// 10,000 is quite a high epsilon and, indeed, it does thin out the path quite significantly,
|
|
// but the algorithm is quite amazing in that it does preserve the essence of the path.
|
|
// I recommend no higher than ~1000 for a default epsilon, for most data sets. That'd be
|
|
// an input simplification of ~1.0. Users can set this lower if they want more points
|
|
// or higher if they want less. The high range is somewhat necessary to accommodate
|
|
// different spreads of data.
|
|
const xMin, xMax = 0.0, 10.0
|
|
const epsMin, epsMax = 10.0, 10000.0
|
|
locProc.epsilon = (epsMax-epsMin)*((options.Simplification-xMin)/(xMax-xMin)) + epsMin
|
|
}
|
|
|
|
return locProc, nil
|
|
}
|
|
|
|
// locationProcessor sets up a pipeline for processing location points/items:
|
|
//
|
|
// decoding -> deduplicating -> denoising -> clustering -> simplifying (optional)
|
|
//
|
|
// The output of the last phase can be used as the next data point.
|
|
//
|
|
// Assign one locationProcessor per goroutine.
|
|
type locationProcessor struct {
|
|
source LocationSource
|
|
sourceDrained bool // if true, no more input points remain from the source
|
|
|
|
// de-duplicating (TODO: Make more customizable?)
|
|
previous *Location
|
|
// minTemporalSpacing time.Duration
|
|
// minDistanceMeters int
|
|
|
|
// denoising
|
|
denoiseWindow []*Location
|
|
|
|
// clustering
|
|
clusteringBuffer []*Location
|
|
startOfNextTrack *Location
|
|
clusteringCoeffScale float64
|
|
|
|
// simplifiying
|
|
epsilon float64 // enables RDP path simplification (https://en.wikipedia.org/wiki/Ramer%E2%80%93Douglas%E2%80%93Peucker_algorithm)
|
|
batch []*Location
|
|
}
|
|
|
|
// NextLocation returns the next location point that can be stored as an item.
|
|
func (lp *locationProcessor) NextLocation(ctx context.Context) (*Location, error) {
|
|
if lp.epsilon > 0 {
|
|
return lp.simplifiedNext(ctx)
|
|
}
|
|
return lp.clusteredNext(ctx)
|
|
}
|
|
|
|
func (lp *locationProcessor) simplifiedNext(ctx context.Context) (*Location, error) {
|
|
// ensure simplification is only performed on points coming from the same device/path!
|
|
// mixing points from different devices is asking for bad results
|
|
|
|
if len(lp.batch) == 0 {
|
|
const batchSize = 1000
|
|
lp.batch = make([]*Location, 0, batchSize)
|
|
|
|
for len(lp.batch) < batchSize {
|
|
next, err := lp.clusteredNext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if next == nil {
|
|
break
|
|
}
|
|
lp.batch = append(lp.batch, next)
|
|
}
|
|
|
|
lp.batch = simplifyPath(lp.batch, lp.epsilon)
|
|
}
|
|
|
|
if len(lp.batch) > 0 {
|
|
next := lp.batch[0]
|
|
lp.batch = lp.batch[1:]
|
|
return next, nil
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
func (lp *locationProcessor) clusteredNext(ctx context.Context) (*Location, error) {
|
|
if err := ctx.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// if we have processed points left in the buffer, pop off the next one and return it
|
|
// (leave at least clusterWindowSize points in the buffer to roll-over to the next
|
|
// batch, so that we don't break clusters across arbitrary batch boundaries - but if
|
|
// there are no more points, or if there is a new track, then we must completely drain
|
|
// the buffer)
|
|
if len(lp.clusteringBuffer) > clusterWindowSize ||
|
|
((lp.sourceDrained || lp.startOfNextTrack != nil) && len(lp.clusteringBuffer) > 0) {
|
|
next := lp.clusteringBuffer[0]
|
|
lp.clusteringBuffer = lp.clusteringBuffer[1:]
|
|
return next, nil
|
|
}
|
|
|
|
// no more points here or there, no more points anywhere!
|
|
if lp.sourceDrained && lp.startOfNextTrack == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
// refill buffer and process new batch
|
|
// (start with the first point of new/next track, if any)
|
|
if lp.startOfNextTrack != nil {
|
|
lp.clusteringBuffer = append(lp.clusteringBuffer, lp.startOfNextTrack)
|
|
lp.startOfNextTrack = nil
|
|
}
|
|
for len(lp.clusteringBuffer) < clusterBufferSize {
|
|
if err := ctx.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
nextPoint, err := lp.denoiseNext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if nextPoint == nil {
|
|
break
|
|
}
|
|
if nextPoint.NewTrack {
|
|
// we just read a point that must not be clustered with any of the previous ones,
|
|
// so instead of appending it to the buffer to be processed along with them, we
|
|
// hold onto it and it will be the first point in the next buffer that we process
|
|
lp.startOfNextTrack = nextPoint
|
|
break
|
|
}
|
|
lp.clusteringBuffer = append(lp.clusteringBuffer, nextPoint)
|
|
}
|
|
lp.clusteringBuffer = lp.clusterBatch(ctx, lp.clusteringBuffer)
|
|
|
|
// buffer has been refilled and processed, so pop its first point off
|
|
return lp.clusteredNext(ctx)
|
|
}
|
|
|
|
// clusterBatch processes the points in the batch into clusters. It returns the processed batch.
|
|
func (lp *locationProcessor) clusterBatch(ctx context.Context, batch []*Location) []*Location {
|
|
if len(batch) == 0 {
|
|
return batch
|
|
}
|
|
|
|
// Our clustering algorithm is "recursive" in that it can add clusters to clusters.
|
|
// We slide a window across the batch and make clusters as we go, then repeat once
|
|
// we reach the end of the batch until this loop has an iteration where no new
|
|
// clusters have been created. In practice it tends to converge quite quickly.
|
|
// However, I have found some cases where, even though it converges quickly, it
|
|
// creates clusters that are TOO big. So we limit the "recursion" (iterations of
|
|
// this loop). Otherwise, I've seen it create clusters of clusters of clusters,
|
|
// which end up being hundreds of points big, and spanning an entire week...
|
|
// that's not usually necessary/useful. In practice, I have found that two scans
|
|
// over the batch is sufficient to cluster effectively.
|
|
for range 2 {
|
|
// We keep track of the centroid of the sliding window.
|
|
var latAvg, lonAvg int64
|
|
|
|
// As we slide along, if we are in a cluster, we move points into this list.
|
|
// We keep track of where the cluster started because the resulting cluster
|
|
// point will replace all the points in the batch starting at that same spot
|
|
// the cluster started. The cluster must consist of consecutive points.
|
|
var cluster []*Location
|
|
var clusterStart int
|
|
var hadCluster bool
|
|
|
|
// finalizeCluster just creates the cluster point, resets the cluster, and
|
|
// inserts the cluster point into the batch.
|
|
finalizeCluster := func() {
|
|
clusterPoint := lp.endClusterFunc(cluster)
|
|
cluster = nil
|
|
batch = slices.Insert(batch, clusterStart, clusterPoint)
|
|
}
|
|
|
|
// slide the window over the batch; the current index is the "latest" point,
|
|
// which is the end of the clustering window (the beginning is i-clusterWindowSize
|
|
// or 0, whichever is greater).
|
|
for i := 0; i < len(batch); i++ {
|
|
if ctx.Err() != nil {
|
|
return batch
|
|
}
|
|
|
|
latest := batch[i] // end of the sliding window
|
|
|
|
// for the first point in a batch, we need to initialize our mean for
|
|
// the statistical analysis
|
|
if i == 0 {
|
|
latAvg = latest.LatitudeE7
|
|
lonAvg = latest.LongitudeE7
|
|
continue // this prevents NaNs at the beginning due to not having a starting lat/lon
|
|
}
|
|
|
|
// compute the centroid of the current window, and compare to centroid of previous window
|
|
|
|
oldLatAvg, oldLonAvg := latAvg, lonAvg
|
|
|
|
oldestIdx := max(i-clusterWindowSize, 0) // beginning of the clustering window (i is the end of it)
|
|
oldest := batch[oldestIdx]
|
|
oldestLat, oldestLon := oldest.LatitudeE7, oldest.LongitudeE7
|
|
latAvg += (latest.LatitudeE7 - oldestLat) / clusterWindowSize
|
|
lonAvg += (latest.LongitudeE7 - oldestLon) / clusterWindowSize
|
|
|
|
// convert to meters to reduce sensitivity to floating point imprecision
|
|
latest.changeToCentroid = haversineDistanceEarth(oldLatAvg, oldLonAvg, latAvg, lonAvg) * kmToMeters
|
|
|
|
// size of the window may be smaller at the beginning
|
|
count := min(clusterWindowSize, i+1)
|
|
|
|
// we don't actually use Welford's online algorithm here, because
|
|
// it is not numerically stable when applied to a sliding window
|
|
// (thankfully, our window size is usually small that these
|
|
// inner loops aren't that expensive in practice)
|
|
|
|
window := batch[oldestIdx : i+1]
|
|
|
|
var windowDistance float64
|
|
|
|
// First pass: compute mean (and distance traveled within the window)
|
|
// (Note that our mean is not the centroid itself but the average DELTA/CHANGE of the centroid in the window.)
|
|
var meanChangeToCentroid float64
|
|
for i, point := range window {
|
|
if i > 0 { // skip first point in window since the distance between it and its previous is not in the window
|
|
windowDistance += point.distanceFromPrev
|
|
}
|
|
meanChangeToCentroid += point.changeToCentroid
|
|
}
|
|
meanChangeToCentroid /= float64(count)
|
|
|
|
// Second pass: compute sum of squared differences
|
|
var M2 float64
|
|
for _, point := range window {
|
|
diff := point.changeToCentroid - meanChangeToCentroid
|
|
M2 += diff * diff
|
|
}
|
|
|
|
// sample variance (population variance just doesn't have the -1)
|
|
variance := M2 / (float64(count) - 1)
|
|
|
|
// compute distance from the previous point to this one
|
|
prev := batch[i-1]
|
|
latest.distanceFromPrev = haversineDistanceEarth(prev.LatitudeE7, prev.LongitudeE7, latest.LatitudeE7, latest.LongitudeE7) * kmToMeters
|
|
|
|
// standard deviation is in same units as the data
|
|
stdDev := math.Sqrt(variance)
|
|
|
|
// We need to decide a coefficient at which a point is part of a cluster, based on what we observe
|
|
// in the window. A simple way to do this is if the mean change of the centroid is less than a
|
|
// certain amount (i.e. there isn't much overall movement within the window), then it's part of
|
|
// a cluster. The question is, what amount should that be? Well this used to be a hard-coded value
|
|
// of 20 meters, but that doesn't work well with different kinds of data (e.g. walking data, too
|
|
// many of the points get eaten up into clusters because walking has higher data density than driving).
|
|
// So we make it relative to the data set by computing its standard deviation and then seeing if
|
|
// that mean centroid delta is less than some multiple of the standard deviation. (Remember that
|
|
// std dev is in the same units as the data.) This works better, but the coefficient now becomes a
|
|
// parameter to tune. Because again, we may want to express dense data differently than sparse data.
|
|
// After some fiddling I've landed on setting the coefficient based on the distance traveled in the
|
|
// window. Based on some sample data, it FEELS LIKE the more distance that is traveled, the tighter
|
|
// the threshold should be for a cluster (i.e. more distance traveled == centroid needs to move less).
|
|
// I don't have a smooth formula for this coefficient yet, so I chose some discrete points at which
|
|
// we determine the coefficient. It's kind of a sensitive parameter. This approach could probably be
|
|
// improved.
|
|
var coefficient float64
|
|
switch {
|
|
case windowDistance < 75:
|
|
coefficient = 2.5 // seems to work well when points are close together
|
|
case windowDistance < 300:
|
|
coefficient = 1.9 // 1.9 seemed good on sample Arc tracks from urban walks
|
|
default:
|
|
coefficient = 1.15 // 1.15 seems good for most driving+destination data
|
|
}
|
|
coefficient *= lp.clusteringCoeffScale
|
|
|
|
// it's important that we're at least a window-size into the buffer, because the oldest
|
|
// point in the window is appended to the cluster; if we did this before we were one full
|
|
// window into the buffer, the oldest point would repeat multiple times as the window grows!
|
|
if count >= clusterWindowSize && meanChangeToCentroid < coefficient*stdDev {
|
|
// we are in a cluster!
|
|
|
|
// if this is the start of a cluster, we need to mark where it started
|
|
// so we know where to put the cluster point
|
|
if len(cluster) == 0 {
|
|
clusterStart = oldestIdx
|
|
} else {
|
|
// a cluster of size 1 ends up just going back into the buffer as itself,
|
|
// not actually as a cluster, so don't count cluster sizes of 1, otherwise
|
|
// it would loop infinitely
|
|
hadCluster = true
|
|
}
|
|
|
|
// add this point to the cluster, remove it from the batch,
|
|
// and decrement the index since we deleted it from the batch
|
|
cluster = append(cluster, oldest)
|
|
batch = slices.Delete(batch, oldestIdx, oldestIdx+1)
|
|
i--
|
|
|
|
// while in a cluster, we need to continue looping until we find
|
|
// the end of the cluster
|
|
//
|
|
// "what about if we're at the end of the batch and we're still
|
|
// in the cluster -- i.e. the cluster goes into the next batch?"
|
|
// you may ask -- well, the simplest thing, which seems to work,
|
|
// is to just end the cluster like normal after the loop finishes,
|
|
// and since we roll-over 1 window size of overlap between batches,
|
|
// that cluster point will be considered at the start of the next
|
|
// batch, so it can be appended to then
|
|
continue
|
|
} else if clusterSize := len(cluster); clusterSize > 0 {
|
|
// not in cluster, but cluster points have accumulated, so must be end of cluster
|
|
finalizeCluster()
|
|
i++ // since we added a cluster point to the batch, increment i to skip over it
|
|
continue
|
|
}
|
|
}
|
|
|
|
// if we finished the batch in a cluster, make sure to create the cluster point
|
|
if len(cluster) > 0 {
|
|
finalizeCluster()
|
|
}
|
|
|
|
// if this scan of the batch had no new clusters, we're done!
|
|
if !hadCluster {
|
|
break
|
|
}
|
|
}
|
|
|
|
return batch
|
|
}
|
|
|
|
// endClusterFunc ends the cluster by combining all the clustered
|
|
// points into a single location and returns it.
|
|
func (lp *locationProcessor) endClusterFunc(cluster []*Location) *Location {
|
|
if len(cluster) == 1 {
|
|
return cluster[0]
|
|
}
|
|
|
|
// expand cluster points (points that are already clusters themselves)
|
|
for i := 0; i < len(cluster); i++ {
|
|
clusterPoints := cluster[i].clusterPoints
|
|
cluster = slices.Insert(cluster, i, clusterPoints...)
|
|
i += len(clusterPoints)
|
|
}
|
|
|
|
clusterSize := float64(len(cluster))
|
|
|
|
// combine all the points in the cluster into a single point;
|
|
// to do this, aggregate some statistics and other values
|
|
// as part of the reduction
|
|
var uncertaintySum, centerLat, centerLon, m2lat, m2lon float64
|
|
|
|
for i, l := range cluster {
|
|
uncertaintySum += float64(l.Uncertainty)
|
|
|
|
deltaLat := float64(l.LatitudeE7) - centerLat
|
|
centerLat += deltaLat / float64(i+1)
|
|
deltaLat2 := float64(l.LatitudeE7) - centerLat
|
|
m2lat += deltaLat * deltaLat2
|
|
|
|
deltaLon := float64(l.LongitudeE7) - centerLon
|
|
centerLon += deltaLon / float64(i+1)
|
|
deltaLon2 := float64(l.LongitudeE7) - centerLon
|
|
m2lon += deltaLon * deltaLon2
|
|
}
|
|
|
|
meanUncertainty := uncertaintySum / clusterSize
|
|
latVariance := m2lat / clusterSize
|
|
lonVariance := m2lon / clusterSize
|
|
|
|
first, last := cluster[0], cluster[len(cluster)-1]
|
|
|
|
clusterPoint := &Location{
|
|
Original: first.Original, // not sure if this makes sense, but anyway...
|
|
LatitudeE7: int64(centerLat),
|
|
LongitudeE7: int64(centerLon),
|
|
Timestamp: first.Timestamp,
|
|
Timespan: last.Timestamp,
|
|
Uncertainty: meanUncertainty,
|
|
Metadata: timeline.Metadata{
|
|
"Cluster size": len(cluster),
|
|
// convert to actual lat/lon units; I don't think we need to stay at 1e7 integers
|
|
"Standard deviation (latitude)": math.Sqrt(latVariance) / placesMult,
|
|
"Standard deviation (longitude)": math.Sqrt(lonVariance) / placesMult,
|
|
},
|
|
clusterPoints: cluster,
|
|
}
|
|
|
|
return clusterPoint
|
|
}
|
|
|
|
// denoiseNext runs the denoising phase of the algorithm.
|
|
// It returns the next point that is not considered noise.
|
|
func (lp *locationProcessor) denoiseNext(ctx context.Context) (*Location, error) {
|
|
// all returned points must have passed through the denoise window,
|
|
// which filters noise; so first, ensure there are points in the buffer
|
|
for len(lp.denoiseWindow) < denoiseWindowSize {
|
|
next, err := lp.source.NextLocation(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if next == nil {
|
|
lp.sourceDrained = true
|
|
break
|
|
}
|
|
if next.Significant {
|
|
lp.previous = next
|
|
return next, nil
|
|
}
|
|
|
|
if lp.sameAsPrevious(next) {
|
|
continue
|
|
}
|
|
|
|
lp.previous = next
|
|
|
|
// our method is simple: if the candidate point has a much lower
|
|
// accuracy (higher value; i.e. higher error) than the points on
|
|
// both sides, consider it noise (we could probably do more
|
|
// advanced statistics like what clustering does, across more
|
|
// points, but I haven't found it to be necessary yet)
|
|
|
|
// at least 3 points required to check; this is a loop because there
|
|
// maybe multiple noisy points consecutively, and each one removes an
|
|
// element from the window; the larger the window the more consecutive
|
|
// noisy points we can filter out (but in practice I think only a few
|
|
// extra slots are beneficial)
|
|
for len(lp.denoiseWindow) >= 3 {
|
|
before, candidate, after := lp.denoiseWindow[len(lp.denoiseWindow)-2], lp.denoiseWindow[len(lp.denoiseWindow)-1], next
|
|
|
|
// hard to differentiate noise with higher accuracy values; maybe statistics similar to
|
|
// our clustering phase can do better, but so far I haven't seen much need to denoise
|
|
// points that are within ~100m accurate, esp. with clustering
|
|
const thresholdMeters = 115
|
|
if candidate.Uncertainty > thresholdMeters {
|
|
avgSurroundingUncertainty := float64(before.Uncertainty+after.Uncertainty) / 2 //nolint:mnd
|
|
|
|
// TODO: maybe should consider temporal locality as well (are the points close enough in time? -- might need to be relative to time deltas)
|
|
if candidate.Uncertainty > avgSurroundingUncertainty*1.5 {
|
|
// pop the last element, which is the candidate in the middle, as it looks like noise
|
|
lp.denoiseWindow = lp.denoiseWindow[:len(lp.denoiseWindow)-1]
|
|
|
|
// must loop to converge: the removal of the middle point from
|
|
// our window means that two new points are now adjacent that
|
|
// were never adjacent before; every time we pop we need to
|
|
// "rewind" and bring back the last point that, at the time,
|
|
// didn't look like noise, and now see how it compares with
|
|
// its new neighbor; this is why the window size is generally
|
|
// larger than 3: the extra spaces are to catch all noise when
|
|
// multiple noisy points are consecutive
|
|
continue
|
|
}
|
|
}
|
|
|
|
// quick sanity check before we let it through: is it obviously wrong?
|
|
elapsedSincePrev := candidate.Timestamp.Sub(before.Timestamp).Seconds()
|
|
if elapsedSincePrev > 0 {
|
|
velocity := candidate.distanceFromPrev / elapsedSincePrev
|
|
const fasterThanCommercialJet = 350 // 350 m/s ~= 800 mph (commercial jets fly at < 600 mph)
|
|
if velocity > fasterThanCommercialJet {
|
|
// this person is likely NOT using their phone in the SR-71 Blackbird
|
|
lp.denoiseWindow = lp.denoiseWindow[:len(lp.denoiseWindow)-1]
|
|
continue
|
|
}
|
|
}
|
|
|
|
// point appears useful; no need to discard an element and re-check
|
|
break
|
|
}
|
|
|
|
lp.denoiseWindow = append(lp.denoiseWindow, next)
|
|
}
|
|
|
|
// pop the oldest point that has passed through the denoise filter as the newest point for the next phase
|
|
if len(lp.denoiseWindow) > 0 {
|
|
denoised := lp.denoiseWindow[0]
|
|
lp.denoiseWindow = lp.denoiseWindow[1:]
|
|
return denoised, nil
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
// TODO: after decoding, skip if sameAsPrevious()
|
|
|
|
// sameAsPrevious returns true if the location l is deemed to be identical to
|
|
// or very similar to the previously-decoded point. Should be called just after
|
|
// being read from the source input.
|
|
func (lp locationProcessor) sameAsPrevious(l *Location) bool {
|
|
if l.NewTrack {
|
|
lp.previous = nil
|
|
}
|
|
if lp.previous == nil {
|
|
return false
|
|
}
|
|
|
|
if l.Timestamp.Sub(lp.previous.Timestamp) < 5*time.Second {
|
|
return true
|
|
}
|
|
|
|
if l.LatitudeE7 == lp.previous.LatitudeE7 && l.LongitudeE7 == lp.previous.LongitudeE7 {
|
|
return true
|
|
}
|
|
|
|
distanceKM := haversineDistanceEarth(lp.previous.LatitudeE7, lp.previous.LongitudeE7, l.LatitudeE7, l.LongitudeE7)
|
|
distanceM := distanceKM * kmToMeters
|
|
l.distanceFromPrev = distanceM // memoize (keep temporarily) even though we may need to change this if previous point changes (during denoising)
|
|
const threshold = 5
|
|
return distanceM < threshold // TODO: subtract accuracy as well? TODO: consider time as well? (like 12 hours or something)
|
|
}
|
|
|
|
const (
|
|
denoiseWindowSize = 5
|
|
clusterWindowSize = 4
|
|
clusterBufferSize = 1000 // size of each batch of points when clustering
|
|
)
|
|
|
|
//nolint:dupword
|
|
/*
|
|
NOTES
|
|
|
|
Welford's online algorithm, from: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
|
|
|
|
# For a new value newValue, compute the new count, new mean, the new M2.
|
|
# mean accumulates the mean of the entire dataset.
|
|
# M2 aggregates the squared distance from the mean.
|
|
# count aggregates the number of samples seen so far.
|
|
def update(existingAggregate, newValue):
|
|
(count, mean, M2) = existingAggregate
|
|
count += 1
|
|
delta = newValue - mean
|
|
mean += delta / count
|
|
delta2 = newValue - mean
|
|
M2 += delta * delta2
|
|
return (count, mean, M2)
|
|
|
|
# Retrieve the mean, variance and sample variance from an aggregate
|
|
def finalize(existingAggregate):
|
|
(count, mean, M2) = existingAggregate
|
|
if count < 2:
|
|
return float("nan")
|
|
else:
|
|
(mean, variance, sampleVariance) = (mean, M2 / count, M2 / (count-1))
|
|
return (mean, variance, sampleVariance)
|
|
|
|
^ This assumes a calculation over the whole data set.
|
|
If computing only over a sliding window, replace count
|
|
with the windowSize, and delta is newValue-oldestValue
|
|
instead of newValue-mean. However, this is numerically
|
|
unstable. Negative variances will likely appear.
|
|
|
|
Here's a simple Go implementation, though slightly contrived
|
|
as the entire data set is passed in as a slice, but the idea
|
|
works for streams or arbitrarily-long inputs too:
|
|
|
|
func meanAndStandardDeviation(data []float64) (float64, float64) {
|
|
if len(data) == 0 {
|
|
return 0, 0
|
|
}
|
|
var mean, m2 float64
|
|
for i, x := range data {
|
|
delta := x - mean
|
|
mean += delta / float64(i+1)
|
|
delta2 := x - mean
|
|
m2 += delta * delta2
|
|
}
|
|
variance := m2 / float64(len(data))
|
|
stdDev := math.Sqrt(variance)
|
|
return mean, stdDev
|
|
}
|
|
|
|
*/
|
|
|
|
// Location represents a location data point that is about to be
|
|
// processed, is currently being processed, or has just been processed but
|
|
// is not yet an item. It is the input value to a LocationProcessor.
|
|
type Location struct {
|
|
// Attach the original location value if you'll be needing it after processing.
|
|
// (The location processor does not use this.) If the output ends up being a
|
|
// cluster, only this value from the cluster's first point will be included.
|
|
Original any
|
|
|
|
// Populate these values. Coordinates are integer precision to 7 decimal places
|
|
// to preserve arithmetic precision when doing calculations. Accuracy can be
|
|
// omitted if unknown.
|
|
LatitudeE7 int64 // degrees latitude times 1e7 (precision to ~1.1cm at equator)
|
|
LongitudeE7 int64 // degrees longitude times 1e7
|
|
Altitude float64 // meters
|
|
Uncertainty float64 // meters (must be > 0); higher values are less accurate
|
|
Timestamp time.Time // old exports used to call this timestampMs, in milliseconds
|
|
Significant bool // if true, this point will not be filtered out (may still be clustered, so if you need it, call ClusterPoints() to get it)
|
|
|
|
// These fields are not read by the processor (you do not need to set them in your
|
|
// implementation of NextLocation), but they will be set on the output if this
|
|
// location becomes a cluster. Copy these output values into the final resulting item.
|
|
Timespan time.Time
|
|
Metadata timeline.Metadata
|
|
|
|
// Typically, locations must be processed in chronological order in order to filter
|
|
// noise, duplicates, etc. And locations may be clustered into a single point if
|
|
// they look like a cluster. But if the stream of data has multiple time series or
|
|
// "starts over" one or more times, set this to true when the stream has gone back
|
|
// in time or when a new track has begun. As long as subsequent data points after
|
|
// this are still in ascending chronological order, it's as if starting a new stream.
|
|
// Points will not be clustered across different tracks.
|
|
NewTrack bool
|
|
|
|
distanceFromPrev float64 // distance from the location before this one in meters
|
|
changeToCentroid float64 // how much this point changed the mean
|
|
clusterPoints []*Location
|
|
}
|
|
|
|
// Location converts the coordinates into a item-processable Location value.
|
|
func (lc Location) Location() timeline.Location {
|
|
var l timeline.Location
|
|
if lc.LatitudeE7 != 0 {
|
|
lat := float64(lc.LatitudeE7) / placesMult
|
|
l.Latitude = &lat
|
|
}
|
|
if lc.LongitudeE7 != 0 {
|
|
lon := float64(lc.LongitudeE7) / placesMult
|
|
l.Longitude = &lon
|
|
}
|
|
if lc.Altitude != 0 {
|
|
alt := float64(lc.Altitude)
|
|
l.Altitude = &alt
|
|
}
|
|
if lc.Uncertainty > 0 {
|
|
unc := metersToApproxDegrees(lc.Uncertainty)
|
|
l.CoordinateUncertainty = &unc
|
|
}
|
|
return l
|
|
}
|
|
|
|
// ClusterPoints returns a slice of the original/input points that were
|
|
// aggregated to make the cluster, if this point is a cluster. A nil
|
|
// value is returned if this point is not a cluster. The slice should
|
|
// not be modified if you want to call this to get the same values again
|
|
// later.
|
|
func (lc Location) ClusterPoints() []*Location {
|
|
return lc.clusterPoints
|
|
}
|
|
|
|
const (
|
|
kmToMeters = 1000
|
|
places = 7
|
|
placesMult = 1e7
|
|
)
|