Merge pull request #44 from Bnyro/main

Parallel instance checks and improved error messages, code cleanup
This commit is contained in:
Kavin 2023-07-26 21:39:43 +01:00 committed by GitHub
commit bb978ea3a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 192 additions and 114 deletions

306
main.go
View File

@ -3,7 +3,8 @@ package main
import (
"context"
"encoding/json"
"github.com/gofiber/fiber/v2"
"errors"
"fmt"
"io"
"log"
"net/http"
@ -11,8 +12,11 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/fiber/v2/middleware/etag"
"github.com/google/go-github/v53/github"
@ -40,6 +44,157 @@ type FrontendConfig struct {
S3Enabled bool `json:"s3Enabled"`
}
var client = http.Client{
Timeout: 10 * time.Second,
}
func testUrl(url string) (*http.Response, error) {
resp, err := client.Get(url)
if err != nil {
return resp, err
}
if resp.StatusCode != 200 {
return resp, errors.New(fmt.Sprintf("Invalid response code at %s: %d", url, resp.StatusCode))
}
return resp, err
}
func testCaching(ApiUrl string) (bool, error) {
resp, err := testUrl(ApiUrl + "/trending?region=US")
if err != nil {
return false, err
}
oldTiming := resp.Header.Get("Server-Timing")
resp, err = testUrl(ApiUrl + "/trending?region=US")
if err != nil {
return false, err
}
newTiming := resp.Header.Get("Server-Timing")
cacheWorking := oldTiming == newTiming
return cacheWorking, nil
}
func getConfig(ApiUrl string) (FrontendConfig, error) {
resp, err := testUrl(ApiUrl + "/config")
if err != nil {
return FrontendConfig{}, err
}
bytes, err := io.ReadAll(resp.Body)
if err != nil {
return FrontendConfig{}, err
}
var config FrontendConfig
err = json.Unmarshal(bytes, &config)
if err != nil {
return FrontendConfig{}, err
}
return config, nil
}
func getInstanceDetails(split []string, latest string) (Instance, error) {
ApiUrl := strings.TrimSpace(split[1])
wg := sync.WaitGroup{}
errorChannel := make(chan error)
// the amount of tests to do
wg.Add(6)
var lastChecked int64
var registered int64
var config FrontendConfig
var hash string
var version string
var cacheWorking bool
go func() {
wg.Wait()
close(errorChannel)
}()
go func() {
defer wg.Done()
if _, err := testUrl(ApiUrl + "/healthcheck"); err != nil {
errorChannel <- err
return
}
lastChecked = time.Now().Unix()
}()
go func() {
defer wg.Done()
resp, err := testUrl(ApiUrl + "/registered/badge")
if err != nil {
errorChannel <- err
return
}
registered, err = strconv.ParseInt(number_re.FindString(resp.Request.URL.Path), 10, 32)
if err != nil {
errorChannel <- err
}
}()
go func() {
defer wg.Done()
resp, err := testUrl(ApiUrl + "/version")
if err != nil {
errorChannel <- err
return
}
buf := new(strings.Builder)
_, err = io.Copy(buf, resp.Body)
if err != nil {
errorChannel <- err
return
}
version = strings.TrimSpace(buf.String())
version_split := strings.Split(version, "-")
hash = version_split[len(version_split)-1]
}()
go func() {
defer wg.Done()
var err error
config, err = getConfig(ApiUrl)
if err != nil {
errorChannel <- err
}
}()
go func() {
defer wg.Done()
var err error
cacheWorking, err = testCaching(ApiUrl)
if err != nil {
errorChannel <- err
}
}()
go func() {
defer wg.Done()
// check if instance can fetch videos
if _, err := testUrl(ApiUrl + "/streams/jNQXAC9IVRw"); err != nil {
errorChannel <- err
}
}()
for err := range errorChannel {
return Instance{}, err
}
return Instance{
Name: strings.TrimSpace(split[0]),
ApiUrl: ApiUrl,
Locations: strings.TrimSpace(split[2]),
Cdn: strings.TrimSpace(split[3]) == "Yes",
Registered: int(registered),
LastChecked: lastChecked,
Version: version,
UpToDate: strings.Contains(latest, hash),
Cache: cacheWorking,
S3Enabled: config.S3Enabled,
}, nil
}
func monitorInstances() {
ctx := context.Background()
var tc *http.Client
@ -76,7 +231,6 @@ func monitorInstances() {
}
if resp.StatusCode == 200 {
// parse the response
buf := new(strings.Builder)
_, err := io.Copy(buf, resp.Body)
@ -88,124 +242,45 @@ func monitorInstances() {
lines := strings.Split(buf.String(), "\n")
instances := []Instance{}
instancesMap := make(map[int]Instance)
wg := sync.WaitGroup{}
skipped := 0
checking := 0
for _, line := range lines {
split := strings.Split(line, "|")
if len(split) >= 5 {
if skipped < 2 {
skipped++
continue
}
ApiUrl := strings.TrimSpace(split[1])
resp, err := http.Get(ApiUrl + "/healthcheck")
if err != nil {
if len(split) < 5 {
continue
}
// skip first two table lines
if skipped < 2 {
skipped++
continue
}
wg.Add(1)
go func(i int, split []string) {
defer wg.Done()
instance, err := getInstanceDetails(split, latest)
if err == nil {
instancesMap[i] = instance
} else {
log.Print(err)
continue
}
LastChecked := time.Now().Unix()
if resp.StatusCode != 200 {
continue
}
resp, err = http.Get(ApiUrl + "/registered/badge")
if err != nil {
log.Print(err)
continue
}
registered, err := strconv.ParseInt(number_re.FindString(resp.Request.URL.Path), 10, 32)
if err != nil {
log.Print(err)
continue
}
resp, err = http.Get(ApiUrl + "/version")
if err != nil {
log.Print(err)
continue
}
if resp.StatusCode != 200 {
continue
}
buf := new(strings.Builder)
_, err = io.Copy(buf, resp.Body)
if err != nil {
log.Print(err)
continue
}
version := strings.TrimSpace(buf.String())
version_split := strings.Split(version, "-")
hash := version_split[len(version_split)-1]
resp, err = http.Get(ApiUrl + "/config")
if err != nil {
log.Print(err)
continue
}
if resp.StatusCode != 200 {
continue
}
bytes, err := io.ReadAll(resp.Body)
if err != nil {
log.Print(err)
continue
}
var config FrontendConfig
err = json.Unmarshal(bytes, &config)
if err != nil {
log.Print(err)
continue
}
cache_working := false
resp, err = http.Get(ApiUrl + "/trending?region=US")
if err != nil {
log.Print(err)
continue
}
if resp.StatusCode == 200 {
old_timing := resp.Header.Get("Server-Timing")
resp, err = http.Get(ApiUrl + "/trending?region=US")
if err != nil {
log.Print(err)
continue
}
if resp.StatusCode == 200 {
new_timing := resp.Header.Get("Server-Timing")
if old_timing == new_timing {
cache_working = true
}
}
}
// check if instance can fetch videos
resp, err = http.Get(ApiUrl + "/streams/jNQXAC9IVRw")
if err != nil {
log.Print(err)
continue
}
if resp.StatusCode != 200 {
continue
}
instances = append(instances, Instance{
Name: strings.TrimSpace(split[0]),
ApiUrl: ApiUrl,
Locations: strings.TrimSpace(split[2]),
Cdn: strings.TrimSpace(split[3]) == "Yes",
Registered: int(registered),
LastChecked: LastChecked,
Version: version,
UpToDate: strings.Contains(latest, hash),
Cache: cache_working,
S3Enabled: config.S3Enabled,
})
}(checking, split)
checking++
}
wg.Wait()
// Map to ordered array
var instances []Instance
for i := 0; i < checking; i++ {
instance, ok := instancesMap[i]
if ok {
instances = append(instances, instance)
}
}
@ -228,5 +303,8 @@ func main() {
return c.JSON(monitored_instances)
})
app.Listen(":3000")
err := app.Listen(":3000")
if err != nil {
panic(err)
}
}