552 lines
16 KiB
Go
552 lines
16 KiB
Go
/*
|
|
Timelinize
|
|
Copyright (c) 2013 Matthew Holt
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as published
|
|
by the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
// Package tlzapp provides the application functionality for timelines, including
|
|
// file utilities, the HTTP server and APIs, the registration of endpoints for the
|
|
// CLI, signals, etc.
|
|
package tlzapp
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"embed"
|
|
"encoding/json"
|
|
"errors"
|
|
"expvar"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"net"
|
|
"net/http"
|
|
"net/http/pprof"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/timelinize/timelinize/timeline"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type App struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc // shuts down the app
|
|
|
|
cfg *Config
|
|
log *zap.Logger
|
|
|
|
commands map[string]Endpoint
|
|
|
|
server server
|
|
|
|
pyServer *exec.Cmd
|
|
pyServerMu *sync.Mutex
|
|
|
|
// references to embedded assets... due to limitations
|
|
// in the go embed tool, the vars have to be in a parent
|
|
// directory of what is being embedded, so we pass in
|
|
// reference to it for the app, since this package isn't
|
|
// in the root of the project, and also to pass along to
|
|
// a new app if the config is changed and the app is
|
|
// restarted
|
|
embeddedWebsite fs.FS
|
|
}
|
|
|
|
func New(ctx context.Context, cfg *Config, embeddedWebsite fs.FS) (*App, error) {
|
|
cfg.fillDefaults()
|
|
|
|
var frontend fs.FS
|
|
if cfg.WebsiteDir == "" {
|
|
// embedded file systems have a top level folder that is annoying for us
|
|
// because it means all requests for these static resources need to be
|
|
// prefixed by the dir name, as if that's relevant!? anyway, strip it.
|
|
topLevelDir := "."
|
|
entries, err := fs.ReadDir(embeddedWebsite, ".")
|
|
if err == nil && len(entries) == 1 {
|
|
topLevelDir = entries[0].Name()
|
|
}
|
|
frontend, err = fs.Sub(embeddedWebsite, topLevelDir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not strip top level folder from embedded website FS: %w", err)
|
|
}
|
|
} else {
|
|
frontend = os.DirFS(cfg.WebsiteDir)
|
|
}
|
|
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithCancel(ctx)
|
|
|
|
newApp := &App{
|
|
ctx: ctx,
|
|
cfg: cfg,
|
|
log: timeline.Log,
|
|
embeddedWebsite: embeddedWebsite,
|
|
pyServerMu: new(sync.Mutex),
|
|
}
|
|
newApp.server = server{
|
|
app: newApp,
|
|
log: newApp.log.Named("http"),
|
|
frontend: frontend,
|
|
}
|
|
newApp.cancel = func() {
|
|
// cancel the context, so anything relying on it knows to terminate
|
|
cancel()
|
|
|
|
// close all open timelines (TODO: Maybe they should be open on the app, not global in the timeline package?)
|
|
shutdownTimelines()
|
|
|
|
newApp.pyServerMu.Lock()
|
|
defer newApp.pyServerMu.Unlock()
|
|
|
|
// stop python server (will wait for it below)
|
|
if newApp.pyServer != nil && newApp.pyServer.Process != nil {
|
|
if err := newApp.pyServer.Process.Kill(); err != nil {
|
|
newApp.log.Error("could not terminate Python server", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// gracefully close the HTTP server (let existing requests finish within a timeout)
|
|
if newApp.server.httpServer != nil {
|
|
// use a different context since the one we have has been canceled
|
|
const shutdownTimeout = 10 * time.Second
|
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
|
|
defer shutdownCancel()
|
|
_ = newApp.server.httpServer.Shutdown(shutdownCtx)
|
|
}
|
|
|
|
// finish waiting for python server to exit
|
|
if newApp.pyServer != nil && newApp.pyServer.Process != nil {
|
|
if state, err := newApp.pyServer.Process.Wait(); err != nil {
|
|
newApp.log.Error("Python server", zap.Error(err), zap.String("state", state.String()))
|
|
}
|
|
}
|
|
|
|
newApp.pyServer = nil
|
|
}
|
|
newApp.registerCommands()
|
|
|
|
appMu.Lock()
|
|
app = newApp
|
|
appMu.Unlock()
|
|
|
|
return newApp, nil
|
|
}
|
|
|
|
func (app *App) Shutdown() { app.cancel() }
|
|
|
|
func (app *App) RunCommand(ctx context.Context, args []string) error {
|
|
if len(args) == 0 {
|
|
return errors.New("no command specified")
|
|
}
|
|
|
|
commandName := args[0]
|
|
|
|
endpoint, ok := app.commands[commandName]
|
|
if !ok {
|
|
return fmt.Errorf("unrecognized command: %s", commandName)
|
|
}
|
|
|
|
// make request body
|
|
var body io.Reader
|
|
switch endpoint.GetContentType() {
|
|
case Form:
|
|
bodyStr := makeForm(args[1:])
|
|
body = strings.NewReader(bodyStr)
|
|
case JSON:
|
|
bodyBytes, err := makeJSON(args[1:])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(bodyBytes) > 0 {
|
|
body = bytes.NewReader(bodyBytes)
|
|
}
|
|
case None:
|
|
}
|
|
|
|
url := "http://" + app.cfg.listenAddr() + apiBasePath + commandName
|
|
|
|
req, err := http.NewRequestWithContext(ctx, endpoint.Method, url, body)
|
|
if err != nil {
|
|
return fmt.Errorf("building request: %w", err)
|
|
}
|
|
req.Header.Set("Content-Type", string(endpoint.GetContentType()))
|
|
req.Header.Set("Origin", req.URL.Scheme+"://"+req.URL.Host)
|
|
|
|
// execute the command; if the server is running in another
|
|
// process already, send the request to it; otherwise send
|
|
// a virtual request directly to the HTTP handler function
|
|
var resp *http.Response
|
|
if app.serverRunning() {
|
|
httpClient := &http.Client{Timeout: 1 * time.Minute}
|
|
resp, err = httpClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("running command on server: %w", err)
|
|
}
|
|
} else {
|
|
if err := app.openRepos(); err != nil {
|
|
return fmt.Errorf("opening repos from last time: %w", err)
|
|
}
|
|
vrw := &virtualResponseWriter{body: new(bytes.Buffer), header: make(http.Header)}
|
|
err := endpoint.ServeHTTP(vrw, req)
|
|
if err != nil {
|
|
return fmt.Errorf("running command: %w", err)
|
|
}
|
|
resp = &http.Response{
|
|
StatusCode: vrw.status,
|
|
Header: vrw.header,
|
|
Body: io.NopCloser(vrw.body),
|
|
ContentLength: int64(vrw.body.Len()),
|
|
}
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// print out the response
|
|
if strings.Contains(resp.Header.Get("Content-Type"), "json") {
|
|
// to pretty-print the JSON, we just decode it
|
|
// and then re-encode it ¯\_(ツ)_/¯
|
|
var js interface{}
|
|
err := json.NewDecoder(resp.Body).Decode(&js)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if js == nil {
|
|
return nil
|
|
}
|
|
enc := json.NewEncoder(os.Stdout)
|
|
enc.SetIndent("", "\t")
|
|
err = enc.Encode(js)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
_, _ = io.Copy(os.Stdout, resp.Body)
|
|
}
|
|
|
|
if resp.StatusCode >= lowestErrorStatus {
|
|
return fmt.Errorf("server returned error: HTTP %d %s",
|
|
resp.StatusCode, http.StatusText(resp.StatusCode))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Serve serves the application server only if it is not already running
|
|
// (possibly in another process). It returns true if it started the
|
|
// application server, or false if it was already running.
|
|
func (app *App) Serve() (bool, error) {
|
|
if app.serverRunning() {
|
|
return false, nil
|
|
}
|
|
return true, app.serve()
|
|
}
|
|
|
|
func (app *App) MustServe() error {
|
|
return app.serve()
|
|
}
|
|
|
|
func (app *App) startPythonServer(host string, port int) error {
|
|
// only start the python server if it isn't already running
|
|
app.pyServerMu.Lock()
|
|
serverRunning := app.pyServer != nil
|
|
app.pyServerMu.Unlock()
|
|
if serverRunning {
|
|
app.log.Debug("skipping starting python server since it is already non-nil, so should be running")
|
|
return nil
|
|
}
|
|
|
|
// nothing to do if uv isn't installed
|
|
if _, err := exec.LookPath("uv"); err != nil {
|
|
app.log.Warn("uv is not installed in PATH; semantic features will be unavailable (to fix: install uv into PATH, then restart app)", zap.Error(err))
|
|
return nil
|
|
}
|
|
|
|
// the subfolder in the embedded directory wherein the Python project lives;
|
|
// the contents of this folder are dumped to disk, and may replace what is
|
|
// already in that folder on disk (to accommodate changes/updates); the
|
|
// virtual env must be in a different folder (via UV_PROJECT_ENVIRONMENT)
|
|
const scriptSubfolder = "server"
|
|
|
|
// get the name of the embedded folder
|
|
entries, err := fs.ReadDir(embeddedPython, ".")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(entries) != 1 || !entries[0].IsDir() {
|
|
return errors.New("embedded python assets have unexpected structure; semantic features will be unavailable")
|
|
}
|
|
embedFolderName := entries[0].Name()
|
|
|
|
// embedded assets need to be written to disk so that the python environment
|
|
// can operate: it needs to know the uv project manifest, the dependencies,
|
|
// and a place to put the virtualenv (venv). By default, uv would put the
|
|
// .venv folder in the project directory, but that would make updating the
|
|
// scripts tricky, since we couldn't just delete the whole thing and re-write
|
|
// the files (we don't want to delete the venv, it's usually a big cache); so
|
|
// we need to put the files we embed into a separate subfolder from the venv;
|
|
// we can do this easily with an environment variable to relocate the venv,
|
|
// and have the scripts/project in their own subfolder, in the app data dir.
|
|
dataDir := AppDataDir()
|
|
projectPath := filepath.Join(dataDir, embedFolderName, scriptSubfolder)
|
|
venvPath := filepath.Join(dataDir, embedFolderName, "venv")
|
|
|
|
// clear out old project/script files (this ensures they stay current)
|
|
// then write the embedded files in their place
|
|
if err = os.RemoveAll(projectPath); err != nil {
|
|
return nil
|
|
}
|
|
if err := os.CopyFS(dataDir, embeddedPython); err != nil {
|
|
return err
|
|
}
|
|
|
|
app.log.Info("starting python server to enable semantic features", zap.String("dir", projectPath))
|
|
|
|
// run the python server such that the venv is relocated to its own
|
|
// folder outside the project folder (but still in the app data dir)
|
|
//nolint:gosec
|
|
cmd := exec.CommandContext(app.ctx,
|
|
"uv", "run", "server.py",
|
|
"--host", host,
|
|
"--port", strconv.Itoa(port))
|
|
cmd.Dir = projectPath
|
|
cmd.Stdout = os.Stdout
|
|
cmd.Stderr = os.Stderr
|
|
cmd.Env = append(os.Environ(), "UV_PROJECT_ENVIRONMENT="+venvPath)
|
|
app.pyServerMu.Lock()
|
|
app.pyServer = cmd
|
|
app.pyServerMu.Unlock()
|
|
return cmd.Start() // TODO: need to be sure this stops when our program stops
|
|
}
|
|
|
|
func (app *App) stopPythonServer() error {
|
|
app.pyServerMu.Lock()
|
|
defer app.pyServerMu.Unlock()
|
|
if app.pyServer == nil {
|
|
return nil
|
|
}
|
|
if app.pyServer != nil && app.pyServer.Process != nil {
|
|
return app.pyServer.Process.Kill()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (app *App) serve() error {
|
|
if err := app.openRepos(); err != nil {
|
|
return fmt.Errorf("opening previously-opened repositories: %w", err)
|
|
}
|
|
|
|
if app.server.adminLn != nil {
|
|
return fmt.Errorf("server already running on %s", app.server.adminLn.Addr())
|
|
}
|
|
|
|
adminAddr := app.cfg.listenAddr()
|
|
app.server.fillAllowedOrigins(app.cfg.AllowedOrigins, adminAddr) // for CORS enforcement
|
|
|
|
ln, err := new(net.ListenConfig).Listen(app.ctx, "tcp", adminAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("opening listener: %w", err)
|
|
}
|
|
app.server.adminLn = ln
|
|
|
|
app.server.mux = http.NewServeMux()
|
|
|
|
addRoute := func(uriPath string, endpoint Endpoint) {
|
|
handler := app.server.enforceHost(endpoint) // simple DNS rebinding mitigation
|
|
handler = app.server.enforceOriginAndMethod(endpoint.Method, handler) // simple cross-origin mitigation
|
|
app.server.mux.Handle(uriPath, wrapErrorHandler(handler))
|
|
}
|
|
|
|
// static file server
|
|
app.server.staticFiles = http.FileServer(http.FS(app.server.frontend))
|
|
addRoute("/", Endpoint{
|
|
Method: http.MethodGet,
|
|
Handler: app.server.serveFrontend,
|
|
})
|
|
|
|
// API endpoints
|
|
for command, endpoint := range app.commands {
|
|
addRoute(apiBasePath+command, endpoint)
|
|
}
|
|
|
|
// debug endpoints
|
|
addRoute("/debug/pprof/", Endpoint{
|
|
Method: http.MethodGet,
|
|
Handler: httpWrap(http.HandlerFunc(pprof.Index)),
|
|
})
|
|
addRoute("/debug/pprof/cmdline", Endpoint{
|
|
Method: http.MethodGet,
|
|
Handler: httpWrap(http.HandlerFunc(pprof.Cmdline)),
|
|
})
|
|
addRoute("/debug/pprof/profile", Endpoint{
|
|
Method: http.MethodGet,
|
|
Handler: httpWrap(http.HandlerFunc(pprof.Profile)),
|
|
})
|
|
addRoute("/debug/pprof/symbol", Endpoint{
|
|
Method: http.MethodGet,
|
|
Handler: httpWrap(http.HandlerFunc(pprof.Symbol)),
|
|
})
|
|
addRoute("/debug/pprof/trace", Endpoint{
|
|
Method: http.MethodGet,
|
|
Handler: httpWrap(http.HandlerFunc(pprof.Trace)),
|
|
})
|
|
addRoute("/debug/vars", Endpoint{
|
|
Method: http.MethodGet,
|
|
Handler: httpWrap(expvar.Handler()),
|
|
})
|
|
|
|
// TODO: remote server (with TLS mutual auth)
|
|
|
|
allowedOrigins := make([]string, 0, len(app.server.allowedOrigins))
|
|
for _, u := range app.server.allowedOrigins {
|
|
allowedOrigins = append(allowedOrigins, u.String())
|
|
}
|
|
|
|
app.log.Info("started admin server",
|
|
zap.String("listener", ln.Addr().String()),
|
|
zap.Strings("allowed_origins", allowedOrigins))
|
|
|
|
app.server.httpServer = &http.Server{
|
|
Handler: app.server,
|
|
ReadHeaderTimeout: 5 * time.Second,
|
|
MaxHeaderBytes: 1024 * 128,
|
|
}
|
|
|
|
go func() {
|
|
err := app.server.httpServer.Serve(ln)
|
|
if errors.Is(err, net.ErrClosed) || errors.Is(err, http.ErrServerClosed) {
|
|
// normal; the listener or server was deliberately closed
|
|
app.log.Info("stopped server", zap.String("listener", ln.Addr().String()))
|
|
} else if err != nil {
|
|
app.log.Error("server failed", zap.String("listener", ln.Addr().String()), zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
// don't return until server is actually serving
|
|
|
|
// ensure we don't wait longer than a set amount of time
|
|
const maxWait = 30 * time.Second
|
|
var cancel context.CancelFunc
|
|
ctx, cancel := context.WithTimeout(app.ctx, maxWait)
|
|
defer cancel()
|
|
|
|
// set up HTTP client and request with short timeout and context cancellation
|
|
client := &http.Client{Timeout: 1 * time.Second}
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:12002", nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// since some operating systems sometimes do weird things with
|
|
// port reuse (*cough* Windows), poll until connection succeeds
|
|
for {
|
|
resp, err := client.Do(req)
|
|
if err == nil {
|
|
resp.Body.Close()
|
|
if resp.Header.Get("Server") == "Timelinize" {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
const interval = 500 * time.Millisecond
|
|
timer := time.NewTimer(interval)
|
|
select {
|
|
case <-timer.C:
|
|
case <-ctx.Done():
|
|
if !timer.Stop() {
|
|
<-timer.C
|
|
}
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (app *App) serverRunning() bool {
|
|
// TODO: get URL from config?
|
|
req, err := http.NewRequestWithContext(app.ctx, http.MethodGet, "http://localhost:12002", nil)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
client := &http.Client{Timeout: time.Second}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
resp.Body.Close()
|
|
return resp.Header.Get("Server") == "Timelinize"
|
|
}
|
|
|
|
// openRepos opens all timeline repositories in the current configuration.
|
|
func (app *App) openRepos() error {
|
|
// open designated timelines; copy pointer so we don't have to acquire lock on cfg and create deadlock with OpenRepository()
|
|
// TODO: use race detector to verify ^
|
|
lastOpenedRepos := app.cfg.Repositories
|
|
for i, repoDir := range lastOpenedRepos {
|
|
_, err := app.openRepository(app.ctx, repoDir, false)
|
|
if err != nil {
|
|
app.log.Error(fmt.Sprintf("failed to open timeline %d of %d", i+1, len(app.cfg.Repositories)),
|
|
zap.Error(err),
|
|
zap.String("dir", repoDir))
|
|
}
|
|
}
|
|
|
|
// persist config so it can be used on restart
|
|
if err := app.cfg.Save(); err != nil {
|
|
return fmt.Errorf("persisting config file: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// virtualResponseWriter is used in virtualized HTTP requests
|
|
// where the handler is called directly rather than using a
|
|
// network.
|
|
type virtualResponseWriter struct {
|
|
status int
|
|
header http.Header
|
|
body *bytes.Buffer
|
|
}
|
|
|
|
func (vrw *virtualResponseWriter) Header() http.Header {
|
|
return vrw.header
|
|
}
|
|
|
|
func (vrw *virtualResponseWriter) WriteHeader(statusCode int) {
|
|
vrw.status = statusCode
|
|
}
|
|
|
|
func (vrw *virtualResponseWriter) Write(data []byte) (int, error) {
|
|
return vrw.body.Write(data)
|
|
}
|
|
|
|
// The app global instance is used mainly for properly
|
|
// shutting down after a signal is received.
|
|
var (
|
|
app *App
|
|
appMu sync.Mutex
|
|
)
|
|
|
|
const lowestErrorStatus = 400
|
|
|
|
const osWindows = "windows"
|
|
|
|
const defaultAdminAddr = "127.0.0.1:12002"
|
|
|
|
//go:embed python
|
|
var embeddedPython embed.FS
|