From 580053adbdc19ff2a59cdc62c3465d8a4c355300 Mon Sep 17 00:00:00 2001 From: Russ Magee Date: Sun, 5 Nov 2023 14:58:24 -0800 Subject: [PATCH] Connection keepalive/disconnect --- xs/xs.go | 7 +++-- xsd/xsd.go | 72 +++++++++++++++++++++++++++++++++++++++---- xsnet/consts.go | 4 ++- xsnet/net.go | 81 +++++++++++++++++++++++++++++++++++++------------ 4 files changed, 135 insertions(+), 29 deletions(-) diff --git a/xs/xs.go b/xs/xs.go index 7fbdbc3..9ff83f9 100755 --- a/xs/xs.go +++ b/xs/xs.go @@ -1059,13 +1059,16 @@ func main() { //nolint: funlen, gocyclo fmt.Fprintln(os.Stderr, rejectUserMsg()) rec.SetStatus(GeneralProtocolErr) } else { + // === Set up connection keepalive to server + conn.StartupKeepAlive() // goroutine, returns immediately + defer conn.ShutdownKeepAlive() + // === Set up chaffing to server conn.SetupChaff(chaffFreqMin, chaffFreqMax, chaffBytesMax) // enable client->server chaffing if chaffEnabled { // #gv:s/label=\"main\$2\"/label=\"deferCloseChaff\"/ // TODO:.gv:main:2:deferCloseChaff - conn.EnableChaff() // goroutine, returns immediately - defer conn.DisableChaff() + conn.StartupChaff() // goroutine, returns immediately defer conn.ShutdownChaff() } diff --git a/xsd/xsd.go b/xsd/xsd.go index da8df8b..1c1db30 100755 --- a/xsd/xsd.go +++ b/xsd/xsd.go @@ -17,11 +17,14 @@ import ( "fmt" "io" "log" + "net/http" "os" "os/exec" "os/signal" "os/user" "path" + "runtime" + "runtime/pprof" "strings" "sync" "syscall" @@ -44,6 +47,9 @@ var ( // Log - syslog output (with no -d) Log *logger.Writer + + cpuprofile string + memprofile string ) const ( @@ -115,10 +121,13 @@ func runClientToServerCopyAs(who, ttype string, conn *xsnet.Conn, fpath string, c.Stdout = os.Stdout c.Stderr = os.Stderr + // === Set up connection keepalive to client + conn.StartupKeepAlive() // goroutine, returns immediately + defer conn.ShutdownKeepAlive() + if chaffing { - conn.EnableChaff() + conn.StartupChaff() } - defer conn.DisableChaff() defer conn.ShutdownChaff() // Start the command (no pty) @@ -212,11 +221,14 @@ func runServerToClientCopyAs(who, ttype string, conn *xsnet.Conn, srcPath string c.Stderr = stdErrBuffer //c.Stderr = nil + // === Set up connection keepalive to client + conn.StartupKeepAlive() // goroutine, returns immediately + defer conn.ShutdownKeepAlive() + if chaffing { - conn.EnableChaff() + conn.StartupChaff() } //defer conn.Close() - defer conn.DisableChaff() defer conn.ShutdownChaff() // Start the command (no pty) @@ -352,12 +364,15 @@ func runShellAs(who, hname, ttype, cmd string, interactive bool, //nolint:funlen } }() + // === Set up connection keepalive to client + conn.StartupKeepAlive() // goroutine, returns immediately + defer conn.ShutdownKeepAlive() + if chaffing { - conn.EnableChaff() + conn.StartupChaff() } // #gv:s/label=\"runShellAs\$4\"/label=\"deferChaffShutdown\"/ defer func() { - conn.DisableChaff() conn.ShutdownChaff() }() @@ -545,6 +560,9 @@ func main() { //nolint:funlen,gocyclo H_SHA256 H_SHA512`) + flag.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to <`file`>") + flag.StringVar(&memprofile, "memprofile", "", "write memory profile to <`file`>") + flag.Parse() if vopt { @@ -559,6 +577,24 @@ func main() { //nolint:funlen,gocyclo } } + // === Profiling instrumentation + + if cpuprofile != "" { + f, err := os.Create(cpuprofile) + if err != nil { + log.Fatal("could not create CPU profile: ", err) + } + defer f.Close() + fmt.Println("StartCPUProfile()") + if err := pprof.StartCPUProfile(f); err != nil { + log.Fatal("could not start CPU profile: ", err) //nolint:gocritic + } else { + defer pprof.StopCPUProfile() + } + + go func() { http.ListenAndServe("localhost:6060", nil) }() //nolint:errcheck,gosec + } + // Enforce some sane min/max vals on chaff flags if chaffFreqMin < 2 { //nolint:gomnd chaffFreqMin = 2 @@ -611,6 +647,9 @@ func main() { //nolint:funlen,gocyclo syscall.Kill(0, syscall.SIGINT) //nolint:errcheck case "hangup": logger.LogNotice(fmt.Sprintf("[Got signal: %s - nop]", sig)) //nolint:errcheck + if cpuprofile != "" || memprofile != "" { + dumpProf() + } default: logger.LogNotice(fmt.Sprintf("[Got signal: %s - ignored]", sig)) //nolint:errcheck } @@ -632,6 +671,7 @@ func main() { //nolint:funlen,gocyclo // Wait for a connection. // Then check if client-proposed algs are allowed conn, err := l.Accept() + //logger.LogDebug(fmt.Sprintf("l.Accept()\n")) if err != nil { log.Printf("Accept() got error(%v), hanging up.\n", err) } else { @@ -864,3 +904,23 @@ func main() { //nolint:funlen,gocyclo } //endfor //logger.LogNotice(fmt.Sprintln("[Exiting]")) //nolint:errcheck } + +func dumpProf() { + if cpuprofile != "" { + pprof.StopCPUProfile() + } + + if memprofile != "" { + f, err := os.Create(memprofile) + if err != nil { + log.Fatal("could not create memory profile: ", err) + } + defer f.Close() + runtime.GC() // get up-to-date statistics + if err := pprof.WriteHeapProfile(f); err != nil { + log.Fatal("could not write memory profile: ", err) //nolint:gocritic + } + } + + //os.Exit(status) +} diff --git a/xsnet/consts.go b/xsnet/consts.go index 30833a2..a95691f 100644 --- a/xsnet/consts.go +++ b/xsnet/consts.go @@ -52,6 +52,7 @@ const ( CSEKEXAlgDenied // server rejected proposed KEX alg CSECipherAlgDenied // server rejected proposed Cipher alg CSEHMACAlgDenied // server rejected proposed HMAC alg + CSEConnDead // connection keepalives expired ) // Extended (>255 UNIX exit status) codes @@ -78,6 +79,7 @@ const ( CSOTunKeepAlive // client tunnel heartbeat CSOTunDisconn // server -> client: tunnel rport disconnected CSOTunHangup // client -> server: tunnel lport hung up + CSOKeepAlive // bidir keepalive packet to monitor main connection ) // TunEndpoint.tunCtl control values - used to control workers for client @@ -97,7 +99,7 @@ const ( // Channel status Op byte type (see CSONone, ... and CSENone, ...) type CSOType uint32 -//TODO: this should be small (max unfragmented packet size?) +// TODO: this should be small (max unfragmented packet size?) const MAX_PAYLOAD_LEN = 2*1024*1024*1024 - 1 // Session symmetric crypto algs diff --git a/xsnet/net.go b/xsnet/net.go index 3dde211..d59640b 100644 --- a/xsnet/net.go +++ b/xsnet/net.go @@ -64,7 +64,6 @@ type ( // see: https://en.wikipedia.org/wiki/chaff_(countermeasure) ChaffConfig struct { shutdown bool //set to inform chaffHelper to shut down - enabled bool msecsMin uint //msecs min interval msecsMax uint //msecs max interval szMax uint // max size in bytes @@ -87,8 +86,9 @@ type ( Rows uint16 Cols uint16 - chaff ChaffConfig - tuns *map[uint16](*TunEndpoint) + keepalive uint // if this reaches zero, conn is considered dead + chaff ChaffConfig + tuns *map[uint16](*TunEndpoint) closeStat *CSOType // close status (CSOExitStatus) r cipher.Stream //read cipherStream @@ -971,7 +971,7 @@ func Dial(protocol string, ipport string, extensions ...string) (hc Conn, err er // Close a hkex.Conn func (hc *Conn) Close() (err error) { - hc.DisableChaff() + hc.ShutdownChaff() s := make([]byte, 4) binary.BigEndian.PutUint32(s, uint32(*hc.closeStat)) log.Printf("** Writing closeStat %d at Close()\n", *hc.closeStat) @@ -1337,6 +1337,9 @@ func (hc Conn) Read(b []byte) (n int, err error) { case CSOChaff: // Throw away pkt if it's chaff (ie., caller to Read() won't see this data) log.Printf("[Chaff pkt, discarded (len %d)]\n", decryptN) + case CSOKeepAlive: + //log.Printf("[KeepAlive pkt, discarded (len %d)]\n", decryptN) + hc.ResetKeepAlive() case CSOTermSize: fmt.Sscanf(string(payloadBytes), "%d %d", &hc.Rows, &hc.Cols) log.Printf("[TermSize pkt: rows %v cols %v]\n", hc.Rows, hc.Cols) @@ -1568,18 +1571,12 @@ func (hc *Conn) WritePacket(b []byte, ctrlStatOp byte) (n int, err error) { return retN, err } -func (hc *Conn) EnableChaff() { +func (hc *Conn) StartupChaff() { hc.chaff.shutdown = false - hc.chaff.enabled = true log.Println("Chaffing ENABLED") hc.chaffHelper() } -func (hc *Conn) DisableChaff() { - hc.chaff.enabled = false - log.Println("Chaffing DISABLED") -} - func (hc *Conn) ShutdownChaff() { hc.chaff.shutdown = true log.Println("Chaffing SHUTDOWN") @@ -1594,9 +1591,10 @@ func (hc *Conn) SetupChaff(msecsMin uint, msecsMax uint, szMax uint) { // Helper routine to spawn a chaffing goroutine for each Conn func (hc *Conn) chaffHelper() { go func() { + var nextDuration int for { - var nextDuration int - if hc.chaff.enabled { + //logger.LogDebug(fmt.Sprintf("[chaffHelper Loop]\n")) + if !hc.chaff.shutdown { var bufTmp []byte bufTmp = make([]byte, rand.Intn(int(hc.chaff.szMax))) min := int(hc.chaff.msecsMin) @@ -1604,17 +1602,60 @@ func (hc *Conn) chaffHelper() { _, _ = rand.Read(bufTmp) _, err := hc.WritePacket(bufTmp, CSOChaff) if err != nil { - log.Println("[ *** error - chaffHelper quitting *** ]") - hc.chaff.enabled = false + log.Println("[ *** error - chaffHelper shutting down *** ]") + hc.chaff.shutdown = true break } - } - time.Sleep(time.Duration(nextDuration) * time.Millisecond) - if hc.chaff.shutdown { - log.Println("*** chaffHelper shutting down") + } else { + log.Println("[ *** chaffHelper shutting down *** ]") + break + } + time.Sleep(time.Duration(nextDuration) * time.Millisecond) + } + }() +} + +func (hc *Conn) StartupKeepAlive() { + hc.ResetKeepAlive() + log.Println("KeepAlive ENABLED") + hc.keepaliveHelper() +} + +func (hc *Conn) ShutdownKeepAlive() { + log.Println("Conn SHUTDOWN") + hc.SetStatus(CSEConnDead) + hc.Close() +} + +func (hc *Conn) ResetKeepAlive() { + hc.keepalive = 3 + log.Println("KeepAlive RESET") +} + +// Helper routine to spawn a keepalive goroutine for each Conn +func (hc *Conn) keepaliveHelper() { + go func() { + for { + nextDuration := 10000 + bufTmp := []byte{0x55, 0xaa} + _, err := hc.WritePacket(bufTmp, CSOKeepAlive) + //logger.LogDebug(fmt.Sprintf("[keepalive]\n")) + if err != nil { + logger.LogDebug(fmt.Sprintf("[ *** error - keepaliveHelper quitting *** ]\n")) + break + } + time.Sleep(time.Duration(nextDuration) * time.Millisecond) + hc.keepalive -= 1 + + if rand.Intn(32) == 0 { + hc.keepalive = 0 + } + + if hc.keepalive == 0 { + logger.LogDebug(fmt.Sprintf("*** keepaliveHelper shutting down\n")) + hc.ShutdownKeepAlive() break } - } }() }