From eb6e93cf439af967025d7e726e237d91f908ff07 Mon Sep 17 00:00:00 2001 From: Moses Narrow Date: Thu, 11 Jul 2024 09:08:48 -0500 Subject: [PATCH] update deps --- go.mod | 2 +- go.sum | 4 +- .../skycoin/dmsg/pkg/dmsg/client.go | 73 ++++++++++++++++++- .../skycoin/dmsg/pkg/dmsg/client_session.go | 44 +++++++++++ .../github.com/skycoin/dmsg/pkg/dmsg/const.go | 4 +- .../skycoin/dmsg/pkg/dmsg/server_session.go | 35 +++++++++ .../skycoin/dmsg/pkg/dmsg/stream.go | 38 ++++++++++ .../github.com/skycoin/dmsg/pkg/dmsg/types.go | 3 + .../skycoin/dmsg/pkg/dmsgcurl/flags.go | 3 +- .../skycoin/dmsg/pkg/dmsgserver/config.go | 5 +- vendor/modules.txt | 2 +- 11 files changed, 204 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index c05f77d3..e81da500 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/json-iterator/go v1.1.12 github.com/rs/cors v1.8.2 github.com/sirupsen/logrus v1.9.3 - github.com/skycoin/dmsg v1.3.22-0.20240627165206-e8cd649ecc88 + github.com/skycoin/dmsg v1.3.22-0.20240710131937-c1a367e17db9 github.com/skycoin/skywire v1.3.24-0.20240627153953-6f467e52ed17 github.com/skycoin/skywire-utilities v1.3.18-0.20240624172427-aeaf6f14fbdc github.com/songgao/water v0.0.0-20200317203138-2b4b6d7c09d8 diff --git a/go.sum b/go.sum index 6861bca4..4fa433e9 100644 --- a/go.sum +++ b/go.sum @@ -391,8 +391,8 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/skycoin/dmsg v1.3.22-0.20240627165206-e8cd649ecc88 h1:ENFkdPTq1Um9uoYXAh3h0WTUpJjDHhVZZevsbbP2fyw= -github.com/skycoin/dmsg v1.3.22-0.20240627165206-e8cd649ecc88/go.mod h1:v56TVWnlz18J4qrVFxjOyjzg153XSuay9Y76AUb35ac= +github.com/skycoin/dmsg v1.3.22-0.20240710131937-c1a367e17db9 h1:yIJXsn30pexJsufo80w/Ct0QaPDkte2Gnj0WHUtChTI= +github.com/skycoin/dmsg v1.3.22-0.20240710131937-c1a367e17db9/go.mod h1:72MC0HFDxKYqMLZ2RWGY/ZDNFq6965SP1PIrKlYqaiQ= github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6 h1:1Nc5EBY6pjfw1kwW0duwyG+7WliWz5u9kgk1h5MnLuA= github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:UXghlricA7J3aRD/k7p/zBObQfmBawwCxIVPVjz2Q3o= github.com/skycoin/skycoin v0.27.1 h1:HatxsRwVSPaV4qxH6290xPBmkH/HgiuAoY2qC+e8C9I= diff --git a/vendor/github.com/skycoin/dmsg/pkg/dmsg/client.go b/vendor/github.com/skycoin/dmsg/pkg/dmsg/client.go index 0973245a..9b3fd8c9 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/dmsg/client.go +++ b/vendor/github.com/skycoin/dmsg/pkg/dmsg/client.go @@ -366,6 +366,78 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) { return nil, ErrCannotConnectToDelegated } +// LookupIP dails to dmsg servers for public IP of the client. +func (ce *Client) LookupIP(ctx context.Context, servers []cipher.PubKey) (myIP net.IP, err error) { + + cancellabelCtx, cancel := context.WithCancel(ctx) + defer cancel() + + if servers == nil { + entries, err := ce.discoverServers(cancellabelCtx, true) + if err != nil { + return nil, err + } + for _, entry := range entries { + servers = append(servers, entry.Static) + } + } + + // Range client's delegated servers. + // See if we are already connected to a delegated server. + for _, srvPK := range servers { + if dSes, ok := ce.clientSession(ce.porter, srvPK); ok { + ip, err := dSes.LookupIP(Addr{PK: dSes.RemotePK(), Port: 1}) + if err != nil { + ce.log.WithError(err).WithField("server_pk", srvPK).Warn("Failed to dial server for IP.") + continue + } + + // If the client is test client then ignore Public IP check + if ce.conf.ClientType == "test" { + return ip, nil + } + + // Check if the IP is public + if !netutil.IsPublicIP(ip) { + return nil, errors.New("received non-public IP address from dmsg server") + } + return ip, nil + } + } + + // Range client's delegated servers. + // Attempt to connect to a delegated server. + // And Close it after getting the IP. + for _, srvPK := range servers { + dSes, err := ce.EnsureAndObtainSession(ctx, srvPK) + if err != nil { + continue + } + ip, err := dSes.LookupIP(Addr{PK: dSes.RemotePK(), Port: 1}) + if err != nil { + ce.log.WithError(err).WithField("server_pk", srvPK).Warn("Failed to dial server for IP.") + continue + } + err = dSes.Close() + if err != nil { + ce.log.WithError(err).WithField("server_pk", srvPK).Warn("Failed to close session") + } + + // If the client is test client then ignore Public IP check + if ce.conf.ClientType == "test" { + return ip, nil + } + + // Check if the IP is public + if !netutil.IsPublicIP(ip) { + return nil, errors.New("received non-public IP address from dmsg server") + } + return ip, nil + } + + return nil, ErrCannotConnectToDelegated +} + // Session obtains an established session. func (ce *Client) Session(pk cipher.PubKey) (ClientSession, bool) { return ce.clientSession(ce.porter, pk) @@ -403,7 +475,6 @@ func (ce *Client) EnsureAndObtainSession(ctx context.Context, srvPK cipher.PubKe if err != nil { return ClientSession{}, err } - return ce.dialSession(ctx, srvEntry) } diff --git a/vendor/github.com/skycoin/dmsg/pkg/dmsg/client_session.go b/vendor/github.com/skycoin/dmsg/pkg/dmsg/client_session.go index 7c766975..ba3cfff3 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/dmsg/client_session.go +++ b/vendor/github.com/skycoin/dmsg/pkg/dmsg/client_session.go @@ -69,6 +69,50 @@ func (cs *ClientSession) DialStream(dst Addr) (dStr *Stream, err error) { return dStr, err } +// LookupIP attempts to dial a stream to the server for the IP address of the client. +func (cs *ClientSession) LookupIP(dst Addr) (myIP net.IP, err error) { + log := cs.log. + WithField("func", "ClientSession.LookupIP"). + WithField("dst_addr", cs.rPK) + + dStr, err := newInitiatingStream(cs) + if err != nil { + return nil, err + } + + // Close stream on failure. + defer func() { + if err != nil { + log.WithError(err). + WithField("close_error", dStr.Close()). + Debug("Stream closed on failure.") + } + }() + + // Prepare deadline. + if err = dStr.SetDeadline(time.Now().Add(HandshakeTimeout)); err != nil { + return nil, err + } + + // Do stream handshake. + req, err := dStr.writeIPRequest(dst) + if err != nil { + return nil, err + } + + myIP, err = dStr.readIPResponse(req) + if err != nil { + return nil, err + } + + err = dStr.Close() + if err != nil { + return nil, err + } + + return myIP, err +} + // serve accepts incoming streams from remote clients. func (cs *ClientSession) serve() error { defer func() { diff --git a/vendor/github.com/skycoin/dmsg/pkg/dmsg/const.go b/vendor/github.com/skycoin/dmsg/pkg/dmsg/const.go index 731c2bc8..2718c8c2 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/dmsg/const.go +++ b/vendor/github.com/skycoin/dmsg/pkg/dmsg/const.go @@ -3,11 +3,13 @@ package dmsg import ( "time" + + "github.com/skycoin/skywire-utilities/pkg/skyenv" ) // Constants. const ( - DefaultDiscAddr = "http://dmsgd.skywire.skycoin.com" + DefaultDiscAddr = skyenv.DmsgDiscAddr DefaultMinSessions = 1 diff --git a/vendor/github.com/skycoin/dmsg/pkg/dmsg/server_session.go b/vendor/github.com/skycoin/dmsg/pkg/dmsg/server_session.go index 57ce2b30..4b8376ee 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/dmsg/server_session.go +++ b/vendor/github.com/skycoin/dmsg/pkg/dmsg/server_session.go @@ -2,6 +2,7 @@ package dmsg import ( + "fmt" "io" "net" @@ -98,6 +99,29 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr *yamux.Stream) WithField("dst_addr", req.DstAddr) log.Debug("Read stream request from initiating side.") + if req.IPinfo && req.DstAddr.PK == ss.entity.LocalPK() { + log.Debug("Received IP stream request.") + + ip, err := addrToIP(yStr.RemoteAddr()) + if err != nil { + ss.m.RecordStream(servermetrics.DeltaFailed) // record failed stream + return err + } + + resp := StreamResponse{ + ReqHash: req.raw.Hash(), + Accepted: true, + IP: ip, + } + obj := MakeSignedStreamResponse(&resp, ss.entity.LocalSK()) + + if err := ss.writeObject(yStr, obj); err != nil { + ss.m.RecordStream(servermetrics.DeltaFailed) // record failed stream + return err + } + log.Debug("Wrote IP stream response.") + return nil + } // Obtain next session. ss2, ok := ss.entity.serverSession(req.DstAddr.PK) @@ -129,6 +153,17 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr *yamux.Stream) return netutil.CopyReadWriteCloser(yStr, yStr2) } +func addrToIP(addr net.Addr) (net.IP, error) { + switch a := addr.(type) { + case *net.TCPAddr: + return a.IP, nil + case *net.UDPAddr: + return a.IP, nil + default: + return nil, fmt.Errorf("unsupported address type %T", addr) + } +} + func (ss *ServerSession) forwardRequest(req StreamRequest) (yStr *yamux.Stream, respObj SignedObject, err error) { defer func() { if err != nil && yStr != nil { diff --git a/vendor/github.com/skycoin/dmsg/pkg/dmsg/stream.go b/vendor/github.com/skycoin/dmsg/pkg/dmsg/stream.go index 72d1bc4e..6bc61f1a 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/dmsg/stream.go +++ b/vendor/github.com/skycoin/dmsg/pkg/dmsg/stream.go @@ -87,6 +87,29 @@ func (s *Stream) writeRequest(rAddr Addr) (req StreamRequest, err error) { return } +func (s *Stream) writeIPRequest(rAddr Addr) (req StreamRequest, err error) { + // Reserve stream in porter. + var lPort uint16 + if lPort, s.close, err = s.ses.porter.ReserveEphemeral(context.Background(), s); err != nil { + return + } + + // Prepare fields. + s.prepareFields(true, Addr{PK: s.ses.LocalPK(), Port: lPort}, rAddr) + + req = StreamRequest{ + Timestamp: time.Now().UnixNano(), + SrcAddr: s.lAddr, + DstAddr: s.rAddr, + IPinfo: true, + } + obj := MakeSignedStreamRequest(&req, s.ses.localSK()) + + // Write request. + err = s.ses.writeObject(s.yStr, obj) + return +} + func (s *Stream) readRequest() (req StreamRequest, err error) { var obj SignedObject if obj, err = s.ses.readObject(s.yStr); err != nil { @@ -158,6 +181,21 @@ func (s *Stream) readResponse(req StreamRequest) error { return s.ns.ProcessHandshakeMessage(resp.NoiseMsg) } +func (s *Stream) readIPResponse(req StreamRequest) (net.IP, error) { + obj, err := s.ses.readObject(s.yStr) + if err != nil { + return nil, err + } + resp, err := obj.ObtainStreamResponse() + if err != nil { + return nil, err + } + if err := resp.Verify(req); err != nil { + return nil, err + } + return resp.IP, nil +} + func (s *Stream) prepareFields(init bool, lAddr, rAddr Addr) { ns, err := noise.New(noise.HandshakeKK, noise.Config{ LocalPK: s.ses.LocalPK(), diff --git a/vendor/github.com/skycoin/dmsg/pkg/dmsg/types.go b/vendor/github.com/skycoin/dmsg/pkg/dmsg/types.go index 1e39ee14..9fbb2ca3 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/dmsg/types.go +++ b/vendor/github.com/skycoin/dmsg/pkg/dmsg/types.go @@ -4,6 +4,7 @@ package dmsg import ( "errors" "fmt" + "net" "strings" "time" @@ -167,6 +168,7 @@ type StreamRequest struct { Timestamp int64 SrcAddr Addr DstAddr Addr + IPinfo bool NoiseMsg []byte raw SignedObject `enc:"-"` // back reference. @@ -203,6 +205,7 @@ func (req StreamRequest) Verify(lastTimestamp int64) error { type StreamResponse struct { ReqHash cipher.SHA256 // Hash of associated dial request. Accepted bool // Whether the request is accepted. + IP net.IP // IP address of the node. ErrCode errorCode // Check if not accepted. NoiseMsg []byte diff --git a/vendor/github.com/skycoin/dmsg/pkg/dmsgcurl/flags.go b/vendor/github.com/skycoin/dmsg/pkg/dmsgcurl/flags.go index e763a059..4f07c228 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/dmsgcurl/flags.go +++ b/vendor/github.com/skycoin/dmsg/pkg/dmsgcurl/flags.go @@ -5,6 +5,7 @@ import ( "flag" "github.com/skycoin/skywire-utilities/pkg/buildinfo" + "github.com/skycoin/skywire-utilities/pkg/skyenv" ) // ExecName contains the execution name. @@ -38,7 +39,7 @@ type dmsgFlags struct { func (f *dmsgFlags) Name() string { return "Dmsg" } func (f *dmsgFlags) Init(fs *flag.FlagSet) { - fs.StringVar(&f.Disc, "dmsg-disc", "http://dmsgd.skywire.skycoin.com", "dmsg discovery `URL`") + fs.StringVar(&f.Disc, "dmsg-disc", skyenv.DmsgDiscAddr, "dmsg discovery `URL`") fs.IntVar(&f.Sessions, "dmsg-sessions", 1, "connect to `NUMBER` of dmsg servers") } diff --git a/vendor/github.com/skycoin/dmsg/pkg/dmsgserver/config.go b/vendor/github.com/skycoin/dmsg/pkg/dmsgserver/config.go index 98b3f989..27c3fb2e 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/dmsgserver/config.go +++ b/vendor/github.com/skycoin/dmsg/pkg/dmsgserver/config.go @@ -8,17 +8,18 @@ import ( "github.com/skycoin/skycoin/src/util/logging" "github.com/skycoin/skywire-utilities/pkg/cipher" + "github.com/skycoin/skywire-utilities/pkg/skyenv" ) const ( - defaultDiscoveryURL = "http://dmsgd.skywire.skycoin.com" + defaultDiscoveryURL = skyenv.DmsgDiscAddr defaultPublicAddress = "127.0.0.1:8081" defaultLocalAddress = ":8081" defaultHTTPAddress = ":8082" // DefaultConfigPath default path of config file DefaultConfigPath = "config.json" // DefaultDiscoverURLTest default URL for discovery in test env - DefaultDiscoverURLTest = "http://dmsgd.skywire.dev" + DefaultDiscoverURLTest = skyenv.TestDmsgDiscAddr ) // Config is structure of config file diff --git a/vendor/modules.txt b/vendor/modules.txt index 231a2026..847eac1e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -457,7 +457,7 @@ github.com/rs/cors ## explicit; go 1.13 github.com/sirupsen/logrus github.com/sirupsen/logrus/hooks/syslog -# github.com/skycoin/dmsg v1.3.22-0.20240627165206-e8cd649ecc88 +# github.com/skycoin/dmsg v1.3.22-0.20240710131937-c1a367e17db9 ## explicit; go 1.21 github.com/skycoin/dmsg/internal/servermetrics github.com/skycoin/dmsg/pkg/direct