Skip to content

Commit

Permalink
Merge pull request #1043 from threefoldtech/development_proxy_healthi…
Browse files Browse the repository at this point in the history
…ness

add healthiness endpoint
  • Loading branch information
Omarabdul3ziz authored Jun 9, 2024
2 parents 97e91fc + 8f85c7b commit 169c823
Show file tree
Hide file tree
Showing 19 changed files with 276 additions and 21 deletions.
6 changes: 6 additions & 0 deletions grid-proxy/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ Check `/version` on any instance to know the version.

## Releases

### v0.15.8

---

- `feat` add `/health` endpoint

### v0.15.7

---
Expand Down
2 changes: 1 addition & 1 deletion grid-proxy/charts/gridproxy/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ version: 1.0.0
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
appVersion: 0.15.7
appVersion: 0.15.8

# make sure to update the changelog with the changes in this release
17 changes: 12 additions & 5 deletions grid-proxy/cmds/proxy_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,20 @@ func main() {
log.Fatal().Err(err).Msg("failed to create relay client")
}

indexerIntervals := make(map[string]uint)
if !f.noIndexer {
startIndexers(ctx, f, &db, rpcRmbClient)
indexerIntervals["gpu"] = f.gpuIndexerIntervalMins
indexerIntervals["health"] = f.healthIndexerIntervalMins
indexerIntervals["dmi"] = f.dmiIndexerIntervalMins
indexerIntervals["workloads"] = f.workloadsIndexerIntervalMins
indexerIntervals["ipv6"] = f.ipv6IndexerIntervalMins
indexerIntervals["speed"] = f.speedIndexerIntervalMins
} else {
log.Info().Msg("Indexers did not start")
}

s, err := createServer(f, dbClient, GitCommit, rpcRmbClient)
s, err := createServer(f, dbClient, GitCommit, rpcRmbClient, indexerIntervals)
if err != nil {
log.Fatal().Err(err).Msg("failed to create mux server")
}
Expand Down Expand Up @@ -210,11 +217,11 @@ func startIndexers(ctx context.Context, f flags, db db.Database, rpcRmbClient *p
ipv6Idx.Start(ctx)

wlNumIdx := indexer.NewIndexer[types.NodesWorkloads](
indexer.NewWorkloadWork(f.ipv6IndexerIntervalMins),
indexer.NewWorkloadWork(f.workloadsIndexerIntervalMins),
"workloads",
db,
rpcRmbClient,
f.ipv6IndexerNumWorkers,
f.workloadsIndexerNumWorkers,
)
wlNumIdx.Start(ctx)
}
Expand Down Expand Up @@ -273,13 +280,13 @@ func createRPCRMBClient(ctx context.Context, relayURL, mnemonics string, subMana
return client, nil
}

func createServer(f flags, dbClient explorer.DBClient, gitCommit string, relayClient rmb.Client) (*http.Server, error) {
func createServer(f flags, dbClient explorer.DBClient, gitCommit string, relayClient rmb.Client, idxIntervals map[string]uint) (*http.Server, error) {
log.Info().Msg("Creating server")

router := mux.NewRouter().StrictSlash(true)

// setup explorer
if err := explorer.Setup(router, gitCommit, dbClient, relayClient); err != nil {
if err := explorer.Setup(router, gitCommit, dbClient, relayClient, idxIntervals); err != nil {
return nil, err
}

Expand Down
10 changes: 5 additions & 5 deletions grid-proxy/internal/explorer/db/indexer_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,39 +40,39 @@ func (p *PostgresDatabase) UpsertNodesGPU(ctx context.Context, gpus []types.Node
func (p *PostgresDatabase) UpsertNodeHealth(ctx context.Context, healthReports []types.HealthReport) error {
conflictClause := clause.OnConflict{
Columns: []clause.Column{{Name: "node_twin_id"}},
DoUpdates: clause.AssignmentColumns([]string{"healthy"}),
DoUpdates: clause.AssignmentColumns([]string{"healthy", "updated_at"}),
}
return p.gormDB.WithContext(ctx).Table("health_report").Clauses(conflictClause).Create(&healthReports).Error
}

func (p *PostgresDatabase) UpsertNodeDmi(ctx context.Context, dmis []types.Dmi) error {
conflictClause := clause.OnConflict{
Columns: []clause.Column{{Name: "node_twin_id"}},
DoUpdates: clause.AssignmentColumns([]string{"bios", "baseboard", "processor", "memory"}),
DoUpdates: clause.AssignmentColumns([]string{"bios", "baseboard", "processor", "memory", "updated_at"}),
}
return p.gormDB.WithContext(ctx).Table("dmi").Clauses(conflictClause).Create(&dmis).Error
}

func (p *PostgresDatabase) UpsertNetworkSpeed(ctx context.Context, speeds []types.Speed) error {
conflictClause := clause.OnConflict{
Columns: []clause.Column{{Name: "node_twin_id"}},
DoUpdates: clause.AssignmentColumns([]string{"download", "upload"}),
DoUpdates: clause.AssignmentColumns([]string{"download", "upload", "updated_at"}),
}
return p.gormDB.WithContext(ctx).Table("speed").Clauses(conflictClause).Create(&speeds).Error
}

func (p *PostgresDatabase) UpsertNodeIpv6Report(ctx context.Context, ips []types.HasIpv6) error {
onConflictClause := clause.OnConflict{
Columns: []clause.Column{{Name: "node_twin_id"}},
DoUpdates: clause.AssignmentColumns([]string{"has_ipv6"}),
DoUpdates: clause.AssignmentColumns([]string{"has_ipv6", "updated_at"}),
}
return p.gormDB.WithContext(ctx).Table("node_ipv6").Clauses(onConflictClause).Create(&ips).Error
}

func (p *PostgresDatabase) UpsertNodeWorkloads(ctx context.Context, workloads []types.NodesWorkloads) error {
conflictClause := clause.OnConflict{
Columns: []clause.Column{{Name: "node_twin_id"}},
DoUpdates: clause.AssignmentColumns([]string{"workloads_number"}),
DoUpdates: clause.AssignmentColumns([]string{"workloads_number", "updated_at"}),
}
return p.gormDB.WithContext(ctx).Table("node_workloads").Clauses(conflictClause).Create(&workloads).Error
}
67 changes: 67 additions & 0 deletions grid-proxy/internal/explorer/db/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,65 @@ func (d *PostgresDatabase) Close() error {
return db.Close()
}

func (d *PostgresDatabase) Ping() error {
db, err := d.gormDB.DB()
if err != nil {
return fmt.Errorf("failed to get db connection")
}

if err := db.Ping(); err != nil {
return fmt.Errorf("failed to ping db")
}

return nil
}

func (d *PostgresDatabase) Initialized() error {
db, err := d.gormDB.DB()
if err != nil {
return fmt.Errorf("failed to get db connection")
}

initTables := []string{"node_gpu", "resources_cache"}
for _, tableName := range initTables {
query := "SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = $1);"
var exists bool

if err := db.QueryRow(query, tableName).Scan(&exists); err != nil {
return err
}

if !exists {
return fmt.Errorf("table %s does not exist", tableName)
}
}

return nil
}

func (d *PostgresDatabase) GetLastUpsertsTimestamp() (types.IndexersState, error) {
var report types.IndexersState
if res := d.gormDB.Table("node_gpu").Select("updated_at").Where("updated_at IS NOT NULL").Order("updated_at DESC").Limit(1).Scan(&report.Gpu.UpdatedAt); res.Error != nil {
return report, errors.Wrap(res.Error, "couldn't get node_gpu last updated_at")
}
if res := d.gormDB.Table("health_report").Select("updated_at").Where("updated_at IS NOT NULL").Order("updated_at DESC").Limit(1).Scan(&report.Health.UpdatedAt); res.Error != nil {
return report, errors.Wrap(res.Error, "couldn't get health_report last updated_at")
}
if res := d.gormDB.Table("node_ipv6").Select("updated_at").Where("updated_at IS NOT NULL").Order("updated_at DESC").Limit(1).Scan(&report.Ipv6.UpdatedAt); res.Error != nil {
return report, errors.Wrap(res.Error, "couldn't get node_ipv6 last updated_at")
}
if res := d.gormDB.Table("speed").Select("updated_at").Where("updated_at IS NOT NULL").Order("updated_at DESC").Limit(1).Scan(&report.Speed.UpdatedAt); res.Error != nil {
return report, errors.Wrap(res.Error, "couldn't get speed last updated_at")
}
if res := d.gormDB.Table("dmi").Select("updated_at").Where("updated_at IS NOT NULL").Order("updated_at DESC").Limit(1).Scan(&report.Dmi.UpdatedAt); res.Error != nil {
return report, errors.Wrap(res.Error, "couldn't get dmi last updated_at")
}
if res := d.gormDB.Table("node_workloads").Select("updated_at").Where("updated_at IS NOT NULL").Order("updated_at DESC").Limit(1).Scan(&report.Workloads.UpdatedAt); res.Error != nil {
return report, errors.Wrap(res.Error, "couldn't get workloads last updated_at")
}
return report, nil
}

func (d *PostgresDatabase) Initialize() error {
err := d.gormDB.AutoMigrate(
&types.NodeGPU{},
Expand Down Expand Up @@ -924,6 +983,14 @@ func (d *PostgresDatabase) GetContractBills(ctx context.Context, contractID uint
return bills, uint(count), nil
}

func (d *PostgresDatabase) GetRandomHealthyTwinIds(length int) ([]uint32, error) {
var ids []uint32
if err := d.gormDB.Table("health_report").Select("node_twin_id").Where("healthy = true").Order("random()").Limit(length).Scan(&ids).Error; err != nil {
return []uint32{}, err
}
return ids, nil
}

// GetContractsLatestBillReports return latest reports for some contracts
func (d *PostgresDatabase) GetContractsLatestBillReports(ctx context.Context, contractsIds []uint32, limit uint) ([]ContractBilling, error) {
// WITH: a CTE to create a tmp table
Expand Down
4 changes: 4 additions & 0 deletions grid-proxy/internal/explorer/db/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
// Database interface for storing and fetching grid info
type Database interface {
GetConnectionString() string
Ping() error
Initialized() error
GetRandomHealthyTwinIds(length int) ([]uint32, error)
GetLastUpsertsTimestamp() (types.IndexersState, error)

// server getters
GetStats(ctx context.Context, filter types.StatsFilter) (types.Stats, error)
Expand Down
108 changes: 108 additions & 0 deletions grid-proxy/internal/explorer/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package explorer

import (
"context"
"fmt"
"sync"
"time"

"github.com/rs/zerolog/log"
"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/types"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go"
)

const (
OkState = "ok"
)

func createReport(db DBClient, peer rmb.Client, idxIntervals map[string]uint) types.Health {
var report types.Health

// db connection
report.DBConn = OkState
if err := db.DB.Ping(); err != nil {
report.DBConn = err.Error()
}
if err := db.DB.Initialized(); err != nil {
report.DBConn = err.Error()
}

// rmb connection
report.RMBConn = OkState
if err := pingRandomTwins(db, peer); err != nil {
report.RMBConn = err.Error()
}

// indexers
indexers, err := db.DB.GetLastUpsertsTimestamp()
if err != nil {
log.Error().Err(err).Msg("failed to get last upsert timestamp")
}
report.Indexers = indexers

// total
report.TotalStateOk = true
if report.DBConn != OkState ||
report.RMBConn != OkState {
report.TotalStateOk = false
}

if isIndexerStale(indexers.Dmi.UpdatedAt, idxIntervals["dmi"]) ||
isIndexerStale(indexers.Gpu.UpdatedAt, idxIntervals["gpu"]) ||
isIndexerStale(indexers.Health.UpdatedAt, idxIntervals["health"]) ||
isIndexerStale(indexers.Ipv6.UpdatedAt, idxIntervals["ipv6"]) ||
isIndexerStale(indexers.Speed.UpdatedAt, idxIntervals["speed"]) ||
isIndexerStale(indexers.Workloads.UpdatedAt, idxIntervals["workloads"]) {
report.TotalStateOk = false
}

return report
}

func isIndexerStale(updatedAt int64, interval uint) bool {
updatedAtInTime := time.Unix(updatedAt, 0)
return time.Since(updatedAtInTime) > time.Duration(interval)*time.Minute
}

func pingRandomTwins(db DBClient, peer rmb.Client) error {
twinIds, err := db.DB.GetRandomHealthyTwinIds(10)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

var wg sync.WaitGroup
successCh := make(chan bool)

for _, twinId := range twinIds {
wg.Add(1)
go func(twinId uint32) {
defer wg.Done()

callCtx, callCancel := context.WithTimeout(ctx, 10*time.Second)
defer callCancel()

var res interface{}
if err := peer.Call(callCtx, twinId, "zos.system.version", nil, &res); err == nil {
select {
case successCh <- true:
case <-ctx.Done():
}
}
}(twinId)
}

go func() {
wg.Wait()
close(successCh)
}()

select {
case <-successCh:
return nil
case <-ctx.Done():
return fmt.Errorf("failed to call twins: %+v", twinIds)
}
}
1 change: 1 addition & 0 deletions grid-proxy/internal/explorer/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type App struct {
cl DBClient
releaseVersion string
relayClient rmb.Client
idxIntervals map[string]uint
}

type ErrorMessage struct {
Expand Down
13 changes: 12 additions & 1 deletion grid-proxy/internal/explorer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,15 @@ func (a *App) version(r *http.Request) (interface{}, mw.Response) {
}, response
}

func (a *App) health(r *http.Request) (interface{}, mw.Response) {
response := mw.Ok()
return createReport(
a.cl,
a.relayClient,
a.idxIntervals,
), response
}

// getNodeStatistics godoc
// @Summary Show node statistics
// @Description Get node statistics for more information about each node through the RMB relay
Expand Down Expand Up @@ -568,12 +577,13 @@ func (a *App) getContractBills(r *http.Request) (interface{}, mw.Response) {
// @license.name Apache 2.0
// @license.url http://www.apache.org/licenses/LICENSE-2.0.html
// @BasePath /
func Setup(router *mux.Router, gitCommit string, cl DBClient, relayClient rmb.Client) error {
func Setup(router *mux.Router, gitCommit string, cl DBClient, relayClient rmb.Client, idxIntervals map[string]uint) error {

a := App{
cl: cl,
releaseVersion: gitCommit,
relayClient: relayClient,
idxIntervals: idxIntervals,
}

router.HandleFunc("/farms", mw.AsHandlerFunc(a.listFarms))
Expand All @@ -599,6 +609,7 @@ func Setup(router *mux.Router, gitCommit string, cl DBClient, relayClient rmb.Cl
router.HandleFunc("/", mw.AsHandlerFunc(a.indexPage(router)))
router.HandleFunc("/ping", mw.AsHandlerFunc(a.ping))
router.HandleFunc("/version", mw.AsHandlerFunc(a.version))
router.HandleFunc("/health", mw.AsHandlerFunc(a.health))
router.PathPrefix("/swagger/").Handler(httpSwagger.WrapHandler)

return nil
Expand Down
1 change: 1 addition & 0 deletions grid-proxy/internal/indexer/dmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,6 @@ func parseDmiResponse(dmiResponse zosDmiTypes.DMI, twinId uint32) types.Dmi {
}

info.NodeTwinId = twinId
info.UpdatedAt = time.Now().Unix()
return info
}
1 change: 1 addition & 0 deletions grid-proxy/internal/indexer/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func getHealthReport(response interface{}, err error, twinId uint32) types.Healt
report := types.HealthReport{
NodeTwinId: twinId,
Healthy: false,
UpdatedAt: time.Now().Unix(),
}

if err != nil {
Expand Down
1 change: 1 addition & 0 deletions grid-proxy/internal/indexer/ipv6.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (w *Ipv6Work) Get(ctx context.Context, rmb *peer.RpcClient, id uint32) ([]t
{
NodeTwinId: id,
HasIpv6: has_ipv6,
UpdatedAt: time.Now().Unix(),
},
}, nil
}
Expand Down
2 changes: 2 additions & 0 deletions grid-proxy/internal/indexer/speed.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,7 @@ func parseSpeed(res zosPerfPkg.TaskResult, twinId uint32) (types.Speed, error) {
}
}

speed.UpdatedAt = time.Now().Unix()

return speed, nil
}
Loading

0 comments on commit 169c823

Please sign in to comment.