Skip to content

Commit

Permalink
if no payload is received, log which relays sent the corresponding bid (
Browse files Browse the repository at this point in the history
#229)

* bid cache to log relay on withholding

* remember multiple relays when delivering the same bid

* cleanup
  • Loading branch information
metachris authored Aug 3, 2022
1 parent d476fec commit 9b3ce62
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 35 deletions.
123 changes: 88 additions & 35 deletions server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -22,7 +23,6 @@ var (
errInvalidSlot = errors.New("invalid slot")
errInvalidHash = errors.New("invalid hash")
errInvalidPubkey = errors.New("invalid pubkey")
errInvalidSignature = errors.New("invalid signature")
errNoSuccessfulRelayResponse = errors.New("no successful relay response")

errServerAlreadyRunning = errors.New("server already running")
Expand All @@ -46,7 +46,7 @@ type BoostServiceOpts struct {
RelayCheck bool
}

// BoostService TODO
// BoostService - the mev-boost service
type BoostService struct {
listenAddr string
relays []RelayEntry
Expand All @@ -56,6 +56,9 @@ type BoostService struct {

builderSigningDomain types.Domain
httpClient http.Client

bidsLock sync.Mutex
bids map[bidRespKey]bidResp // keeping track of bids, to log the originating relay on withholding
}

// NewBoostService created a new BoostService
Expand All @@ -74,6 +77,7 @@ func NewBoostService(opts BoostServiceOpts) (*BoostService, error) {
relays: opts.Relays,
log: opts.Log.WithField("module", "service"),
relayCheck: opts.RelayCheck,
bids: make(map[bidRespKey]bidResp),

builderSigningDomain: builderSigningDomain,
httpClient: http.Client{
Expand Down Expand Up @@ -124,6 +128,8 @@ func (m *BoostService) StartHTTPServer() error {
return errServerAlreadyRunning
}

go m.startBidCacheCleanupTask()

m.srv = &http.Server{
Addr: m.listenAddr,
Handler: m.getRouter(),
Expand All @@ -143,6 +149,19 @@ func (m *BoostService) StartHTTPServer() error {
return err
}

func (m *BoostService) startBidCacheCleanupTask() {
for {
time.Sleep(1 * time.Minute)
m.bidsLock.Lock()
for k, bidResp := range m.bids {
if time.Since(bidResp.t) > 3*time.Minute {
delete(m.bids, k)
}
}
m.bidsLock.Unlock()
}
}

func (m *BoostService) handleRoot(w http.ResponseWriter, req *http.Request) {
m.respondOK(w, nilResponse)
}
Expand Down Expand Up @@ -193,7 +212,7 @@ func (m *BoostService) handleStatus(w http.ResponseWriter, req *http.Request) {
}
}

// RegisterValidatorV1 - returns 200 if at least one relay returns 200
// handleRegisterValidator - returns 200 if at least one relay returns 200, else 502
func (m *BoostService) handleRegisterValidator(w http.ResponseWriter, req *http.Request) {
log := m.log.WithField("method", "registerValidator")
log.Info("registerValidator")
Expand Down Expand Up @@ -235,7 +254,7 @@ func (m *BoostService) handleRegisterValidator(w http.ResponseWriter, req *http.
}
}

// GetHeaderV1 TODO
// handleGetHeader requests bids from the relays
func (m *BoostService) handleGetHeader(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
slot := vars["slot"]
Expand All @@ -247,9 +266,10 @@ func (m *BoostService) handleGetHeader(w http.ResponseWriter, req *http.Request)
"parentHash": parentHashHex,
"pubkey": pubkey,
})
log.Info("getHeader")
log.Debug("getHeader")

if _, err := strconv.ParseUint(slot, 10, 64); err != nil {
_slot, err := strconv.ParseUint(slot, 10, 64)
if err != nil {
m.respondError(w, http.StatusBadRequest, errInvalidSlot.Error())
return
}
Expand All @@ -264,8 +284,10 @@ func (m *BoostService) handleGetHeader(w http.ResponseWriter, req *http.Request)
return
}

result := new(types.GetHeaderResponse)
var mu sync.Mutex
relays := make(map[string][]string) // relays per blockHash
result := bidResp{}

ua := UserAgent(req.Header.Get("User-Agent"))

// Call the relays
Expand Down Expand Up @@ -294,9 +316,10 @@ func (m *BoostService) handleGetHeader(w http.ResponseWriter, req *http.Request)
return
}

blockHash := responsePayload.Data.Message.Header.BlockHash.String()
log = log.WithFields(logrus.Fields{
"blockNumber": responsePayload.Data.Message.Header.BlockNumber,
"blockHash": responsePayload.Data.Message.Header.BlockHash,
"blockHash": blockHash,
"txRoot": responsePayload.Data.Message.Header.TransactionsRoot.String(),
"value": responsePayload.Data.Message.Value.String(),
})
Expand All @@ -308,7 +331,7 @@ func (m *BoostService) handleGetHeader(w http.ResponseWriter, req *http.Request)
return
}
if !ok {
log.WithError(errInvalidSignature).Error("failed to verify relay signature")
log.Error("failed to verify relay signature")
return
}

Expand All @@ -325,46 +348,75 @@ func (m *BoostService) handleGetHeader(w http.ResponseWriter, req *http.Request)
mu.Lock()
defer mu.Unlock()

// Remember which relays delivered which bids (multiple relays might deliver the top bid)
if _, ok := relays[blockHash]; !ok {
relays[blockHash] = []string{relay.String()}
} else {
relays[blockHash] = append(relays[blockHash], relay.String())
}

// Skip if value (fee) is not greater than the current highest value
if result.Data != nil && responsePayload.Data.Message.Value.Cmp(&result.Data.Message.Value) < 1 {
if result.response.Data != nil && responsePayload.Data.Message.Value.Cmp(&result.response.Data.Message.Value) < 1 {
return
}

// Use this relay's response as mev-boost response because it's most profitable
*result = *responsePayload
log.Info("successfully got more valuable payload header")
log.Debug("received a good bid")
result.response = *responsePayload
result.blockHash = blockHash
result.t = time.Now()
}(relay)
}

// Wait for all requests to complete...
wg.Wait()

if result.Data == nil || result.Data.Message == nil || result.Data.Message.Header == nil || result.Data.Message.Header.BlockHash == nilHash {
log.Info("no bids received from relay")
if result.blockHash == "" {
log.Info("no bid received")
w.WriteHeader(http.StatusNoContent)
return
}

m.respondOK(w, result)
// Log result
result.relays = relays[result.blockHash]
log.WithFields(logrus.Fields{
"blockHash": result.blockHash,
"blockNumber": result.response.Data.Message.Header.BlockNumber,
"txRoot": result.response.Data.Message.Header.TransactionsRoot.String(),
"value": result.response.Data.Message.Value.String(),
"relays": strings.Join(result.relays, ", "),
}).Info("best bid")

// Remember the bid, for future logging in case of withholding
bidKey := bidRespKey{slot: _slot, blockHash: result.blockHash}
m.bidsLock.Lock()
m.bids[bidKey] = result
m.bidsLock.Unlock()

// Return the bid
m.respondOK(w, result.response)
}

func (m *BoostService) handleGetPayload(w http.ResponseWriter, req *http.Request) {
log := m.log.WithField("method", "getPayload")
log.Info("getPayload")
log.Debug("getPayload")

payload := new(types.SignedBlindedBeaconBlock)
if err := DecodeJSON(req.Body, &payload); err != nil {
m.respondError(w, http.StatusBadRequest, err.Error())
return
}

result := new(types.GetPayloadResponse)
requestCtx, requestCtxCancel := context.WithCancel(context.Background())
defer requestCtxCancel()
log = log.WithField("blockHash", payload.Message.Body.ExecutionPayloadHeader.BlockHash.String())
var wg sync.WaitGroup
var mu sync.Mutex
result := new(types.GetPayloadResponse)
ua := UserAgent(req.Header.Get("User-Agent"))

// Prepare the request context, which will be cancelled after the first successful response from a relay
requestCtx, requestCtxCancel := context.WithCancel(context.Background())
defer requestCtxCancel()

for _, relay := range m.relays {
wg.Add(1)
go func(relay RelayEntry) {
Expand All @@ -377,12 +429,20 @@ func (m *BoostService) handleGetPayload(w http.ResponseWriter, req *http.Request
_, err := SendHTTPRequest(requestCtx, m.httpClient, http.MethodPost, url, ua, payload, responsePayload)

if err != nil {
log.WithError(err).Warn("error making request to relay")
log.WithError(err).Error("error making request to relay")
return
}

if responsePayload.Data == nil || responsePayload.Data.BlockHash == nilHash {
log.Warn("invalid response")
log.Error("response with empty data!")
return
}

// Ensure the response blockhash matches the request
if payload.Message.Body.ExecutionPayloadHeader.BlockHash != responsePayload.Data.BlockHash {
log.WithFields(logrus.Fields{
"responseBlockHash": responsePayload.Data.BlockHash.String(),
}).Error("requestBlockHash does not equal responseBlockHash")
return
}

Expand All @@ -394,30 +454,23 @@ func (m *BoostService) handleGetPayload(w http.ResponseWriter, req *http.Request
return
}

// Ensure the response blockhash matches the request
if payload.Message.Body.ExecutionPayloadHeader.BlockHash != responsePayload.Data.BlockHash {
log.WithFields(logrus.Fields{
"payloadBlockHash": payload.Message.Body.ExecutionPayloadHeader.BlockHash,
"responseBlockHash": responsePayload.Data.BlockHash,
}).Warn("requestBlockHash does not equal responseBlockHash")
return
}

// Received successful response. Now cancel other requests and return immediately
requestCtxCancel()
*result = *responsePayload
log.WithFields(logrus.Fields{
"blockHash": responsePayload.Data.BlockHash,
"blockNumber": responsePayload.Data.BlockNumber,
}).Info("getPayload: received payload from relay")
log.Info("received payload from relay")
}(relay)
}

// Wait for all requests to complete...
wg.Wait()

// If no payload has been received from relay, log loudly about withholding!
if result.Data == nil || result.Data.BlockHash == nilHash {
log.Warn("getPayload: no valid response from relay")
bidKey := bidRespKey{slot: payload.Message.Slot, blockHash: payload.Message.Body.ExecutionPayloadHeader.BlockHash.String()}
m.bidsLock.Lock()
originalResp := m.bids[bidKey]
m.bidsLock.Unlock()
log.WithField("relays", strings.Join(originalResp.relays, ", ")).Errorf("no payload received from relay -- withholding or network error --")
m.respondError(w, http.StatusBadGateway, errNoSuccessfulRelayResponse.Error())
return
}
Expand Down
15 changes: 15 additions & 0 deletions server/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"net/http"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand Down Expand Up @@ -98,3 +99,17 @@ func DecodeJSON(r io.Reader, dst any) error {
}
return nil
}

// bidResp are entries in the bids cache
type bidResp struct {
t time.Time
response types.GetHeaderResponse
blockHash string
relays []string
}

// bidRespKey is used as key for the bids cache
type bidRespKey struct {
slot uint64
blockHash string
}

0 comments on commit 9b3ce62

Please sign in to comment.