From d2915ecbeb9777d2538d2203d7f9f1e166cdfaee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20SZKIBA?= Date: Mon, 16 Sep 2024 10:47:14 +0200 Subject: [PATCH 1/4] feat: adapt to protocol changes --- internal/stream/parse.go | 195 ++++++++++++++++++++++++++++++++------ internal/stream/stream.go | 4 +- 2 files changed, 171 insertions(+), 28 deletions(-) diff --git a/internal/stream/parse.go b/internal/stream/parse.go index f308553..38af131 100644 --- a/internal/stream/parse.go +++ b/internal/stream/parse.go @@ -2,12 +2,24 @@ package stream import ( "encoding/json" + "errors" + "fmt" + "sort" "github.com/r3labs/sse/v2" "github.com/szkiba/xk6-top/internal/digest" ) -func parse(msg *sse.Event) (*digest.Event, error) { +type parser struct { + metrics digest.Metrics + names []string +} + +func newParser() *parser { + return &parser{metrics: make(digest.Metrics)} +} + +func (p *parser) parse(msg *sse.Event) (*digest.Event, error) { var ( etype digest.EventType edata interface{} @@ -18,7 +30,7 @@ func parse(msg *sse.Event) (*digest.Event, error) { return nil, err } - edata, err = unmarshalData(etype, msg.Data) + edata, err = p.unmarshalData(etype, msg.Data) if err != nil { return nil, err } @@ -26,48 +38,177 @@ func parse(msg *sse.Event) (*digest.Event, error) { return &digest.Event{Type: etype, Data: edata}, nil } -func unmarshalData(etype digest.EventType, data []byte) (interface{}, error) { +func (p *parser) unmarshalData(etype digest.EventType, data []byte) (interface{}, error) { switch etype { case digest.EventTypeMetric: - target := make(digest.Metrics) + return p.parseMetric(data) - if err := json.Unmarshal(data, &target); err != nil { - return nil, err + case digest.EventTypeParam: + return p.parseParam(data) + + case digest.EventTypeConfig: + return p.parseConfig(data) + + case digest.EventTypeStart, + digest.EventTypeStop, + digest.EventTypeSnapshot, + digest.EventTypeCumulative: + if len(data) > 0 && data[0] == '{' { + return p.parseAggregatesLegacy(data) } - return target, nil + return p.parseAggregates(data) - case digest.EventTypeParam: - target := new(digest.ParamData) + default: + return nil, nil //nolint:nilnil + } +} - if err := json.Unmarshal(data, target); err != nil { - return nil, err - } +func (p *parser) parseMetric(data []byte) (interface{}, error) { + target := make(digest.Metrics) - return target, nil + if err := json.Unmarshal(data, &target); err != nil { + return nil, err + } - case digest.EventTypeConfig: - target := make(digest.ConfigData) + for k, v := range target { + v.Name = k - if err := json.Unmarshal(data, &target); err != nil { - return nil, err - } + p.metrics[k] = v + } - return target, nil + names := make([]string, 0, len(p.metrics)) - case digest.EventTypeStart, - digest.EventTypeStop, - digest.EventTypeSnapshot, - digest.EventTypeCumulative: - target := make(digest.Aggregates) + for name := range p.metrics { + names = append(names, name) + } + + sort.Strings(names) + + p.names = names - if err := json.Unmarshal(data, &target); err != nil { + return target, nil +} + +func (p *parser) parseParam(data []byte) (interface{}, error) { + target := new(digest.ParamData) + + if err := json.Unmarshal(data, target); err != nil { + return nil, err + } + + return target, nil +} + +func (p *parser) parseConfig(data []byte) (interface{}, error) { + target := make(digest.ConfigData) + + if err := json.Unmarshal(data, &target); err != nil { + return nil, err + } + + return target, nil +} + +func (p *parser) parseAggregatesLegacy(data []byte) (interface{}, error) { + target := make(digest.Aggregates) + + if err := json.Unmarshal(data, &target); err != nil { + return nil, err + } + + return target, nil +} + +func (p *parser) parseAggregates(data []byte) (interface{}, error) { + var samples [][]float64 + + if err := json.Unmarshal(data, &samples); err != nil { + return nil, err + } + + target := make(digest.Aggregates) + + for metricIdx := range samples { + metric, err := p.getMetric(metricIdx) + if err != nil { + return nil, err + } + + agg, err := p.parseAggregate(samples[metricIdx], metric.Type) + if err != nil { return nil, err } - return target, nil + target[metric.Name] = agg + } + + return target, nil +} + +func (p *parser) getMetric(idx int) (*digest.Metric, error) { + if idx >= len(p.names) { + return nil, fmt.Errorf("%w: metric index out of range %d", errData, idx) + } + + name := p.names[idx] + metric, found := p.metrics[name] + if !found { + return nil, fmt.Errorf("%w: unknown metric name %s", errData, name) + } + + return metric, nil +} + +func (p *parser) parseAggregate(data []float64, mt digest.MetricType) (digest.Aggregate, error) { + names := aggregateNames(mt) + if len(names) == 0 { + return nil, fmt.Errorf("%w: no metric names for type %s", errData, mt.String()) + } + + if len(data) != len(names) { + return nil, fmt.Errorf( + "%w: metric definition mismatch %d - %d - %s", + errData, + len(data), + len(names), + names, + ) + + // return nil, fmt.Errorf("%w: metric definition mismatch %s", errData, mt.String()) + } + + agg := make(digest.Aggregate, len(names)) + + for idx := range names { + agg[names[idx]] = data[idx] + } + + return agg, nil +} + +func aggregateNames(mtype digest.MetricType) []string { + switch mtype { + case digest.MetricTypeGauge: + return gaugeAggregateNames + case digest.MetricTypeRate: + return rateAggregateNames + case digest.MetricTypeCounter: + return counterAggregateNames + case digest.MetricTypeTrend: + return trendAggregateNames default: - return nil, nil //nolint:nilnil + return nil } } + +//nolint:gochecknoglobals +var ( + gaugeAggregateNames = []string{"value"} + rateAggregateNames = []string{"rate"} + counterAggregateNames = []string{"count", "rate"} + trendAggregateNames = []string{"avg", "max", "med", "min", "p(90)", "p(95)", "p(99)"} +) + +var errData = errors.New("invalid data") diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 2d3caf0..949da62 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -22,8 +22,10 @@ func Subscribe(ctx context.Context, url string, sub chan tea.Msg) tea.Cmd { sub <- &digest.Event{Type: digest.EventTypeDisconnect} }) + parser := newParser() + return client.SubscribeRawWithContext(ctx, func(msg *sse.Event) { - event, perr := parse(msg) + event, perr := parser.parse(msg) if perr != nil { sub <- perr From 3545c5a2287a4f178e741a438401306209137e74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20SZKIBA?= Date: Mon, 16 Sep 2024 10:47:30 +0200 Subject: [PATCH 2/4] feat: add "threshold" event type --- internal/digest/event.go | 1 + internal/digest/eventtype_enumer.go | 28 ++++++++++++++++------------ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/internal/digest/event.go b/internal/digest/event.go index 28e916d..f5c5c35 100644 --- a/internal/digest/event.go +++ b/internal/digest/event.go @@ -11,6 +11,7 @@ const ( EventTypeCumulative // EventTypeCumulative mean "cumulative" SSE event. EventTypeStart // EventTypeStart mean "start" SSE event. EventTypeStop // EventTypeStop mean "stop" SSE event. + EventTypeThreshold // EventTypeThreshold mean "threshold" SSE event. EventTypeConnect // EventTypeConnect mean SSE channel connected. EventTypeDisconnect // EventTypeDisconnect mean SSE channel disconnected. ) diff --git a/internal/digest/eventtype_enumer.go b/internal/digest/eventtype_enumer.go index da45637..d4f1d16 100644 --- a/internal/digest/eventtype_enumer.go +++ b/internal/digest/eventtype_enumer.go @@ -8,11 +8,11 @@ import ( "strings" ) -const _EventTypeName = "configparammetricsnapshotcumulativestartstopconnectdisconnect" +const _EventTypeName = "configparammetricsnapshotcumulativestartstopthresholdconnectdisconnect" -var _EventTypeIndex = [...]uint8{0, 6, 11, 17, 25, 35, 40, 44, 51, 61} +var _EventTypeIndex = [...]uint8{0, 6, 11, 17, 25, 35, 40, 44, 53, 60, 70} -const _EventTypeLowerName = "configparammetricsnapshotcumulativestartstopconnectdisconnect" +const _EventTypeLowerName = "configparammetricsnapshotcumulativestartstopthresholdconnectdisconnect" func (i EventType) String() string { if i < 0 || i >= EventType(len(_EventTypeIndex)-1) { @@ -32,11 +32,12 @@ func _EventTypeNoOp() { _ = x[EventTypeCumulative-(4)] _ = x[EventTypeStart-(5)] _ = x[EventTypeStop-(6)] - _ = x[EventTypeConnect-(7)] - _ = x[EventTypeDisconnect-(8)] + _ = x[EventTypeThreshold-(7)] + _ = x[EventTypeConnect-(8)] + _ = x[EventTypeDisconnect-(9)] } -var _EventTypeValues = []EventType{EventTypeConfig, EventTypeParam, EventTypeMetric, EventTypeSnapshot, EventTypeCumulative, EventTypeStart, EventTypeStop, EventTypeConnect, EventTypeDisconnect} +var _EventTypeValues = []EventType{EventTypeConfig, EventTypeParam, EventTypeMetric, EventTypeSnapshot, EventTypeCumulative, EventTypeStart, EventTypeStop, EventTypeThreshold, EventTypeConnect, EventTypeDisconnect} var _EventTypeNameToValueMap = map[string]EventType{ _EventTypeName[0:6]: EventTypeConfig, @@ -53,10 +54,12 @@ var _EventTypeNameToValueMap = map[string]EventType{ _EventTypeLowerName[35:40]: EventTypeStart, _EventTypeName[40:44]: EventTypeStop, _EventTypeLowerName[40:44]: EventTypeStop, - _EventTypeName[44:51]: EventTypeConnect, - _EventTypeLowerName[44:51]: EventTypeConnect, - _EventTypeName[51:61]: EventTypeDisconnect, - _EventTypeLowerName[51:61]: EventTypeDisconnect, + _EventTypeName[44:53]: EventTypeThreshold, + _EventTypeLowerName[44:53]: EventTypeThreshold, + _EventTypeName[53:60]: EventTypeConnect, + _EventTypeLowerName[53:60]: EventTypeConnect, + _EventTypeName[60:70]: EventTypeDisconnect, + _EventTypeLowerName[60:70]: EventTypeDisconnect, } var _EventTypeNames = []string{ @@ -67,8 +70,9 @@ var _EventTypeNames = []string{ _EventTypeName[25:35], _EventTypeName[35:40], _EventTypeName[40:44], - _EventTypeName[44:51], - _EventTypeName[51:61], + _EventTypeName[44:53], + _EventTypeName[53:60], + _EventTypeName[60:70], } // EventTypeString retrieves an enum value from the enum constants string name. From a512053921c6891512a9bb4872e887bdfda6b408 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20SZKIBA?= Date: Mon, 16 Sep 2024 10:47:53 +0200 Subject: [PATCH 3/4] style: fix unused parameter linter issue --- internal/cmd/root.go | 2 +- internal/cmd/run.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/cmd/root.go b/internal/cmd/root.go index 650d1c4..75e3442 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -40,7 +40,7 @@ func RootCmd() *cobra.Command { Long: rootHelp, Version: version, Args: cobra.NoArgs, - RunE: func(cmd *cobra.Command, args []string) error { + RunE: func(_ *cobra.Command, _ []string) error { endpoint, err := url.JoinPath(baseURL, "/events") if err != nil { return err diff --git a/internal/cmd/run.go b/internal/cmd/run.go index c79e5ea..802924f 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -24,7 +24,7 @@ func runCmd() *cobra.Command { Short: "k6 test runner and terminal-based metrics dashboard viewer", Long: runHelp, Version: version, - RunE: func(cmd *cobra.Command, args []string) error { + RunE: func(_ *cobra.Command, args []string) error { return runRun(args) }, From 97e2de2e0d394e2b840a2a846753c24ba3de32ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20SZKIBA?= Date: Mon, 16 Sep 2024 10:48:02 +0200 Subject: [PATCH 4/4] docs: release notes --- releases/v0.2.1.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 releases/v0.2.1.md diff --git a/releases/v0.2.1.md b/releases/v0.2.1.md new file mode 100644 index 0000000..473e6ce --- /dev/null +++ b/releases/v0.2.1.md @@ -0,0 +1,6 @@ +xk6-top `v0.2.1` is here 🎉! + +`v0.2.0` is a maintenance release, it does not contain new features. +The purpose of the release is to adapt to protocol changes. + +From xk6-dashboard v0.7.3, the SSE protocol aggregate events have been optimized. This is an incompatible change that is addressed in this release. Older xk6-top versions do not work with newer xk6-dashboard (and k6) versions.