Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

init location indexer #1260

Draft
wants to merge 2 commits into
base: development
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions grid-proxy/cmds/proxy_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type flags struct {
workloadsIndexerIntervalMins uint
featuresIndexerNumWorkers uint
featuresIndexerIntervalMins uint
locationIndexerNumWorkers uint
locationIndexerIntervalMins uint
}

func main() {
Expand Down Expand Up @@ -111,6 +113,9 @@ func main() {
flag.UintVar(&f.workloadsIndexerNumWorkers, "workloads-indexer-workers", 10, "number of workers checking on node workloads number")
flag.UintVar(&f.featuresIndexerIntervalMins, "features-indexer-interval", 60*24, "node features check interval in min")
flag.UintVar(&f.featuresIndexerNumWorkers, "features-indexer-workers", 10, "number of workers checking on node supported features")
flag.UintVar(&f.locationIndexerIntervalMins, "location-indexer-interval", 60*6, "node location check interval in min")
flag.UintVar(&f.locationIndexerNumWorkers, "location-indexer-workers", 100, "number of workers checking on node location")

flag.Parse()

// shows version and exit
Expand Down Expand Up @@ -153,13 +158,16 @@ func main() {
indexerIntervals := make(map[string]uint)
if !f.noIndexer {
startIndexers(ctx, f, &db, rpcRmbClient)

// for the health endpoint
indexerIntervals["gpu"] = f.gpuIndexerIntervalMins
indexerIntervals["health"] = f.healthIndexerIntervalMins
indexerIntervals["dmi"] = f.dmiIndexerIntervalMins
indexerIntervals["workloads"] = f.workloadsIndexerIntervalMins
indexerIntervals["ipv6"] = f.ipv6IndexerIntervalMins
indexerIntervals["speed"] = f.speedIndexerIntervalMins
indexerIntervals["features"] = f.featuresIndexerIntervalMins
indexerIntervals["location"] = f.locationIndexerIntervalMins
} else {
log.Info().Msg("Indexers did not start")
}
Expand Down Expand Up @@ -238,6 +246,15 @@ func startIndexers(ctx context.Context, f flags, db db.Database, rpcRmbClient *p
f.featuresIndexerNumWorkers,
)
featIdx.Start(ctx)

locationIdx := indexer.NewIndexer[types.NodeLocation](
indexer.NewLocationWork(f.locationIndexerIntervalMins),
"location",
db,
rpcRmbClient,
f.locationIndexerNumWorkers,
)
locationIdx.Start(ctx)
}

func app(s *http.Server, f flags) error {
Expand Down
8 changes: 8 additions & 0 deletions grid-proxy/internal/explorer/db/indexer_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,11 @@ func (p *PostgresDatabase) UpsertNodeFeatures(ctx context.Context, features []ty
}
return p.gormDB.WithContext(ctx).Table("node_features").Clauses(conflictClause).Create(&features).Error
}

func (p *PostgresDatabase) UpsertNodeLocation(ctx context.Context, locations []types.NodeLocation) error {
conflictClause := clause.OnConflict{
Columns: []clause.Column{{Name: "country"}},
DoUpdates: clause.AssignmentColumns([]string{"continent", "updated_at"}),
}
return p.gormDB.WithContext(ctx).Table("node_location").Clauses(conflictClause).Create(&locations).Error
}
14 changes: 11 additions & 3 deletions grid-proxy/internal/explorer/db/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ func (d *PostgresDatabase) GetLastUpsertsTimestamp() (types.IndexersState, error
if res := d.gormDB.Table("node_features").Select("updated_at").Where("updated_at IS NOT NULL").Order("updated_at DESC").Limit(1).Scan(&report.Features.UpdatedAt); res.Error != nil {
return report, errors.Wrap(res.Error, "couldn't get features last updated_at")
}
if res := d.gormDB.Table("node_location").Select("updated_at").Where("updated_at IS NOT NULL").Order("updated_at DESC").Limit(1).Scan(&report.Features.UpdatedAt); res.Error != nil {
return report, errors.Wrap(res.Error, "couldn't get features last updated_at")
}
return report, nil
}

Expand All @@ -148,6 +151,7 @@ func (d *PostgresDatabase) Initialize() error {
&types.HasIpv6{},
&types.NodesWorkloads{},
&types.NodeFeatures{},
&types.NodeLocation{},
); err != nil {
return errors.Wrap(err, "failed to migrate indexer tables")
}
Expand Down Expand Up @@ -387,6 +391,7 @@ func (d *PostgresDatabase) nodeTableQuery(ctx context.Context, filter types.Node
LEFT JOIN health_report ON node.twin_id = health_report.node_twin_id
LEFT JOIN node_ipv6 ON node.twin_id = node_ipv6.node_twin_id
LEFT JOIN node_features ON node.twin_id = node_features.node_twin_id
LEFT JOIN node_location ON node.country = node_location.country
`)

if filter.IsGpuFilterRequested() {
Expand Down Expand Up @@ -440,7 +445,10 @@ func (d *PostgresDatabase) farmTableQuery(ctx context.Context, filter types.Farm
func (d *PostgresDatabase) GetFarms(ctx context.Context, filter types.FarmFilter, limit types.Limit) ([]Farm, uint, error) {
nodeQuery := d.gormDB.Table("resources_cache").
Select("resources_cache.farm_id", "renter", "resources_cache.extra_fee").
Joins("LEFT JOIN node ON node.node_id = resources_cache.node_id").
Joins(`
LEFT JOIN node ON node.node_id = resources_cache.node_id
LEFT JOIN node_location ON node_location.country = resources_cache.country
`).
Group(`resources_cache.farm_id, renter, resources_cache.extra_fee`)

if filter.NodeFreeMRU != nil {
Expand Down Expand Up @@ -469,7 +477,7 @@ func (d *PostgresDatabase) GetFarms(ctx context.Context, filter types.FarmFilter
}

if filter.Region != nil {
nodeQuery = nodeQuery.Where("LOWER(resources_cache.region) = LOWER(?)", *filter.Region)
nodeQuery = nodeQuery.Where("LOWER(node_location.continent) = LOWER(?)", *filter.Region)
}

if len(filter.NodeStatus) != 0 {
Expand Down Expand Up @@ -673,7 +681,7 @@ func (d *PostgresDatabase) GetNodes(ctx context.Context, filter types.NodeFilter
q = q.Where("node.city ILIKE '%' || ? || '%'", *filter.CityContains)
}
if filter.Region != nil {
q = q.Where("LOWER(resources_cache.region) = LOWER(?)", *filter.Region)
q = q.Where("LOWER(node_location.continent) = LOWER(?)", *filter.Region)
}
if filter.NodeID != nil {
q = q.Where("node.node_id = ?", *filter.NodeID)
Expand Down
27 changes: 3 additions & 24 deletions grid-proxy/internal/explorer/db/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ SELECT
rent_contract.contract_id as rent_contract_id,
count(node_contract.contract_id) as node_contracts_count,
node.country as country,
country.region as region,
COALESCE(dmi.bios, '{}') as bios,
COALESCE(dmi.baseboard, '{}') as baseboard,
COALESCE(dmi.processor, '[]') as processor,
Expand All @@ -136,7 +135,6 @@ FROM node
LEFT JOIN contract_resources ON node_contract.resources_used_id = contract_resources.id
LEFT JOIN node_resources_total AS node_resources_total ON node_resources_total.node_id = node.id
LEFT JOIN rent_contract on node.node_id = rent_contract.node_id AND rent_contract.state IN ('Created', 'GracePeriod')
LEFT JOIN country ON LOWER(node.country) = LOWER(country.name)
LEFT JOIN speed ON node.twin_id = speed.node_twin_id
LEFT JOIN dmi ON node.twin_id = dmi.node_twin_id
LEFT JOIN farm ON farm.farm_id = node.farm_id
Expand All @@ -163,7 +161,6 @@ GROUP BY
COALESCE(node_gpu_agg.gpus, '[]'),
COALESCE(node_gpu_agg.gpu_count, 0),
node.country,
country.region,
COALESCE(dmi.bios, '{}'),
COALESCE(dmi.baseboard, '{}'),
COALESCE(dmi.processor, '[]'),
Expand All @@ -172,8 +169,7 @@ GROUP BY
COALESCE(speed.download, 0),
node.certification,
node.extra_fee,
farm.pricing_policy_id,
country.region;
farm.pricing_policy_id;

DROP TABLE IF EXISTS resources_cache;
CREATE TABLE IF NOT EXISTS resources_cache(
Expand All @@ -194,7 +190,6 @@ CREATE TABLE IF NOT EXISTS resources_cache(
rent_contract_id INTEGER,
node_contracts_count INTEGER NOT NULL,
country TEXT,
region TEXT,
bios jsonb,
baseboard jsonb,
processor jsonb,
Expand Down Expand Up @@ -283,27 +278,11 @@ CREATE INDEX IF NOT EXISTS idx_public_config_node_id ON public_config USING gin(
/*
Node Trigger:
- Insert node record > Insert new resources_cache record
- Update node country > update resources_cache country/region
*/
CREATE OR REPLACE FUNCTION reflect_node_changes() RETURNS TRIGGER AS
$$
BEGIN
IF (TG_OP = 'UPDATE') THEN
BEGIN
UPDATE resources_cache
SET
country = NEW.country,
region = (
SELECT region FROM country WHERE LOWER(country.name) = LOWER(NEW.country)
)
WHERE
resources_cache.node_id = NEW.node_id;
EXCEPTION
WHEN OTHERS THEN
RAISE NOTICE 'Error updating resources_cache: %', SQLERRM;
END;

ELSIF (TG_OP = 'INSERT') THEN
IF (TG_OP = 'INSERT') THEN
BEGIN
INSERT INTO resources_cache
SELECT *
Expand All @@ -328,7 +307,7 @@ END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER tg_node
AFTER INSERT OR DELETE OR UPDATE OF country
AFTER INSERT OR DELETE
ON node
FOR EACH ROW EXECUTE PROCEDURE reflect_node_changes();

Expand Down
1 change: 1 addition & 0 deletions grid-proxy/internal/explorer/db/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Database interface {
UpsertNodeIpv6Report(ctx context.Context, ips []types.HasIpv6) error
UpsertNodeWorkloads(ctx context.Context, workloads []types.NodesWorkloads) error
UpsertNodeFeatures(ctx context.Context, features []types.NodeFeatures) error
UpsertNodeLocation(ctx context.Context, locations []types.NodeLocation) error
}

type ContractBilling types.ContractBilling
Expand Down
19 changes: 4 additions & 15 deletions grid-proxy/internal/indexer/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ func (w *HealthWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32

func (w *HealthWork) Upsert(ctx context.Context, db db.Database, batch []types.HealthReport) error {
// to prevent having multiple data for the same twin from different finders
batch = removeDuplicates(batch)
return db.UpsertNodeHealth(ctx, batch)
unique := removeDuplicates(batch, func(n types.HealthReport) uint32 {
return n.NodeTwinId
})
return db.UpsertNodeHealth(ctx, unique)
}

func getHealthReport(response diagnostics.Diagnostics, twinId uint32) types.HealthReport {
Expand All @@ -53,16 +55,3 @@ func getHealthReport(response diagnostics.Diagnostics, twinId uint32) types.Heal

return report
}

func removeDuplicates(reports []types.HealthReport) []types.HealthReport {
seen := make(map[uint32]bool)
result := []types.HealthReport{}
for _, report := range reports {
if _, ok := seen[report.NodeTwinId]; !ok {
seen[report.NodeTwinId] = true
result = append(result, report)
}
}

return result
}
54 changes: 54 additions & 0 deletions grid-proxy/internal/indexer/location.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package indexer

import (
"context"
"time"

"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/internal/explorer/db"
"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/types"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go/peer"
"github.com/threefoldtech/zos/pkg/geoip"
)

const locationCmd = "zos.location.get"

var _ Work[types.NodeLocation] = (*LocationWork)(nil)

type LocationWork struct {
finders map[string]time.Duration
}

func NewLocationWork(interval uint) *LocationWork {
return &LocationWork{
finders: map[string]time.Duration{
"up": time.Duration(interval) * time.Minute,
},
}
}

func (w *LocationWork) Finders() map[string]time.Duration {
return w.finders
}

func (w *LocationWork) Get(ctx context.Context, rmb *peer.RpcClient, id uint32) ([]types.NodeLocation, error) {
var loc geoip.Location
if err := callNode(ctx, rmb, locationCmd, nil, id, &loc); err != nil {
return []types.NodeLocation{}, nil
}

return []types.NodeLocation{
{
Country: loc.Country,
Continent: loc.Continent,
UpdatedAt: time.Now().Unix(),
},
}, nil
}

func (w *LocationWork) Upsert(ctx context.Context, db db.Database, batch []types.NodeLocation) error {
unique := removeDuplicates(batch, func(n types.NodeLocation) string {
return n.Country
})

return db.UpsertNodeLocation(ctx, unique)
}
12 changes: 12 additions & 0 deletions grid-proxy/internal/indexer/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,15 @@ func callNode(ctx context.Context, rmbClient *peer.RpcClient, cmd string, payloa

return rmbClient.Call(subCtx, twinId, cmd, payload, result)
}

func removeDuplicates[T any, K comparable](items []T, keyFunc func(T) K) (result []T) {
seen := make(map[K]bool)
for _, item := range items {
key := keyFunc(item)
if _, ok := seen[key]; !ok {
seen[key] = true
result = append(result, item)
}
}
return
}
52 changes: 52 additions & 0 deletions grid-proxy/internal/indexer/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package indexer

import (
"reflect"
"testing"

"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/types"
)

func TestRemoveDuplicates(t *testing.T) {
t.Run("remove duplicate countries", func(t *testing.T) {
locations := []types.NodeLocation{
{Country: "Egypt", Continent: "Africa"},
{Country: "Egypt", Continent: "Africa"},
{Country: "Belgium", Continent: "Europe"},
}

uniqueLocations := []types.NodeLocation{
{Country: "Egypt", Continent: "Africa"},
{Country: "Belgium", Continent: "Europe"},
}

gotLocations := removeDuplicates(locations, func(n types.NodeLocation) string {
return n.Country
})

if !reflect.DeepEqual(uniqueLocations, gotLocations) {
t.Errorf("expected %v, but got %v", uniqueLocations, gotLocations)
}
})

t.Run("remove duplicate health reports", func(t *testing.T) {
healthReports := []types.HealthReport{
{NodeTwinId: 1, Healthy: true},
{NodeTwinId: 1, Healthy: true},
{NodeTwinId: 2, Healthy: true},
}

uniqueReports := []types.HealthReport{
{NodeTwinId: 1, Healthy: true},
{NodeTwinId: 2, Healthy: true},
}

gotReports := removeDuplicates(healthReports, func(h types.HealthReport) uint32 {
return h.NodeTwinId
})

if !reflect.DeepEqual(gotReports, uniqueReports) {
t.Errorf("expected %v, but got %v", uniqueReports, gotReports)
}
})
}
10 changes: 10 additions & 0 deletions grid-proxy/pkg/types/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,13 @@ type NodeFeatures struct {
func (NodeFeatures) TableName() string {
return "node_features"
}

type NodeLocation struct {
Country string `json:"country" gorm:"unique;not null"`
Continent string `json:"continent"`
UpdatedAt int64 `json:"updated_at"`
}

func (NodeLocation) TableName() string {
return "node_location"
}
2 changes: 1 addition & 1 deletion grid-proxy/tests/queries/mock_client/farms.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (f *Farm) satisfyFarmNodesFilter(data *DBData, filter types.FarmFilter) boo
continue
}

if filter.Region != nil && !strings.EqualFold(*filter.Region, data.Regions[strings.ToLower(node.Country)]) {
if filter.Region != nil && !strings.EqualFold(*filter.Region, data.Regions[node.Country]) {
continue
}

Expand Down
Loading
Loading