From 55c7955fbd1047fccce194dd595774c71d87fa6c Mon Sep 17 00:00:00 2001 From: asolovov Date: Sat, 2 Mar 2024 16:21:45 +0200 Subject: [PATCH] update: timer and ticker --- internal/service/sender.go | 55 +++++++++++++++++++++++++++---------- internal/service/service.go | 1 + internal/service/writer.go | 3 +- 3 files changed, 43 insertions(+), 16 deletions(-) diff --git a/internal/service/sender.go b/internal/service/sender.go index 5216cb5..0a759f7 100644 --- a/internal/service/sender.go +++ b/internal/service/sender.go @@ -16,56 +16,81 @@ func (s *coinAveragePriceSender) runSender() { } dur := epoch.EndTimestamp.Uint64() - epoch.CurrentTimestamp.Uint64() - logDebug(fmt.Sprintf("end epoch in %v seconds", dur), "SendCoinAveragePrice") + logInfo(fmt.Sprintf( + "epoch ID: %v, end epoch: %v, current timestamp: %v, %vs till end", + epoch.EpochID, + time.Unix(int64(epoch.EndTimestamp.Uint64()), 0), + time.Unix(int64(epoch.CurrentTimestamp.Uint64()), 0), + dur, + ), "runSender") - go s.sendARGPrice(s.stopSender) + if dur < 60 { + wait, _ := time.ParseDuration(fmt.Sprintf("%vs", dur+90)) + logInfo(fmt.Sprintf("wait for %v seconds before start", wait), "SendCoinAveragePrice") + time.Sleep(wait) + } + + ticker := time.NewTicker(time.Minute * 3) + logInfo("ticker set to 3 minutes", "SendCoinAveragePrice") + + go s.sendARGPrice(s.stopSender, ticker) } // sendARGPrice is used to send data to the flare smart contracts. // Sending flow is based on Flare documentation. Price data is sent each 3 minutes and reveal is send in the reveal timing // received from the flare smart-contract -func (s *coinAveragePriceSender) sendARGPrice(stop chan struct{}) { +func (s *coinAveragePriceSender) sendARGPrice(stop chan struct{}, ticker *time.Ticker) { + logInfo("starting send loop...", "sendARGPrice") for { select { case <-stop: logInfo("stop...", "sendARGPrice") + ticker.Stop() return - default: + case <-ticker.C: + logInfo("ticker signal", "sendARGPrice") go s.send() - time.Sleep(time.Minute * 3) } } } func (s *coinAveragePriceSender) send() { - logInfo("sending prices...", "send") epoch, err := s.flare.GetCurrentPriceEpochData() if err != nil { logErr(fmt.Sprintln("err get epoch:", err.Error()), "send") - time.Sleep(time.Second * 1) return } + logInfo(fmt.Sprintf( + "epoch ID: %v, end epoch: %v, end reveal: %v, current timestamp: %v, %vs till end, %vs till reveal", + epoch.EpochID, + time.Unix(int64(epoch.EndTimestamp.Uint64()), 0), + time.Unix(int64(epoch.RevealEndTimestamp.Uint64()), 0), + time.Unix(int64(epoch.CurrentTimestamp.Uint64()), 0), + epoch.EndTimestamp.Uint64()-epoch.CurrentTimestamp.Uint64(), + epoch.RevealEndTimestamp.Uint64()-epoch.CurrentTimestamp.Uint64(), + ), "runSender") + + logInfo(fmt.Sprintf("calculate timer duration to %vs", epoch.RevealEndTimestamp.Uint64()-epoch.CurrentTimestamp.Uint64()-60), "send") sleep, _ := time.ParseDuration(fmt.Sprintf("%vs", epoch.RevealEndTimestamp.Uint64()-epoch.CurrentTimestamp.Uint64()-60)) random := s.getRandom() prices := s.currentPrices() - logTrace(fmt.Sprintf("commiting prices"), "send") if err := s.flare.CommitPrices(epoch.EpochID, s.tokens, prices, random); err != nil { - time.Sleep(time.Second * 1) return } - go s.reveal(sleep, epoch.EpochID, s.tokens, prices, random) + timer := time.NewTimer(sleep) + + go s.reveal(timer, epoch.EpochID, s.tokens, prices, random) } // reveal will wait the sleep time and then call the reveal smart-contract method -func (s *coinAveragePriceSender) reveal(sleep time.Duration, epochID *big.Int, indices []contracts.TokenID, prices []*big.Int, random *big.Int) { - logDebug(fmt.Sprintf("received for reveal: epochID %v, indices %v, prices %v, random %v", epochID, indices, prices, random), "reveal") - logDebug(fmt.Sprintln("sleep for:", sleep), "reveal") - time.Sleep(sleep) - logTrace(fmt.Sprintf("revealing price for the %v epoch", epochID.Int64()), "reveal") +func (s *coinAveragePriceSender) reveal(timer *time.Timer, epochID *big.Int, indices []contracts.TokenID, prices []*big.Int, random *big.Int) { + logInfo(fmt.Sprintf("received for reveal: epochID %v, indices %v, prices %v, random %v", epochID, indices, prices, random), "reveal") + <-timer.C + logInfo(fmt.Sprintf("revealing price for the %v epoch", epochID.Int64()), "reveal") if err := s.flare.RevealPrices(epochID, indices, prices, random); err != nil { logErr("err reveal", "reveal") } diff --git a/internal/service/service.go b/internal/service/service.go index 7bde169..e7e763b 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -71,6 +71,7 @@ func (s *service) ListenAndSendAverageCoinPrice(tokens []string) { s.nextID++ + logInfo("starting write and send go-routines...", "ListenAndSendAverageCoinPrice") go sender.runWriter() go sender.runSender() } diff --git a/internal/service/writer.go b/internal/service/writer.go index a460917..aa27723 100644 --- a/internal/service/writer.go +++ b/internal/service/writer.go @@ -20,6 +20,7 @@ func (s *coinAveragePriceSender) runWriter() { // listenAndWrite is used to listen to the CoinAveragePriceStream chanel and write it to the coinAveragePriceSender struct func (s *coinAveragePriceSender) listenAndWrite(tokens []string, id int, freq int, stream chan *wsClient.CoinAveragePriceStream, stop chan struct{}) { + logInfo("starting listed and write loop...", "listenAndWrite") for { select { case <-stop: @@ -32,7 +33,7 @@ func (s *coinAveragePriceSender) listenAndWrite(tokens []string, id int, freq in return } case data := <-stream: - logTrace(fmt.Sprintf("received data on the %s coin", data.Coin), "listenAndWrite") + logInfo(fmt.Sprintf("received data on the %s coin", data.Coin), "listenAndWrite") tokenID := contracts.GetTokenIDFromName(data.Coin) if tokenID == contracts.UnknownToken && tokenID.Index().Int64() < 0 {