Skip to content

Commit

Permalink
add xray endpoint to the api
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed May 8, 2024
1 parent 290b5a1 commit e864cc2
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 7 deletions.
7 changes: 6 additions & 1 deletion internal/alloycli/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/grafana/alloy/internal/service"
httpservice "github.com/grafana/alloy/internal/service/http"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
otel_service "github.com/grafana/alloy/internal/service/otel"
remotecfgservice "github.com/grafana/alloy/internal/service/remotecfg"
uiservice "github.com/grafana/alloy/internal/service/ui"
Expand Down Expand Up @@ -272,8 +273,11 @@ func (fr *alloyRun) Run(configPath string) error {
return fmt.Errorf("failed to create the remotecfg service: %w", err)
}

liveDebuggingService := livedebugging.New()

uiService := uiservice.New(uiservice.Options{
UIPrefix: fr.uiPrefix,
UIPrefix: fr.uiPrefix,
DebuggingStreamHandler: liveDebuggingService.Data().(livedebugging.DebugStreamHandler),
})

otelService := otel_service.New(l)
Expand All @@ -292,6 +296,7 @@ func (fr *alloyRun) Run(configPath string) error {
MinStability: fr.minStability,
Services: []service.Service{
httpService,
liveDebuggingService,
uiService,
clusterService,
otelService,
Expand Down
8 changes: 5 additions & 3 deletions internal/service/ui/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/service"
http_service "github.com/grafana/alloy/internal/service/http"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/grafana/alloy/internal/web/api"
"github.com/grafana/alloy/internal/web/ui"
)
Expand All @@ -21,7 +22,8 @@ const ServiceName = "ui"
// Options are used to configure the UI service. Options are constant for the
// lifetime of the UI service.
type Options struct {
UIPrefix string // Path prefix to host the UI at.
UIPrefix string // Path prefix to host the UI at.
DebuggingStreamHandler livedebugging.DebugStreamHandler // Debugging stream handler used for live debugging in the UI.
}

// Service implements the UI service.
Expand All @@ -46,7 +48,7 @@ func (s *Service) Definition() service.Definition {
return service.Definition{
Name: ServiceName,
ConfigType: nil, // ui does not accept configuration
DependsOn: []string{http_service.ServiceName},
DependsOn: []string{http_service.ServiceName, livedebugging.ServiceName},
Stability: featuregate.StabilityGenerallyAvailable,
}
}
Expand Down Expand Up @@ -75,7 +77,7 @@ func (s *Service) Data() any {
func (s *Service) ServiceHandler(host service.Host) (base string, handler http.Handler) {
r := mux.NewRouter()

fa := api.NewAlloyAPI(host)
fa := api.NewAlloyAPI(host, s.opts.DebuggingStreamHandler)
fa.RegisterRoutes(path.Join(s.opts.UIPrefix, "/api/v0/web"), r)
ui.RegisterRoutes(s.opts.UIPrefix, r)

Expand Down
79 changes: 76 additions & 3 deletions internal/web/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,29 @@ package api

import (
"encoding/json"
"math/rand"
"net/http"
"path"
"strconv"
"strings"

"github.com/gorilla/mux"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/service"
"github.com/grafana/alloy/internal/service/cluster"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/prometheus/prometheus/util/httputil"
)

// AlloyAPI is a wrapper around the component API.
type AlloyAPI struct {
alloy service.Host
alloy service.Host
debuggingStreamHandler livedebugging.DebugStreamHandler
}

// NewAlloyAPI instantiates a new Alloy API.
func NewAlloyAPI(alloy service.Host) *AlloyAPI {
return &AlloyAPI{alloy: alloy}
func NewAlloyAPI(alloy service.Host, debuggingStreamHandler livedebugging.DebugStreamHandler) *AlloyAPI {
return &AlloyAPI{alloy: alloy, debuggingStreamHandler: debuggingStreamHandler}
}

// RegisterRoutes registers all the API's routes.
Expand All @@ -36,6 +41,7 @@ func (a *AlloyAPI) RegisterRoutes(urlPrefix string, r *mux.Router) {
r.Handle(path.Join(urlPrefix, "/components"), httputil.CompressionHandler{Handler: a.listComponentsHandler()})
r.Handle(path.Join(urlPrefix, "/components/{id:.+}"), httputil.CompressionHandler{Handler: a.getComponentHandler()})
r.Handle(path.Join(urlPrefix, "/peers"), httputil.CompressionHandler{Handler: a.getClusteringPeersHandler()})
r.Handle(path.Join(urlPrefix, "/debug/{id:.+}"), a.startDebugStream())
}

func (a *AlloyAPI) listComponentsHandler() http.HandlerFunc {
Expand Down Expand Up @@ -107,3 +113,70 @@ func (a *AlloyAPI) getClusteringPeersHandler() http.HandlerFunc {
_, _ = w.Write(bb)
}
}

func (a *AlloyAPI) startDebugStream() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
componentID := vars["id"]

// Buffer of 1000 entries to handle load spikes and prevent this functionality from eating up too much memory.
dataCh := make(chan string, 1000)
ctx := r.Context()

sampleProb := setSampleProb(w, r.URL.Query().Get("sampleProb"))

a.debuggingStreamHandler.SetStream(componentID, func(data string) {
select {
case <-ctx.Done():
return
default:
if sampleProb < 1 && rand.Float64() > sampleProb {
return
}
// Avoid blocking the channel when the channel is full
select {
case dataCh <- data:
default:
}
}
})

stopStreaming := func() {
close(dataCh)
a.debuggingStreamHandler.DeleteStream(componentID)
}

for {
select {
case data := <-dataCh:
var builder strings.Builder
builder.WriteString(data)
// |;| delimiter is added at the end of every chunk
builder.WriteString("|;|")
_, writeErr := w.Write([]byte(builder.String()))
if writeErr != nil {
stopStreaming()
return
}
// TODO: flushing at a regular interval might be better performance wise
w.(http.Flusher).Flush()
case <-ctx.Done():
stopStreaming()
return
}
}
}
}

func setSampleProb(w http.ResponseWriter, sampleProbParam string) (sampleProb float64) {
sampleProb = 1.0
if sampleProbParam != "" {
var err error
sampleProb, err = strconv.ParseFloat(sampleProbParam, 64)
if err != nil || sampleProb < 0 || sampleProb > 1 {
http.Error(w, "Invalid sample probability", http.StatusBadRequest)
return 1.0
}
}
return sampleProb
}

0 comments on commit e864cc2

Please sign in to comment.