Skip to content

Commit

Permalink
Merge pull request uddugteam#20 from uddugteam/deliverence/fixTime
Browse files Browse the repository at this point in the history
  • Loading branch information
andskur authored Mar 4, 2024
2 parents bd7f2d4 + 10878ff commit 36c7f01
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 172 deletions.
2 changes: 1 addition & 1 deletion config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func init() {
// 114 - coston2 chain testnet
// 19 - songbird chain net

viper.SetDefault("tokens", []string{"ETH", "BTC"})
viper.SetDefault("tokens", []string{"BTC", "ETH"})

viper.SetDefault("flare.rpcurl", "https://flare-coston2.eu-north-2.gateway.fm/ext/bc/C/rpc")

Expand Down
2 changes: 1 addition & 1 deletion internal/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (app *App) WhiteListAddressAll(address string) error {

// Serve start serving Application service
func (app *App) Serve() error {
go app.srv.ListenAndSendAverageCoinPrice(app.config.Tokens)
go app.srv.SendCoinAveragePrice(app.config.Tokens)

// Gracefully shutdown the server
quit := make(chan os.Signal, 1)
Expand Down
105 changes: 65 additions & 40 deletions internal/service/coinAveragePriceSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,86 +2,100 @@ package service

import (
"crypto/rand"
"fmt"
"math/big"
"sync"
"time"

"golang.org/x/sync/syncmap"

"oracle-flare/pkg/flare"
"oracle-flare/pkg/flare/contracts"
"oracle-flare/pkg/wsClient"
)

// coinAveragePriceSender is a struct of the coin average prices sender
type coinAveragePriceSender struct {
// SendCoinAveragePrice is used to subscribe on the avg price and send results to the flare smart contracts
func (s *service) SendCoinAveragePrice(tokens []string) {
parsedTokens := []contracts.TokenID{}

for _, t := range tokens {
parsedToken := contracts.GetTokenIDFromName(t)
if parsedToken == contracts.UnknownToken {
logWarn(fmt.Sprintln("received unknown token:", t), "SendCoinAveragePrice")
} else {
parsedTokens = append(parsedTokens, parsedToken)
}
}

if len(parsedTokens) == 0 {
logErr("all tokens are invalid", "SendCoinAveragePrice")
return
}

sender := newCoinAvgPriceSender(len(s.avgPriceSenders), s.flare, s.wsClient, parsedTokens)
s.avgPriceSenders = append(s.avgPriceSenders, sender)

go sender.runWriter()

sender.ticker = time.NewTicker(time.Minute * 3)
logInfo("set ticker for sender to 3m", "SendCoinAveragePrice")
go sender.runSender()
}

// coinAVGPriceSender is a struct of the coin average prices sender
type coinAVGPriceSender struct {
// id is a WS id
id int

flare flare.IFlare
wsClient wsClient.IWSClient

stream chan *wsClient.CoinAveragePriceStream
stopWriter chan struct{}
stopSender chan struct{}
stream chan *wsClient.CoinAveragePriceStream
resubscribe chan struct{}

ticker *time.Ticker

// tokens are the tokens for each submit-reveal flow
tokens []contracts.TokenID
// prices are the prices for next submit-reveal flow
prices sync.Map
prices *syncmap.Map
}

// newCoinAveragePriceSender is used to get new coinAveragePriceSender instance
func newCoinAveragePriceSender(
id int,
flare flare.IFlare,
ws wsClient.IWSClient,
tokens []contracts.TokenID,
) *coinAveragePriceSender {
return &coinAveragePriceSender{
// newCoinAvgPriceSender is used to get new coinAVGPriceSender instance
func newCoinAvgPriceSender(id int, flare flare.IFlare, ws wsClient.IWSClient, tokens []contracts.TokenID) *coinAVGPriceSender {
return &coinAVGPriceSender{
id: id,
flare: flare,
wsClient: ws,
stream: make(chan *wsClient.CoinAveragePriceStream),
stopWriter: make(chan struct{}),
stopSender: make(chan struct{}),
stream: make(chan *wsClient.CoinAveragePriceStream),
resubscribe: make(chan struct{}),
tokens: tokens,
prices: sync.Map{},
prices: &syncmap.Map{},
}
}

// tokenNames is used to get token names string from the tokens struct
func (s *coinAveragePriceSender) tokenNames() []string {
tokenNames := []string{}

// parsePrices is used to get big int prices from the prices map
func (s *coinAVGPriceSender) parsePrices() (prices []*big.Int) {
for _, t := range s.tokens {
tokenNames = append(tokenNames, t.Name())
}

return tokenNames
}

func (s *coinAveragePriceSender) currentPrices() []*big.Int {
prices := []*big.Int{}

for _, t := range s.tokens {
price := big.NewInt(0)

p, ok := s.prices.Load(t)
p := big.NewInt(0)
pp, ok := s.prices.Load(t)
if ok {
pb, ok := p.(*big.Int)
pb, ok := pp.(*big.Int)
if ok {
price = pb
p = pb
}
}

prices = append(prices, price)
prices = append(prices, p)
}

return prices
}

// getRandom is used to update random arg and return it
func (s *coinAveragePriceSender) getRandom() *big.Int {
func (s *coinAVGPriceSender) getRandom() *big.Int {
random, err := rand.Prime(rand.Reader, 130)
if err != nil {
return s.getRandom()
Expand All @@ -90,8 +104,19 @@ func (s *coinAveragePriceSender) getRandom() *big.Int {
return random
}

// close is used to close coin average price writer
func (s *coinAveragePriceSender) close() {
close(s.stopSender)
// tokenNames is used to get token names string from the tokens struct
func (s *coinAVGPriceSender) tokenNames() []string {
tokenNames := []string{}

for _, t := range s.tokens {
tokenNames = append(tokenNames, t.Name())
}

return tokenNames
}

// close is used to close coin average price sender
func (s *coinAVGPriceSender) close() {
close(s.stopWriter)
close(s.stopSender)
}
95 changes: 23 additions & 72 deletions internal/service/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,90 +8,41 @@ import (
"oracle-flare/pkg/flare/contracts"
)

// runSender is used to run reveal-submit flow
func (s *coinAveragePriceSender) runSender() {
epoch, err := s.flare.GetCurrentPriceEpochData()
if err != nil {
logErr(fmt.Sprintln("err get epoch:", err.Error()), "SendCoinAveragePrice")
}

dur := epoch.EndTimestamp.Uint64() - epoch.CurrentTimestamp.Uint64()
logInfo(fmt.Sprintf(
"epoch ID: %v, end epoch: %v, current timestamp: %v, %vs till end",
epoch.EpochID,
time.Unix(int64(epoch.EndTimestamp.Uint64()), 0),
time.Unix(int64(epoch.CurrentTimestamp.Uint64()), 0),
dur,
), "runSender")

if dur < 60 {
wait, _ := time.ParseDuration(fmt.Sprintf("%vs", dur+90))
logInfo(fmt.Sprintf("wait for %v seconds before start", wait), "SendCoinAveragePrice")
time.Sleep(wait)
}

ticker := time.NewTicker(time.Minute * 3)
logInfo("ticker set to 3 minutes", "SendCoinAveragePrice")

go s.sendARGPrice(s.stopSender, ticker)
}

// sendARGPrice is used to send data to the flare smart contracts.
// Sending flow is based on Flare documentation. Price data is sent each 3 minutes and reveal is send in the reveal timing
// received from the flare smart-contract
func (s *coinAveragePriceSender) sendARGPrice(stop chan struct{}, ticker *time.Ticker) {
logInfo("starting send loop...", "sendARGPrice")
func (s *coinAVGPriceSender) runSender() {
for {
select {
case <-stop:
logInfo("stop...", "sendARGPrice")
ticker.Stop()
case <-s.stopSender:
logInfo("stop...", "Sender")
return
case <-ticker.C:
logInfo("ticker signal", "sendARGPrice")
go s.send()
}
}
}

func (s *coinAveragePriceSender) send() {
epoch, err := s.flare.GetCurrentPriceEpochData()
if err != nil {
logErr(fmt.Sprintln("err get epoch:", err.Error()), "send")
return
}
case <-s.ticker.C:
logInfo(fmt.Sprintf("commiting price"), "Sender")

logInfo(fmt.Sprintf(
"epoch ID: %v, end epoch: %v, end reveal: %v, current timestamp: %v, %vs till end, %vs till reveal",
epoch.EpochID,
time.Unix(int64(epoch.EndTimestamp.Uint64()), 0),
time.Unix(int64(epoch.RevealEndTimestamp.Uint64()), 0),
time.Unix(int64(epoch.CurrentTimestamp.Uint64()), 0),
epoch.EndTimestamp.Uint64()-epoch.CurrentTimestamp.Uint64(),
epoch.RevealEndTimestamp.Uint64()-epoch.CurrentTimestamp.Uint64(),
), "runSender")
epoch, err := s.flare.GetCurrentPriceEpochData()
if err != nil {
logErr(fmt.Sprintln("err get epoch:", err.Error()), "Sender")
continue
}

logInfo(fmt.Sprintf("calculate timer duration to %vs", epoch.RevealEndTimestamp.Uint64()-epoch.CurrentTimestamp.Uint64()-60), "send")
sleep, _ := time.ParseDuration(fmt.Sprintf("%vs", epoch.RevealEndTimestamp.Uint64()-epoch.CurrentTimestamp.Uint64()-60))
random := s.getRandom()
sleep, _ := time.ParseDuration(fmt.Sprintf("%vs", epoch.RevealEndTimestamp.Uint64()-epoch.CurrentTimestamp.Uint64()-60))
random := s.getRandom()

prices := s.currentPrices()
if err := s.flare.CommitPrices(epoch.EpochID, s.tokens, s.parsePrices(), random); err != nil {
continue
}

if err := s.flare.CommitPrices(epoch.EpochID, s.tokens, prices, random); err != nil {
return
timer := time.NewTimer(sleep)
logInfo(fmt.Sprintf("time for reverl: %v", sleep), "Sender")
go s.reveal(timer, epoch.EpochID, s.tokens, s.parsePrices(), random)
}
}

timer := time.NewTimer(sleep)

go s.reveal(timer, epoch.EpochID, s.tokens, prices, random)
}

// reveal will wait the sleep time and then call the reveal smart-contract method
func (s *coinAveragePriceSender) reveal(timer *time.Timer, epochID *big.Int, indices []contracts.TokenID, prices []*big.Int, random *big.Int) {
logInfo(fmt.Sprintf("received for reveal: epochID %v, indices %v, prices %v, random %v", epochID, indices, prices, random), "reveal")
func (s *coinAVGPriceSender) reveal(timer *time.Timer, epochID *big.Int, indices []contracts.TokenID, prices []*big.Int, random *big.Int) {
logInfo(fmt.Sprintf("received for reveal: epochID %v, indices %v, prices %v, random %v", epochID, indices, prices, random), "Sender")
<-timer.C
logInfo(fmt.Sprintf("revealing price for the %v epoch", epochID.Int64()), "reveal")
logInfo(fmt.Sprintf("revealing price for the %v epoch", epochID.Int64()), "Sender")
if err := s.flare.RevealPrices(epochID, indices, prices, random); err != nil {
logErr("err reveal", "reveal")
logErr("err reveal", "Sender")
}
}
39 changes: 4 additions & 35 deletions internal/service/service.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
package service

import (
"fmt"

"oracle-flare/pkg/flare"
"oracle-flare/pkg/flare/contracts"
"oracle-flare/pkg/wsClient"
)

// IService is a service layer interface
type IService interface {
// WhiteListAddress is used to add address to the smart-contract whitelist with given tokens
WhiteListAddress(addressS string, indicesS []string) ([]bool, error)
// ListenAndSendAverageCoinPrice is
ListenAndSendAverageCoinPrice(tokens []string)
// SendCoinAveragePrice is used to send coin average price from the ws service to the flare smart-contracts
SendCoinAveragePrice(tokens []string)
// Close is used to stop the service
Close()
}
Expand All @@ -23,16 +20,15 @@ type service struct {
flare flare.IFlare
wsClient wsClient.IWSClient

nextID int
avgPriceSenders []*coinAveragePriceSender
avgPriceSenders []*coinAVGPriceSender
resubscribe chan struct{}
}

// NewService is used to get new service instance
func NewService(ws wsClient.IWSClient, flare flare.IFlare) IService {
logInfo("creating new service...", "Init")
c := &service{
avgPriceSenders: make([]*coinAveragePriceSender, 0),
avgPriceSenders: make([]*coinAVGPriceSender, 0),
}

if ws != nil {
Expand All @@ -49,33 +45,6 @@ func NewService(ws wsClient.IWSClient, flare flare.IFlare) IService {
return c
}

func (s *service) ListenAndSendAverageCoinPrice(tokens []string) {
parsedTokens := []contracts.TokenID{}

for _, t := range tokens {
parsedToken := contracts.GetTokenIDFromName(t)
if parsedToken == contracts.UnknownToken {
logWarn(fmt.Sprintln("received unknown token:", t), "SendCoinAveragePrice")
} else {
parsedTokens = append(parsedTokens, parsedToken)
}
}

if len(parsedTokens) == 0 {
logErr("all tokens are invalid", "SendCoinAveragePrice")
return
}

sender := newCoinAveragePriceSender(s.nextID, s.flare, s.wsClient, parsedTokens)
s.avgPriceSenders = append(s.avgPriceSenders, sender)

s.nextID++

logInfo("starting write and send go-routines...", "ListenAndSendAverageCoinPrice")
go sender.runWriter()
go sender.runSender()
}

func (s *service) listenResubscribe() {
for {
select {
Expand Down
Loading

0 comments on commit 36c7f01

Please sign in to comment.