164 lines
5.7 KiB
Go
164 lines
5.7 KiB
Go
/*
|
|
Timelinize
|
|
Copyright (c) 2013 Matthew Holt
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as published
|
|
by the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
package timeline
|
|
|
|
import (
|
|
"errors"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
)
|
|
|
|
// Log is the main process log. All named logs should be derivatives of
|
|
// this logger. All log emissions should be sent through this logger or
|
|
// one of its derivatives.
|
|
var Log = newLogger()
|
|
|
|
// newLogger returns a logger that writes to websocketLogOutputs
|
|
// and the console, with JSON and console encoders, respectively.
|
|
// It is intended for setting up the main process logger during
|
|
// the program's init phase.
|
|
func newLogger() *zap.Logger {
|
|
websocketsSync := zapcore.AddSync(websocketLogOutputs)
|
|
|
|
websocketsOut := zapcore.Lock(websocketsSync)
|
|
consoleOut := zapcore.Lock(os.Stderr)
|
|
|
|
encCfg := zap.NewProductionEncoderConfig()
|
|
encCfg.EncodeTime = func(ts time.Time, encoder zapcore.PrimitiveArrayEncoder) {
|
|
encoder.AppendString(ts.UTC().Format("2006/01/02 15:04:05.000"))
|
|
}
|
|
encCfg.EncodeLevel = zapcore.CapitalColorLevelEncoder
|
|
consoleEncoder := zapcore.NewConsoleEncoder(encCfg)
|
|
jsonEncoder := zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig())
|
|
|
|
core := zapcore.NewTee(
|
|
zapcore.NewCore(consoleEncoder, consoleOut, zap.DebugLevel), // TODO: keep at debug? make this optional?
|
|
zapcore.NewCore(jsonEncoder, websocketsOut, zap.InfoLevel), // sent to web frontend / UI
|
|
)
|
|
|
|
// the embedded core avoids a firehose of logs, but we still need an unsampled core for UI updates and such, where every message is critical
|
|
// (the critical messages are defined in the Check() method of our Core type)
|
|
const sampledLogInterval, sampledLiveJobProgressInterval, sampledLiveJobProgressCount = 250 * time.Millisecond, 100 * time.Millisecond, 2
|
|
return zap.New(&customCore{
|
|
Core: zapcore.NewSamplerWithOptions(core, sampledLogInterval, 1, 0),
|
|
nonSamplingCore: core,
|
|
liveJobProgressCore: zapcore.NewSamplerWithOptions(core, sampledLiveJobProgressInterval, sampledLiveJobProgressCount, 0),
|
|
})
|
|
}
|
|
|
|
// multiConnWriter is like io.multiWriter from the standard lib,
|
|
// except this supports dynamically adding and removing writers
|
|
// and is specifically for WebSocket connections and Wails
|
|
// application events.
|
|
//
|
|
// This is a "best-effort" multi-writer. If there is an error writing
|
|
// to one conn, it does not abort and will continue to write to the
|
|
// other conns. Write errors are discarded, but write errors that are
|
|
// specifically closed connections will result in that connection
|
|
// being removed from the pool.
|
|
type multiConnWriter struct {
|
|
conns []*websocket.Conn
|
|
connsMu sync.RWMutex
|
|
}
|
|
|
|
func (mw *multiConnWriter) Write(p []byte) (n int, err error) {
|
|
mw.connsMu.RLock()
|
|
for _, w := range mw.conns {
|
|
err = w.WriteMessage(websocket.TextMessage, p)
|
|
// the handler that added this connection to the pool should
|
|
// have removed it when it was closed, but just in case we
|
|
// find out first that it was closed, we can remove it now
|
|
if errors.Is(err, websocket.ErrCloseSent) {
|
|
defer mw.RemoveConn(w)
|
|
}
|
|
}
|
|
mw.connsMu.RUnlock()
|
|
return len(p), err
|
|
}
|
|
|
|
// AddConn subscribes conn to writes.
|
|
func (mw *multiConnWriter) AddConn(conn *websocket.Conn) {
|
|
mw.connsMu.Lock()
|
|
mw.conns = append(mw.conns, conn)
|
|
mw.connsMu.Unlock()
|
|
}
|
|
|
|
// RemoveConn unsubscribes conn from writes, if it is subscribed.
|
|
func (mw *multiConnWriter) RemoveConn(conn *websocket.Conn) {
|
|
mw.connsMu.Lock()
|
|
for i, mww := range mw.conns {
|
|
if mww == conn {
|
|
mw.conns = append(mw.conns[:i], mw.conns[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
mw.connsMu.Unlock()
|
|
}
|
|
|
|
// websocketLogOutputs mediates the list of active
|
|
// websocket connections that are receiving process
|
|
// logs.
|
|
var websocketLogOutputs = new(multiConnWriter)
|
|
|
|
// AddLogConn subscribes conn to the log output. When
|
|
// the conn is closed, it should be removed with
|
|
// RemoveLogConn().
|
|
func AddLogConn(conn *websocket.Conn) {
|
|
websocketLogOutputs.AddConn(conn)
|
|
}
|
|
|
|
// RemoveLogConn removes conn from receiving logs.
|
|
// It is idempotent.
|
|
func RemoveLogConn(conn *websocket.Conn) {
|
|
websocketLogOutputs.RemoveConn(conn)
|
|
}
|
|
|
|
// customCore wraps another zapcore.Core and prevents sampling based on logger name.
|
|
type customCore struct {
|
|
zapcore.Core
|
|
nonSamplingCore zapcore.Core
|
|
liveJobProgressCore zapcore.Core
|
|
}
|
|
|
|
func (c *customCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
|
|
if ent.LoggerName == "job.status" {
|
|
// always allow through, no sampling -- otherwise UI gets out of sync
|
|
return ce.AddCore(ent, c.nonSamplingCore)
|
|
}
|
|
if ent.LoggerName == "job.action" && (ent.Message == "finished graph" || ent.Message == "finished thumbnail") {
|
|
return c.liveJobProgressCore.Check(ent, ce)
|
|
}
|
|
return c.Core.Check(ent, ce)
|
|
}
|
|
|
|
// With is a promotion of the embedded Core.With() method so that we can ensure
|
|
// derivative loggers are our type, not the embedded type, to preserve our other
|
|
// promoted methods like Check()...
|
|
func (c *customCore) With(fields []zapcore.Field) zapcore.Core {
|
|
return &customCore{
|
|
Core: c.Core.With(fields),
|
|
nonSamplingCore: c.nonSamplingCore.With(fields),
|
|
liveJobProgressCore: c.liveJobProgressCore.With(fields),
|
|
}
|
|
}
|