Skip to content

Commit

Permalink
Merge pull request #386 from askuy/feature/optimizecontextcancel
Browse files Browse the repository at this point in the history
增加timeoutcause
  • Loading branch information
askuy authored Jun 6, 2024
2 parents a77dba3 + 342129c commit 687ce24
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 3 deletions.
2 changes: 1 addition & 1 deletion client/egrpc/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newComponent(name string, config *Config, logger *elog.Component) *Componen
if config.EnableBlock {
if config.DialTimeout > time.Duration(0) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, config.DialTimeout)
ctx, cancel = context.WithTimeoutCause(ctx, config.DialTimeout, fmt.Errorf("grpc client conn dial timeout"))
defer cancel()
}

Expand Down
2 changes: 1 addition & 1 deletion client/egrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (c *Container) timeoutUnaryClientInterceptor() grpc.UnaryClientInterceptor
_, ok := ctx.Deadline()
if !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, c.config.ReadTimeout)
ctx, cancel = context.WithTimeoutCause(ctx, c.config.ReadTimeout, fmt.Errorf("grpc client read timeout"))
defer cancel()
}
return invoker(ctx, method, req, reply, cc, opts...)
Expand Down
9 changes: 9 additions & 0 deletions core/emetric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var (
TypeMySQL = "mysql"
// DefaultNamespace ...
DefaultNamespace = "ego"
// Conn 连接信息
Conn = "conn"
)

var (
Expand Down Expand Up @@ -133,6 +135,13 @@ var (
Name: "build_info",
Labels: []string{"name", "mode", "region", "zone", "app_version", "ego_version", "start_time", "build_time", "go_version"},
}.Build()

// ConnGauge ...
ConnGauge = GaugeVecOpts{
Namespace: DefaultNamespace,
Name: "connection_states",
Labels: []string{"state"},
}.Build()
)

func init() {
Expand Down
189 changes: 189 additions & 0 deletions core/emetric/tcpstat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package emetric

import (
"fmt"
"io"
"os"
"path"
"strconv"
"strings"
"time"

"github.com/gotomicro/ego/core/elog"
)

type tcpConnectionState int

const (
// TCP_ESTABLISHED
tcpEstablished tcpConnectionState = iota + 1
// TCP_SYN_SENT
tcpSynSent
// TCP_SYN_RECV
tcpSynRecv
// TCP_FIN_WAIT1
tcpFinWait1
// TCP_FIN_WAIT2
tcpFinWait2
// TCP_TIME_WAIT
tcpTimeWait
// TCP_CLOSE
tcpClose
// TCP_CLOSE_WAIT
tcpCloseWait
// TCP_LAST_ACK
tcpLastAck
// TCP_LISTEN
tcpListen
// TCP_CLOSING
tcpClosing
// TCP_RX_BUFFER
//tcpRxQueuedBytes
// TCP_TX_BUFFER
//tcpTxQueuedBytes
)

type TcpStatCollector struct {
}

// NewTCPStatCollector returns a new Collector exposing network stats.
func NewTCPStatCollector() (*TcpStatCollector, error) {
return &TcpStatCollector{}, nil
}

func (c *TcpStatCollector) Update() error {
go func() {
statsFile := path.Join("/proc", strconv.Itoa(os.Getpid()), "net", "tcp")
tcpStats, err := getTCPStats(statsFile)
if err != nil {
elog.EgoLogger.Error(fmt.Errorf("couldn't get tcpstats: %w", err).Error())
return
}
for {
for name, value := range tcpStats {
ConnGauge.WithLabelValues(
name.String(),
).Set(value)
time.Sleep(5 * time.Second)
}
}
}()

// if enabled ipv6 system
//tcp6File := procFilePath("net/tcp6")
//if _, hasIPv6 := os.Stat(tcp6File); hasIPv6 == nil {
// tcp6Stats, err := getTCPStats(tcp6File)
// if err != nil {
// return fmt.Errorf("couldn't get tcp6stats: %w", err)
// }
//
// for st, value := range tcp6Stats {
// tcpStats[st] += value
// }
//}

//for st, value := range tcpStats {
// ch <- c.desc.mustNewConstMetric(value, st.String())
//}
return nil
}

func getTCPStats(statsFile string) (map[tcpConnectionState]float64, error) {
file, err := os.Open(statsFile)
if err != nil {
return nil, err
}
defer file.Close()

return parseTCPStats(file)
}

func parseTCPStats(r io.Reader) (map[tcpConnectionState]float64, error) {
tcpStats := map[tcpConnectionState]float64{}
contents, err := io.ReadAll(r)
if err != nil {
return nil, err
}

for _, line := range strings.Split(string(contents), "\n")[1:] {
parts := strings.Fields(line)
if len(parts) == 0 {
continue
}
if len(parts) < 5 {
return nil, fmt.Errorf("invalid TCP stats line: %q", line)
}

//qu := strings.Split(parts[4], ":")
//if len(qu) < 2 {
// return nil, fmt.Errorf("cannot parse tx_queues and rx_queues: %q", line)
//}
//
//tx, err := strconv.ParseUint(qu[0], 16, 64)
//if err != nil {
// return nil, err
//}
//tcpStats[tcpConnectionState(tcpTxQueuedBytes)] += float64(tx)
//
//rx, err := strconv.ParseUint(qu[1], 16, 64)
//if err != nil {
// return nil, err
//}
//tcpStats[tcpConnectionState(tcpRxQueuedBytes)] += float64(rx)

st, err := strconv.ParseInt(parts[3], 16, 8)
if err != nil {
return nil, err
}

tcpStats[tcpConnectionState(st)]++

}

return tcpStats, nil
}

func (st tcpConnectionState) String() string {
switch st {
case tcpEstablished:
return "established"
case tcpSynSent:
return "syn_sent"
case tcpSynRecv:
return "syn_recv"
case tcpFinWait1:
return "fin_wait1"
case tcpFinWait2:
return "fin_wait2"
case tcpTimeWait:
return "time_wait"
case tcpClose:
return "close"
case tcpCloseWait:
return "close_wait"
case tcpLastAck:
return "last_ack"
case tcpListen:
return "listen"
case tcpClosing:
return "closing"
//case tcpRxQueuedBytes:
// return "rx_queued_bytes"
//case tcpTxQueuedBytes:
// return "tx_queued_bytes"
default:
return "unknown"
}
}
4 changes: 4 additions & 0 deletions ego_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/gotomicro/ego/internal/retry"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

Expand All @@ -45,6 +46,9 @@ func (e *Ego) waitSignals() {
signal.Stop(sig)
cancel()
}()

elog.Info("server stop", zap.Bool("graceful", grace))

_ = e.Stop(stopCtx, grace)
<-stopCtx.Done()
// 记录服务器关闭时候,由于关闭过慢,无法正常关闭,被强制cancel
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/gotomicro/ego

go 1.18
go 1.21

require (
github.com/BurntSushi/toml v1.1.0
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNI
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
Expand All @@ -175,6 +176,7 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.1.0 h1:/d3pCKDPWNnvIWe0vVUpNP32qc8U3PDVxySP/y360qE=
github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -221,6 +223,7 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
Expand Down Expand Up @@ -309,9 +312,11 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
Expand Down Expand Up @@ -383,6 +388,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
Expand Down Expand Up @@ -422,6 +428,7 @@ github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzG
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
Expand Down Expand Up @@ -525,6 +532,7 @@ go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk=
go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
Expand Down Expand Up @@ -862,6 +870,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
Expand Down
1 change: 1 addition & 0 deletions server/egovernor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Config struct {
Host string
Port int
EnableLocalMainIP bool
EnableConnTcp bool
Network string
}

Expand Down
9 changes: 9 additions & 0 deletions server/egovernor/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package egovernor
import (
"github.com/gotomicro/ego/core/econf"
"github.com/gotomicro/ego/core/elog"
"github.com/gotomicro/ego/core/emetric"
"github.com/gotomicro/ego/core/util/xnet"
)

Expand Down Expand Up @@ -53,5 +54,13 @@ func (c *Container) Build(options ...Option) *Component {
for _, option := range options {
option(c)
}
if c.config.EnableConnTcp {
obj, err := emetric.NewTCPStatCollector()
if err != nil {
c.logger.Panic("NewTCPStatCollector fail", elog.FieldErr(err))
}
obj.Update()

Check failure on line 62 in server/egovernor/container.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `obj.Update` is not checked (errcheck)
}

return newComponent(c.name, c.config, c.logger)
}

0 comments on commit 687ce24

Please sign in to comment.