Skip to content

Commit

Permalink
projection trigger (#124)
Browse files Browse the repository at this point in the history
* trigger a running projection to run

* add more docs

* TriggerSync / TriggerAsync on the projection group

* TriggerAsync will return if other thread has filled the trigger channel

* make projection error chan on Start

* update readme

* update readme

* update readme

* update readme

* projection running state

* projection running state as atomic.Bool

* go version 1.19

* remove unused code

* update readme

* update README

* update readme

* update readme

* projection trigger async test

* return error if same projection is started multiple times

* projections test trigger multiple async calls
  • Loading branch information
hallgren authored Aug 9, 2024
1 parent d9582c1 commit 7019e3a
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.13'
go-version: '1.19'

- name: Build
run: go build -v ./...
Expand Down
17 changes: 13 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -454,15 +454,16 @@ type ProjectionResult struct {

Run will run forever until event consumer is returning an error or if it's canceled from the outside. When it hits the end of the event stream it will start a timer and sleep the time set in the projection property `Pace`.

```go
Run(ctx context.Context) error
```
```go
Run(ctx context.Context, pace time.Duration) error
```

A running projection can be triggered manually via `TriggerAsync()` or `TriggerSync()`.

### Projection properties

A projection have a set of properties that can affect it's behaivior.

* **Pace** - Is used in the Run method to set how often the projection will poll the event store for new events.
* **Strict** - Default true and it will trigger an error if a fetched event is not registered in the event `Register`. This force all events to be handled by the callbackFunc.
* **Name** - The name of the projection. Can be useful when debugging multiple running projection. The default name is the index it was created from the projection handler.

Expand Down Expand Up @@ -505,6 +506,14 @@ select {
}
```

The pace of the projection can be changed with the `Pace` property. Default is every 10 second.

If the pace is not fast enough for some senario it's possible to trigger manually.

`TriggerAsync()`: Triggers all projections in the group and return.

`TriggerSync()`: Triggers all projections in the group and wait for them running to the end of there event streams.

#### Race

Compared to a group the race is a one shot operation. Instead of fetching events continuously it's used to iterate and process all existing events and then return.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/hallgren/eventsourcing

go 1.13
go 1.19

require github.com/hallgren/eventsourcing/core v0.4.0

Expand Down
111 changes: 92 additions & 19 deletions projections.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/hallgren/eventsourcing/core"
Expand All @@ -19,6 +20,9 @@ type ProjectionHandler struct {
count int
}

// ErrProjectionAlreadyRunning is returned if Run is called on an already running projection
var ErrProjectionAlreadyRunning = errors.New("projection is already running")

func NewProjectionHandler(register *Register, encoder encoder) *ProjectionHandler {
return &ProjectionHandler{
register: register,
Expand All @@ -27,16 +31,18 @@ func NewProjectionHandler(register *Register, encoder encoder) *ProjectionHandle
}

type Projection struct {
running atomic.Bool
fetchF fetchFunc
callbackF callbackFunc
handler *ProjectionHandler
Pace time.Duration // Pace is used when a projection is running and it reaches the end of the event stream
Strict bool // Strict indicate if the projection should return error if the event it fetches is not found in the register
trigger chan func()
Strict bool // Strict indicate if the projection should return error if the event it fetches is not found in the register
Name string
}

// Group runs projections concurrently
type Group struct {
Pace time.Duration // Pace is used when a projection is running and it reaches the end of the event stream
handler *ProjectionHandler
projections []*Projection
cancelF context.CancelFunc
Expand All @@ -57,33 +63,74 @@ func (ph *ProjectionHandler) Projection(fetchF fetchFunc, callbackF callbackFunc
fetchF: fetchF,
callbackF: callbackF,
handler: ph,
Pace: time.Second * 10, // Default pace 10 seconds
trigger: make(chan func()),
Strict: true, // Default strict is active
Name: fmt.Sprintf("%d", ph.count), // Default the name to it's creation index
}
ph.count++
return &projection
}

// TriggerAsync force a running projection to run immediately independent on the pace
// It will return immediately after triggering the prjection to run.
// If the trigger channel is already filled it will return without inserting any value.
func (p *Projection) TriggerAsync() {
if !p.running.Load() {
return
}
select {
case p.trigger <- func() {}:
default:
}
}

// TriggerSync force a running projection to run immediately independent on the pace
// It will wait for the projection to finish running to its current end before returning.
func (p *Projection) TriggerSync() {
if !p.running.Load() {
return
}
wg := sync.WaitGroup{}
wg.Add(1)
f := func() {
wg.Done()
}
p.trigger <- f
wg.Wait()
}

// Run runs the projection forever until the context is cancelled. When there are no more events to consume it
// sleeps the set pace before it runs again.
func (p *Projection) Run(ctx context.Context) error {
var result ProjectionResult
timer := time.NewTimer(0)
// waits for a trigger or context cancel.
func (p *Projection) Run(ctx context.Context, pace time.Duration) error {
if p.running.Load() {
return ErrProjectionAlreadyRunning
}
p.running.Store(true)
defer func() {
p.running.Store(false)
}()

var noopFunc = func() {}
var f = noopFunc
triggerFunc := func() {
f()
// reset the f to the noop func
f = noopFunc
}
for {
result := p.RunToEnd(ctx)
// if triggered by a sync trigger the triggerFunc callback that it's finished
// if not triggered by a sync trigger the triggerFunc will call an no ops function
triggerFunc()
if result.Error != nil {
return result.Error
}
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return ctx.Err()
case <-timer.C:
result = p.RunToEnd(ctx)
if result.Error != nil {
return result.Error
}
case <-time.After(pace):
case f = <-p.trigger:
}
timer.Reset(p.Pace)
}
}

Expand Down Expand Up @@ -174,37 +221,63 @@ func (ph *ProjectionHandler) Group(projections ...*Projection) *Group {
handler: ph,
projections: projections,
cancelF: func() {},
ErrChan: make(chan error),
Pace: time.Second * 10, // Default pace 10 seconds
}
}

// Start starts all projectinos in the group, an error channel i created on the group to notify
// if a result containing an error is returned from a projection
func (g *Group) Start() {
g.ErrChan = make(chan error)
ctx, cancel := context.WithCancel(context.Background())
g.cancelF = cancel

g.wg.Add(len(g.projections))
for _, projection := range g.projections {
go func(p *Projection) {
defer g.wg.Done()
err := p.Run(ctx)
err := p.Run(ctx, g.Pace)
if !errors.Is(err, context.Canceled) {
g.ErrChan <- err
}
}(projection)
}
}

// Stop terminate all projections in the group
// TriggerAsync force all projections to run not waiting for them to finish
func (g *Group) TriggerAsync() {
for _, projection := range g.projections {
projection.TriggerAsync()
}
}

// TriggerSync force all projections to run and wait for them to finish
func (g *Group) TriggerSync() {
wg := sync.WaitGroup{}
for _, projection := range g.projections {
wg.Add(1)
go func(p *Projection) {
p.TriggerSync()
wg.Done()
}(projection)
}
wg.Wait()
}

// Stop halts all projections in the group
func (g *Group) Stop() {
if g.ErrChan == nil {
return
}
g.cancelF()

// return when all projections has stopped
g.wg.Wait()

// close the error channel
close(g.ErrChan)

g.ErrChan = nil
}

// Race runs the projections to the end of the events streams.
Expand Down
Loading

0 comments on commit 7019e3a

Please sign in to comment.