diff --git a/main.go b/main.go index 5608e69..793c5b9 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,8 @@ package main import ( "context" "encoding/json" - "github.com/gofiber/fiber/v2" + "errors" + "fmt" "io" "log" "net/http" @@ -11,8 +12,11 @@ import ( "regexp" "strconv" "strings" + "sync" "time" + "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/etag" "github.com/google/go-github/v53/github" @@ -40,6 +44,157 @@ type FrontendConfig struct { S3Enabled bool `json:"s3Enabled"` } +var client = http.Client{ + Timeout: 10 * time.Second, +} + +func testUrl(url string) (*http.Response, error) { + resp, err := client.Get(url) + if err != nil { + return resp, err + } + if resp.StatusCode != 200 { + return resp, errors.New(fmt.Sprintf("Invalid response code at %s: %d", url, resp.StatusCode)) + } + return resp, err +} + +func testCaching(ApiUrl string) (bool, error) { + resp, err := testUrl(ApiUrl + "/trending?region=US") + if err != nil { + return false, err + } + oldTiming := resp.Header.Get("Server-Timing") + resp, err = testUrl(ApiUrl + "/trending?region=US") + if err != nil { + return false, err + } + newTiming := resp.Header.Get("Server-Timing") + cacheWorking := oldTiming == newTiming + return cacheWorking, nil +} + +func getConfig(ApiUrl string) (FrontendConfig, error) { + resp, err := testUrl(ApiUrl + "/config") + if err != nil { + return FrontendConfig{}, err + } + bytes, err := io.ReadAll(resp.Body) + if err != nil { + return FrontendConfig{}, err + } + var config FrontendConfig + err = json.Unmarshal(bytes, &config) + if err != nil { + return FrontendConfig{}, err + } + return config, nil +} + +func getInstanceDetails(split []string, latest string) (Instance, error) { + ApiUrl := strings.TrimSpace(split[1]) + + wg := sync.WaitGroup{} + errorChannel := make(chan error) + // the amount of tests to do + wg.Add(6) + + var lastChecked int64 + var registered int64 + var config FrontendConfig + var hash string + var version string + var cacheWorking bool + + go func() { + wg.Wait() + close(errorChannel) + }() + + go func() { + defer wg.Done() + if _, err := testUrl(ApiUrl + "/healthcheck"); err != nil { + errorChannel <- err + return + } + lastChecked = time.Now().Unix() + }() + + go func() { + defer wg.Done() + resp, err := testUrl(ApiUrl + "/registered/badge") + if err != nil { + errorChannel <- err + return + } + registered, err = strconv.ParseInt(number_re.FindString(resp.Request.URL.Path), 10, 32) + if err != nil { + errorChannel <- err + } + }() + + go func() { + defer wg.Done() + resp, err := testUrl(ApiUrl + "/version") + if err != nil { + errorChannel <- err + return + } + buf := new(strings.Builder) + _, err = io.Copy(buf, resp.Body) + if err != nil { + errorChannel <- err + return + } + version = strings.TrimSpace(buf.String()) + version_split := strings.Split(version, "-") + hash = version_split[len(version_split)-1] + }() + + go func() { + defer wg.Done() + var err error + config, err = getConfig(ApiUrl) + if err != nil { + errorChannel <- err + } + }() + + go func() { + defer wg.Done() + var err error + cacheWorking, err = testCaching(ApiUrl) + if err != nil { + errorChannel <- err + } + }() + + go func() { + defer wg.Done() + // check if instance can fetch videos + if _, err := testUrl(ApiUrl + "/streams/jNQXAC9IVRw"); err != nil { + errorChannel <- err + } + }() + + for err := range errorChannel { + return Instance{}, err + } + + return Instance{ + Name: strings.TrimSpace(split[0]), + ApiUrl: ApiUrl, + Locations: strings.TrimSpace(split[2]), + Cdn: strings.TrimSpace(split[3]) == "Yes", + Registered: int(registered), + LastChecked: lastChecked, + Version: version, + UpToDate: strings.Contains(latest, hash), + Cache: cacheWorking, + S3Enabled: config.S3Enabled, + }, nil +} + func monitorInstances() { ctx := context.Background() var tc *http.Client @@ -76,7 +231,6 @@ func monitorInstances() { } if resp.StatusCode == 200 { - // parse the response buf := new(strings.Builder) _, err := io.Copy(buf, resp.Body) @@ -88,124 +242,45 @@ func monitorInstances() { lines := strings.Split(buf.String(), "\n") - instances := []Instance{} + instancesMap := make(map[int]Instance) + + wg := sync.WaitGroup{} skipped := 0 + checking := 0 for _, line := range lines { split := strings.Split(line, "|") - if len(split) >= 5 { - if skipped < 2 { - skipped++ - continue - } - ApiUrl := strings.TrimSpace(split[1]) - resp, err := http.Get(ApiUrl + "/healthcheck") - if err != nil { + + if len(split) < 5 { + continue + } + + // skip first two table lines + if skipped < 2 { + skipped++ + continue + } + + wg.Add(1) + go func(i int, split []string) { + defer wg.Done() + instance, err := getInstanceDetails(split, latest) + if err == nil { + instancesMap[i] = instance + } else { log.Print(err) - continue } - LastChecked := time.Now().Unix() - if resp.StatusCode != 200 { - continue - } - resp, err = http.Get(ApiUrl + "/registered/badge") - if err != nil { - log.Print(err) - continue - } - registered, err := strconv.ParseInt(number_re.FindString(resp.Request.URL.Path), 10, 32) - if err != nil { - log.Print(err) - continue - } - resp, err = http.Get(ApiUrl + "/version") - if err != nil { - log.Print(err) - continue - } - if resp.StatusCode != 200 { - continue - } - buf := new(strings.Builder) - _, err = io.Copy(buf, resp.Body) - if err != nil { - log.Print(err) - continue - } - version := strings.TrimSpace(buf.String()) - version_split := strings.Split(version, "-") - hash := version_split[len(version_split)-1] - - resp, err = http.Get(ApiUrl + "/config") - if err != nil { - log.Print(err) - continue - } - if resp.StatusCode != 200 { - continue - } - - bytes, err := io.ReadAll(resp.Body) - - if err != nil { - log.Print(err) - continue - } - - var config FrontendConfig - - err = json.Unmarshal(bytes, &config) - - if err != nil { - log.Print(err) - continue - } - - cache_working := false - - resp, err = http.Get(ApiUrl + "/trending?region=US") - if err != nil { - log.Print(err) - continue - } - if resp.StatusCode == 200 { - old_timing := resp.Header.Get("Server-Timing") - resp, err = http.Get(ApiUrl + "/trending?region=US") - if err != nil { - log.Print(err) - continue - } - if resp.StatusCode == 200 { - new_timing := resp.Header.Get("Server-Timing") - if old_timing == new_timing { - cache_working = true - } - } - } - - // check if instance can fetch videos - resp, err = http.Get(ApiUrl + "/streams/jNQXAC9IVRw") - if err != nil { - log.Print(err) - continue - } - if resp.StatusCode != 200 { - continue - } - - instances = append(instances, Instance{ - Name: strings.TrimSpace(split[0]), - ApiUrl: ApiUrl, - Locations: strings.TrimSpace(split[2]), - Cdn: strings.TrimSpace(split[3]) == "Yes", - Registered: int(registered), - LastChecked: LastChecked, - Version: version, - UpToDate: strings.Contains(latest, hash), - Cache: cache_working, - S3Enabled: config.S3Enabled, - }) + }(checking, split) + checking++ + } + wg.Wait() + // Map to ordered array + var instances []Instance + for i := 0; i < checking; i++ { + instance, ok := instancesMap[i] + if ok { + instances = append(instances, instance) } } @@ -228,5 +303,8 @@ func main() { return c.JSON(monitored_instances) }) - app.Listen(":3000") + err := app.Listen(":3000") + if err != nil { + panic(err) + } }