Skip to content

Commit

Permalink
add trade-engine example (#82)
Browse files Browse the repository at this point in the history
* add trade-engine example

* fix race condition

* engine, err

* remove go routines

* remove active flag

* switch to subscribe

* stop using eventstream

* fix comment

* cleanup and readme

* switch from slice to map

* Replace Unix timestamp with time.Time in trade engine

The commit updates the trade engine by replacing Unix timestamp format with Go's native time.Time. This affects order expiry and price update time tracking in the system. It's a better practice for readability and consistency in Go, and will also handle different time zones effectively. A logging line deemed excessively verbose was also commented out for cleanliness.

* remove signal

* enchance main.go

* shut down the trade engine before exit.

* remove depreciated log handler

---------

Co-authored-by: Per Buer <[email protected]>
  • Loading branch information
AR1011 and perbu authored Dec 5, 2023
1 parent c0a4bb5 commit b6373b0
Show file tree
Hide file tree
Showing 7 changed files with 566 additions and 1 deletion.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ TODO.txt
TODO
_test
/.vscode
/.idea
/.idea
.DS_Store
137 changes: 137 additions & 0 deletions examples/trade-engine/actors/executor/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package executor

import (
"log/slog"
"time"

"github.com/anthdm/hollywood/actor"
"github.com/anthdm/hollywood/examples/trade-engine/types"
)

// Options for creating a new executor
type ExecutorOptions struct {
PriceWatcherPID *actor.PID
TradeID string
Ticker string
Token0 string
Token1 string
Chain string
Wallet string
Pk string
Expires time.Time
}

type tradeExecutorActor struct {
id string
ActorEngine *actor.Engine
PID *actor.PID
priceWatcherPID *actor.PID
ticker string
token0 string
token1 string
chain string
wallet string
pk string
status string
lastPrice float64
expires time.Time
}

func (te *tradeExecutorActor) Receive(c *actor.Context) {
switch msg := c.Message().(type) {
case actor.Started:
slog.Info("tradeExecutor.Started", "id", te.id, "wallet", te.wallet)

// set actorEngine and PID
te.ActorEngine = c.Engine()
te.PID = c.PID()

// subscribe to price updates
te.ActorEngine.Send(te.priceWatcherPID, types.Subscribe{Sendto: te.PID})

case actor.Stopped:
slog.Info("tradeExecutor.Stopped", "id", te.id, "wallet", te.wallet)

case types.PriceUpdate:
// update the price
te.processUpdate(msg)

case types.TradeInfoRequest:
slog.Info("tradeExecutor.TradeInfoRequest", "id", te.id, "wallet", te.wallet)

// handle the request
te.handleTradeInfoRequest(c)

case types.CancelOrderRequest:
slog.Info("tradeExecutor.CancelOrderRequest", "id", te.id, "wallet", te.wallet)

// update status
te.status = "cancelled"

// stop the executor
te.Finished()
}
}

func (te *tradeExecutorActor) processUpdate(update types.PriceUpdate) {

// if expires is set and is less than current time, cancel the order
if !te.expires.IsZero() && time.Now().After(te.expires) {
slog.Info("Trade Expired", "id", te.id, "wallet", te.wallet)
te.Finished()
return
}

// update the price
te.lastPrice = update.Price

// do something with the price
// eg update pnl, etc

// for example just print price
// XXX a bit too much noise.
// slog.Info("tradeExecutor.PriceUpdate", "ticker", update.Ticker, "price", update.Price)
}

func (te *tradeExecutorActor) handleTradeInfoRequest(c *actor.Context) {
c.Respond(types.TradeInfoResponse{
// for example
Foo: 100,
Bar: 100,
Price: te.lastPrice,
})
}

func (te *tradeExecutorActor) Finished() {
// make sure ActorEngine and PID are set
if te.ActorEngine == nil {
slog.Error("tradeExecutor.actorEngine is <nil>")
}

if te.PID == nil {
slog.Error("tradeExecutor.PID is <nil>")
}

// unsubscribe from price updates
te.ActorEngine.Send(te.priceWatcherPID, types.Unsubscribe{Sendto: te.PID})

// poision itself
te.ActorEngine.Poison(te.PID)
}

func NewExecutorActor(opts *ExecutorOptions) actor.Producer {
return func() actor.Receiver {
return &tradeExecutorActor{
id: opts.TradeID,
ticker: opts.Ticker,
token0: opts.Token0,
token1: opts.Token1,
chain: opts.Chain,
wallet: opts.Wallet,
pk: opts.Pk,
priceWatcherPID: opts.PriceWatcherPID,
expires: opts.Expires,
status: "active",
}
}
}
106 changes: 106 additions & 0 deletions examples/trade-engine/actors/price/price.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package price

import (
"log/slog"
"time"

"github.com/anthdm/hollywood/actor"
"github.com/anthdm/hollywood/examples/trade-engine/types"
)

type priceWatcherActor struct {
ActorEngine *actor.Engine
PID *actor.PID
repeater actor.SendRepeater
ticker string
token0 string
token1 string
chain string
lastPrice float64
updatedAt time.Time
subscribers map[*actor.PID]bool
}

func (pw *priceWatcherActor) Receive(c *actor.Context) {

switch msg := c.Message().(type) {
case actor.Started:
slog.Info("priceWatcher.Started", "ticker", pw.ticker)

// set actorEngine and PID
pw.ActorEngine = c.Engine()
pw.PID = c.PID()

// create a repeater to trigger price updates every 200ms
pw.repeater = pw.ActorEngine.SendRepeat(pw.PID, types.TriggerPriceUpdate{}, time.Millisecond*200)

case actor.Stopped:
slog.Info("priceWatcher.Stopped", "ticker", pw.ticker)

case types.Subscribe:
slog.Info("priceWatcher.Subscribe", "ticker", pw.ticker, "subscriber", msg.Sendto)

// add the subscriber to the map
pw.subscribers[msg.Sendto] = true

case types.Unsubscribe:
slog.Info("priceWatcher.Unsubscribe", "ticker", pw.ticker, "subscriber", msg.Sendto)

// remove the subscriber from the map
delete(pw.subscribers, msg.Sendto)

case types.TriggerPriceUpdate:
pw.refresh()
}
}

func (pw *priceWatcherActor) refresh() {

// check if there are any subscribers
if len(pw.subscribers) == 0 {
slog.Info("No Subscribers: Killing Price Watcher", "ticker", pw.ticker)

// if no subscribers, kill itself
pw.Kill()
}

// for example, just increment the price by 2
pw.lastPrice += 2
pw.updatedAt = time.Now()

// send the price update to all executors
for pid := range pw.subscribers {
pw.ActorEngine.Send(pid, types.PriceUpdate{
Ticker: pw.ticker,
UpdatedAt: pw.updatedAt,
Price: pw.lastPrice,
})
}
}

func (pw *priceWatcherActor) Kill() {
if pw.ActorEngine == nil {
slog.Error("priceWatcher.actorEngine is <nil>", "ticker", pw.ticker)
}
if pw.PID == nil {
slog.Error("priceWatcher.PID is <nil>", "ticker", pw.ticker)
}

// stop the repeater
pw.repeater.Stop()

// poision itself
pw.ActorEngine.Poison(pw.PID)
}

func NewPriceActor(opts types.PriceOptions) actor.Producer {
return func() actor.Receiver {
return &priceWatcherActor{
ticker: opts.Ticker,
token0: opts.Token0,
token1: opts.Token1,
chain: opts.Chain,
subscribers: make(map[*actor.PID]bool),
}
}
}
141 changes: 141 additions & 0 deletions examples/trade-engine/actors/tradeEngine/tradeEngine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package tradeEngine

import (
"fmt"
"log/slog"
"time"

"github.com/anthdm/hollywood/actor"
"github.com/anthdm/hollywood/examples/trade-engine/actors/executor"
"github.com/anthdm/hollywood/examples/trade-engine/actors/price"
"github.com/anthdm/hollywood/examples/trade-engine/types"
)

// tradeEngineActor is the main actor for the trade engine
type tradeEngineActor struct {
}

// TradeOrderRequest is the message sent to the trade engine to create a new trade order

func (t *tradeEngineActor) Receive(c *actor.Context) {
switch msg := c.Message().(type) {
case actor.Started:
slog.Info("tradeEngine.Started")
case actor.Stopped:
slog.Info("tradeEngine.Stopped")
case types.TradeOrderRequest:
// got new trade order, create the executor
slog.Info("tradeEngine.TradeOrderRequest", "id", msg.TradeID, "wallet", msg.Wallet)
// spawn the executor
t.spawnExecutor(msg, c)
case types.CancelOrderRequest:
// cancel the order
slog.Info("tradeEngine.CancelOrderRequest", "id", msg.TradeID)

// cancel the order
t.cancelOrder(msg.TradeID, c)

case types.TradeInfoRequest:
// get trade info
slog.Info("tradeEngine.TradeInfoRequest", "id", msg.TradeID)

t.handleTradeInfoRequest(msg, c)
}
}

func (t *tradeEngineActor) spawnExecutor(msg types.TradeOrderRequest, c *actor.Context) {
// make sure there is a price Watcher for this token pair
pricePID := t.ensurePriceWatcher(msg, c)

// spawn the executor
options := &executor.ExecutorOptions{
PriceWatcherPID: pricePID,
TradeID: msg.TradeID,
Ticker: toTicker(msg.Token0, msg.Token1, msg.Chain),
Token0: msg.Token0,
Token1: msg.Token1,
Chain: msg.Chain,
Wallet: msg.Wallet,
Pk: msg.PrivateKey,
Expires: msg.Expires,
}

// spawn the actor
c.SpawnChild(executor.NewExecutorActor(options), msg.TradeID)
}

func (t *tradeEngineActor) ensurePriceWatcher(order types.TradeOrderRequest, c *actor.Context) *actor.PID {
// create the ticker string
ticker := toTicker(order.Token0, order.Token1, order.Chain)

// look for existing price watcher in trade-engine child actors
pid := c.Child("trade-engine/" + ticker)
if pid != nil {
// if we found a price watcher, return it
return pid
}

// no price watcher found, spawn a new one
options := types.PriceOptions{
Ticker: ticker,
Token0: order.Token0,
Token1: order.Token1,
Chain: order.Chain,
}

// spawn the actor
pid = c.SpawnChild(price.NewPriceActor(options), ticker)
return pid
}

func (t *tradeEngineActor) cancelOrder(id string, c *actor.Context) {
// get the executor
pid := c.Child("trade-engine/" + id)
if pid == nil {
// no executor found
slog.Error("Failed to cancel order", "err", "tradeExecutor PID not found", "id", id)
return
}

// send cancel message
c.Send(pid, types.CancelOrderRequest{})
}

func (t *tradeEngineActor) handleTradeInfoRequest(msg types.TradeInfoRequest, c *actor.Context) {
// get the executor
pid := c.Child("trade-engine/" + msg.TradeID)
if pid == nil {
// no executor found
slog.Error("Failed to get trade info", "err", "tradeExecutor PID not found", "id", msg.TradeID)
return
}

// send tradeInfo Request
resp := c.Request(pid, types.TradeInfoRequest{}, time.Second*5)
res, err := resp.Result()
if err != nil {
slog.Error("Failed to get trade info", "err", err)
return
}

switch msg := res.(type) {

case types.TradeInfoResponse:
c.Respond(msg)

default:
slog.Error("Failed to get trade info", "err", "unknown response type")
}

}

func NewTradeEngine() actor.Producer {
return func() actor.Receiver {
return &tradeEngineActor{}
}
}

// utility function to format the ticker
func toTicker(token0, token1, chain string) string {
return fmt.Sprintf("%s-%s-%s", token0, token1, chain)
}
Loading

0 comments on commit b6373b0

Please sign in to comment.