From 2e081e3641a56bdb4ae7313e06380543ffba90df Mon Sep 17 00:00:00 2001 From: Bnyro Date: Fri, 21 Jul 2023 18:03:29 +0200 Subject: [PATCH 1/6] fetch instance details asynchronously --- main.go | 243 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 126 insertions(+), 117 deletions(-) diff --git a/main.go b/main.go index 5608e69..43c08a2 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,8 @@ package main import ( "context" "encoding/json" - "github.com/gofiber/fiber/v2" + "errors" + "fmt" "io" "log" "net/http" @@ -11,8 +12,11 @@ import ( "regexp" "strconv" "strings" + "sync" "time" + "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/etag" "github.com/google/go-github/v53/github" @@ -40,6 +44,110 @@ type FrontendConfig struct { S3Enabled bool `json:"s3Enabled"` } +func getInstanceDetails(line string, latest string) (Instance, error) { + split := strings.Split(line, "|") + if len(split) >= 5 { + ApiUrl := strings.TrimSpace(split[1]) + + resp, err := http.Get(ApiUrl + "/healthcheck") + if err != nil { + return Instance{}, err + } + LastChecked := time.Now().Unix() + if resp.StatusCode != 200 { + return Instance{}, errors.New("Invalid response code") + } + resp, err = http.Get(ApiUrl + "/registered/badge") + if err != nil { + return Instance{}, err + } + registered, err := strconv.ParseInt(number_re.FindString(resp.Request.URL.Path), 10, 32) + if err != nil { + return Instance{}, err + } + resp, err = http.Get(ApiUrl + "/version") + if err != nil { + return Instance{}, err + } + if resp.StatusCode != 200 { + return Instance{}, errors.New("Invalid response code") + } + buf := new(strings.Builder) + _, err = io.Copy(buf, resp.Body) + if err != nil { + return Instance{}, err + } + version := strings.TrimSpace(buf.String()) + version_split := strings.Split(version, "-") + hash := version_split[len(version_split)-1] + + resp, err = http.Get(ApiUrl + "/config") + if err != nil { + return Instance{}, err + } + if resp.StatusCode != 200 { + return Instance{}, errors.New("Invalid response code") + } + + bytes, err := io.ReadAll(resp.Body) + + if err != nil { + return Instance{}, err + } + + var config FrontendConfig + + err = json.Unmarshal(bytes, &config) + + if err != nil { + return Instance{}, err + } + + cache_working := false + + resp, err = http.Get(ApiUrl + "/trending?region=US") + if err != nil { + return Instance{}, err + } + if resp.StatusCode == 200 { + old_timing := resp.Header.Get("Server-Timing") + resp, err = http.Get(ApiUrl + "/trending?region=US") + if err != nil { + return Instance{}, err + } + if resp.StatusCode == 200 { + new_timing := resp.Header.Get("Server-Timing") + if old_timing == new_timing { + cache_working = true + } + } + } + + // check if instance can fetch videos + resp, err = http.Get(ApiUrl + "/streams/jNQXAC9IVRw") + if err != nil { + return Instance{}, err + } + if resp.StatusCode != 200 { + return Instance{}, errors.New("Invalid status code") + } + + return Instance{ + Name: strings.TrimSpace(split[0]), + ApiUrl: ApiUrl, + Locations: strings.TrimSpace(split[2]), + Cdn: strings.TrimSpace(split[3]) == "Yes", + Registered: int(registered), + LastChecked: LastChecked, + Version: version, + UpToDate: strings.Contains(latest, hash), + Cache: cache_working, + S3Enabled: config.S3Enabled, + }, nil + } + return Instance{}, errors.New("Invalid line") +} + func monitorInstances() { ctx := context.Background() var tc *http.Client @@ -90,124 +198,25 @@ func monitorInstances() { instances := []Instance{} - skipped := 0 - for _, line := range lines { - split := strings.Split(line, "|") - if len(split) >= 5 { - if skipped < 2 { - skipped++ - continue - } - ApiUrl := strings.TrimSpace(split[1]) - resp, err := http.Get(ApiUrl + "/healthcheck") - if err != nil { - log.Print(err) - continue - } - LastChecked := time.Now().Unix() - if resp.StatusCode != 200 { - continue - } - resp, err = http.Get(ApiUrl + "/registered/badge") - if err != nil { - log.Print(err) - continue - } - registered, err := strconv.ParseInt(number_re.FindString(resp.Request.URL.Path), 10, 32) - if err != nil { - log.Print(err) - continue - } - resp, err = http.Get(ApiUrl + "/version") - if err != nil { - log.Print(err) - continue - } - if resp.StatusCode != 200 { - continue - } - buf := new(strings.Builder) - _, err = io.Copy(buf, resp.Body) - if err != nil { - log.Print(err) - continue - } - version := strings.TrimSpace(buf.String()) - version_split := strings.Split(version, "-") - hash := version_split[len(version_split)-1] - - resp, err = http.Get(ApiUrl + "/config") - if err != nil { - log.Print(err) - continue - } - if resp.StatusCode != 200 { - continue - } - - bytes, err := io.ReadAll(resp.Body) - - if err != nil { - log.Print(err) - continue - } - - var config FrontendConfig - - err = json.Unmarshal(bytes, &config) - - if err != nil { - log.Print(err) - continue - } - - cache_working := false - - resp, err = http.Get(ApiUrl + "/trending?region=US") - if err != nil { - log.Print(err) - continue - } - if resp.StatusCode == 200 { - old_timing := resp.Header.Get("Server-Timing") - resp, err = http.Get(ApiUrl + "/trending?region=US") - if err != nil { - log.Print(err) - continue - } - if resp.StatusCode == 200 { - new_timing := resp.Header.Get("Server-Timing") - if old_timing == new_timing { - cache_working = true - } - } - } - - // check if instance can fetch videos - resp, err = http.Get(ApiUrl + "/streams/jNQXAC9IVRw") - if err != nil { - log.Print(err) - continue - } - if resp.StatusCode != 200 { - continue - } - - instances = append(instances, Instance{ - Name: strings.TrimSpace(split[0]), - ApiUrl: ApiUrl, - Locations: strings.TrimSpace(split[2]), - Cdn: strings.TrimSpace(split[3]) == "Yes", - Registered: int(registered), - LastChecked: LastChecked, - Version: version, - UpToDate: strings.Contains(latest, hash), - Cache: cache_working, - S3Enabled: config.S3Enabled, - }) - + wg := sync.WaitGroup{} + for index, line := range lines { + if index < 3 { + fmt.Println(line) + continue } + + wg.Add(1) + go func(line string) { + instance, err := getInstanceDetails(line, latest) + if err == nil { + instances = append(instances, instance) + } else { + log.Print(err) + } + wg.Done() + }(line) } + wg.Wait() // update the global instances variable monitored_instances = instances From c369f2b7bcff8a3cdfbb32ddff407f2cee76ad8c Mon Sep 17 00:00:00 2001 From: Bnyro Date: Fri, 21 Jul 2023 23:49:00 +0200 Subject: [PATCH 2/6] parallize checks per instance using go routines --- main.go | 209 ++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 128 insertions(+), 81 deletions(-) diff --git a/main.go b/main.go index 43c08a2..cafbd2c 100644 --- a/main.go +++ b/main.go @@ -44,108 +44,155 @@ type FrontendConfig struct { S3Enabled bool `json:"s3Enabled"` } +func testUrl(url string) (*http.Response, error) { + resp, err := http.Get(url) + if err != nil { + return resp, err + } + if resp.StatusCode != 200 { + return resp, errors.New(fmt.Sprintf("Invalid response code at %s: %d", url, resp.StatusCode)) + } + return resp, err +} + +func testCaching(ApiUrl string) (bool, error) { + resp, err := testUrl(ApiUrl + "/trending?region=US") + if err != nil { + return false, err + } + oldTiming := resp.Header.Get("Server-Timing") + resp, err = testUrl(ApiUrl + "/trending?region=US") + if err != nil { + return false, err + } + newTiming := resp.Header.Get("Server-Timing") + cacheWorking := oldTiming == newTiming + return cacheWorking, nil +} + +func getConfig(ApiUrl string) (FrontendConfig, error) { + resp, err := testUrl(ApiUrl + "/config") + if err != nil { + return FrontendConfig{}, err + } + bytes, err := io.ReadAll(resp.Body) + if err != nil { + return FrontendConfig{}, err + } + var config FrontendConfig + err = json.Unmarshal(bytes, &config) + if err != nil { + return FrontendConfig{}, err + } + return config, nil +} + func getInstanceDetails(line string, latest string) (Instance, error) { split := strings.Split(line, "|") - if len(split) >= 5 { - ApiUrl := strings.TrimSpace(split[1]) + if len(split) < 5 { + return Instance{}, errors.New(fmt.Sprintf("Invalid line: %s", line)) + } + ApiUrl := strings.TrimSpace(split[1]) - resp, err := http.Get(ApiUrl + "/healthcheck") + wg := sync.WaitGroup{} + errorChannel := make(chan error) + // the amount of tests to do + wg.Add(6) + + var lastChecked int64 + var registered int64 + var config FrontendConfig + var hash string + var version string + var cacheWorking bool + + go func() { + wg.Wait() + close(errorChannel) + }() + + go func() { + defer wg.Done() + if _, err := testUrl(ApiUrl + "/healthcheck"); err != nil { + errorChannel <- err + return + } + lastChecked = time.Now().Unix() + }() + + go func() { + defer wg.Done() + resp, err := testUrl(ApiUrl + "/registered/badge") if err != nil { - return Instance{}, err + errorChannel <- err + return } - LastChecked := time.Now().Unix() - if resp.StatusCode != 200 { - return Instance{}, errors.New("Invalid response code") - } - resp, err = http.Get(ApiUrl + "/registered/badge") + registered, err = strconv.ParseInt(number_re.FindString(resp.Request.URL.Path), 10, 32) if err != nil { - return Instance{}, err + errorChannel <- err } - registered, err := strconv.ParseInt(number_re.FindString(resp.Request.URL.Path), 10, 32) + }() + + go func() { + defer wg.Done() + resp, err := testUrl(ApiUrl + "/version") if err != nil { - return Instance{}, err - } - resp, err = http.Get(ApiUrl + "/version") - if err != nil { - return Instance{}, err - } - if resp.StatusCode != 200 { - return Instance{}, errors.New("Invalid response code") + errorChannel <- err + return } buf := new(strings.Builder) _, err = io.Copy(buf, resp.Body) if err != nil { - return Instance{}, err + errorChannel <- err + return } - version := strings.TrimSpace(buf.String()) + version = strings.TrimSpace(buf.String()) version_split := strings.Split(version, "-") - hash := version_split[len(version_split)-1] + hash = version_split[len(version_split)-1] + }() - resp, err = http.Get(ApiUrl + "/config") + go func() { + defer wg.Done() + var err error + config, err = getConfig(ApiUrl) if err != nil { - return Instance{}, err - } - if resp.StatusCode != 200 { - return Instance{}, errors.New("Invalid response code") + errorChannel <- err } + }() - bytes, err := io.ReadAll(resp.Body) - + go func() { + defer wg.Done() + var err error + cacheWorking, err = testCaching(ApiUrl) if err != nil { - return Instance{}, err - } - - var config FrontendConfig - - err = json.Unmarshal(bytes, &config) - - if err != nil { - return Instance{}, err - } - - cache_working := false - - resp, err = http.Get(ApiUrl + "/trending?region=US") - if err != nil { - return Instance{}, err - } - if resp.StatusCode == 200 { - old_timing := resp.Header.Get("Server-Timing") - resp, err = http.Get(ApiUrl + "/trending?region=US") - if err != nil { - return Instance{}, err - } - if resp.StatusCode == 200 { - new_timing := resp.Header.Get("Server-Timing") - if old_timing == new_timing { - cache_working = true - } - } + errorChannel <- err } + }() + go func() { + defer wg.Done() // check if instance can fetch videos - resp, err = http.Get(ApiUrl + "/streams/jNQXAC9IVRw") - if err != nil { - return Instance{}, err - } - if resp.StatusCode != 200 { - return Instance{}, errors.New("Invalid status code") + if _, err := testUrl(ApiUrl + "/streams/jNQXAC9IVRw"); err != nil { + errorChannel <- err } + }() - return Instance{ - Name: strings.TrimSpace(split[0]), - ApiUrl: ApiUrl, - Locations: strings.TrimSpace(split[2]), - Cdn: strings.TrimSpace(split[3]) == "Yes", - Registered: int(registered), - LastChecked: LastChecked, - Version: version, - UpToDate: strings.Contains(latest, hash), - Cache: cache_working, - S3Enabled: config.S3Enabled, - }, nil + for err := range errorChannel { + return Instance{}, err } - return Instance{}, errors.New("Invalid line") + + return Instance{ + Name: strings.TrimSpace(split[0]), + ApiUrl: ApiUrl, + Locations: strings.TrimSpace(split[2]), + Cdn: strings.TrimSpace(split[3]) == "Yes", + Registered: int(registered), + LastChecked: lastChecked, + Version: version, + UpToDate: strings.Contains(latest, hash), + Cache: cacheWorking, + S3Enabled: config.S3Enabled, + }, nil } func monitorInstances() { @@ -184,7 +231,6 @@ func monitorInstances() { } if resp.StatusCode == 200 { - // parse the response buf := new(strings.Builder) _, err := io.Copy(buf, resp.Body) @@ -200,20 +246,20 @@ func monitorInstances() { wg := sync.WaitGroup{} for index, line := range lines { - if index < 3 { - fmt.Println(line) + // skip first two and last line + if index < 2 || index == len(lines)-1 { continue } wg.Add(1) go func(line string) { + defer wg.Done() instance, err := getInstanceDetails(line, latest) if err == nil { instances = append(instances, instance) } else { log.Print(err) } - wg.Done() }(line) } wg.Wait() @@ -237,5 +283,6 @@ func main() { return c.JSON(monitored_instances) }) + fmt.Println("Listening on http://localhost:3000") app.Listen(":3000") } From 4ba1d2dfb423ca6a03b58eca373346910627da5c Mon Sep 17 00:00:00 2001 From: Kavin <20838718+FireMasterK@users.noreply.github.com> Date: Wed, 26 Jul 2023 12:17:18 +0100 Subject: [PATCH 3/6] Skip table lines properly. --- main.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/main.go b/main.go index cafbd2c..7e9f85f 100644 --- a/main.go +++ b/main.go @@ -87,11 +87,7 @@ func getConfig(ApiUrl string) (FrontendConfig, error) { return config, nil } -func getInstanceDetails(line string, latest string) (Instance, error) { - split := strings.Split(line, "|") - if len(split) < 5 { - return Instance{}, errors.New(fmt.Sprintf("Invalid line: %s", line)) - } +func getInstanceDetails(split []string, latest string) (Instance, error) { ApiUrl := strings.TrimSpace(split[1]) wg := sync.WaitGroup{} @@ -245,22 +241,31 @@ func monitorInstances() { instances := []Instance{} wg := sync.WaitGroup{} - for index, line := range lines { - // skip first two and last line - if index < 2 || index == len(lines)-1 { + + skipped := 0 + for _, line := range lines { + split := strings.Split(line, "|") + + if len(split) < 5 { + continue + } + + // skip first two table lines + if skipped < 2 { + skipped++ continue } wg.Add(1) - go func(line string) { + go func(split []string) { defer wg.Done() - instance, err := getInstanceDetails(line, latest) + instance, err := getInstanceDetails(split, latest) if err == nil { instances = append(instances, instance) } else { log.Print(err) } - }(line) + }(split) } wg.Wait() From e8d2b2b267a04c0fed9ec5be893e9f1c607d8beb Mon Sep 17 00:00:00 2001 From: Kavin <20838718+FireMasterK@users.noreply.github.com> Date: Wed, 26 Jul 2023 12:22:46 +0100 Subject: [PATCH 4/6] Remove listening on message as Gofiber has its own --- main.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index 7e9f85f..7ff4140 100644 --- a/main.go +++ b/main.go @@ -238,7 +238,7 @@ func monitorInstances() { lines := strings.Split(buf.String(), "\n") - instances := []Instance{} + var instances []Instance wg := sync.WaitGroup{} @@ -288,6 +288,8 @@ func main() { return c.JSON(monitored_instances) }) - fmt.Println("Listening on http://localhost:3000") - app.Listen(":3000") + err := app.Listen(":3000") + if err != nil { + panic(err) + } } From 20e53507367f178059b77e14247a96f8219e08ec Mon Sep 17 00:00:00 2001 From: Kavin <20838718+FireMasterK@users.noreply.github.com> Date: Wed, 26 Jul 2023 12:46:28 +0100 Subject: [PATCH 5/6] Store checked instances in map to preserve order. --- main.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index 7ff4140..d9bfe4c 100644 --- a/main.go +++ b/main.go @@ -238,11 +238,12 @@ func monitorInstances() { lines := strings.Split(buf.String(), "\n") - var instances []Instance + instancesMap := make(map[int]Instance) wg := sync.WaitGroup{} skipped := 0 + checking := 0 for _, line := range lines { split := strings.Split(line, "|") @@ -257,18 +258,28 @@ func monitorInstances() { } wg.Add(1) - go func(split []string) { + go func(i int, split []string) { defer wg.Done() instance, err := getInstanceDetails(split, latest) if err == nil { - instances = append(instances, instance) + instancesMap[i] = instance } else { log.Print(err) } - }(split) + }(checking, split) + checking++ } wg.Wait() + // Map to ordered array + var instances []Instance + for i := 0; i < checking; i++ { + instance, ok := instancesMap[i] + if ok { + instances = append(instances, instance) + } + } + // update the global instances variable monitored_instances = instances } From 9e4c9e9d8c85ac8fa76d7a5f054b8c41be5e01d8 Mon Sep 17 00:00:00 2001 From: Kavin <20838718+FireMasterK@users.noreply.github.com> Date: Wed, 26 Jul 2023 21:31:31 +0100 Subject: [PATCH 6/6] Set a timeout for requests. --- main.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index d9bfe4c..793c5b9 100644 --- a/main.go +++ b/main.go @@ -44,8 +44,12 @@ type FrontendConfig struct { S3Enabled bool `json:"s3Enabled"` } +var client = http.Client{ + Timeout: 10 * time.Second, +} + func testUrl(url string) (*http.Response, error) { - resp, err := http.Get(url) + resp, err := client.Get(url) if err != nil { return resp, err }