From b43abde86c37a0a47650305d2f370b5e8dc6b4fa Mon Sep 17 00:00:00 2001 From: Spencer Whaley Date: Fri, 6 Dec 2024 14:18:58 -0800 Subject: [PATCH] Add sourceHost and destHost to flow metrics. This change adds sourceHost and destHost to the flow metrics produced by the flow collector. It acomplishes this by adding those labels to the promethus metrics, and changing the aggregation key for metric aggreagation in the flow collector. This approach may change to one that does not require new labels on the prometheus metric. To be discussed. Signed-off-by: Spencer Whaley --- pkg/flow/collector.go | 14 +++++++------- pkg/flow/flow_mem_driver.go | 10 ++++++++++ pkg/flow/record.go | 2 ++ 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/pkg/flow/collector.go b/pkg/flow/collector.go index 6188816ac..8e575f1d9 100644 --- a/pkg/flow/collector.go +++ b/pkg/flow/collector.go @@ -75,37 +75,37 @@ func (fc *FlowCollector) NewMetrics(reg prometheus.Registerer) *collectorMetrics Name: "flows_total", Help: "Total Flows", }, - []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess"}), + []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "sourceHost", "destHost"}), octets: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "octets_total", Help: "Total Octets", }, - []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess"}), + []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "sourceHost", "destHost"}), httpReqsMethod: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "http_requests_method_total", Help: "How many HTTP requests processed, partitioned by method", }, - []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "method"}), + []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "method", "sourceHost", "destHost"}), httpReqsResult: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "http_requests_result_total", Help: "How many HTTP requests processed, partitioned by result code", }, - []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "code"}), + []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "code", "sourceHost", "destHost"}), activeFlows: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "active_flows", Help: "Number of flows that are currently active, partitioned by source and destination", }, - []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess"}), + []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "sourceHost", "destHost"}), lastAccessed: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "address_last_time_seconds", Help: "The last time the address was served", }, - []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess"}), + []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "sourceHost", "destHost"}), flowLatency: prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "flow_latency_microseconds", @@ -113,7 +113,7 @@ func (fc *FlowCollector) NewMetrics(reg prometheus.Registerer) *collectorMetrics // 1ms, 2 ms, 5ms, 10ms, 100ms, 1s, 10s Buckets: []float64{1000, 2000, 5000, 10000, 100000, 1000000, 10000000}, }, - []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess"}), + []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "sourceHost", "destHost"}), activeReconcile: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "active_reconciles", diff --git a/pkg/flow/flow_mem_driver.go b/pkg/flow/flow_mem_driver.go index 383fec9a4..22e5ab29f 100644 --- a/pkg/flow/flow_mem_driver.go +++ b/pkg/flow/flow_mem_driver.go @@ -421,11 +421,15 @@ func (fc *FlowCollector) linkFlowPair(flow *FlowRecord) (*FlowPairRecord, bool) fwdLabels["destSite"] = destSiteName + "@_@" + destSiteId fwdLabels["sourceProcess"] = *sourceFlow.ProcessName fwdLabels["destProcess"] = *destFlow.ProcessName + fwdLabels["sourceHost"] = *sourceFlow.SourceHost + fwdLabels["destHost"] = *destFlow.SourceHost delete(fwdLabels, "process") revLabels["sourceSite"] = destSiteName + "@_@" + destSiteId revLabels["destSite"] = sourceSiteName + "@_@" + sourceSiteId revLabels["sourceProcess"] = *destFlow.ProcessName revLabels["destProcess"] = *sourceFlow.ProcessName + revLabels["sourceHost"] = *destFlow.SourceHost + fwdLabels["destHost"] = *sourceFlow.SourceHost delete(revLabels, "process") fp := &FlowPairRecord{ @@ -2262,6 +2266,12 @@ func (fc *FlowCollector) setupFlowMetrics(va *VanAddressRecord, flow *FlowRecord if key.sourceProcess, ok = metricLabel["sourceProcess"]; !ok { return fmt.Errorf("Metric label missing source process key") } + if key.sourceHost, ok = metricLabel["sourceHost"]; !ok { + return fmt.Errorf("Metric label missing source host key") + } + if key.sourceHost, ok = metricLabel["destHost"]; !ok { + return fmt.Errorf("Metric label missing dest host key") + } if key.destSite, ok = metricLabel["destSite"]; !ok { return fmt.Errorf("Metric label missing dest site key") } diff --git a/pkg/flow/record.go b/pkg/flow/record.go index 4ccf883ab..c694caadf 100644 --- a/pkg/flow/record.go +++ b/pkg/flow/record.go @@ -324,6 +324,8 @@ type LogEventRecord struct { type metricKey struct { sourceSite string sourceProcess string + sourceHost string + destHost string destSite string destProcess string }