diff --git a/core/emetric/tcpstat.go b/core/emetric/tcpstat.go index 4fe2414..aa90039 100644 --- a/core/emetric/tcpstat.go +++ b/core/emetric/tcpstat.go @@ -25,6 +25,7 @@ import ( "time" "github.com/gotomicro/ego/core/elog" + "github.com/samber/lo" ) type tcpConnectionState int @@ -59,18 +60,21 @@ const ( ) type TcpStatCollector struct { + ForeignPorts []uint64 } // NewTCPStatCollector returns a new Collector exposing network stats. -func NewTCPStatCollector() (*TcpStatCollector, error) { - return &TcpStatCollector{}, nil +func NewTCPStatCollector(foreignPorts []uint64) *TcpStatCollector { + return &TcpStatCollector{ + ForeignPorts: foreignPorts, + } } func (c *TcpStatCollector) Update() error { go func() { for { statsFile := path.Join("/proc", strconv.Itoa(os.Getpid()), "net", "tcp") - tcpStats, err := getTCPStats(statsFile) + tcpStats, err := c.getTCPStats(statsFile) if err != nil { elog.EgoLogger.Error(fmt.Errorf("couldn't get tcpstats: %w", err).Error()) return @@ -87,34 +91,17 @@ func (c *TcpStatCollector) Update() error { time.Sleep(5 * time.Second) } }() - - // if enabled ipv6 system - //tcp6File := procFilePath("net/tcp6") - //if _, hasIPv6 := os.Stat(tcp6File); hasIPv6 == nil { - // tcp6Stats, err := getTCPStats(tcp6File) - // if err != nil { - // return fmt.Errorf("couldn't get tcp6stats: %w", err) - // } - // - // for st, value := range tcp6Stats { - // tcpStats[st] += value - // } - //} - - //for st, value := range tcpStats { - // ch <- c.desc.mustNewConstMetric(value, st.String()) - //} return nil } -func getTCPStats(statsFile string) (map[tcpConnectionState]map[string]float64, error) { +func (c *TcpStatCollector) getTCPStats(statsFile string) (map[tcpConnectionState]map[string]float64, error) { file, err := os.Open(statsFile) if err != nil { return nil, err } defer file.Close() - return parseTCPStats(file) + return c.parseTCPStats(file) } /* @@ -127,8 +114,7 @@ func getTCPStats(statsFile string) (map[tcpConnectionState]map[string]float64, e |----------------------------------> number of entry */ -func parseTCPStats(r io.Reader) (map[tcpConnectionState]map[string]float64, error) { - //tcpStats := map[tcpConnectionState]float64{} +func (c *TcpStatCollector) parseTCPStats(r io.Reader) (map[tcpConnectionState]map[string]float64, error) { tcpStatsMap := make(map[tcpConnectionState]map[string]float64, 0) contents, err := io.ReadAll(r) if err != nil { @@ -144,24 +130,7 @@ func parseTCPStats(r io.Reader) (map[tcpConnectionState]map[string]float64, erro return nil, fmt.Errorf("invalid TCP stats line: %q", line) } - //qu := strings.Split(parts[4], ":") - //if len(qu) < 2 { - // return nil, fmt.Errorf("cannot parse tx_queues and rx_queues: %q", line) - //} - // - //tx, err := strconv.ParseUint(qu[0], 16, 64) - //if err != nil { - // return nil, err - //} - //tcpStats[tcpConnectionState(tcpTxQueuedBytes)] += float64(tx) - // - //rx, err := strconv.ParseUint(qu[1], 16, 64) - //if err != nil { - // return nil, err - //} - //tcpStats[tcpConnectionState(tcpRxQueuedBytes)] += float64(rx) - - ipv4, _ := parseIpV4(parts[2]) + ipv4, _ := c.parseIpV4(parts[2]) st, err := strconv.ParseInt(parts[3], 16, 8) if err != nil { return nil, err @@ -175,9 +144,6 @@ func parseTCPStats(r io.Reader) (map[tcpConnectionState]map[string]float64, erro info[ipv4]++ } tcpStatsMap[tcpConnectionState(st)] = info - - //tcpStats[tcpConnectionState(st)]++ - } return tcpStatsMap, nil @@ -218,12 +184,21 @@ func (st tcpConnectionState) String() string { // 只解析IPV4 // 34190A0A:3D2D -func parseIpV4(s string) (string, error) { +func (ts *TcpStatCollector) parseIpV4(s string) (string, error) { if len(s) != 13 { return "", fmt.Errorf("not ipv4") } hexIP := s[:len(s)-5] hexPort := s[len(s)-4:] + port, err := strconv.ParseUint(hexPort, 16, 16) + if err != nil { + return "", nil + } + // 防止端口过多,导致prometheus监控数据太大 + if !lo.Contains(ts.ForeignPorts, port) { + return "all", nil + } + bytesIP, err := hex.DecodeString(hexIP) if err != nil { return "", nil @@ -231,10 +206,5 @@ func parseIpV4(s string) (string, error) { uint32IP := binary.LittleEndian.Uint32(bytesIP) //转换为主机字节序 IP := make(net.IP, 4) binary.BigEndian.PutUint32(IP, uint32IP) - port, err := strconv.ParseUint(hexPort, 16, 16) return fmt.Sprintf("%s:%d", IP.String(), port), err } - -//func parsePort(portStr string) (int64, error) { -// return strconv.ParseInt(portStr, 16, 16) -//} diff --git a/go.mod b/go.mod index 646b3a0..61990b3 100644 --- a/go.mod +++ b/go.mod @@ -78,6 +78,7 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/samber/lo v1.39.0 // indirect github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect github.com/shirou/gopsutil/v3 v3.21.6 // indirect github.com/stoewer/go-strcase v1.2.0 // indirect @@ -93,6 +94,7 @@ require ( go.uber.org/multierr v1.6.0 // indirect golang.org/x/arch v0.3.0 // indirect golang.org/x/crypto v0.11.0 // indirect + golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect golang.org/x/mod v0.11.0 // indirect golang.org/x/net v0.12.0 // indirect golang.org/x/sys v0.12.0 // indirect diff --git a/go.sum b/go.sum index ad118be..71f2ed2 100644 --- a/go.sum +++ b/go.sum @@ -431,6 +431,8 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= +github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA= +github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk= github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g= @@ -567,6 +569,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= +golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= diff --git a/server/egovernor/config.go b/server/egovernor/config.go index 1afe844..e75840d 100644 --- a/server/egovernor/config.go +++ b/server/egovernor/config.go @@ -12,15 +12,17 @@ type Config struct { Port int EnableLocalMainIP bool EnableConnTcpMetric bool + ConnTcpMetricPorts []uint64 Network string } // DefaultConfig 默认配置 func DefaultConfig() *Config { return &Config{ - Host: eflag.String("host"), - Network: "tcp4", - Port: 9003, + Host: eflag.String("host"), + Network: "tcp4", + Port: 9003, + ConnTcpMetricPorts: []uint64{6379, 3306, 8635, 27017, 9092}, } } diff --git a/server/egovernor/container.go b/server/egovernor/container.go index 3957114..c155c2b 100644 --- a/server/egovernor/container.go +++ b/server/egovernor/container.go @@ -55,12 +55,8 @@ func (c *Container) Build(options ...Option) *Component { option(c) } if c.config.EnableConnTcpMetric { - obj, err := emetric.NewTCPStatCollector() - if err != nil { - c.logger.Panic("NewTCPStatCollector fail", elog.FieldErr(err)) - } + obj := emetric.NewTCPStatCollector(c.config.ConnTcpMetricPorts) obj.Update() } - return newComponent(c.name, c.config, c.logger) }