-
Notifications
You must be signed in to change notification settings - Fork 129
/
source.go
116 lines (100 loc) · 2.83 KB
/
source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package yomo
import (
"context"
"github.com/yomorun/yomo/core"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/pkg/id"
)
// Source is responsible for sending data to yomo.
type Source interface {
// Close will close the connection to YoMo-Zipper.
Close() error
// Connect to YoMo-Zipper.
Connect() error
// Write the data to directed downstream.
Write(tag uint32, data []byte) error
// WriteWithTarget writes data to sfn instance with specified target.
WriteWithTarget(tag uint32, data []byte, target string) error
// SetErrorHandler set the error handler function when server error occurs
SetErrorHandler(fn func(err error))
}
// YoMo-Source
type yomoSource struct {
name string
zipperAddr string
client *core.Client
}
var _ Source = &yomoSource{}
// NewSource create a yomo-source
func NewSource(name, zipperAddr string, opts ...SourceOption) Source {
clientOpts := make([]core.ClientOption, len(opts))
for k, v := range opts {
clientOpts[k] = core.ClientOption(v)
}
client := core.NewClient(name, zipperAddr, core.ClientTypeSource, clientOpts...)
client.Logger = client.Logger.With(
"service", "source",
"source_id", client.ClientID(),
"source_name", client.Name(),
"zipper_addr", zipperAddr,
)
return &yomoSource{
name: name,
zipperAddr: zipperAddr,
client: client,
}
}
// Close will close the connection to YoMo-Zipper.
func (s *yomoSource) Close() error {
_ = s.client.Close()
s.client.Logger.Debug("the source is closed")
return nil
}
// Connect to YoMo-Zipper.
func (s *yomoSource) Connect() error {
return s.client.Connect(context.Background())
}
// Write writes data with specified tag.
func (s *yomoSource) Write(tag uint32, data []byte) error {
if err := frame.IsReservedTag(tag); err != nil {
return err
}
md := core.NewMetadata(s.client.ClientID(), id.New())
mdBytes, err := md.Encode()
// metadata
if err != nil {
return err
}
f := &frame.DataFrame{
Tag: tag,
Metadata: mdBytes,
Payload: data,
}
s.client.Logger.Debug("source write", "tag", tag, "dataLen", len(data))
return s.client.WriteFrame(f)
}
// WritePayload writes `yomo.Payload` with specified tag.
func (s *yomoSource) WriteWithTarget(tag uint32, data []byte, target string) error {
if err := frame.IsReservedTag(tag); err != nil {
return err
}
md := core.NewMetadata(s.client.ClientID(), id.New())
if target != "" {
core.SetMetadataTarget(md, target)
}
mdBytes, err := md.Encode()
if err != nil {
return err
}
f := &frame.DataFrame{
Tag: tag,
Metadata: mdBytes,
Payload: data,
}
s.client.Logger.Debug("source write with target", "tag", tag, "data", data, "target", target)
return s.client.WriteFrame(f)
}
// SetErrorHandler set the error handler function when server error occurs
func (s *yomoSource) SetErrorHandler(fn func(err error)) {
s.client.SetErrorHandler(fn)
}