Skip to content

Commit

Permalink
add xray endpoint to the api
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed May 8, 2024
1 parent 06026b9 commit 2f87657
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 4 deletions.
5 changes: 5 additions & 0 deletions internal/alloycli/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
otel_service "github.com/grafana/alloy/internal/service/otel"
remotecfgservice "github.com/grafana/alloy/internal/service/remotecfg"
uiservice "github.com/grafana/alloy/internal/service/ui"
"github.com/grafana/alloy/internal/service/xray"
"github.com/grafana/alloy/internal/static/config/instrumentation"
"github.com/grafana/alloy/internal/usagestats"
"github.com/grafana/alloy/syntax/diag"
Expand Down Expand Up @@ -272,8 +273,11 @@ func (fr *alloyRun) Run(configPath string) error {
return fmt.Errorf("failed to create the remotecfg service: %w", err)
}

xrayService := xray.New()

uiService := uiservice.New(uiservice.Options{
UIPrefix: fr.uiPrefix,
Xray: xrayService.Data().(xray.DebugStreamHandler),
})

otelService := otel_service.New(l)
Expand All @@ -292,6 +296,7 @@ func (fr *alloyRun) Run(configPath string) error {
MinStability: fr.minStability,
Services: []service.Service{
httpService,
xrayService,
uiService,
clusterService,
otelService,
Expand Down
6 changes: 4 additions & 2 deletions internal/service/ui/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/service"
http_service "github.com/grafana/alloy/internal/service/http"
"github.com/grafana/alloy/internal/service/xray"
"github.com/grafana/alloy/internal/web/api"
"github.com/grafana/alloy/internal/web/ui"
)
Expand All @@ -22,6 +23,7 @@ const ServiceName = "ui"
// lifetime of the UI service.
type Options struct {
UIPrefix string // Path prefix to host the UI at.
Xray xray.DebugStreamHandler
}

// Service implements the UI service.
Expand All @@ -46,7 +48,7 @@ func (s *Service) Definition() service.Definition {
return service.Definition{
Name: ServiceName,
ConfigType: nil, // ui does not accept configuration
DependsOn: []string{http_service.ServiceName},
DependsOn: []string{http_service.ServiceName, xray.ServiceName},
Stability: featuregate.StabilityGenerallyAvailable,
}
}
Expand Down Expand Up @@ -75,7 +77,7 @@ func (s *Service) Data() any {
func (s *Service) ServiceHandler(host service.Host) (base string, handler http.Handler) {
r := mux.NewRouter()

fa := api.NewAlloyAPI(host)
fa := api.NewAlloyAPI(host, s.opts.Xray)
fa.RegisterRoutes(path.Join(s.opts.UIPrefix, "/api/v0/web"), r)
ui.RegisterRoutes(s.opts.UIPrefix, r)

Expand Down
77 changes: 75 additions & 2 deletions internal/web/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,29 @@ package api

import (
"encoding/json"
"math/rand"
"net/http"
"path"
"strconv"
"strings"

"github.com/gorilla/mux"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/service"
"github.com/grafana/alloy/internal/service/cluster"
"github.com/grafana/alloy/internal/service/xray"
"github.com/prometheus/prometheus/util/httputil"
)

// AlloyAPI is a wrapper around the component API.
type AlloyAPI struct {
alloy service.Host
xray xray.DebugStreamHandler
}

// NewAlloyAPI instantiates a new Alloy API.
func NewAlloyAPI(alloy service.Host) *AlloyAPI {
return &AlloyAPI{alloy: alloy}
func NewAlloyAPI(alloy service.Host, xray xray.DebugStreamHandler) *AlloyAPI {
return &AlloyAPI{alloy: alloy, xray: xray}
}

// RegisterRoutes registers all the API's routes.
Expand All @@ -36,6 +41,7 @@ func (a *AlloyAPI) RegisterRoutes(urlPrefix string, r *mux.Router) {
r.Handle(path.Join(urlPrefix, "/components"), httputil.CompressionHandler{Handler: a.listComponentsHandler()})
r.Handle(path.Join(urlPrefix, "/components/{id:.+}"), httputil.CompressionHandler{Handler: a.getComponentHandler()})
r.Handle(path.Join(urlPrefix, "/peers"), httputil.CompressionHandler{Handler: a.getClusteringPeersHandler()})
r.Handle(path.Join(urlPrefix, "/debugStream/{id:.+}"), a.startDebugStream())
}

func (a *AlloyAPI) listComponentsHandler() http.HandlerFunc {
Expand Down Expand Up @@ -107,3 +113,70 @@ func (a *AlloyAPI) getClusteringPeersHandler() http.HandlerFunc {
_, _ = w.Write(bb)
}
}

func (a *AlloyAPI) startDebugStream() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
componentID := vars["id"]

// Buffer of 1000 entries to handle load spikes and prevent this functionality from eating up too much memory.
dataCh := make(chan string, 1000)
ctx := r.Context()

sampleProb := setSampleProb(w, r.URL.Query().Get("sampleProb"))

a.xray.SetStream(componentID, func(data string) {
select {
case <-ctx.Done():
return
default:
if sampleProb < 1 && rand.Float64() > sampleProb {
return
}
// Avoid blocking the channel when the channel is full
select {
case dataCh <- data:
default:
}
}
})

stopStreaming := func() {
close(dataCh)
a.xray.DeleteStream(componentID)
}

for {
select {
case data := <-dataCh:
var builder strings.Builder
builder.WriteString(data)
// |xray| delimiter is added at the end of every chunk
builder.WriteString("|xray|")
_, writeErr := w.Write([]byte(builder.String()))
if writeErr != nil {
stopStreaming()
return
}
// TODO: flushing at a regular interval might be better performance wise
w.(http.Flusher).Flush()
case <-ctx.Done():
stopStreaming()
return
}
}
}
}

func setSampleProb(w http.ResponseWriter, sampleProbParam string) (sampleProb float64) {
sampleProb = 1.0
if sampleProbParam != "" {
var err error
sampleProb, err = strconv.ParseFloat(sampleProbParam, 64)
if err != nil || sampleProb < 0 || sampleProb > 1 {
http.Error(w, "Invalid sample probability", http.StatusBadRequest)
return 1.0
}
}
return sampleProb
}

0 comments on commit 2f87657

Please sign in to comment.