From 6528aa6a15fe3eeb58513436431ad2aa510d85d7 Mon Sep 17 00:00:00 2001 From: asolovov Date: Thu, 29 Feb 2024 20:02:57 +0200 Subject: [PATCH] update: sync map --- internal/service/coinAveragePriceSender.go | 15 ++++++++------- internal/service/sender.go | 6 ++---- internal/service/writer.go | 8 +++----- 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/internal/service/coinAveragePriceSender.go b/internal/service/coinAveragePriceSender.go index 247bee4..b312ae4 100644 --- a/internal/service/coinAveragePriceSender.go +++ b/internal/service/coinAveragePriceSender.go @@ -18,8 +18,6 @@ type coinAveragePriceSender struct { flare flare.IFlare wsClient wsClient.IWSClient - mu sync.Mutex - stopWriter chan struct{} stopSender chan struct{} stream chan *wsClient.CoinAveragePriceStream @@ -28,7 +26,7 @@ type coinAveragePriceSender struct { // tokens are the tokens for each submit-reveal flow tokens []contracts.TokenID // prices are the prices for next submit-reveal flow - prices map[contracts.TokenID]*big.Int + prices sync.Map } // newCoinAveragePriceSender is used to get new coinAveragePriceSender instance @@ -42,13 +40,12 @@ func newCoinAveragePriceSender( id: id, flare: flare, wsClient: ws, - mu: sync.Mutex{}, stopWriter: make(chan struct{}), stopSender: make(chan struct{}), stream: make(chan *wsClient.CoinAveragePriceStream), resubscribe: make(chan struct{}), tokens: tokens, - prices: make(map[contracts.TokenID]*big.Int), + prices: sync.Map{}, } } @@ -69,8 +66,12 @@ func (s *coinAveragePriceSender) currentPrices() []*big.Int { for _, t := range s.tokens { price := big.NewInt(0) - if s.prices[t] != nil { - price = s.prices[t] + p, ok := s.prices.Load(t) + if ok { + pb, ok := p.(*big.Int) + if ok { + price = pb + } } prices = append(prices, price) diff --git a/internal/service/sender.go b/internal/service/sender.go index 66a4fe2..5216cb5 100644 --- a/internal/service/sender.go +++ b/internal/service/sender.go @@ -28,7 +28,7 @@ func (s *coinAveragePriceSender) sendARGPrice(stop chan struct{}) { for { select { case <-stop: - logTrace("stop...", "sendARGPrice") + logInfo("stop...", "sendARGPrice") return default: go s.send() @@ -38,7 +38,7 @@ func (s *coinAveragePriceSender) sendARGPrice(stop chan struct{}) { } func (s *coinAveragePriceSender) send() { - logTrace("sending prices...", "send") + logInfo("sending prices...", "send") epoch, err := s.flare.GetCurrentPriceEpochData() if err != nil { logErr(fmt.Sprintln("err get epoch:", err.Error()), "send") @@ -49,9 +49,7 @@ func (s *coinAveragePriceSender) send() { sleep, _ := time.ParseDuration(fmt.Sprintf("%vs", epoch.RevealEndTimestamp.Uint64()-epoch.CurrentTimestamp.Uint64()-60)) random := s.getRandom() - s.mu.Lock() prices := s.currentPrices() - s.mu.Unlock() logTrace(fmt.Sprintf("commiting prices"), "send") if err := s.flare.CommitPrices(epoch.EpochID, s.tokens, prices, random); err != nil { diff --git a/internal/service/writer.go b/internal/service/writer.go index bb19310..a460917 100644 --- a/internal/service/writer.go +++ b/internal/service/writer.go @@ -23,10 +23,10 @@ func (s *coinAveragePriceSender) listenAndWrite(tokens []string, id int, freq in for { select { case <-stop: - logTrace("stop...", "listenAndWrite") + logInfo("stop...", "listenAndWrite") return case <-s.resubscribe: - logTrace("resubscribing...", "listenAndWrite") + logInfo("resubscribing...", "listenAndWrite") if err := s.subscribeCoinAveragePrice(tokens, id, freq, stream); err != nil { logErr(fmt.Sprintln("err resubscribe:", err.Error()), "listenAndWrite") return @@ -44,9 +44,7 @@ func (s *coinAveragePriceSender) listenAndWrite(tokens []string, id int, freq in price = price.Mul(price, big.NewFloat(100000)) integer, _ := price.Int64() - s.mu.Lock() - s.prices[tokenID] = big.NewInt(integer) - s.mu.Unlock() + s.prices.Store(tokenID, big.NewInt(integer)) } } }