Skip to content

Commit

Permalink
Merge branch 'uddugteam:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
andskur authored Feb 29, 2024
2 parents 69189bf + 3294a26 commit 31ed2a6
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 16 deletions.
15 changes: 8 additions & 7 deletions internal/service/coinAveragePriceSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ type coinAveragePriceSender struct {
flare flare.IFlare
wsClient wsClient.IWSClient

mu sync.Mutex

stopWriter chan struct{}
stopSender chan struct{}
stream chan *wsClient.CoinAveragePriceStream
Expand All @@ -28,7 +26,7 @@ type coinAveragePriceSender struct {
// tokens are the tokens for each submit-reveal flow
tokens []contracts.TokenID
// prices are the prices for next submit-reveal flow
prices map[contracts.TokenID]*big.Int
prices sync.Map
}

// newCoinAveragePriceSender is used to get new coinAveragePriceSender instance
Expand All @@ -42,13 +40,12 @@ func newCoinAveragePriceSender(
id: id,
flare: flare,
wsClient: ws,
mu: sync.Mutex{},
stopWriter: make(chan struct{}),
stopSender: make(chan struct{}),
stream: make(chan *wsClient.CoinAveragePriceStream),
resubscribe: make(chan struct{}),
tokens: tokens,
prices: make(map[contracts.TokenID]*big.Int),
prices: sync.Map{},
}
}

Expand All @@ -69,8 +66,12 @@ func (s *coinAveragePriceSender) currentPrices() []*big.Int {
for _, t := range s.tokens {
price := big.NewInt(0)

if s.prices[t] != nil {
price = s.prices[t]
p, ok := s.prices.Load(t)
if ok {
pb, ok := p.(*big.Int)
if ok {
price = pb
}
}

prices = append(prices, price)
Expand Down
6 changes: 2 additions & 4 deletions internal/service/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *coinAveragePriceSender) sendARGPrice(stop chan struct{}) {
for {
select {
case <-stop:
logTrace("stop...", "sendARGPrice")
logInfo("stop...", "sendARGPrice")
return
default:
go s.send()
Expand All @@ -38,7 +38,7 @@ func (s *coinAveragePriceSender) sendARGPrice(stop chan struct{}) {
}

func (s *coinAveragePriceSender) send() {
logTrace("sending prices...", "send")
logInfo("sending prices...", "send")
epoch, err := s.flare.GetCurrentPriceEpochData()
if err != nil {
logErr(fmt.Sprintln("err get epoch:", err.Error()), "send")
Expand All @@ -49,9 +49,7 @@ func (s *coinAveragePriceSender) send() {
sleep, _ := time.ParseDuration(fmt.Sprintf("%vs", epoch.RevealEndTimestamp.Uint64()-epoch.CurrentTimestamp.Uint64()-60))
random := s.getRandom()

s.mu.Lock()
prices := s.currentPrices()
s.mu.Unlock()

logTrace(fmt.Sprintf("commiting prices"), "send")
if err := s.flare.CommitPrices(epoch.EpochID, s.tokens, prices, random); err != nil {
Expand Down
8 changes: 3 additions & 5 deletions internal/service/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ func (s *coinAveragePriceSender) listenAndWrite(tokens []string, id int, freq in
for {
select {
case <-stop:
logTrace("stop...", "listenAndWrite")
logInfo("stop...", "listenAndWrite")
return
case <-s.resubscribe:
logTrace("resubscribing...", "listenAndWrite")
logInfo("resubscribing...", "listenAndWrite")
if err := s.subscribeCoinAveragePrice(tokens, id, freq, stream); err != nil {
logErr(fmt.Sprintln("err resubscribe:", err.Error()), "listenAndWrite")
return
Expand All @@ -44,9 +44,7 @@ func (s *coinAveragePriceSender) listenAndWrite(tokens []string, id int, freq in
price = price.Mul(price, big.NewFloat(100000))
integer, _ := price.Int64()

s.mu.Lock()
s.prices[tokenID] = big.NewInt(integer)
s.mu.Unlock()
s.prices.Store(tokenID, big.NewInt(integer))
}
}
}
Expand Down

0 comments on commit 31ed2a6

Please sign in to comment.