diff --git a/.gitignore b/.gitignore index 6703fea..1e5d8b0 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ TODO.txt TODO _test /.vscode -/.idea \ No newline at end of file +/.idea +.DS_Store \ No newline at end of file diff --git a/examples/trade-engine/actors/executor/executor.go b/examples/trade-engine/actors/executor/executor.go new file mode 100644 index 0000000..5ef57d0 --- /dev/null +++ b/examples/trade-engine/actors/executor/executor.go @@ -0,0 +1,137 @@ +package executor + +import ( + "log/slog" + "time" + + "github.com/anthdm/hollywood/actor" + "github.com/anthdm/hollywood/examples/trade-engine/types" +) + +// Options for creating a new executor +type ExecutorOptions struct { + PriceWatcherPID *actor.PID + TradeID string + Ticker string + Token0 string + Token1 string + Chain string + Wallet string + Pk string + Expires time.Time +} + +type tradeExecutorActor struct { + id string + ActorEngine *actor.Engine + PID *actor.PID + priceWatcherPID *actor.PID + ticker string + token0 string + token1 string + chain string + wallet string + pk string + status string + lastPrice float64 + expires time.Time +} + +func (te *tradeExecutorActor) Receive(c *actor.Context) { + switch msg := c.Message().(type) { + case actor.Started: + slog.Info("tradeExecutor.Started", "id", te.id, "wallet", te.wallet) + + // set actorEngine and PID + te.ActorEngine = c.Engine() + te.PID = c.PID() + + // subscribe to price updates + te.ActorEngine.Send(te.priceWatcherPID, types.Subscribe{Sendto: te.PID}) + + case actor.Stopped: + slog.Info("tradeExecutor.Stopped", "id", te.id, "wallet", te.wallet) + + case types.PriceUpdate: + // update the price + te.processUpdate(msg) + + case types.TradeInfoRequest: + slog.Info("tradeExecutor.TradeInfoRequest", "id", te.id, "wallet", te.wallet) + + // handle the request + te.handleTradeInfoRequest(c) + + case types.CancelOrderRequest: + slog.Info("tradeExecutor.CancelOrderRequest", "id", te.id, "wallet", te.wallet) + + // update status + te.status = "cancelled" + + // stop the executor + te.Finished() + } +} + +func (te *tradeExecutorActor) processUpdate(update types.PriceUpdate) { + + // if expires is set and is less than current time, cancel the order + if !te.expires.IsZero() && time.Now().After(te.expires) { + slog.Info("Trade Expired", "id", te.id, "wallet", te.wallet) + te.Finished() + return + } + + // update the price + te.lastPrice = update.Price + + // do something with the price + // eg update pnl, etc + + // for example just print price + // XXX a bit too much noise. + // slog.Info("tradeExecutor.PriceUpdate", "ticker", update.Ticker, "price", update.Price) +} + +func (te *tradeExecutorActor) handleTradeInfoRequest(c *actor.Context) { + c.Respond(types.TradeInfoResponse{ + // for example + Foo: 100, + Bar: 100, + Price: te.lastPrice, + }) +} + +func (te *tradeExecutorActor) Finished() { + // make sure ActorEngine and PID are set + if te.ActorEngine == nil { + slog.Error("tradeExecutor.actorEngine is ") + } + + if te.PID == nil { + slog.Error("tradeExecutor.PID is ") + } + + // unsubscribe from price updates + te.ActorEngine.Send(te.priceWatcherPID, types.Unsubscribe{Sendto: te.PID}) + + // poision itself + te.ActorEngine.Poison(te.PID) +} + +func NewExecutorActor(opts *ExecutorOptions) actor.Producer { + return func() actor.Receiver { + return &tradeExecutorActor{ + id: opts.TradeID, + ticker: opts.Ticker, + token0: opts.Token0, + token1: opts.Token1, + chain: opts.Chain, + wallet: opts.Wallet, + pk: opts.Pk, + priceWatcherPID: opts.PriceWatcherPID, + expires: opts.Expires, + status: "active", + } + } +} diff --git a/examples/trade-engine/actors/price/price.go b/examples/trade-engine/actors/price/price.go new file mode 100644 index 0000000..60a3297 --- /dev/null +++ b/examples/trade-engine/actors/price/price.go @@ -0,0 +1,106 @@ +package price + +import ( + "log/slog" + "time" + + "github.com/anthdm/hollywood/actor" + "github.com/anthdm/hollywood/examples/trade-engine/types" +) + +type priceWatcherActor struct { + ActorEngine *actor.Engine + PID *actor.PID + repeater actor.SendRepeater + ticker string + token0 string + token1 string + chain string + lastPrice float64 + updatedAt time.Time + subscribers map[*actor.PID]bool +} + +func (pw *priceWatcherActor) Receive(c *actor.Context) { + + switch msg := c.Message().(type) { + case actor.Started: + slog.Info("priceWatcher.Started", "ticker", pw.ticker) + + // set actorEngine and PID + pw.ActorEngine = c.Engine() + pw.PID = c.PID() + + // create a repeater to trigger price updates every 200ms + pw.repeater = pw.ActorEngine.SendRepeat(pw.PID, types.TriggerPriceUpdate{}, time.Millisecond*200) + + case actor.Stopped: + slog.Info("priceWatcher.Stopped", "ticker", pw.ticker) + + case types.Subscribe: + slog.Info("priceWatcher.Subscribe", "ticker", pw.ticker, "subscriber", msg.Sendto) + + // add the subscriber to the map + pw.subscribers[msg.Sendto] = true + + case types.Unsubscribe: + slog.Info("priceWatcher.Unsubscribe", "ticker", pw.ticker, "subscriber", msg.Sendto) + + // remove the subscriber from the map + delete(pw.subscribers, msg.Sendto) + + case types.TriggerPriceUpdate: + pw.refresh() + } +} + +func (pw *priceWatcherActor) refresh() { + + // check if there are any subscribers + if len(pw.subscribers) == 0 { + slog.Info("No Subscribers: Killing Price Watcher", "ticker", pw.ticker) + + // if no subscribers, kill itself + pw.Kill() + } + + // for example, just increment the price by 2 + pw.lastPrice += 2 + pw.updatedAt = time.Now() + + // send the price update to all executors + for pid := range pw.subscribers { + pw.ActorEngine.Send(pid, types.PriceUpdate{ + Ticker: pw.ticker, + UpdatedAt: pw.updatedAt, + Price: pw.lastPrice, + }) + } +} + +func (pw *priceWatcherActor) Kill() { + if pw.ActorEngine == nil { + slog.Error("priceWatcher.actorEngine is ", "ticker", pw.ticker) + } + if pw.PID == nil { + slog.Error("priceWatcher.PID is ", "ticker", pw.ticker) + } + + // stop the repeater + pw.repeater.Stop() + + // poision itself + pw.ActorEngine.Poison(pw.PID) +} + +func NewPriceActor(opts types.PriceOptions) actor.Producer { + return func() actor.Receiver { + return &priceWatcherActor{ + ticker: opts.Ticker, + token0: opts.Token0, + token1: opts.Token1, + chain: opts.Chain, + subscribers: make(map[*actor.PID]bool), + } + } +} diff --git a/examples/trade-engine/actors/tradeEngine/tradeEngine.go b/examples/trade-engine/actors/tradeEngine/tradeEngine.go new file mode 100644 index 0000000..f0ac456 --- /dev/null +++ b/examples/trade-engine/actors/tradeEngine/tradeEngine.go @@ -0,0 +1,141 @@ +package tradeEngine + +import ( + "fmt" + "log/slog" + "time" + + "github.com/anthdm/hollywood/actor" + "github.com/anthdm/hollywood/examples/trade-engine/actors/executor" + "github.com/anthdm/hollywood/examples/trade-engine/actors/price" + "github.com/anthdm/hollywood/examples/trade-engine/types" +) + +// tradeEngineActor is the main actor for the trade engine +type tradeEngineActor struct { +} + +// TradeOrderRequest is the message sent to the trade engine to create a new trade order + +func (t *tradeEngineActor) Receive(c *actor.Context) { + switch msg := c.Message().(type) { + case actor.Started: + slog.Info("tradeEngine.Started") + case actor.Stopped: + slog.Info("tradeEngine.Stopped") + case types.TradeOrderRequest: + // got new trade order, create the executor + slog.Info("tradeEngine.TradeOrderRequest", "id", msg.TradeID, "wallet", msg.Wallet) + // spawn the executor + t.spawnExecutor(msg, c) + case types.CancelOrderRequest: + // cancel the order + slog.Info("tradeEngine.CancelOrderRequest", "id", msg.TradeID) + + // cancel the order + t.cancelOrder(msg.TradeID, c) + + case types.TradeInfoRequest: + // get trade info + slog.Info("tradeEngine.TradeInfoRequest", "id", msg.TradeID) + + t.handleTradeInfoRequest(msg, c) + } +} + +func (t *tradeEngineActor) spawnExecutor(msg types.TradeOrderRequest, c *actor.Context) { + // make sure there is a price Watcher for this token pair + pricePID := t.ensurePriceWatcher(msg, c) + + // spawn the executor + options := &executor.ExecutorOptions{ + PriceWatcherPID: pricePID, + TradeID: msg.TradeID, + Ticker: toTicker(msg.Token0, msg.Token1, msg.Chain), + Token0: msg.Token0, + Token1: msg.Token1, + Chain: msg.Chain, + Wallet: msg.Wallet, + Pk: msg.PrivateKey, + Expires: msg.Expires, + } + + // spawn the actor + c.SpawnChild(executor.NewExecutorActor(options), msg.TradeID) +} + +func (t *tradeEngineActor) ensurePriceWatcher(order types.TradeOrderRequest, c *actor.Context) *actor.PID { + // create the ticker string + ticker := toTicker(order.Token0, order.Token1, order.Chain) + + // look for existing price watcher in trade-engine child actors + pid := c.Child("trade-engine/" + ticker) + if pid != nil { + // if we found a price watcher, return it + return pid + } + + // no price watcher found, spawn a new one + options := types.PriceOptions{ + Ticker: ticker, + Token0: order.Token0, + Token1: order.Token1, + Chain: order.Chain, + } + + // spawn the actor + pid = c.SpawnChild(price.NewPriceActor(options), ticker) + return pid +} + +func (t *tradeEngineActor) cancelOrder(id string, c *actor.Context) { + // get the executor + pid := c.Child("trade-engine/" + id) + if pid == nil { + // no executor found + slog.Error("Failed to cancel order", "err", "tradeExecutor PID not found", "id", id) + return + } + + // send cancel message + c.Send(pid, types.CancelOrderRequest{}) +} + +func (t *tradeEngineActor) handleTradeInfoRequest(msg types.TradeInfoRequest, c *actor.Context) { + // get the executor + pid := c.Child("trade-engine/" + msg.TradeID) + if pid == nil { + // no executor found + slog.Error("Failed to get trade info", "err", "tradeExecutor PID not found", "id", msg.TradeID) + return + } + + // send tradeInfo Request + resp := c.Request(pid, types.TradeInfoRequest{}, time.Second*5) + res, err := resp.Result() + if err != nil { + slog.Error("Failed to get trade info", "err", err) + return + } + + switch msg := res.(type) { + + case types.TradeInfoResponse: + c.Respond(msg) + + default: + slog.Error("Failed to get trade info", "err", "unknown response type") + } + +} + +func NewTradeEngine() actor.Producer { + return func() actor.Receiver { + return &tradeEngineActor{} + } +} + +// utility function to format the ticker +func toTicker(token0, token1, chain string) string { + return fmt.Sprintf("%s-%s-%s", token0, token1, chain) +} diff --git a/examples/trade-engine/main.go b/examples/trade-engine/main.go new file mode 100644 index 0000000..ffef615 --- /dev/null +++ b/examples/trade-engine/main.go @@ -0,0 +1,80 @@ +package main + +import ( + "crypto/rand" + "encoding/hex" + "fmt" + "os" + "time" + + "github.com/anthdm/hollywood/actor" + "github.com/anthdm/hollywood/examples/trade-engine/actors/tradeEngine" + "github.com/anthdm/hollywood/examples/trade-engine/types" +) + +func main() { + // example script showing the trade engine in action + // this script creates a trade order, gets the trade info and then cancels the order + + // create the actor engine + e, err := actor.NewEngine() + if err != nil { + fmt.Printf("failed to create actor engine: %v", err) + os.Exit(1) + } + + // spawn the trade engine + tradeEnginePID := e.Spawn(tradeEngine.NewTradeEngine(), "trade-engine") + + // create the trade order + fmt.Println("Creating 1 trade order to test getting trade info and cancelling") + tradeOrder := types.TradeOrderRequest{ + TradeID: GenID(), + Token0: "token0", + Token1: "token1", + Chain: "ETH", + Wallet: "wallet6", + PrivateKey: "privateKey", + // Expires: (the zero time indicate no expiry) + } + + // send the trade order + e.Send(tradeEnginePID, tradeOrder) + + // get trade info + fmt.Println("\nGetting trade info") + + // send a TradeInfoRequest to the tradeEngine + resp := e.Request(tradeEnginePID, types.TradeInfoRequest{TradeID: tradeOrder.TradeID}, 5*time.Second) + + // wait for response + res, err := resp.Result() + if err != nil { + fmt.Printf("Failed to get trade info: %v", err) + os.Exit(1) + } + + tradeInfo, ok := res.(types.TradeInfoResponse) + if !ok { + fmt.Printf("Failed to cast response to TradeInfoResponse") + os.Exit(1) + } + fmt.Printf("Trade info: %+v\n", tradeInfo) + + // small delay before cancelling + time.Sleep(2 * time.Second) + + fmt.Println("\nCancelling trade order") + e.Send(tradeEnginePID, types.CancelOrderRequest{TradeID: tradeOrder.TradeID}) + + time.Sleep(2 * time.Second) + e.Poison(tradeEnginePID).Wait() +} + +// The GenID function generates a random ID string of length 16 using a cryptographic random number +// generator. +func GenID() string { + id := make([]byte, 16) + _, _ = rand.Read(id) + return hex.EncodeToString(id) +} diff --git a/examples/trade-engine/readme.md b/examples/trade-engine/readme.md new file mode 100644 index 0000000..d6cdef4 --- /dev/null +++ b/examples/trade-engine/readme.md @@ -0,0 +1,36 @@ +# Trade-Engine Example + +**Note:** This is a conceptual example. It does not execute actual trades. + +## Overview + +This example outlines a simple trade engine system composed of three distinct types of actors. Each actor plays a specific role in the system to manage and execute trades efficiently. + +### Actor Types + +1. **Trade Engine Actor** + + - **Role:** This singular actor is responsible for creating and overseeing the Price Watcher and Trade Executor actors. + - **Functionality:** It acts as a central management hub for the system. + +2. **Price Watcher** + + - **Role:** Dedicated to monitoring the price of a specific ticker. The system creates one Price Watcher per ticker to optimize resource usage. + - **Key Functions:** + - **Periodic Updates:** Utilizes `(*actor.Engine).SendRepeat` to regularly send an `TriggerPriceUpdate` message to itself. + - **Price Update:** Upon receiving an `TriggerPriceUpdate` message, it updates the latest price and sends all subscribed Trade Executors a `PriceUpdate` message. + - **Subscriber Check:** During each `TriggerPriceUpdate`, it checks for active subscribers. If no subscribers exist, it uses `(actor.SendRepeater).Stop()` to stop the `SendRepeat` and then posions itself using `(*actor.Engine).Poison`. + +3. **Trade Executor** + - **Role:** Manages the execution of individual trades. + - **Key Functions:** + - **Subscription:** Subscribes to the Price Watcher with the ticker it is trading. + - **Trade Decision:** On receiving a `PriceUpdate`, it checks if the current price falls within its trade parameters.(not implemented in example - just prints the price) + - **Cancellation:** If a trade is canceled, it sends an `Unsubscribe` message to the Price Watcher and then poisons itself using `(*actor.Engine).Poison`. + +### Flow + +1. The **Trade Engine Actor** sets up the system by creating Price Watcher and Trade Executor actors. +2. **Price Watchers** continuously monitor prices, updating subscribed Trade Executors. +3. **Trade Executors** analyze these updates and execute trades based on the updates. +4. If a trade is canceled or completed, the corresponding Trade Executor unsubscribes from its Price Watcher and poisons itself. diff --git a/examples/trade-engine/types/types.go b/examples/trade-engine/types/types.go new file mode 100644 index 0000000..eb21a27 --- /dev/null +++ b/examples/trade-engine/types/types.go @@ -0,0 +1,64 @@ +package types + +import ( + "github.com/anthdm/hollywood/actor" + "time" +) + +// message to cancel a trade +type CancelOrderRequest struct { + TradeID string +} + +// message sent to get trade info +type TradeInfoRequest struct { + TradeID string +} + +// response message for trade info +type TradeInfoResponse struct { + // info regarding the current position + // eg price, pnl, etc + Foo int + Bar int + Price float64 // using float in example +} + +// message sent to create a new trade order +type TradeOrderRequest struct { + TradeID string + Token0 string + Token1 string + Chain string + Wallet string + PrivateKey string + Expires time.Time +} + +// options when creating new price watcher +type PriceOptions struct { + Ticker string + Token0 string + Token1 string + Chain string +} + +// price update from price watcher +type PriceUpdate struct { + Ticker string + UpdatedAt time.Time + Price float64 +} + +// subscribe to price watcher +type Subscribe struct { + Sendto *actor.PID +} + +// unsubscribe from price watcher +type Unsubscribe struct { + Sendto *actor.PID +} + +// used with SendRepeat to trigger price update +type TriggerPriceUpdate struct{}