-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor Pusher Engine with updated interface #6780
Refactor Pusher Engine with updated interface #6780
Conversation
tim-barry
commented
Dec 4, 2024
- Remove pusher engine implementation of network.Engine
- Replace with network.MessageProcessor
- See: replace engine.Unit with ComponentManager in Pusher Engine #6747 (comment)
- Remove SubmitCollectionGuarantee message type
- Was only used between Finalizer and Pusher engine
- New interface passes and stores collection guarantees directly, instead of wrapping and then unwrapping them
- See: replace engine.Unit with ComponentManager in Pusher Engine #6747 (comment)
- Add GuaranteedCollectionPublisher interface, implemented by pusher engine
- Only used by the Finalizer (and intermediate constructors)
- Mocks are generated for it, used in Finalizer unit tests
- See: replace engine.Unit with ComponentManager in Pusher Engine #6747 (comment)
- Remove pusher engine implementation of network.Engine - Replace with network.MessageProcessor - See: #6747 (comment) - Remove SubmitCollectionGuarantee message type - Was only used between Finalizer and Pusher engine - New interface passes and stores collection guarantees directly, instead of wrapping and then unwrapping them - See: #6747 (comment) - Add GuaranteedCollectionPublisher interface, implemented by pusher engine - Only used by the Finalizer (and intermediate constructors) - Mocks are generated for it, used in Finalizer unit tests - See: #6747 (comment)
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## feature/pusher-engine-refactor #6780 +/- ##
===============================================================
Coverage 41.24% 41.25%
===============================================================
Files 2061 2062 +1
Lines 182737 182738 +1
===============================================================
+ Hits 75375 75382 +7
+ Misses 101045 101044 -1
+ Partials 6317 6312 -5
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Issue: #6765 |
prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{ | ||
CollectionID: block.Payload.Collection.ID(), | ||
ReferenceBlockID: refBlock.ID(), | ||
ChainID: block.Header.ChainID, | ||
SignerIndices: block.Header.ParentVoterIndices, | ||
Signature: nil, | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can similarly change this call to be part of the assertion at the beginning of the test. We would remove lines 183-189 and replace line 150 with:
prov.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{
CollectionID: block.Payload.Collection.ID(),
ReferenceBlockID: refBlock.ID(),
ChainID: block.Header.ChainID,
SignerIndices: block.Header.ParentVoterIndices,
Signature: nil,
}).Once()
This is again a sign of older code using old patterns (not a problem with your PR: just pointing out something that is already present). When you write new tests, you should generally write specific assertions before execution, rather than afterward. It is less code, and having the assertions before execution avoids the potential pitfall of using the test output itself as the expected value.
I'm pointing this out mainly as a pattern to avoid directly replicating in your own code. But I do think it would be a good exercise to update the assertions in this test file as described above, to get a sense for how the library works and when/how assertions are validated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this in d157f48, and also made sure to see the error messages when the test fails because those assertions aren't met; definitely useful that it tells you when the number of expected calls doesn't match.
I did run into what I think is a VSCode issue: when the tests are run from VSCode and fail, the file that the error occurred in is given the wrong path (it's assumed to be in the "module/finalizer/collection/" directory, the same one as the test is in); when the tests are instead run on the command line via go test
, it only reports the filename where the error occurred (not a full path).
- Construct the mock objects with their `.New___()` method instead of using golang's built-in `new` function, which enables automatic cleanup. - Replace explicit AssertCalled checks at the end of tests with .On() and .Once() which will automatically be checked at the end of the test.
Co-authored-by: Jordan Schalm <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice work 👏
engine/collection/pusher/engine.go
Outdated
// Process processes the given event from the node with the given origin ID in | ||
// a blocking manner. It returns the potential processing error when done. | ||
// a non-blocking manner. It returns the potential processing error when done. | ||
// Because the pusher engine does not accept inputs from the network, | ||
// always drop any messages and return an error. | ||
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error { | ||
return fmt.Errorf("pusher engine should only receive local messages on the same node") | ||
return fmt.Errorf("pusher engine should only receive local messages on the same node: got message %T on channel %v from origin %v", message, channel, originID) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This suggestion is largely about mildly adjusting the framing of Process
to align with the protocol. Jordan had a helpful explanation in his comment on the prior PR, which might be a useful reference here.
Here’s how I’d frame the context:
- The Flow protocol requires that honest Collector nodes broadcast Collection guarantees to Consensus Nodes (and only those). "Broadcasting" here refers to epidemic gossip or similar algorithms used to disseminate messages efficiently while minimizing bandwidth usage. You don’t need to dive into the details for this PR, but I thought a quick note on terminology might be helpful. The relevant broadcasting operation is this:
flow-go/engine/collection/pusher/engine.go
Line 173 in 42b2331
err = e.conduit.Publish(guarantee, consensusNodes.NodeIDs()...) - Now, consider the case where an honest Collector node receives a message from another node—let’s call it "Byzantine Bob." Bob, being byzantine, might deviate from protocol rules when broadcasting messages. While the protocol specifies that Collection Guarantees should be sent to the consensus committee, Bob might specify a different set of recipients for its message.
- From the perspective of the
pusher.Engine
(only running within honest Collector nodes), we would receive such messages via thechannels.PushGuarantees
networking channel. It’s important to note that the networking layer operates as a low-level tool and doesn’t understand the higher-level protocol logic. Its role is simply to relay messages, even if they’re outside the protocol rules (there are only very very basic checks in the networking layer, as briefly explained in this notion doc).
However, there’s an opportunity to refine this behavior. Returning an error here informs the networking layer of an issue, but since the networking layer doesn’t understand the meaning of those errors, it’s essentially acting as a "glorified logger." Instead, I think it would be cleaner and more maintainable for the pusher.Engine
, which has the detailed protocol knowledge, to handle this edge case directly. For example:
- We could use the dedicated logging keys for flagging suspected protocolviolations:
flow-go/utils/logging/consts.go
Lines 4 to 6 in 42b2331
// KeySuspicious is a logging label that is used to flag the log event as suspicious behavior // This is used to add an easily searchable label to the log event KeySuspicious = "suspicious" - Additionally, we can add an explanatory log entry here to provide context for the behavior without needing to involve the networking layer.
// Process processes the given event from the node with the given origin ID in | |
// a blocking manner. It returns the potential processing error when done. | |
// a non-blocking manner. It returns the potential processing error when done. | |
// Because the pusher engine does not accept inputs from the network, | |
// always drop any messages and return an error. | |
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error { | |
return fmt.Errorf("pusher engine should only receive local messages on the same node") | |
return fmt.Errorf("pusher engine should only receive local messages on the same node: got message %T on channel %v from origin %v", message, channel, originID) | |
} | |
// Process is called by the networking layer, when peers broadcast messages with this node | |
// as one of the recipients. The protocol specifies that Collector nodes broadcast Collection | |
// Guarantees to Consensus Nodes and _only_ those. When the pusher engine (running only on | |
// Collectors) receives a message, this message is evidence of byzantine behavior. | |
// Byzantine inputs are internally handled by the pusher.Engine and do *not* result in | |
// error returns. No errors expected during normal operation (including byzantine inputs). | |
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error { | |
// Targeting a collector node's pusher.Engine with messages could be considered as a slashable offense. | |
// Though, for generating cryptographic evidence, we need Message Forensics - see reference [1]. | |
// Much further into the future, when we are implementing slashing challenges, we'll probably implement a | |
// dedicated consumer to post-process evidence of protocol violations into slashing challenges. For now, | |
// we just log this with the `KeySuspicious` to alert the node operator. | |
// [1] Message Forensics FLIP https://github.com/onflow/flips/pull/195) | |
errs := fmt.Errorf("collector node's pusher.Engine was targeted by message %T on channel %v", message, channel) | |
e.log.Warn(). | |
Err(errs). | |
Bool(logging.KeySuspicious, true). | |
Str("peer_id", originID.String()). | |
Msg("potentially byzantine networking traffic detected") | |
return nil | |
} |
// send from a non-allowed role | ||
sender := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleVerification))[0] | ||
|
||
msg := &messages.SubmitCollectionGuarantee{ | ||
Guarantee: *guarantee, | ||
} | ||
err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, msg) | ||
err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, guarantee) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunately, with the change I suggested above, we would hide the case where pusher.Engine
rejected the input from the external-facing MessageProcessor
interface
Lines 52 to 58 in 42b2331
// TODO: This function should not return an error. | |
// The networking layer's responsibility is fulfilled once it delivers a message to an engine. | |
// It does not possess the context required to handle errors that may arise during an engine's processing | |
// of the message, as error handling for message processing falls outside the domain of the networking layer. | |
// Consequently, it is reasonable to remove the error from the Process function's signature, | |
// since returning an error to the networking layer would not be useful in this context. | |
Process(channel channels.Channel, originID flow.Identifier, message interface{}) error |
Trying to adjust the test so we verify that pusher.Engine
handles any input interface and does not broadcast when Process
is called (?) ... maybe something like 👇 ?
// send from a non-allowed role | |
sender := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleVerification))[0] | |
msg := &messages.SubmitCollectionGuarantee{ | |
Guarantee: *guarantee, | |
} | |
err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, msg) | |
err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, guarantee) | |
// verify that pusher.Engine handles any (potentially byzantine) input: | |
// A byzantine peer could target the collector node's pusher engine with messages | |
// The pusher should discard those and explicitly not get tricked into broadcasting | |
// collection guarantees which a byzantine peer might try to inject into the system. | |
sender := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleVerification))[0] | |
err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, guarantee) | |
suite.Require().NoError(err) |
forgot to complement on that earlier: your PR description is great - very precise, thorough and concise. |
Improve documentation for Process method of pusher engine, and log an error instead of returning an error. See: #6780 (comment) Co-authored-by: Alexander Hentschel <[email protected]>