Skip to content

Commit

Permalink
Support ErrCh() for client
Browse files Browse the repository at this point in the history
  • Loading branch information
bynil committed Nov 14, 2022
1 parent f671bdb commit d38d110
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 0 deletions.
5 changes: 5 additions & 0 deletions jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func (c *Client) Close() error {
return c.transport.Close()
}

// ErrCh returns a chan to send errors that occurred in the client
func (c *Client) ErrCh() chan error {
return c.transport.ErrCh()
}

// Call makes a jsonrpc call
func (c *Client) Call(method string, out interface{}, params ...interface{}) error {
return c.transport.Call(method, out, params...)
Expand Down
5 changes: 5 additions & 0 deletions jsonrpc/transport/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func (h *HTTP) Close() error {
return nil
}

// ErrCh implements the transport interface
func (h *HTTP) ErrCh() chan error {
return nil
}

// Call implements the transport interface
func (h *HTTP) Call(method string, out interface{}, params ...interface{}) error {
// Encode json-rpc request
Expand Down
2 changes: 2 additions & 0 deletions jsonrpc/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type Transport interface {

// Close closes the transport connection if necessary
Close() error

ErrCh() chan error
}

// PubSubTransport is a transport that allows subscriptions
Expand Down
11 changes: 11 additions & 0 deletions jsonrpc/transport/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@ type stream struct {
subsLock sync.Mutex
subs map[string]func(b []byte)

errCh chan error
closeCh chan struct{}
timer *time.Timer
}

func newStream(codec Codec) (*stream, error) {
w := &stream{
codec: codec,
errCh: make(chan error, 1),
closeCh: make(chan struct{}),
handler: map[uint64]callback{},
subs: map[string]func(b []byte){},
Expand All @@ -71,6 +73,11 @@ func (s *stream) Close() error {
return s.codec.Close()
}

// ErrCh implements the transport interface
func (s *stream) ErrCh() chan error {
return s.errCh
}

func (s *stream) incSeq() uint64 {
return atomic.AddUint64(&s.seq, 1)
}
Expand All @@ -94,6 +101,10 @@ func (s *stream) listen() {
if !s.isClosed() {
// log error
}
select {
case s.errCh <- err:
default:
}
return
}

Expand Down

0 comments on commit d38d110

Please sign in to comment.