forked from jen20/go-event-sourcing-sample
-
Notifications
You must be signed in to change notification settings - Fork 23
/
eventrepository.go
205 lines (176 loc) · 5.61 KB
/
eventrepository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package eventsourcing
import (
"context"
"errors"
"fmt"
"reflect"
"github.com/hallgren/eventsourcing/core"
)
// Aggregate interface to use the aggregate root specific methods
type aggregate interface {
Root() *AggregateRoot
Transition(event Event)
Register(RegisterFunc)
}
type EventSubscribers interface {
All(f func(e Event)) *subscription
AggregateID(f func(e Event), aggregates ...aggregate) *subscription
Aggregate(f func(e Event), aggregates ...aggregate) *subscription
Event(f func(e Event), events ...interface{}) *subscription
Name(f func(e Event), aggregate string, events ...string) *subscription
}
type encoder interface {
Serialize(v interface{}) ([]byte, error)
Deserialize(data []byte, v interface{}) error
}
var (
// ErrAggregateNotFound returns if events not found for aggregate or aggregate was not based on snapshot from the outside
ErrAggregateNotFound = errors.New("aggregate not found")
// ErrAggregateNotRegistered when saving aggregate when it's not registered in the repository
ErrAggregateNotRegistered = errors.New("aggregate not registered")
// ErrEventNotRegistered when saving aggregate and one event is not registered in the repository
ErrEventNotRegistered = errors.New("event not registered")
// ErrConcurrency when the currently saved version of the aggregate differs from the new events
ErrConcurrency = errors.New("concurrency error")
)
// EventRepository is the returned instance from the factory function
type EventRepository struct {
eventStream *EventStream
eventStore core.EventStore
// register that convert the Data []byte to correct type
register *Register
// encoder to serialize / deserialize events
encoder encoder
Projections *ProjectionHandler
}
// NewRepository factory function
func NewEventRepository(eventStore core.EventStore) *EventRepository {
register := NewRegister()
encoder := EncoderJSON{}
return &EventRepository{
eventStore: eventStore,
eventStream: NewEventStream(),
register: register,
encoder: encoder, // Default to JSON encoder
Projections: NewProjectionHandler(register, encoder),
}
}
// Encoder change the default JSON encoder that serializer/deserializer events
func (er *EventRepository) Encoder(e encoder) {
// set encoder on event repository
er.encoder = e
// set encoder in projection handler
er.Projections.Encoder = e
}
func (er *EventRepository) Register(a aggregate) {
er.register.Register(a)
}
// Subscribers returns an interface with all event subscribers
func (er *EventRepository) Subscribers() EventSubscribers {
return er.eventStream
}
// Save an aggregates events
func (er *EventRepository) Save(a aggregate) error {
var esEvents = make([]core.Event, 0)
if !er.register.AggregateRegistered(a) {
return ErrAggregateNotRegistered
}
root := a.Root()
// return as quick as possible when no events to process
if len(root.aggregateEvents) == 0 {
return nil
}
for _, event := range root.aggregateEvents {
data, err := er.encoder.Serialize(event.Data())
if err != nil {
return err
}
metadata, err := er.encoder.Serialize(event.Metadata())
if err != nil {
return err
}
esEvent := core.Event{
AggregateID: event.AggregateID(),
Version: core.Version(event.Version()),
AggregateType: event.AggregateType(),
Timestamp: event.Timestamp(),
Data: data,
Metadata: metadata,
Reason: event.Reason(),
}
_, ok := er.register.EventRegistered(esEvent)
if !ok {
return ErrEventNotRegistered
}
esEvents = append(esEvents, esEvent)
}
err := er.eventStore.Save(esEvents)
if err != nil {
if errors.Is(err, core.ErrConcurrency) {
return ErrConcurrency
}
return fmt.Errorf("error from event store: %w", err)
}
// update the global version on event bound to the aggregate
for i, event := range esEvents {
root.aggregateEvents[i].event.GlobalVersion = event.GlobalVersion
}
// publish the saved events to subscribers
er.eventStream.Publish(*root, root.Events())
// update the internal aggregate state
root.update()
return nil
}
// GetWithContext fetches the aggregates event and build up the aggregate based on it's current version.
// The event fetching can be canceled from the outside.
func (er *EventRepository) GetWithContext(ctx context.Context, id string, a aggregate) error {
if reflect.ValueOf(a).Kind() != reflect.Ptr {
return ErrAggregateNeedsToBeAPointer
}
root := a.Root()
aggregateType := aggregateType(a)
// fetch events after the current version of the aggregate that could be fetched from the snapshot store
eventIterator, err := er.eventStore.Get(ctx, id, aggregateType, core.Version(root.aggregateVersion))
if err != nil {
return err
}
defer eventIterator.Close()
for eventIterator.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
event, err := eventIterator.Value()
if err != nil {
return err
}
// apply the event to the aggregate
f, found := er.register.EventRegistered(event)
if !found {
continue
}
data := f()
err = er.encoder.Deserialize(event.Data, &data)
if err != nil {
return err
}
metadata := make(map[string]interface{})
err = er.encoder.Deserialize(event.Metadata, &metadata)
if err != nil {
return err
}
e := NewEvent(event, data, metadata)
root.BuildFromHistory(a, []Event{e})
}
}
if a.Root().Version() == 0 {
return ErrAggregateNotFound
}
return nil
}
// Get fetches the aggregates event and build up the aggregate.
// If the aggregate is based on a snapshot it fetches event after the
// version of the aggregate.
func (er *EventRepository) Get(id string, a aggregate) error {
return er.GetWithContext(context.Background(), id, a)
}