diff --git a/client/egrpc/component.go b/client/egrpc/component.go index c177957d..33678dfc 100644 --- a/client/egrpc/component.go +++ b/client/egrpc/component.go @@ -37,7 +37,7 @@ func newComponent(name string, config *Config, logger *elog.Component) *Componen if config.EnableBlock { if config.DialTimeout > time.Duration(0) { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, config.DialTimeout) + ctx, cancel = context.WithTimeoutCause(ctx, config.DialTimeout, fmt.Errorf("grpc client conn dial timeout")) defer cancel() } diff --git a/client/egrpc/interceptor.go b/client/egrpc/interceptor.go index 3ef422a6..24312bdb 100644 --- a/client/egrpc/interceptor.go +++ b/client/egrpc/interceptor.go @@ -301,7 +301,7 @@ func (c *Container) timeoutUnaryClientInterceptor() grpc.UnaryClientInterceptor _, ok := ctx.Deadline() if !ok { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, c.config.ReadTimeout) + ctx, cancel = context.WithTimeoutCause(ctx, c.config.ReadTimeout, fmt.Errorf("grpc client read timeout")) defer cancel() } return invoker(ctx, method, req, reply, cc, opts...) diff --git a/core/emetric/metric.go b/core/emetric/metric.go index 30025a52..affbc6b9 100644 --- a/core/emetric/metric.go +++ b/core/emetric/metric.go @@ -23,6 +23,8 @@ var ( TypeMySQL = "mysql" // DefaultNamespace ... DefaultNamespace = "ego" + // Conn 连接信息 + Conn = "conn" ) var ( @@ -133,6 +135,13 @@ var ( Name: "build_info", Labels: []string{"name", "mode", "region", "zone", "app_version", "ego_version", "start_time", "build_time", "go_version"}, }.Build() + + // ConnGauge ... + ConnGauge = GaugeVecOpts{ + Namespace: DefaultNamespace, + Name: "connection_states", + Labels: []string{"state"}, + }.Build() ) func init() { diff --git a/core/emetric/tcpstat.go b/core/emetric/tcpstat.go new file mode 100644 index 00000000..accf4e38 --- /dev/null +++ b/core/emetric/tcpstat.go @@ -0,0 +1,189 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package emetric + +import ( + "fmt" + "io" + "os" + "path" + "strconv" + "strings" + "time" + + "github.com/gotomicro/ego/core/elog" +) + +type tcpConnectionState int + +const ( + // TCP_ESTABLISHED + tcpEstablished tcpConnectionState = iota + 1 + // TCP_SYN_SENT + tcpSynSent + // TCP_SYN_RECV + tcpSynRecv + // TCP_FIN_WAIT1 + tcpFinWait1 + // TCP_FIN_WAIT2 + tcpFinWait2 + // TCP_TIME_WAIT + tcpTimeWait + // TCP_CLOSE + tcpClose + // TCP_CLOSE_WAIT + tcpCloseWait + // TCP_LAST_ACK + tcpLastAck + // TCP_LISTEN + tcpListen + // TCP_CLOSING + tcpClosing + // TCP_RX_BUFFER + //tcpRxQueuedBytes + // TCP_TX_BUFFER + //tcpTxQueuedBytes +) + +type TcpStatCollector struct { +} + +// NewTCPStatCollector returns a new Collector exposing network stats. +func NewTCPStatCollector() (*TcpStatCollector, error) { + return &TcpStatCollector{}, nil +} + +func (c *TcpStatCollector) Update() error { + go func() { + statsFile := path.Join("/proc", strconv.Itoa(os.Getpid()), "net", "tcp") + tcpStats, err := getTCPStats(statsFile) + if err != nil { + elog.EgoLogger.Error(fmt.Errorf("couldn't get tcpstats: %w", err).Error()) + return + } + for { + for name, value := range tcpStats { + ConnGauge.WithLabelValues( + name.String(), + ).Set(value) + time.Sleep(5 * time.Second) + } + } + }() + + // if enabled ipv6 system + //tcp6File := procFilePath("net/tcp6") + //if _, hasIPv6 := os.Stat(tcp6File); hasIPv6 == nil { + // tcp6Stats, err := getTCPStats(tcp6File) + // if err != nil { + // return fmt.Errorf("couldn't get tcp6stats: %w", err) + // } + // + // for st, value := range tcp6Stats { + // tcpStats[st] += value + // } + //} + + //for st, value := range tcpStats { + // ch <- c.desc.mustNewConstMetric(value, st.String()) + //} + return nil +} + +func getTCPStats(statsFile string) (map[tcpConnectionState]float64, error) { + file, err := os.Open(statsFile) + if err != nil { + return nil, err + } + defer file.Close() + + return parseTCPStats(file) +} + +func parseTCPStats(r io.Reader) (map[tcpConnectionState]float64, error) { + tcpStats := map[tcpConnectionState]float64{} + contents, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + for _, line := range strings.Split(string(contents), "\n")[1:] { + parts := strings.Fields(line) + if len(parts) == 0 { + continue + } + if len(parts) < 5 { + return nil, fmt.Errorf("invalid TCP stats line: %q", line) + } + + //qu := strings.Split(parts[4], ":") + //if len(qu) < 2 { + // return nil, fmt.Errorf("cannot parse tx_queues and rx_queues: %q", line) + //} + // + //tx, err := strconv.ParseUint(qu[0], 16, 64) + //if err != nil { + // return nil, err + //} + //tcpStats[tcpConnectionState(tcpTxQueuedBytes)] += float64(tx) + // + //rx, err := strconv.ParseUint(qu[1], 16, 64) + //if err != nil { + // return nil, err + //} + //tcpStats[tcpConnectionState(tcpRxQueuedBytes)] += float64(rx) + + st, err := strconv.ParseInt(parts[3], 16, 8) + if err != nil { + return nil, err + } + + tcpStats[tcpConnectionState(st)]++ + + } + + return tcpStats, nil +} + +func (st tcpConnectionState) String() string { + switch st { + case tcpEstablished: + return "established" + case tcpSynSent: + return "syn_sent" + case tcpSynRecv: + return "syn_recv" + case tcpFinWait1: + return "fin_wait1" + case tcpFinWait2: + return "fin_wait2" + case tcpTimeWait: + return "time_wait" + case tcpClose: + return "close" + case tcpCloseWait: + return "close_wait" + case tcpLastAck: + return "last_ack" + case tcpListen: + return "listen" + case tcpClosing: + return "closing" + //case tcpRxQueuedBytes: + // return "rx_queued_bytes" + //case tcpTxQueuedBytes: + // return "tx_queued_bytes" + default: + return "unknown" + } +} diff --git a/ego_function.go b/ego_function.go index 68f8d22c..582c06eb 100644 --- a/ego_function.go +++ b/ego_function.go @@ -23,6 +23,7 @@ import ( "github.com/gotomicro/ego/internal/retry" "github.com/prometheus/client_golang/prometheus" "go.uber.org/automaxprocs/maxprocs" + "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -45,6 +46,9 @@ func (e *Ego) waitSignals() { signal.Stop(sig) cancel() }() + + elog.Info("server stop", zap.Bool("graceful", grace)) + _ = e.Stop(stopCtx, grace) <-stopCtx.Done() // 记录服务器关闭时候,由于关闭过慢,无法正常关闭,被强制cancel diff --git a/go.mod b/go.mod index 95dcd6b6..646b3a00 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/gotomicro/ego -go 1.18 +go 1.21 require ( github.com/BurntSushi/toml v1.1.0 diff --git a/go.sum b/go.sum index bc806e82..ad118bef 100644 --- a/go.sum +++ b/go.sum @@ -157,6 +157,7 @@ github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNI github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= @@ -175,6 +176,7 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.1.0 h1:/d3pCKDPWNnvIWe0vVUpNP32qc8U3PDVxySP/y360qE= +github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -221,6 +223,7 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -309,9 +312,11 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= @@ -383,6 +388,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= +github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= @@ -422,6 +428,7 @@ github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzG github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= @@ -525,6 +532,7 @@ go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk= go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= @@ -862,6 +870,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= diff --git a/server/egovernor/config.go b/server/egovernor/config.go index 15c5956b..d87713ec 100644 --- a/server/egovernor/config.go +++ b/server/egovernor/config.go @@ -11,6 +11,7 @@ type Config struct { Host string Port int EnableLocalMainIP bool + EnableConnTcp bool Network string } diff --git a/server/egovernor/container.go b/server/egovernor/container.go index 44b83b34..7bf406cc 100644 --- a/server/egovernor/container.go +++ b/server/egovernor/container.go @@ -3,6 +3,7 @@ package egovernor import ( "github.com/gotomicro/ego/core/econf" "github.com/gotomicro/ego/core/elog" + "github.com/gotomicro/ego/core/emetric" "github.com/gotomicro/ego/core/util/xnet" ) @@ -53,5 +54,13 @@ func (c *Container) Build(options ...Option) *Component { for _, option := range options { option(c) } + if c.config.EnableConnTcp { + obj, err := emetric.NewTCPStatCollector() + if err != nil { + c.logger.Panic("NewTCPStatCollector fail", elog.FieldErr(err)) + } + obj.Update() + } + return newComponent(c.name, c.config, c.logger) }