From 38e71518dcb96cb4523c6159a4280de59333bcf6 Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Tue, 19 Nov 2024 13:42:45 +0200 Subject: [PATCH 01/26] Implemented pending transaction trigger --- .../backend/backend_stream_transactions.go | 51 +++++++++++++------ .../backend_stream_transactions_test.go | 2 +- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/engine/access/rpc/backend/backend_stream_transactions.go b/engine/access/rpc/backend/backend_stream_transactions.go index a82b365240e..b7a8efb2762 100644 --- a/engine/access/rpc/backend/backend_stream_transactions.go +++ b/engine/access/rpc/backend/backend_stream_transactions.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - + "go.uber.org/atomic" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -39,6 +39,7 @@ type TransactionSubscriptionMetadata struct { blockWithTx *flow.Header txExecuted bool eventEncodingVersion entities.EventEncodingVersion + triggerFirstPending *atomic.Bool } // SubscribeTransactionStatuses subscribes to transaction status changes starting from the transaction reference block ID. @@ -57,11 +58,12 @@ func (b *backendSubscribeTransactions) SubscribeTransactionStatuses( TransactionResult: &access.TransactionResult{ TransactionID: tx.ID(), BlockID: flow.ZeroID, - Status: flow.TransactionStatusUnknown, + Status: flow.TransactionStatusPending, }, txReferenceBlockID: tx.ReferenceBlockID, blockWithTx: nil, eventEncodingVersion: requiredEventEncodingVersion, + triggerFirstPending: atomic.NewBool(true), } return b.subscriptionHandler.Subscribe(ctx, nextHeight, b.getTransactionStatusResponse(&txInfo)) @@ -76,6 +78,13 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *Tran return nil, err } + // The status of the first pending transaction should be returned immediately, as the transaction has already been sent. + // This should occur only once for each subscription. + if txInfo.triggerFirstPending.Load() { + txInfo.triggerFirstPending.Toggle() + return b.generateResultsWithMissingStatuses(txInfo, flow.TransactionStatusUnknown) + } + // If the transaction status already reported the final status, return with no data available if txInfo.Status == flow.TransactionStatusSealed || txInfo.Status == flow.TransactionStatusExpired { return nil, fmt.Errorf("transaction final status %s was already reported: %w", txInfo.Status.String(), subscription.ErrEndOfData) @@ -120,19 +129,8 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *Tran } // If block with transaction was not found, get transaction status to check if it different from last status - if txInfo.blockWithTx == nil { - txInfo.Status, err = b.txLocalDataProvider.DeriveUnknownTransactionStatus(txInfo.txReferenceBlockID) - } else if txInfo.Status == prevTxStatus { - // When a block with the transaction is available, it is possible to receive a new transaction status while - // searching for the transaction result. Otherwise, it remains unchanged. So, if the old and new transaction - // statuses are the same, the current transaction status should be retrieved. - txInfo.Status, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.blockWithTx.Height, txInfo.txExecuted) - } - if err != nil { - if !errors.Is(err, state.ErrUnknownSnapshotReference) { - irrecoverable.Throw(ctx, err) - } - return nil, rpc.ConvertStorageError(err) + if txInfo.Status, err = b.getTransactionStatus(ctx, txInfo, prevTxStatus); err != nil { + return nil, err } // If the old and new transaction statuses are still the same, the status change should not be reported, so @@ -145,6 +143,29 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *Tran } } +func (b *backendSubscribeTransactions) getTransactionStatus(ctx context.Context, txInfo *TransactionSubscriptionMetadata, prevTxStatus flow.TransactionStatus) (flow.TransactionStatus, error) { + txStatus := prevTxStatus + var err error + + if txInfo.blockWithTx == nil { + txStatus, err = b.txLocalDataProvider.DeriveUnknownTransactionStatus(txInfo.txReferenceBlockID) + } else if txInfo.Status == prevTxStatus { + // When a block with the transaction is available, it is possible to receive a new transaction status while + // searching for the transaction result. Otherwise, it remains unchanged. So, if the old and new transaction + // statuses are the same, the current transaction status should be retrieved. + txStatus, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.blockWithTx.Height, txInfo.txExecuted) + } + + if err != nil { + if !errors.Is(err, state.ErrUnknownSnapshotReference) { + irrecoverable.Throw(ctx, err) + } + return flow.TransactionStatusUnknown, rpc.ConvertStorageError(err) + } + + return txStatus, nil +} + // generateResultsWithMissingStatuses checks if the current result differs from the previous result by more than one step. // If yes, it generates results for the missing transaction statuses. This is done because the subscription should send // responses for each of the statuses in the transaction lifecycle, and the message should be sent in the order of transaction statuses. diff --git a/engine/access/rpc/backend/backend_stream_transactions_test.go b/engine/access/rpc/backend/backend_stream_transactions_test.go index 24cdf601f17..1d6faaa0526 100644 --- a/engine/access/rpc/backend/backend_stream_transactions_test.go +++ b/engine/access/rpc/backend/backend_stream_transactions_test.go @@ -310,7 +310,7 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() { result := txResults[0] assert.Equal(s.T(), txId, result.TransactionID) assert.Equal(s.T(), expectedTxStatus, result.Status) - }, time.Second, fmt.Sprintf("timed out waiting for transaction info:\n\t- txID: %x\n\t- blockID: %x", txId, s.finalizedBlock.ID())) + }, 1000*time.Second, fmt.Sprintf("timed out waiting for transaction info:\n\t- txID: %x\n\t- blockID: %x", txId, s.finalizedBlock.ID())) } // 1. Subscribe to transaction status and receive the first message with pending status From 2ac2742a76ad8929421d1819138a1394d55d7c17 Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Tue, 19 Nov 2024 13:56:21 +0200 Subject: [PATCH 02/26] fixed tests --- engine/access/rpc/backend/backend_stream_transactions.go | 4 ++-- .../access/rpc/backend/backend_stream_transactions_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/engine/access/rpc/backend/backend_stream_transactions.go b/engine/access/rpc/backend/backend_stream_transactions.go index b7a8efb2762..3bd455dd66d 100644 --- a/engine/access/rpc/backend/backend_stream_transactions.go +++ b/engine/access/rpc/backend/backend_stream_transactions.go @@ -144,12 +144,12 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *Tran } func (b *backendSubscribeTransactions) getTransactionStatus(ctx context.Context, txInfo *TransactionSubscriptionMetadata, prevTxStatus flow.TransactionStatus) (flow.TransactionStatus, error) { - txStatus := prevTxStatus + txStatus := txInfo.Status var err error if txInfo.blockWithTx == nil { txStatus, err = b.txLocalDataProvider.DeriveUnknownTransactionStatus(txInfo.txReferenceBlockID) - } else if txInfo.Status == prevTxStatus { + } else if txStatus == prevTxStatus { // When a block with the transaction is available, it is possible to receive a new transaction status while // searching for the transaction result. Otherwise, it remains unchanged. So, if the old and new transaction // statuses are the same, the current transaction status should be retrieved. diff --git a/engine/access/rpc/backend/backend_stream_transactions_test.go b/engine/access/rpc/backend/backend_stream_transactions_test.go index 1d6faaa0526..6d6f98ae1d3 100644 --- a/engine/access/rpc/backend/backend_stream_transactions_test.go +++ b/engine/access/rpc/backend/backend_stream_transactions_test.go @@ -148,7 +148,7 @@ func (s *TransactionStatusSuite) SetupTest() { require.NoError(s.T(), err) s.blocks.On("ByHeight", mock.AnythingOfType("uint64")).Return(mocks.StorageMapGetter(s.blockMap)) - s.state.On("Final").Return(s.finalSnapshot, nil) + s.state.On("Final").Return(s.finalSnapshot, nil).Maybe() s.state.On("AtBlockID", mock.AnythingOfType("flow.Identifier")).Return(func(blockID flow.Identifier) protocolint.Snapshot { s.tempSnapshot.On("Head").Unset() s.tempSnapshot.On("Head").Return(func() *flow.Header { @@ -162,12 +162,12 @@ func (s *TransactionStatusSuite) SetupTest() { }, nil) return s.tempSnapshot - }, nil) + }, nil).Maybe() s.finalSnapshot.On("Head").Return(func() *flow.Header { finalizedHeader := s.finalizedBlock.Header return finalizedHeader - }, nil) + }, nil).Maybe() s.blockTracker.On("GetStartHeightFromBlockID", mock.Anything).Return(func(_ flow.Identifier) (uint64, error) { finalizedHeader := s.finalizedBlock.Header From 8eee89a1631a0968684c65f9a955098cba3392b9 Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Tue, 19 Nov 2024 14:05:37 +0200 Subject: [PATCH 03/26] Added documentation --- .../rpc/backend/backend_stream_transactions.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/engine/access/rpc/backend/backend_stream_transactions.go b/engine/access/rpc/backend/backend_stream_transactions.go index 3bd455dd66d..36163ea30a1 100644 --- a/engine/access/rpc/backend/backend_stream_transactions.go +++ b/engine/access/rpc/backend/backend_stream_transactions.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "go.uber.org/atomic" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -39,7 +40,7 @@ type TransactionSubscriptionMetadata struct { blockWithTx *flow.Header txExecuted bool eventEncodingVersion entities.EventEncodingVersion - triggerFirstPending *atomic.Bool + shouldTriggerPending *atomic.Bool } // SubscribeTransactionStatuses subscribes to transaction status changes starting from the transaction reference block ID. @@ -63,7 +64,7 @@ func (b *backendSubscribeTransactions) SubscribeTransactionStatuses( txReferenceBlockID: tx.ReferenceBlockID, blockWithTx: nil, eventEncodingVersion: requiredEventEncodingVersion, - triggerFirstPending: atomic.NewBool(true), + shouldTriggerPending: atomic.NewBool(true), } return b.subscriptionHandler.Subscribe(ctx, nextHeight, b.getTransactionStatusResponse(&txInfo)) @@ -80,8 +81,8 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *Tran // The status of the first pending transaction should be returned immediately, as the transaction has already been sent. // This should occur only once for each subscription. - if txInfo.triggerFirstPending.Load() { - txInfo.triggerFirstPending.Toggle() + if txInfo.shouldTriggerPending.Load() { + txInfo.shouldTriggerPending.Toggle() return b.generateResultsWithMissingStatuses(txInfo, flow.TransactionStatusUnknown) } @@ -143,6 +144,11 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *Tran } } +// getTransactionStatus determines the current status of a transaction based on its metadata +// and previous status. It derives the transaction status by analyzing the transaction's +// execution block, if available, or its reference block. +// +// No errors expected during normal operations. func (b *backendSubscribeTransactions) getTransactionStatus(ctx context.Context, txInfo *TransactionSubscriptionMetadata, prevTxStatus flow.TransactionStatus) (flow.TransactionStatus, error) { txStatus := txInfo.Status var err error From 6766a380daa0ea81719f20a7a5a5ed16f05e7abf Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Tue, 19 Nov 2024 14:06:39 +0200 Subject: [PATCH 04/26] revert unnecessary changes --- engine/access/rpc/backend/backend_stream_transactions_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/access/rpc/backend/backend_stream_transactions_test.go b/engine/access/rpc/backend/backend_stream_transactions_test.go index 6d6f98ae1d3..4ae2af7161a 100644 --- a/engine/access/rpc/backend/backend_stream_transactions_test.go +++ b/engine/access/rpc/backend/backend_stream_transactions_test.go @@ -310,7 +310,7 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() { result := txResults[0] assert.Equal(s.T(), txId, result.TransactionID) assert.Equal(s.T(), expectedTxStatus, result.Status) - }, 1000*time.Second, fmt.Sprintf("timed out waiting for transaction info:\n\t- txID: %x\n\t- blockID: %x", txId, s.finalizedBlock.ID())) + }, time.Second, fmt.Sprintf("timed out waiting for transaction info:\n\t- txID: %x\n\t- blockID: %x", txId, s.finalizedBlock.ID())) } // 1. Subscribe to transaction status and receive the first message with pending status From 8ced75a2d44fee9cd6edc813c4447b509c949ef5 Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Mon, 9 Dec 2024 16:46:00 +0200 Subject: [PATCH 05/26] Implemented subsscribe tx statuses. Make refactoring. --- access/api.go | 7 +- access/handler.go | 7 +- access/mock/api.go | 30 ++++++-- engine/access/rpc/backend/backend.go | 1 + .../backend/backend_stream_transactions.go | 75 +++++++++++++++---- .../backend_stream_transactions_test.go | 13 +++- 6 files changed, 103 insertions(+), 30 deletions(-) diff --git a/access/api.go b/access/api.go index adeb7284c10..76126c38828 100644 --- a/access/api.go +++ b/access/api.go @@ -206,7 +206,12 @@ type API interface { // SubscribeTransactionStatuses streams transaction statuses starting from the reference block saved in the // transaction itself until the block containing the transaction becomes sealed or expired. When the transaction // status becomes TransactionStatusSealed or TransactionStatusExpired, the subscription will automatically shut down. - SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription + SubscribeTransactionStatuses(ctx context.Context, txID flow.Identifier, blockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription + + // SendAndSubscribeTransactionStatuses streams transaction statuses starting from the reference block saved in the + // transaction itself until the block containing the transaction becomes sealed or expired. When the transaction + // status becomes TransactionStatusSealed or TransactionStatusExpired, the subscription will automatically shut down. + SendAndSubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription } // TODO: Combine this with flow.TransactionResult? diff --git a/access/handler.go b/access/handler.go index b974e7034fc..bcf401a2884 100644 --- a/access/handler.go +++ b/access/handler.go @@ -1425,12 +1425,7 @@ func (h *Handler) SendAndSubscribeTransactionStatuses( return status.Error(codes.InvalidArgument, err.Error()) } - err = h.api.SendTransaction(ctx, &tx) - if err != nil { - return err - } - - sub := h.api.SubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion()) + sub := h.api.SendAndSubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion()) messageIndex := counters.NewMonotonousCounter(0) return subscription.HandleRPCSubscription(sub, func(txResults []*TransactionResult) error { diff --git a/access/mock/api.go b/access/mock/api.go index eaaf6c428f2..274cc688a5a 100644 --- a/access/mock/api.go +++ b/access/mock/api.go @@ -1145,6 +1145,26 @@ func (_m *API) Ping(ctx context.Context) error { return r0 } +// SendAndSubscribeTransactionStatuses provides a mock function with given fields: ctx, tx, requiredEventEncodingVersion +func (_m *API) SendAndSubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription { + ret := _m.Called(ctx, tx, requiredEventEncodingVersion) + + if len(ret) == 0 { + panic("no return value specified for SendAndSubscribeTransactionStatuses") + } + + var r0 subscription.Subscription + if rf, ok := ret.Get(0).(func(context.Context, *flow.TransactionBody, entities.EventEncodingVersion) subscription.Subscription); ok { + r0 = rf(ctx, tx, requiredEventEncodingVersion) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(subscription.Subscription) + } + } + + return r0 +} + // SendTransaction provides a mock function with given fields: ctx, tx func (_m *API) SendTransaction(ctx context.Context, tx *flow.TransactionBody) error { ret := _m.Called(ctx, tx) @@ -1343,17 +1363,17 @@ func (_m *API) SubscribeBlocksFromStartHeight(ctx context.Context, startHeight u return r0 } -// SubscribeTransactionStatuses provides a mock function with given fields: ctx, tx, requiredEventEncodingVersion -func (_m *API) SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription { - ret := _m.Called(ctx, tx, requiredEventEncodingVersion) +// SubscribeTransactionStatuses provides a mock function with given fields: ctx, txID, blockID, requiredEventEncodingVersion +func (_m *API) SubscribeTransactionStatuses(ctx context.Context, txID flow.Identifier, blockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription { + ret := _m.Called(ctx, txID, blockID, requiredEventEncodingVersion) if len(ret) == 0 { panic("no return value specified for SubscribeTransactionStatuses") } var r0 subscription.Subscription - if rf, ok := ret.Get(0).(func(context.Context, *flow.TransactionBody, entities.EventEncodingVersion) subscription.Subscription); ok { - r0 = rf(ctx, tx, requiredEventEncodingVersion) + if rf, ok := ret.Get(0).(func(context.Context, flow.Identifier, flow.Identifier, entities.EventEncodingVersion) subscription.Subscription); ok { + r0 = rf(ctx, txID, blockID, requiredEventEncodingVersion) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(subscription.Subscription) diff --git a/engine/access/rpc/backend/backend.go b/engine/access/rpc/backend/backend.go index d4666af9529..5ed2232b87b 100644 --- a/engine/access/rpc/backend/backend.go +++ b/engine/access/rpc/backend/backend.go @@ -260,6 +260,7 @@ func New(params Params) (*Backend, error) { executionResults: params.ExecutionResults, subscriptionHandler: params.SubscriptionHandler, blockTracker: params.BlockTracker, + sendTransaction: b.SendTransaction, } retry.SetBackend(b) diff --git a/engine/access/rpc/backend/backend_stream_transactions.go b/engine/access/rpc/backend/backend_stream_transactions.go index 36163ea30a1..a1cebfba7c9 100644 --- a/engine/access/rpc/backend/backend_stream_transactions.go +++ b/engine/access/rpc/backend/backend_stream_transactions.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" - "go.uber.org/atomic" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -22,6 +21,8 @@ import ( "github.com/onflow/flow/protobuf/go/flow/entities" ) +type sendTransaction func(ctx context.Context, tx *flow.TransactionBody) error + // backendSubscribeTransactions handles transaction subscriptions. type backendSubscribeTransactions struct { txLocalDataProvider *TransactionsLocalDataProvider @@ -31,31 +32,37 @@ type backendSubscribeTransactions struct { subscriptionHandler *subscription.SubscriptionHandler blockTracker subscription.BlockTracker + sendTransaction sendTransaction } -// TransactionSubscriptionMetadata holds data representing the status state for each transaction subscription. -type TransactionSubscriptionMetadata struct { +// transactionSubscriptionMetadata holds data representing the status state for each transaction subscription. +type transactionSubscriptionMetadata struct { *access.TransactionResult txReferenceBlockID flow.Identifier blockWithTx *flow.Header txExecuted bool eventEncodingVersion entities.EventEncodingVersion - shouldTriggerPending *atomic.Bool + shouldTriggerPending bool } -// SubscribeTransactionStatuses subscribes to transaction status changes starting from the transaction reference block ID. -// If invalid tx parameters will be supplied SubscribeTransactionStatuses will return a failed subscription. -func (b *backendSubscribeTransactions) SubscribeTransactionStatuses( +// SendAndSubscribeTransactionStatuses subscribes to transaction status changes starting from the transaction reference block ID. +// If invalid tx parameters will be supplied SendAndSubscribeTransactionStatuses will return a failed subscription. +func (b *backendSubscribeTransactions) SendAndSubscribeTransactionStatuses( ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion, ) subscription.Subscription { + err := b.sendTransaction(ctx, tx) + if err != nil { + return subscription.NewFailedSubscription(err, "transaction sending failed") + } + nextHeight, err := b.blockTracker.GetStartHeightFromBlockID(tx.ReferenceBlockID) if err != nil { return subscription.NewFailedSubscription(err, "could not get start height") } - txInfo := TransactionSubscriptionMetadata{ + txInfo := transactionSubscriptionMetadata{ TransactionResult: &access.TransactionResult{ TransactionID: tx.ID(), BlockID: flow.ZeroID, @@ -64,7 +71,43 @@ func (b *backendSubscribeTransactions) SubscribeTransactionStatuses( txReferenceBlockID: tx.ReferenceBlockID, blockWithTx: nil, eventEncodingVersion: requiredEventEncodingVersion, - shouldTriggerPending: atomic.NewBool(true), + shouldTriggerPending: true, + } + + return b.subscriptionHandler.Subscribe(ctx, nextHeight, b.getTransactionStatusResponse(&txInfo)) +} + +// SubscribeTransactionStatuses subscribes to transaction status changes starting from the transaction reference block ID. +// If invalid tx parameters will be supplied SubscribeTransactionStatuses will return a failed subscription. +func (b *backendSubscribeTransactions) SubscribeTransactionStatuses( + ctx context.Context, + txID flow.Identifier, + blockID flow.Identifier, + requiredEventEncodingVersion entities.EventEncodingVersion, +) subscription.Subscription { + if blockID == flow.ZeroID { + header, err := b.txLocalDataProvider.state.Sealed().Head() + if err != nil { + return subscription.NewFailedSubscription(err, "could not get latest block") + } + blockID = header.ID() + } + + nextHeight, err := b.blockTracker.GetStartHeightFromBlockID(blockID) + if err != nil { + return subscription.NewFailedSubscription(err, "could not get start height") + } + + txInfo := transactionSubscriptionMetadata{ + TransactionResult: &access.TransactionResult{ + TransactionID: txID, + BlockID: flow.ZeroID, + Status: flow.TransactionStatusUnknown, + }, + txReferenceBlockID: blockID, + blockWithTx: nil, + eventEncodingVersion: requiredEventEncodingVersion, + shouldTriggerPending: false, } return b.subscriptionHandler.Subscribe(ctx, nextHeight, b.getTransactionStatusResponse(&txInfo)) @@ -72,7 +115,7 @@ func (b *backendSubscribeTransactions) SubscribeTransactionStatuses( // getTransactionStatusResponse returns a callback function that produces transaction status // subscription responses based on new blocks. -func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *TransactionSubscriptionMetadata) func(context.Context, uint64) (interface{}, error) { +func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *transactionSubscriptionMetadata) func(context.Context, uint64) (interface{}, error) { return func(ctx context.Context, height uint64) (interface{}, error) { err := b.checkBlockReady(height) if err != nil { @@ -81,8 +124,8 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *Tran // The status of the first pending transaction should be returned immediately, as the transaction has already been sent. // This should occur only once for each subscription. - if txInfo.shouldTriggerPending.Load() { - txInfo.shouldTriggerPending.Toggle() + if txInfo.shouldTriggerPending { + txInfo.shouldTriggerPending = false return b.generateResultsWithMissingStatuses(txInfo, flow.TransactionStatusUnknown) } @@ -149,7 +192,7 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *Tran // execution block, if available, or its reference block. // // No errors expected during normal operations. -func (b *backendSubscribeTransactions) getTransactionStatus(ctx context.Context, txInfo *TransactionSubscriptionMetadata, prevTxStatus flow.TransactionStatus) (flow.TransactionStatus, error) { +func (b *backendSubscribeTransactions) getTransactionStatus(ctx context.Context, txInfo *transactionSubscriptionMetadata, prevTxStatus flow.TransactionStatus) (flow.TransactionStatus, error) { txStatus := txInfo.Status var err error @@ -180,7 +223,7 @@ func (b *backendSubscribeTransactions) getTransactionStatus(ctx context.Context, // 2. pending(1) -> expired(5) // No errors expected during normal operations. func (b *backendSubscribeTransactions) generateResultsWithMissingStatuses( - txInfo *TransactionSubscriptionMetadata, + txInfo *transactionSubscriptionMetadata, prevTxStatus flow.TransactionStatus, ) ([]*access.TransactionResult, error) { // If the previous status is pending and the new status is expired, which is the last status, return its result. @@ -255,7 +298,7 @@ func (b *backendSubscribeTransactions) checkBlockReady(height uint64) error { // - codes.Internal when other errors occur during block or collection lookup func (b *backendSubscribeTransactions) searchForTransactionBlockInfo( height uint64, - txInfo *TransactionSubscriptionMetadata, + txInfo *transactionSubscriptionMetadata, ) (*flow.Header, flow.Identifier, uint64, flow.Identifier, error) { block, err := b.txLocalDataProvider.blocks.ByHeight(height) if err != nil { @@ -279,7 +322,7 @@ func (b *backendSubscribeTransactions) searchForTransactionBlockInfo( // - codes.Internal if an internal error occurs while retrieving execution result. func (b *backendSubscribeTransactions) searchForTransactionResult( ctx context.Context, - txInfo *TransactionSubscriptionMetadata, + txInfo *transactionSubscriptionMetadata, ) (*access.TransactionResult, error) { _, err := b.executionResults.ByBlockID(txInfo.BlockID) if err != nil { diff --git a/engine/access/rpc/backend/backend_stream_transactions_test.go b/engine/access/rpc/backend/backend_stream_transactions_test.go index 4ae2af7161a..c4f7056f9b2 100644 --- a/engine/access/rpc/backend/backend_stream_transactions_test.go +++ b/engine/access/rpc/backend/backend_stream_transactions_test.go @@ -34,6 +34,7 @@ import ( storagemock "github.com/onflow/flow-go/storage/mock" "github.com/onflow/flow-go/utils/unittest" "github.com/onflow/flow-go/utils/unittest/mocks" + accessproto "github.com/onflow/flow/protobuf/go/flow/access" "github.com/onflow/flow/protobuf/go/flow/entities" ) @@ -121,6 +122,14 @@ func (s *TransactionStatusSuite) SetupTest() { s.blockTracker = subscriptionmock.NewBlockTracker(s.T()) s.resultsMap = map[flow.Identifier]*flow.ExecutionResult{} + s.colClient.On( + "SendTransaction", + mock.Anything, + mock.Anything, + ).Return(&accessproto.SendTransactionResponse{}, nil).Maybe() + + s.transactions.On("Store", mock.Anything).Return(nil).Maybe() + // generate blockCount consecutive blocks with associated seal, result and execution data s.rootBlock = unittest.BlockFixture() rootResult := unittest.ExecutionResultFixture(unittest.WithBlock(&s.rootBlock)) @@ -314,7 +323,7 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() { } // 1. Subscribe to transaction status and receive the first message with pending status - sub := s.backend.SubscribeTransactionStatuses(ctx, &transaction.TransactionBody, entities.EventEncodingVersion_CCF_V0) + sub := s.backend.SendAndSubscribeTransactionStatuses(ctx, &transaction.TransactionBody, entities.EventEncodingVersion_CCF_V0) checkNewSubscriptionMessage(sub, flow.TransactionStatusPending) // 2. Make transaction reference block sealed, and add a new finalized block that includes the transaction @@ -380,7 +389,7 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusExpired() { } // Subscribe to transaction status and receive the first message with pending status - sub := s.backend.SubscribeTransactionStatuses(ctx, &transaction.TransactionBody, entities.EventEncodingVersion_CCF_V0) + sub := s.backend.SendAndSubscribeTransactionStatuses(ctx, &transaction.TransactionBody, entities.EventEncodingVersion_CCF_V0) checkNewSubscriptionMessage(sub, flow.TransactionStatusPending) // Generate 600 blocks without transaction included and check, that transaction still pending From ff1a95f4d02fc4aa91d54f61bc7f3cde7ba2bb6c Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Mon, 9 Dec 2024 17:01:06 +0200 Subject: [PATCH 06/26] linted --- cmd/util/cmd/run-script/cmd.go | 11 ++++++++++- .../rpc/backend/backend_stream_transactions_test.go | 3 ++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/cmd/util/cmd/run-script/cmd.go b/cmd/util/cmd/run-script/cmd.go index 171f97e76b7..d66aa341d93 100644 --- a/cmd/util/cmd/run-script/cmd.go +++ b/cmd/util/cmd/run-script/cmd.go @@ -532,7 +532,16 @@ func (*api) SubscribeBlockDigestsFromLatest( return nil } -func (*api) SubscribeTransactionStatuses( +func (a *api) SubscribeTransactionStatuses( + _ context.Context, + _ flow.Identifier, + _ flow.Identifier, + _ entities.EventEncodingVersion, +) subscription.Subscription { + return nil +} + +func (a *api) SendAndSubscribeTransactionStatuses( _ context.Context, _ *flow.TransactionBody, _ entities.EventEncodingVersion, diff --git a/engine/access/rpc/backend/backend_stream_transactions_test.go b/engine/access/rpc/backend/backend_stream_transactions_test.go index c4f7056f9b2..47b494e87b3 100644 --- a/engine/access/rpc/backend/backend_stream_transactions_test.go +++ b/engine/access/rpc/backend/backend_stream_transactions_test.go @@ -14,6 +14,8 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + accessproto "github.com/onflow/flow/protobuf/go/flow/access" + accessapi "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/engine/access/index" @@ -34,7 +36,6 @@ import ( storagemock "github.com/onflow/flow-go/storage/mock" "github.com/onflow/flow-go/utils/unittest" "github.com/onflow/flow-go/utils/unittest/mocks" - accessproto "github.com/onflow/flow/protobuf/go/flow/access" "github.com/onflow/flow/protobuf/go/flow/entities" ) From 815419b0a1f570202e9def6218d4877e60975888 Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Tue, 10 Dec 2024 16:57:18 +0200 Subject: [PATCH 07/26] Clean up code. Added comments --- access/api.go | 28 ++++-- .../backend/backend_stream_transactions.go | 92 +++++++++++-------- 2 files changed, 76 insertions(+), 44 deletions(-) diff --git a/access/api.go b/access/api.go index 76126c38828..63db940414a 100644 --- a/access/api.go +++ b/access/api.go @@ -203,14 +203,28 @@ type API interface { // // If invalid parameters will be supplied SubscribeBlockDigestsFromLatest will return a failed subscription. SubscribeBlockDigestsFromLatest(ctx context.Context, blockStatus flow.BlockStatus) subscription.Subscription - // SubscribeTransactionStatuses streams transaction statuses starting from the reference block saved in the - // transaction itself until the block containing the transaction becomes sealed or expired. When the transaction - // status becomes TransactionStatusSealed or TransactionStatusExpired, the subscription will automatically shut down. + // SubscribeTransactionStatuses subscribes to transaction status updates for a given transaction ID. + // Monitoring begins from the specified block ID or the latest block if no block ID is provided. + // The subscription streams status updates until the transaction reaches a final state (TransactionStatusSealed or TransactionStatusExpired). + // When the transaction reaches one of these final statuses, the subscription will automatically terminate. + // + // Parameters: + // - ctx: The context to manage the subscription's lifecycle, including cancellation. + // - txID: The unique identifier of the transaction to monitor. + // - blockID: The block ID from which to start monitoring. If set to flow.ZeroID, monitoring starts from the latest block. + // - requiredEventEncodingVersion: The version of event encoding required for the subscription. SubscribeTransactionStatuses(ctx context.Context, txID flow.Identifier, blockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription - - // SendAndSubscribeTransactionStatuses streams transaction statuses starting from the reference block saved in the - // transaction itself until the block containing the transaction becomes sealed or expired. When the transaction - // status becomes TransactionStatusSealed or TransactionStatusExpired, the subscription will automatically shut down. + // SendAndSubscribeTransactionStatuses sends a transaction to the execution node and subscribes to its status updates. + // Monitoring begins from the reference block saved in the transaction itself and streams status updates until the transaction + // reaches a final state (TransactionStatusSealed or TransactionStatusExpired). Once a final status is reached, the subscription + // automatically terminates. + // + // Parameters: + // - ctx: The context to manage the transaction sending and subscription lifecycle, including cancellation. + // - tx: The transaction body to be sent and monitored. + // - requiredEventEncodingVersion: The version of event encoding required for the subscription. + // + // If the transaction cannot be sent, the subscription will fail and return a failed subscription. SendAndSubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription } diff --git a/engine/access/rpc/backend/backend_stream_transactions.go b/engine/access/rpc/backend/backend_stream_transactions.go index a1cebfba7c9..718d41ec5de 100644 --- a/engine/access/rpc/backend/backend_stream_transactions.go +++ b/engine/access/rpc/backend/backend_stream_transactions.go @@ -21,6 +21,7 @@ import ( "github.com/onflow/flow/protobuf/go/flow/entities" ) +// sendTransaction defines a function type for sending a transaction. type sendTransaction func(ctx context.Context, tx *flow.TransactionBody) error // backendSubscribeTransactions handles transaction subscriptions. @@ -45,69 +46,79 @@ type transactionSubscriptionMetadata struct { shouldTriggerPending bool } -// SendAndSubscribeTransactionStatuses subscribes to transaction status changes starting from the transaction reference block ID. -// If invalid tx parameters will be supplied SendAndSubscribeTransactionStatuses will return a failed subscription. +// SendAndSubscribeTransactionStatuses sends a transaction and subscribes to its status updates. +// It starts monitoring the status from the transaction's reference block ID. +// If the transaction cannot be sent or an error occurs during subscription creation, a failed subscription is returned. func (b *backendSubscribeTransactions) SendAndSubscribeTransactionStatuses( ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion, ) subscription.Subscription { - err := b.sendTransaction(ctx, tx) - if err != nil { - return subscription.NewFailedSubscription(err, "transaction sending failed") - } - - nextHeight, err := b.blockTracker.GetStartHeightFromBlockID(tx.ReferenceBlockID) - if err != nil { - return subscription.NewFailedSubscription(err, "could not get start height") + if err := b.sendTransaction(ctx, tx); err != nil { + b.log.Error().Err(err).Str("tx_id", tx.ID().String()).Msg("failed to send transaction") + return subscription.NewFailedSubscription(err, "failed to send transaction") } - txInfo := transactionSubscriptionMetadata{ - TransactionResult: &access.TransactionResult{ - TransactionID: tx.ID(), - BlockID: flow.ZeroID, - Status: flow.TransactionStatusPending, - }, - txReferenceBlockID: tx.ReferenceBlockID, - blockWithTx: nil, - eventEncodingVersion: requiredEventEncodingVersion, - shouldTriggerPending: true, - } - - return b.subscriptionHandler.Subscribe(ctx, nextHeight, b.getTransactionStatusResponse(&txInfo)) + return b.createSubscription(ctx, tx.ID(), tx.ReferenceBlockID, tx.ReferenceBlockID, requiredEventEncodingVersion, true) } -// SubscribeTransactionStatuses subscribes to transaction status changes starting from the transaction reference block ID. -// If invalid tx parameters will be supplied SubscribeTransactionStatuses will return a failed subscription. +// SubscribeTransactionStatuses subscribes to the status updates of a transaction. +// Monitoring starts from the specified block ID or the latest block if no block ID is provided. +// If the block ID cannot be determined or an error occurs during subscription creation, a failed subscription is returned. func (b *backendSubscribeTransactions) SubscribeTransactionStatuses( ctx context.Context, txID flow.Identifier, blockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion, ) subscription.Subscription { + // if no block ID provided, get latest block ID if blockID == flow.ZeroID { header, err := b.txLocalDataProvider.state.Sealed().Head() if err != nil { - return subscription.NewFailedSubscription(err, "could not get latest block") + b.log.Error().Err(err).Msg("failed to retrieve latest block") + return subscription.NewFailedSubscription(err, "failed to retrieve latest block") } blockID = header.ID() } + return b.createSubscription(ctx, txID, blockID, flow.ZeroID, requiredEventEncodingVersion, false) +} + +// createSubscription initializes a subscription for monitoring a transaction's status. +// If the start height cannot be determined, a failed subscription is returned. +func (b *backendSubscribeTransactions) createSubscription( + ctx context.Context, + txID flow.Identifier, + blockID flow.Identifier, + referenceBlockID flow.Identifier, + requiredEventEncodingVersion entities.EventEncodingVersion, + shouldTriggerPending bool, +) subscription.Subscription { + // Get height to start subscription from nextHeight, err := b.blockTracker.GetStartHeightFromBlockID(blockID) if err != nil { - return subscription.NewFailedSubscription(err, "could not get start height") + b.log.Error().Err(err).Str("block_id", blockID.String()).Msg("failed to get start height") + return subscription.NewFailedSubscription(err, "failed to get start height") + } + + // choose initial transaction status + initialStatus := flow.TransactionStatusUnknown + if shouldTriggerPending { + // The status of the first pending transaction should be returned immediately, as the transaction has already been sent. + // This should occur only once for each subscription. + initialStatus = flow.TransactionStatusPending } txInfo := transactionSubscriptionMetadata{ TransactionResult: &access.TransactionResult{ TransactionID: txID, BlockID: flow.ZeroID, - Status: flow.TransactionStatusUnknown, + Status: initialStatus, }, - txReferenceBlockID: blockID, + txReferenceBlockID: referenceBlockID, blockWithTx: nil, eventEncodingVersion: requiredEventEncodingVersion, - shouldTriggerPending: false, + shouldTriggerPending: shouldTriggerPending, } return b.subscriptionHandler.Subscribe(ctx, nextHeight, b.getTransactionStatusResponse(&txInfo)) @@ -122,16 +133,12 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *tran return nil, err } - // The status of the first pending transaction should be returned immediately, as the transaction has already been sent. - // This should occur only once for each subscription. if txInfo.shouldTriggerPending { - txInfo.shouldTriggerPending = false - return b.generateResultsWithMissingStatuses(txInfo, flow.TransactionStatusUnknown) + return b.handlePendingStatus(txInfo) } - // If the transaction status already reported the final status, return with no data available - if txInfo.Status == flow.TransactionStatusSealed || txInfo.Status == flow.TransactionStatusExpired { - return nil, fmt.Errorf("transaction final status %s was already reported: %w", txInfo.Status.String(), subscription.ErrEndOfData) + if b.isTransactionFinalStatus(txInfo) { + return nil, fmt.Errorf("transaction final status %s already reported: %w", txInfo.Status.String(), subscription.ErrEndOfData) } // If on this step transaction block not available, search for it. @@ -187,6 +194,17 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *tran } } +// handlePendingStatus handles the initial pending status for a transaction. +func (b *backendSubscribeTransactions) handlePendingStatus(txInfo *transactionSubscriptionMetadata) (interface{}, error) { + txInfo.shouldTriggerPending = false + return b.generateResultsWithMissingStatuses(txInfo, flow.TransactionStatusUnknown) +} + +// isTransactionFinalStatus checks if a transaction has reached a final state (Sealed or Expired). +func (b *backendSubscribeTransactions) isTransactionFinalStatus(txInfo *transactionSubscriptionMetadata) bool { + return txInfo.Status == flow.TransactionStatusSealed || txInfo.Status == flow.TransactionStatusExpired +} + // getTransactionStatus determines the current status of a transaction based on its metadata // and previous status. It derives the transaction status by analyzing the transaction's // execution block, if available, or its reference block. From 2bbd39916b8c346bd492e9717e59e550b659ec9e Mon Sep 17 00:00:00 2001 From: Andrii Date: Wed, 11 Dec 2024 17:11:43 +0200 Subject: [PATCH 08/26] Added skeleton for tx account data provider --- .../transaction_statuses_provider.go | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 engine/access/rest/websockets/data_providers/transaction_statuses_provider.go diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go new file mode 100644 index 00000000000..e17f61b116b --- /dev/null +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go @@ -0,0 +1,93 @@ +package data_providers + +import ( + "context" + "fmt" + "github.com/onflow/flow/protobuf/go/flow/entities" + + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/access" + "github.com/onflow/flow-go/engine/access/rest/websockets/models" + "github.com/onflow/flow-go/engine/access/state_stream" + "github.com/onflow/flow-go/engine/access/state_stream/backend" + "github.com/onflow/flow-go/engine/access/subscription" + "github.com/onflow/flow-go/model/flow" +) + +type TransactionStatusesArguments struct { + StartBlockID flow.Identifier // ID of the block to start subscription from + txID flow.Identifier // ID of the transaction to monitor. +} + +type TransactionStatusesDataProvider struct { + *baseDataProvider + + logger zerolog.Logger + stateStreamApi state_stream.API + api access.API +} + +var _ DataProvider = (*TransactionStatusesDataProvider)(nil) + +func NewTransactionStatusesDataProvider( + ctx context.Context, + logger zerolog.Logger, + stateStreamApi state_stream.API, + topic string, + arguments models.Arguments, + send chan<- interface{}, + chain flow.Chain, +) (*TransactionStatusesDataProvider, error) { + p := &TransactionStatusesDataProvider{ + logger: logger.With().Str("component", "transaction-statuses-data-provider").Logger(), + stateStreamApi: stateStreamApi, + } + + // Initialize arguments passed to the provider. + txStatusesArgs, err := parseTransactionStatusesArguments(arguments, chain) + if err != nil { + return nil, fmt.Errorf("invalid arguments for tx statuses data provider: %w", err) + } + + subCtx, cancel := context.WithCancel(ctx) + + p.baseDataProvider = newBaseDataProvider( + topic, + cancel, + send, + p.createSubscription(subCtx, txStatusesArgs), // Set up a subscription to tx statuses based on arguments. + ) + + return p, nil +} + +// Run starts processing the subscription for events and handles responses. +// +// No errors are expected during normal operations. +func (p *TransactionStatusesDataProvider) Run() error { + return subscription.HandleSubscription(p.subscription, p.handleResponse()) +} + +// createSubscription creates a new subscription using the specified input arguments. +func (p *TransactionStatusesDataProvider) createSubscription( + ctx context.Context, + args TransactionStatusesArguments, +) subscription.Subscription { + return p.api.SubscribeTransactionStatuses(ctx, args.txID, args.StartBlockID, entities.EventEncodingVersion_JSON_CDC_V0) +} + +// handleResponse processes an account statuses and sends the formatted response. +// +// No errors are expected during normal operations. +func (p *TransactionStatusesDataProvider) handleResponse() func(txStatusesResponse *backend.) { + +} + +// parseAccountStatusesArguments validates and initializes the account statuses arguments. +func parseTransactionStatusesArguments( + arguments models.Arguments, + chain flow.Chain, +) (TransactionStatusesArguments, error) { + +} From 683f0260513de07a4567e23383aa95c791b81f12 Mon Sep 17 00:00:00 2001 From: Andrii Date: Wed, 11 Dec 2024 18:55:38 +0200 Subject: [PATCH 09/26] Implemented tx statuses provider --- .../account_statuses_provider.go | 1 - .../rest/websockets/data_providers/factory.go | 3 +- .../websockets/data_providers/factory_test.go | 11 +++ .../transaction_statuses_provider.go | 69 +++++++++++++++---- .../websockets/models/tx_statuses_model.go | 11 +++ 5 files changed, 79 insertions(+), 16 deletions(-) create mode 100644 engine/access/rest/websockets/models/tx_statuses_model.go diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider.go b/engine/access/rest/websockets/data_providers/account_statuses_provider.go index 1a3aee203c9..bf3d77ce823 100644 --- a/engine/access/rest/websockets/data_providers/account_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/account_statuses_provider.go @@ -157,7 +157,6 @@ func parseAccountStatusesArguments( args.StartBlockID = startBlockID.Flow() } - // Parse 'start_block_height' if provided // Parse 'start_block_height' if provided if hasStartBlockHeight { result, ok := startBlockHeightIn.(string) diff --git a/engine/access/rest/websockets/data_providers/factory.go b/engine/access/rest/websockets/data_providers/factory.go index 26aade4e090..b8897deb988 100644 --- a/engine/access/rest/websockets/data_providers/factory.go +++ b/engine/access/rest/websockets/data_providers/factory.go @@ -103,8 +103,7 @@ func (s *DataProviderFactoryImpl) NewDataProvider( case AccountStatusesTopic: return NewAccountStatusesDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval) case TransactionStatusesTopic: - // TODO: Implemented handlers for each topic should be added in respective case - return nil, fmt.Errorf(`topic "%s" not implemented yet`, topic) + return NewTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) default: return nil, fmt.Errorf("unsupported topic \"%s\"", topic) } diff --git a/engine/access/rest/websockets/data_providers/factory_test.go b/engine/access/rest/websockets/data_providers/factory_test.go index 9421cef37f6..674fc6998ea 100644 --- a/engine/access/rest/websockets/data_providers/factory_test.go +++ b/engine/access/rest/websockets/data_providers/factory_test.go @@ -132,6 +132,17 @@ func (s *DataProviderFactorySuite) TestSupportedTopics() { s.stateStreamApi.AssertExpectations(s.T()) }, }, + { + name: "transaction statuses topic", + topic: TransactionStatusesTopic, + arguments: models.Arguments{}, + setupSubscription: func() { + s.setupSubscription(s.accessApi.On("SubscribeTransactionStatuses", mock.Anything, mock.Anything, mock.Anything, mock.Anything)) + }, + assertExpectations: func() { + s.stateStreamApi.AssertExpectations(s.T()) + }, + }, } for _, test := range testCases { diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go index e17f61b116b..6a446282d5e 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go @@ -3,16 +3,19 @@ package data_providers import ( "context" "fmt" - "github.com/onflow/flow/protobuf/go/flow/entities" + "strconv" "github.com/rs/zerolog" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/onflow/flow-go/access" + "github.com/onflow/flow-go/engine/access/rest/common/parser" "github.com/onflow/flow-go/engine/access/rest/websockets/models" - "github.com/onflow/flow-go/engine/access/state_stream" - "github.com/onflow/flow-go/engine/access/state_stream/backend" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/counters" + "github.com/onflow/flow/protobuf/go/flow/entities" ) type TransactionStatusesArguments struct { @@ -23,9 +26,8 @@ type TransactionStatusesArguments struct { type TransactionStatusesDataProvider struct { *baseDataProvider - logger zerolog.Logger - stateStreamApi state_stream.API - api access.API + logger zerolog.Logger + api access.API } var _ DataProvider = (*TransactionStatusesDataProvider)(nil) @@ -33,19 +35,18 @@ var _ DataProvider = (*TransactionStatusesDataProvider)(nil) func NewTransactionStatusesDataProvider( ctx context.Context, logger zerolog.Logger, - stateStreamApi state_stream.API, + api access.API, topic string, arguments models.Arguments, send chan<- interface{}, - chain flow.Chain, ) (*TransactionStatusesDataProvider, error) { p := &TransactionStatusesDataProvider{ - logger: logger.With().Str("component", "transaction-statuses-data-provider").Logger(), - stateStreamApi: stateStreamApi, + logger: logger.With().Str("component", "transaction-statuses-data-provider").Logger(), + api: api, } // Initialize arguments passed to the provider. - txStatusesArgs, err := parseTransactionStatusesArguments(arguments, chain) + txStatusesArgs, err := parseTransactionStatusesArguments(arguments) if err != nil { return nil, fmt.Errorf("invalid arguments for tx statuses data provider: %w", err) } @@ -80,14 +81,56 @@ func (p *TransactionStatusesDataProvider) createSubscription( // handleResponse processes an account statuses and sends the formatted response. // // No errors are expected during normal operations. -func (p *TransactionStatusesDataProvider) handleResponse() func(txStatusesResponse *backend.) { +func (p *TransactionStatusesDataProvider) handleResponse() func(txResults []*access.TransactionResult) error { + messageIndex := counters.NewMonotonousCounter(1) + + return func(txResults []*access.TransactionResult) error { + + index := messageIndex.Value() + if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } + + p.send <- models.TransactionStatusesResponse{ + TransactionResults: txResults, + MessageIndex: strconv.FormatUint(index, 10), + } + return nil + } } // parseAccountStatusesArguments validates and initializes the account statuses arguments. func parseTransactionStatusesArguments( arguments models.Arguments, - chain flow.Chain, ) (TransactionStatusesArguments, error) { + var args TransactionStatusesArguments + + if txIDIn, ok := arguments["tx_id"]; ok && txIDIn != "" { + result, ok := txIDIn.(string) + if !ok { + return args, fmt.Errorf("'tx_id' must be a string") + } + var txID parser.ID + err := txID.Parse(result) + if err != nil { + return args, fmt.Errorf("invalid 'tx_id': %w", err) + } + args.txID = txID.Flow() + } + + if startBlockIDIn, ok := arguments["start_block_id"]; ok && startBlockIDIn != "" { + result, ok := startBlockIDIn.(string) + if !ok { + return args, fmt.Errorf("'start_block_id' must be a string") + } + var startBlockID parser.ID + err := startBlockID.Parse(result) + if err != nil { + return args, fmt.Errorf("invalid 'start_block_id': %w", err) + } + args.StartBlockID = startBlockID.Flow() + } + return args, nil } diff --git a/engine/access/rest/websockets/models/tx_statuses_model.go b/engine/access/rest/websockets/models/tx_statuses_model.go new file mode 100644 index 00000000000..55b9d3bdfdd --- /dev/null +++ b/engine/access/rest/websockets/models/tx_statuses_model.go @@ -0,0 +1,11 @@ +package models + +import ( + "github.com/onflow/flow-go/access" +) + +// TransactionStatusesResponse is the response message for 'events' topic. +type TransactionStatusesResponse struct { + TransactionResults []*access.TransactionResult `json:"transaction_results"` + MessageIndex string `json:"message_index"` +} From 3b19cd07fb9aae0a9e7a1e6477263b00da0db778 Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Thu, 12 Dec 2024 16:13:35 +0200 Subject: [PATCH 10/26] Separate subscribe tx statuses to 3 function --- access/api.go | 33 +++++++-- access/mock/api.go | 50 ++++++++++++-- cmd/util/cmd/run-script/cmd.go | 18 ++++- .../backend/backend_stream_transactions.go | 67 +++++++++++++------ .../backend_stream_transactions_test.go | 4 +- 5 files changed, 139 insertions(+), 33 deletions(-) diff --git a/access/api.go b/access/api.go index 63db940414a..acd4d6138b8 100644 --- a/access/api.go +++ b/access/api.go @@ -203,17 +203,38 @@ type API interface { // // If invalid parameters will be supplied SubscribeBlockDigestsFromLatest will return a failed subscription. SubscribeBlockDigestsFromLatest(ctx context.Context, blockStatus flow.BlockStatus) subscription.Subscription - // SubscribeTransactionStatuses subscribes to transaction status updates for a given transaction ID. - // Monitoring begins from the specified block ID or the latest block if no block ID is provided. - // The subscription streams status updates until the transaction reaches a final state (TransactionStatusSealed or TransactionStatusExpired). - // When the transaction reaches one of these final statuses, the subscription will automatically terminate. + // SubscribeTransactionStatusesFromStartBlockID subscribes to transaction status updates for a given transaction ID. + // Monitoring begins from the specified block ID. The subscription streams status updates until the transaction + // reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of + // these final statuses, the subscription will automatically terminate. + // + // Parameters: + // - ctx: The context to manage the subscription's lifecycle, including cancellation. + // - txID: The identifier of the transaction to monitor. + // - startBlockID: The block ID from which to start monitoring. + // - requiredEventEncodingVersion: The version of event encoding required for the subscription. + SubscribeTransactionStatusesFromStartBlockID(ctx context.Context, txID flow.Identifier, startBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription + // SubscribeTransactionStatusesFromStartHeight subscribes to transaction status updates for a given transaction ID. + // Monitoring begins from the specified block height. The subscription streams status updates until the transaction + // reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of + // these final statuses, the subscription will automatically terminate. + // + // Parameters: + // - ctx: The context to manage the subscription's lifecycle, including cancellation. + // - txID: The unique identifier of the transaction to monitor. + // - startHeight: The block height from which to start monitoring. + // - requiredEventEncodingVersion: The version of event encoding required for the subscription. + SubscribeTransactionStatusesFromStartHeight(ctx context.Context, txID flow.Identifier, startHeight uint64, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription + // SubscribeTransactionStatusesFromLatest subscribes to transaction status updates for a given transaction ID. + // Monitoring begins from the latest block. The subscription streams status updates until the transaction + // reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of + // these final statuses, the subscription will automatically terminate. // // Parameters: // - ctx: The context to manage the subscription's lifecycle, including cancellation. // - txID: The unique identifier of the transaction to monitor. - // - blockID: The block ID from which to start monitoring. If set to flow.ZeroID, monitoring starts from the latest block. // - requiredEventEncodingVersion: The version of event encoding required for the subscription. - SubscribeTransactionStatuses(ctx context.Context, txID flow.Identifier, blockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription + SubscribeTransactionStatusesFromLatest(ctx context.Context, txID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription // SendAndSubscribeTransactionStatuses sends a transaction to the execution node and subscribes to its status updates. // Monitoring begins from the reference block saved in the transaction itself and streams status updates until the transaction // reaches a final state (TransactionStatusSealed or TransactionStatusExpired). Once a final status is reached, the subscription diff --git a/access/mock/api.go b/access/mock/api.go index 274cc688a5a..13c35b293d3 100644 --- a/access/mock/api.go +++ b/access/mock/api.go @@ -1363,17 +1363,57 @@ func (_m *API) SubscribeBlocksFromStartHeight(ctx context.Context, startHeight u return r0 } -// SubscribeTransactionStatuses provides a mock function with given fields: ctx, txID, blockID, requiredEventEncodingVersion -func (_m *API) SubscribeTransactionStatuses(ctx context.Context, txID flow.Identifier, blockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription { - ret := _m.Called(ctx, txID, blockID, requiredEventEncodingVersion) +// SubscribeTransactionStatusesFromLatest provides a mock function with given fields: ctx, txID, requiredEventEncodingVersion +func (_m *API) SubscribeTransactionStatusesFromLatest(ctx context.Context, txID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription { + ret := _m.Called(ctx, txID, requiredEventEncodingVersion) if len(ret) == 0 { - panic("no return value specified for SubscribeTransactionStatuses") + panic("no return value specified for SubscribeTransactionStatusesFromLatest") + } + + var r0 subscription.Subscription + if rf, ok := ret.Get(0).(func(context.Context, flow.Identifier, entities.EventEncodingVersion) subscription.Subscription); ok { + r0 = rf(ctx, txID, requiredEventEncodingVersion) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(subscription.Subscription) + } + } + + return r0 +} + +// SubscribeTransactionStatusesFromStartBlockID provides a mock function with given fields: ctx, txID, startBlockID, requiredEventEncodingVersion +func (_m *API) SubscribeTransactionStatusesFromStartBlockID(ctx context.Context, txID flow.Identifier, startBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription { + ret := _m.Called(ctx, txID, startBlockID, requiredEventEncodingVersion) + + if len(ret) == 0 { + panic("no return value specified for SubscribeTransactionStatusesFromStartBlockID") } var r0 subscription.Subscription if rf, ok := ret.Get(0).(func(context.Context, flow.Identifier, flow.Identifier, entities.EventEncodingVersion) subscription.Subscription); ok { - r0 = rf(ctx, txID, blockID, requiredEventEncodingVersion) + r0 = rf(ctx, txID, startBlockID, requiredEventEncodingVersion) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(subscription.Subscription) + } + } + + return r0 +} + +// SubscribeTransactionStatusesFromStartHeight provides a mock function with given fields: ctx, txID, startHeight, requiredEventEncodingVersion +func (_m *API) SubscribeTransactionStatusesFromStartHeight(ctx context.Context, txID flow.Identifier, startHeight uint64, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription { + ret := _m.Called(ctx, txID, startHeight, requiredEventEncodingVersion) + + if len(ret) == 0 { + panic("no return value specified for SubscribeTransactionStatusesFromStartHeight") + } + + var r0 subscription.Subscription + if rf, ok := ret.Get(0).(func(context.Context, flow.Identifier, uint64, entities.EventEncodingVersion) subscription.Subscription); ok { + r0 = rf(ctx, txID, startHeight, requiredEventEncodingVersion) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(subscription.Subscription) diff --git a/cmd/util/cmd/run-script/cmd.go b/cmd/util/cmd/run-script/cmd.go index d66aa341d93..59646d0687a 100644 --- a/cmd/util/cmd/run-script/cmd.go +++ b/cmd/util/cmd/run-script/cmd.go @@ -532,7 +532,7 @@ func (*api) SubscribeBlockDigestsFromLatest( return nil } -func (a *api) SubscribeTransactionStatuses( +func (a *api) SubscribeTransactionStatusesFromStartBlockID( _ context.Context, _ flow.Identifier, _ flow.Identifier, @@ -541,6 +541,22 @@ func (a *api) SubscribeTransactionStatuses( return nil } +func (a *api) SubscribeTransactionStatusesFromStartHeight( + _ context.Context, + _ flow.Identifier, + _ uint64, + _ entities.EventEncodingVersion, +) subscription.Subscription { + return nil +} + +func (a *api) SubscribeTransactionStatusesFromLatest( + _ context.Context, + _ flow.Identifier, + _ entities.EventEncodingVersion, +) subscription.Subscription { + return nil +} func (a *api) SendAndSubscribeTransactionStatuses( _ context.Context, _ *flow.TransactionBody, diff --git a/engine/access/rpc/backend/backend_stream_transactions.go b/engine/access/rpc/backend/backend_stream_transactions.go index 718d41ec5de..ae6678e0a8b 100644 --- a/engine/access/rpc/backend/backend_stream_transactions.go +++ b/engine/access/rpc/backend/backend_stream_transactions.go @@ -59,29 +59,48 @@ func (b *backendSubscribeTransactions) SendAndSubscribeTransactionStatuses( return subscription.NewFailedSubscription(err, "failed to send transaction") } - return b.createSubscription(ctx, tx.ID(), tx.ReferenceBlockID, tx.ReferenceBlockID, requiredEventEncodingVersion, true) + return b.createSubscription(ctx, tx.ID(), tx.ReferenceBlockID, 0, tx.ReferenceBlockID, requiredEventEncodingVersion, true) } -// SubscribeTransactionStatuses subscribes to the status updates of a transaction. -// Monitoring starts from the specified block ID or the latest block if no block ID is provided. +// SubscribeTransactionStatusesFromStartHeight subscribes to the status updates of a transaction. +// Monitoring starts from the specified block height. +// If the block height cannot be determined or an error occurs during subscription creation, a failed subscription is returned. +func (b *backendSubscribeTransactions) SubscribeTransactionStatusesFromStartHeight( + ctx context.Context, + txID flow.Identifier, + startHeight uint64, + requiredEventEncodingVersion entities.EventEncodingVersion, +) subscription.Subscription { + return b.createSubscription(ctx, txID, flow.ZeroID, startHeight, flow.ZeroID, requiredEventEncodingVersion, false) +} + +// SubscribeTransactionStatusesFromStartBlockID subscribes to the status updates of a transaction. +// Monitoring starts from the specified block ID. // If the block ID cannot be determined or an error occurs during subscription creation, a failed subscription is returned. -func (b *backendSubscribeTransactions) SubscribeTransactionStatuses( +func (b *backendSubscribeTransactions) SubscribeTransactionStatusesFromStartBlockID( ctx context.Context, txID flow.Identifier, - blockID flow.Identifier, + startBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion, ) subscription.Subscription { - // if no block ID provided, get latest block ID - if blockID == flow.ZeroID { - header, err := b.txLocalDataProvider.state.Sealed().Head() - if err != nil { - b.log.Error().Err(err).Msg("failed to retrieve latest block") - return subscription.NewFailedSubscription(err, "failed to retrieve latest block") - } - blockID = header.ID() + return b.createSubscription(ctx, txID, startBlockID, 0, flow.ZeroID, requiredEventEncodingVersion, false) +} + +// SubscribeTransactionStatusesFromLatest subscribes to the status updates of a transaction. +// Monitoring starts from the latest block. +// If the block cannot be retrieved or an error occurs during subscription creation, a failed subscription is returned. +func (b *backendSubscribeTransactions) SubscribeTransactionStatusesFromLatest( + ctx context.Context, + txID flow.Identifier, + requiredEventEncodingVersion entities.EventEncodingVersion, +) subscription.Subscription { + header, err := b.txLocalDataProvider.state.Sealed().Head() + if err != nil { + b.log.Error().Err(err).Msg("failed to retrieve latest block") + return subscription.NewFailedSubscription(err, "failed to retrieve latest block") } - return b.createSubscription(ctx, txID, blockID, flow.ZeroID, requiredEventEncodingVersion, false) + return b.createSubscription(ctx, txID, header.ID(), 0, flow.ZeroID, requiredEventEncodingVersion, false) } // createSubscription initializes a subscription for monitoring a transaction's status. @@ -89,16 +108,26 @@ func (b *backendSubscribeTransactions) SubscribeTransactionStatuses( func (b *backendSubscribeTransactions) createSubscription( ctx context.Context, txID flow.Identifier, - blockID flow.Identifier, + startBlockID flow.Identifier, + startBlockHeight uint64, referenceBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion, shouldTriggerPending bool, ) subscription.Subscription { + var nextHeight uint64 + var err error + // Get height to start subscription from - nextHeight, err := b.blockTracker.GetStartHeightFromBlockID(blockID) - if err != nil { - b.log.Error().Err(err).Str("block_id", blockID.String()).Msg("failed to get start height") - return subscription.NewFailedSubscription(err, "failed to get start height") + if startBlockID == flow.ZeroID { + if nextHeight, err = b.blockTracker.GetStartHeightFromHeight(startBlockHeight); err != nil { + b.log.Error().Err(err).Uint64("block_height", startBlockHeight).Msg("failed to get start height") + return subscription.NewFailedSubscription(err, "failed to get start height") + } + } else { + if nextHeight, err = b.blockTracker.GetStartHeightFromBlockID(startBlockID); err != nil { + b.log.Error().Err(err).Str("block_id", startBlockID.String()).Msg("failed to get start height") + return subscription.NewFailedSubscription(err, "failed to get start height") + } } // choose initial transaction status diff --git a/engine/access/rpc/backend/backend_stream_transactions_test.go b/engine/access/rpc/backend/backend_stream_transactions_test.go index 47b494e87b3..52049a4b0ef 100644 --- a/engine/access/rpc/backend/backend_stream_transactions_test.go +++ b/engine/access/rpc/backend/backend_stream_transactions_test.go @@ -245,7 +245,7 @@ func (s *TransactionStatusSuite) addNewFinalizedBlock(parent *flow.Header, notif } } -// TestSubscribeTransactionStatusHappyCase tests the functionality of the SubscribeTransactionStatuses method in the Backend. +// TestSubscribeTransactionStatusHappyCase tests the functionality of the SubscribeTransactionStatusesFromStartBlockID method in the Backend. // It covers the emulation of transaction stages from pending to sealed, and receiving status updates. func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() { ctx, cancel := context.WithCancel(context.Background()) @@ -359,7 +359,7 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() { }, 100*time.Millisecond, "timed out waiting for subscription to shutdown") } -// TestSubscribeTransactionStatusExpired tests the functionality of the SubscribeTransactionStatuses method in the Backend +// TestSubscribeTransactionStatusExpired tests the functionality of the SubscribeTransactionStatusesFromStartBlockID method in the Backend // when transaction become expired func (s *TransactionStatusSuite) TestSubscribeTransactionStatusExpired() { ctx, cancel := context.WithCancel(context.Background()) From 699b243af7417e427cecea51a3453bfc65df0a73 Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 13 Dec 2024 11:09:32 +0200 Subject: [PATCH 11/26] Added invalid params test --- .../transaction_statuses_provider_test.go | 161 ++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go new file mode 100644 index 00000000000..7eac0ec850b --- /dev/null +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go @@ -0,0 +1,161 @@ +package data_providers + +import ( + "context" + "github.com/onflow/flow-go/access" + accessmock "github.com/onflow/flow-go/access/mock" + "github.com/onflow/flow-go/engine/access/rest/websockets/models" + ssmock "github.com/onflow/flow-go/engine/access/state_stream/mock" + "github.com/onflow/flow/protobuf/go/flow/entities" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/engine/access/state_stream" + "github.com/onflow/flow-go/engine/access/subscription" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" +) + +type TransactionStatusesProviderSuite struct { + suite.Suite + + log zerolog.Logger + api access.API + + chain flow.Chain + rootBlock flow.Block + finalizedBlock *flow.Header + + factory *DataProviderFactoryImpl +} + +func TestNewTransactionStatusesDataProvider(t *testing.T) { + suite.Run(t, new(TransactionStatusesProviderSuite)) +} + +func (s *TransactionStatusesProviderSuite) SetupTest() { + s.log = unittest.Logger() + s.api = accessmock.NewAPI(s.T()) + + s.chain = flow.Testnet.Chain() + + s.rootBlock = unittest.BlockFixture() + s.rootBlock.Header.Height = 0 + + s.factory = NewDataProviderFactory( + s.log, + nil, + s.api, + flow.Testnet.Chain(), + state_stream.DefaultEventFilterConfig, + subscription.DefaultHeartbeatInterval) + s.Require().NotNil(s.factory) +} + +func (s *TransactionStatusesProviderSuite) TestTransactionStatusesDataProvider_HappyPath() { + + testHappyPath( + s.T(), + AccountStatusesTopic, + s.factory, + s.subscribeTransactionStatusesDataProviderTestCases(), + func(dataChan chan interface{}) { + for i := 0; i < len(expectedAccountStatusesResponses); i++ { + dataChan <- &expectedAccountStatusesResponses[i] + } + }, + expectedAccountStatusesResponses, + s.requireAccountStatuses, + ) + +} + +func (s *TransactionStatusesProviderSuite) subscribeTransactionStatusesDataProviderTestCases() []testType { + return []testType{ + { + name: "SubscribeTransactionStatuses happy path", + arguments: models.Arguments{ + "start_block_id": s.rootBlock.ID().String(), + "event_types": []string{"flow.AccountCreated", "flow.AccountKeyAdded"}, + }, + setupBackend: func(sub *ssmock.Subscription) { + s.api.On( + "SubscribeTransactionStatuses", + mock.Anything, + mock.Anything, + s.rootBlock.ID(), + entities.EventEncodingVersion_JSON_CDC_V0, + ).Return(sub).Once() + }, + }, + } +} + +// requireAccountStatuses ensures that the received account statuses information matches the expected data. +func (s *AccountStatusesProviderSuite) requireTransactionStatuses( + v interface{}, + expectedResponse interface{}, +) { + expectedTransactionStatusesResponse, ok := expectedResponse.([]access.TransactionResult) + require.True(s.T(), ok, "unexpected type: %T", expectedResponse) + +} + +// TestAccountStatusesDataProvider_InvalidArguments tests the behavior of the transaction statuses data provider +// when invalid arguments are provided. It verifies that appropriate errors are returned +// for missing or conflicting arguments. +// This test covers the test cases: +// 1. Invalid 'tx_id' argument. +// 2. Invalid 'start_block_id' argument. +func (s *TransactionStatusesProviderSuite) TestAccountStatusesDataProvider_InvalidArguments() { + ctx := context.Background() + send := make(chan interface{}) + + topic := TransactionStatusesTopic + + for _, test := range invalidTransactionStatusesArgumentsTestCases() { + s.Run(test.name, func() { + provider, err := NewTransactionStatusesDataProvider( + ctx, + s.log, + s.api, + topic, + test.arguments, + send, + ) + s.Require().Nil(provider) + s.Require().Error(err) + s.Require().Contains(err.Error(), test.expectedErrorMsg) + }) + } +} + +// invalidTransactionStatusesArgumentsTestCases returns a list of test cases with invalid argument combinations +// for testing the behavior of transaction statuses data providers. Each test case includes a name, +// a set of input arguments, and the expected error message that should be returned. +// +// The test cases cover scenarios such as: +// 1. Providing invalid 'tx_id' value. +// 2. Providing invalid 'start_block_id' value. +func invalidTransactionStatusesArgumentsTestCases() []testErrType { + return []testErrType{ + { + name: "invalid 'tx_id' argument", + arguments: map[string]interface{}{ + "tx_id": "invalid_tx_id", + }, + expectedErrorMsg: "invalid ID format", + }, + { + name: "invalid 'start_block_id' argument", + arguments: map[string]interface{}{ + "start_block_id": "invalid_block_id", + }, + expectedErrorMsg: "invalid ID format", + }, + } +} From 9ed92071d5822c83879b244e7d9bba597e0f7c71 Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 13 Dec 2024 13:26:11 +0200 Subject: [PATCH 12/26] Changed parse args function, added invalid args testcases --- .../account_statuses_provider.go | 1 - .../transaction_statuses_provider.go | 54 +++++++++++++++---- .../transaction_statuses_provider_test.go | 51 +++++++++++------- 3 files changed, 77 insertions(+), 29 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider.go b/engine/access/rest/websockets/data_providers/account_statuses_provider.go index 94e984ae32c..09fca79bd70 100644 --- a/engine/access/rest/websockets/data_providers/account_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/account_statuses_provider.go @@ -135,7 +135,6 @@ func parseAccountStatusesArguments( eventFilterConfig state_stream.EventFilterConfig, ) (accountStatusesArguments, error) { var args accountStatusesArguments - //var err error // Check for mutual exclusivity of start_block_id and start_block_height early startBlockIDIn, hasStartBlockID := arguments["start_block_id"] diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go index 6a446282d5e..e2b151b64f0 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go @@ -3,6 +3,8 @@ package data_providers import ( "context" "fmt" + "github.com/onflow/flow-go/engine/access/rest/http/request" + "github.com/onflow/flow-go/engine/access/rest/util" "strconv" "github.com/rs/zerolog" @@ -18,9 +20,11 @@ import ( "github.com/onflow/flow/protobuf/go/flow/entities" ) -type TransactionStatusesArguments struct { - StartBlockID flow.Identifier // ID of the block to start subscription from - txID flow.Identifier // ID of the transaction to monitor. +// transactionStatusesArguments contains the arguments required for subscribing to transaction statuses +type transactionStatusesArguments struct { + TxID flow.Identifier // ID of the transaction to monitor. + StartBlockID flow.Identifier // ID of the block to start subscription from + StartBlockHeight uint64 // Height of the block to start subscription from } type TransactionStatusesDataProvider struct { @@ -73,9 +77,17 @@ func (p *TransactionStatusesDataProvider) Run() error { // createSubscription creates a new subscription using the specified input arguments. func (p *TransactionStatusesDataProvider) createSubscription( ctx context.Context, - args TransactionStatusesArguments, + args transactionStatusesArguments, ) subscription.Subscription { - return p.api.SubscribeTransactionStatuses(ctx, args.txID, args.StartBlockID, entities.EventEncodingVersion_JSON_CDC_V0) + if args.StartBlockID != flow.ZeroID { + return p.api.SubscribeTransactionStatusesFromStartBlockID(ctx, args.TxID, args.StartBlockID, entities.EventEncodingVersion_JSON_CDC_V0) + } + + if args.StartBlockHeight != request.EmptyHeight { + return p.api.SubscribeTransactionStatusesFromStartHeight(ctx, args.TxID, args.StartBlockHeight, entities.EventEncodingVersion_JSON_CDC_V0) + } + + return p.api.SubscribeTransactionStatusesFromLatest(ctx, args.TxID, entities.EventEncodingVersion_JSON_CDC_V0) } // handleResponse processes an account statuses and sends the formatted response. @@ -103,8 +115,16 @@ func (p *TransactionStatusesDataProvider) handleResponse() func(txResults []*acc // parseAccountStatusesArguments validates and initializes the account statuses arguments. func parseTransactionStatusesArguments( arguments models.Arguments, -) (TransactionStatusesArguments, error) { - var args TransactionStatusesArguments +) (transactionStatusesArguments, error) { + var args transactionStatusesArguments + + // Check for mutual exclusivity of start_block_id and start_block_height early + startBlockIDIn, hasStartBlockID := arguments["start_block_id"] + startBlockHeightIn, hasStartBlockHeight := arguments["start_block_height"] + + if hasStartBlockID && hasStartBlockHeight { + return args, fmt.Errorf("can only provide either 'start_block_id' or 'start_block_height'") + } if txIDIn, ok := arguments["tx_id"]; ok && txIDIn != "" { result, ok := txIDIn.(string) @@ -116,10 +136,11 @@ func parseTransactionStatusesArguments( if err != nil { return args, fmt.Errorf("invalid 'tx_id': %w", err) } - args.txID = txID.Flow() + args.TxID = txID.Flow() } - if startBlockIDIn, ok := arguments["start_block_id"]; ok && startBlockIDIn != "" { + // Parse 'start_block_id' if provided + if hasStartBlockID { result, ok := startBlockIDIn.(string) if !ok { return args, fmt.Errorf("'start_block_id' must be a string") @@ -132,5 +153,20 @@ func parseTransactionStatusesArguments( args.StartBlockID = startBlockID.Flow() } + // Parse 'start_block_height' if provided + var err error + if hasStartBlockHeight { + result, ok := startBlockHeightIn.(string) + if !ok { + return args, fmt.Errorf("'start_block_height' must be a string") + } + args.StartBlockHeight, err = util.ToUint64(result) + if err != nil { + return args, fmt.Errorf("invalid 'start_block_height': %w", err) + } + } else { + args.StartBlockHeight = request.EmptyHeight + } + return args, nil } diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go index 3b0b486db4f..153fb67da83 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go @@ -2,6 +2,7 @@ package data_providers import ( "context" + "fmt" "testing" "github.com/rs/zerolog" @@ -11,12 +12,9 @@ import ( accessmock "github.com/onflow/flow-go/access/mock" "github.com/onflow/flow-go/engine/access/rest/websockets/models" "github.com/onflow/flow-go/engine/access/state_stream" - ssmock "github.com/onflow/flow-go/engine/access/state_stream/mock" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" - "github.com/onflow/flow/protobuf/go/flow/entities" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -76,22 +74,22 @@ func (s *TransactionStatusesProviderSuite) TestTransactionStatusesDataProvider_H func (s *TransactionStatusesProviderSuite) subscribeTransactionStatusesDataProviderTestCases() []testType { return []testType{ - { - name: "SubscribeTransactionStatuses happy path", - arguments: models.Arguments{ - "start_block_id": s.rootBlock.ID().String(), - "event_types": []string{"flow.AccountCreated", "flow.AccountKeyAdded"}, - }, - setupBackend: func(sub *ssmock.Subscription) { - s.api.On( - "SubscribeTransactionStatuses", - mock.Anything, - mock.Anything, - s.rootBlock.ID(), - entities.EventEncodingVersion_JSON_CDC_V0, - ).Return(sub).Once() - }, - }, + //{ + // name: "SubscribeTransactionStatuses happy path", + // arguments: models.Arguments{ + // "start_block_id": s.rootBlock.ID().String(), + // "event_types": []string{"flow.AccountCreated", "flow.AccountKeyAdded"}, + // }, + // setupBackend: func(sub *ssmock.Subscription) { + // s.api.On( + // "SubscribeTransactionStatuses", + // mock.Anything, + // mock.Anything, + // s.rootBlock.ID(), + // entities.EventEncodingVersion_JSON_CDC_V0, + // ).Return(sub).Once() + // }, + //}, } } @@ -143,6 +141,14 @@ func (s *TransactionStatusesProviderSuite) TestAccountStatusesDataProvider_Inval // 2. Providing invalid 'start_block_id' value. func invalidTransactionStatusesArgumentsTestCases() []testErrType { return []testErrType{ + { + name: "provide both 'start_block_id' and 'start_block_height' arguments", + arguments: models.Arguments{ + "start_block_id": unittest.BlockFixture().ID().String(), + "start_block_height": fmt.Sprintf("%d", unittest.BlockFixture().Header.Height), + }, + expectedErrorMsg: "can only provide either 'start_block_id' or 'start_block_height'", + }, { name: "invalid 'tx_id' argument", arguments: map[string]interface{}{ @@ -157,5 +163,12 @@ func invalidTransactionStatusesArgumentsTestCases() []testErrType { }, expectedErrorMsg: "invalid ID format", }, + { + name: "invalid 'start_block_height' argument", + arguments: map[string]interface{}{ + "start_block_height": "-1", + }, + expectedErrorMsg: "value must be an unsigned 64 bit integer", + }, } } From d660c7fe3b0522d9d40c7ec6f01a7cb2a1ce5c6e Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 13 Dec 2024 13:33:22 +0200 Subject: [PATCH 13/26] Linted --- .../data_providers/transaction_statuses_provider.go | 5 +++-- .../data_providers/transaction_statuses_provider_test.go | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go index e2b151b64f0..1562f738b80 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go @@ -3,8 +3,6 @@ package data_providers import ( "context" "fmt" - "github.com/onflow/flow-go/engine/access/rest/http/request" - "github.com/onflow/flow-go/engine/access/rest/util" "strconv" "github.com/rs/zerolog" @@ -13,10 +11,13 @@ import ( "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine/access/rest/common/parser" + "github.com/onflow/flow-go/engine/access/rest/http/request" + "github.com/onflow/flow-go/engine/access/rest/util" "github.com/onflow/flow-go/engine/access/rest/websockets/models" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/counters" + "github.com/onflow/flow/protobuf/go/flow/entities" ) diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go index 153fb67da83..4e2c18e81f9 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/rs/zerolog" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/onflow/flow-go/access" @@ -15,7 +16,6 @@ import ( "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" - "github.com/stretchr/testify/require" ) type TransactionStatusesProviderSuite struct { From 3126cd9c2e64eaeae44a86a2a179bbb25a0827f5 Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 13 Dec 2024 13:40:28 +0200 Subject: [PATCH 14/26] Changed api in test to mock api --- .../websockets/data_providers/factory_test.go | 2 +- .../transaction_statuses_provider_test.go | 64 ++++++++++++++----- 2 files changed, 48 insertions(+), 18 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/factory_test.go b/engine/access/rest/websockets/data_providers/factory_test.go index 674fc6998ea..33389c3997e 100644 --- a/engine/access/rest/websockets/data_providers/factory_test.go +++ b/engine/access/rest/websockets/data_providers/factory_test.go @@ -137,7 +137,7 @@ func (s *DataProviderFactorySuite) TestSupportedTopics() { topic: TransactionStatusesTopic, arguments: models.Arguments{}, setupSubscription: func() { - s.setupSubscription(s.accessApi.On("SubscribeTransactionStatuses", mock.Anything, mock.Anything, mock.Anything, mock.Anything)) + s.setupSubscription(s.accessApi.On("SubscribeTransactionStatusesFromLatest", mock.Anything, mock.Anything, mock.Anything)) }, assertExpectations: func() { s.stateStreamApi.AssertExpectations(s.T()) diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go index 4e2c18e81f9..c57a7dff685 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go @@ -3,9 +3,12 @@ package data_providers import ( "context" "fmt" + "github.com/onflow/flow/protobuf/go/flow/entities" + "strconv" "testing" "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -13,6 +16,7 @@ import ( accessmock "github.com/onflow/flow-go/access/mock" "github.com/onflow/flow-go/engine/access/rest/websockets/models" "github.com/onflow/flow-go/engine/access/state_stream" + ssmock "github.com/onflow/flow-go/engine/access/state_stream/mock" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" @@ -22,7 +26,7 @@ type TransactionStatusesProviderSuite struct { suite.Suite log zerolog.Logger - api access.API + api *accessmock.API chain flow.Chain rootBlock flow.Block @@ -74,22 +78,48 @@ func (s *TransactionStatusesProviderSuite) TestTransactionStatusesDataProvider_H func (s *TransactionStatusesProviderSuite) subscribeTransactionStatusesDataProviderTestCases() []testType { return []testType{ - //{ - // name: "SubscribeTransactionStatuses happy path", - // arguments: models.Arguments{ - // "start_block_id": s.rootBlock.ID().String(), - // "event_types": []string{"flow.AccountCreated", "flow.AccountKeyAdded"}, - // }, - // setupBackend: func(sub *ssmock.Subscription) { - // s.api.On( - // "SubscribeTransactionStatuses", - // mock.Anything, - // mock.Anything, - // s.rootBlock.ID(), - // entities.EventEncodingVersion_JSON_CDC_V0, - // ).Return(sub).Once() - // }, - //}, + { + name: "SubscribeAccountStatusesFromStartBlockID happy path", + arguments: models.Arguments{ + "start_block_id": s.rootBlock.ID().String(), + "event_types": []string{"flow.AccountCreated", "flow.AccountKeyAdded"}, + }, + setupBackend: func(sub *ssmock.Subscription) { + s.api.On( + "SubscribeTransactionStatusesFromStartBlockID", + mock.Anything, + s.rootBlock.ID(), + mock.Anything, + entities.EventEncodingVersion_JSON_CDC_V0, + ).Return(sub).Once() + }, + }, + { + name: "SubscribeAccountStatusesFromStartHeight happy path", + arguments: models.Arguments{ + "start_block_height": strconv.FormatUint(s.rootBlock.Header.Height, 10), + }, + setupBackend: func(sub *ssmock.Subscription) { + s.api.On( + "SubscribeTransactionStatusesFromStartHeight", + mock.Anything, + s.rootBlock.Header.Height, + mock.Anything, + entities.EventEncodingVersion_JSON_CDC_V0, + ).Return(sub).Once() + }, + }, + { + name: "SubscribeAccountStatusesFromLatestBlock happy path", + arguments: models.Arguments{}, + setupBackend: func(sub *ssmock.Subscription) { + s.api.On( + "SubscribeTransactionStatusesFromLatest", + mock.Anything, + entities.EventEncodingVersion_JSON_CDC_V0, + ).Return(sub).Once() + }, + }, } } From 2ec38177d83b1aea6b51da95cac83a81e2cee3f2 Mon Sep 17 00:00:00 2001 From: Andrii Date: Tue, 17 Dec 2024 13:15:51 +0200 Subject: [PATCH 15/26] Linted --- .../data_providers/events_provider_test.go | 1 - .../transaction_statuses_provider.go | 2 +- .../transaction_statuses_provider_test.go | 72 +++++++++++++------ 3 files changed, 50 insertions(+), 25 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go index 214781f8885..ddfcd7d9fce 100644 --- a/engine/access/rest/websockets/data_providers/events_provider_test.go +++ b/engine/access/rest/websockets/data_providers/events_provider_test.go @@ -76,7 +76,6 @@ func (s *EventsProviderSuite) TestEventsDataProvider_HappyPath() { Events: expectedEvents, BlockTimestamp: s.rootBlock.Header.Timestamp, }) - } testHappyPath( diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go index 1562f738b80..2dfa62fbb88 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go @@ -104,7 +104,7 @@ func (p *TransactionStatusesDataProvider) handleResponse() func(txResults []*acc return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) } - p.send <- models.TransactionStatusesResponse{ + p.send <- &models.TransactionStatusesResponse{ TransactionResults: txResults, MessageIndex: strconv.FormatUint(index, 10), } diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go index c57a7dff685..ba37340ef41 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go @@ -3,7 +3,6 @@ package data_providers import ( "context" "fmt" - "github.com/onflow/flow/protobuf/go/flow/entities" "strconv" "testing" @@ -20,6 +19,8 @@ import ( "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" + + "github.com/onflow/flow/protobuf/go/flow/entities" ) type TransactionStatusesProviderSuite struct { @@ -59,43 +60,63 @@ func (s *TransactionStatusesProviderSuite) SetupTest() { } func (s *TransactionStatusesProviderSuite) TestTransactionStatusesDataProvider_HappyPath() { + id := unittest.IdentifierFixture() + cid := unittest.IdentifierFixture() + txr := access.TransactionResult{ + Status: flow.TransactionStatusSealed, + StatusCode: 10, + Events: []flow.Event{ + unittest.EventFixture(flow.EventAccountCreated, 1, 0, id, 200), + }, + ErrorMessage: "", + BlockID: s.rootBlock.ID(), + CollectionID: cid, + BlockHeight: s.rootBlock.Header.Height, + } + + var expectedTxStatusesResponses [][]*access.TransactionResult + var expectedTxResultsResponses []*access.TransactionResult - //testHappyPath( - // s.T(), - // AccountStatusesTopic, - // s.factory, - // s.subscribeTransactionStatusesDataProviderTestCases(), - // func(dataChan chan interface{}) { - // for i := 0; i < len(expectedAccountStatusesResponses); i++ { - // dataChan <- &expectedAccountStatusesResponses[i] - // } - // }, - // expectedAccountStatusesResponses, - // s.requireAccountStatuses, - //) + for i := 0; i < 2; i++ { + expectedTxResultsResponses = append(expectedTxResultsResponses, &txr) + expectedTxStatusesResponses = append(expectedTxStatusesResponses, expectedTxResultsResponses) + } + + testHappyPath( + s.T(), + TransactionStatusesTopic, + s.factory, + s.subscribeTransactionStatusesDataProviderTestCases(), + func(dataChan chan interface{}) { + for i := 0; i < len(expectedTxStatusesResponses); i++ { + dataChan <- expectedTxStatusesResponses[i] + } + }, + expectedTxStatusesResponses, + s.requireTransactionStatuses, + ) } func (s *TransactionStatusesProviderSuite) subscribeTransactionStatusesDataProviderTestCases() []testType { return []testType{ { - name: "SubscribeAccountStatusesFromStartBlockID happy path", + name: "SubscribeTransactionStatusesFromStartBlockID happy path", arguments: models.Arguments{ "start_block_id": s.rootBlock.ID().String(), - "event_types": []string{"flow.AccountCreated", "flow.AccountKeyAdded"}, }, setupBackend: func(sub *ssmock.Subscription) { s.api.On( "SubscribeTransactionStatusesFromStartBlockID", mock.Anything, - s.rootBlock.ID(), mock.Anything, + s.rootBlock.ID(), entities.EventEncodingVersion_JSON_CDC_V0, ).Return(sub).Once() }, }, { - name: "SubscribeAccountStatusesFromStartHeight happy path", + name: "SubscribeTransactionStatusesFromStartHeight happy path", arguments: models.Arguments{ "start_block_height": strconv.FormatUint(s.rootBlock.Header.Height, 10), }, @@ -103,19 +124,20 @@ func (s *TransactionStatusesProviderSuite) subscribeTransactionStatusesDataProvi s.api.On( "SubscribeTransactionStatusesFromStartHeight", mock.Anything, - s.rootBlock.Header.Height, mock.Anything, + s.rootBlock.Header.Height, entities.EventEncodingVersion_JSON_CDC_V0, ).Return(sub).Once() }, }, { - name: "SubscribeAccountStatusesFromLatestBlock happy path", + name: "SubscribeTransactionStatusesFromLatest happy path", arguments: models.Arguments{}, setupBackend: func(sub *ssmock.Subscription) { s.api.On( "SubscribeTransactionStatusesFromLatest", mock.Anything, + mock.Anything, entities.EventEncodingVersion_JSON_CDC_V0, ).Return(sub).Once() }, @@ -123,14 +145,18 @@ func (s *TransactionStatusesProviderSuite) subscribeTransactionStatusesDataProvi } } -// requireAccountStatuses ensures that the received account statuses information matches the expected data. -func (s *AccountStatusesProviderSuite) requireTransactionStatuses( +// requireTransactionStatuses ensures that the received transaction statuses information matches the expected data. +func (s *TransactionStatusesProviderSuite) requireTransactionStatuses( v interface{}, expectedResponse interface{}, ) { - _, ok := expectedResponse.([]access.TransactionResult) + expectedAccountStatusesResponse, ok := expectedResponse.([]*access.TransactionResult) require.True(s.T(), ok, "unexpected type: %T", expectedResponse) + actualResponse, ok := v.(*models.TransactionStatusesResponse) + require.True(s.T(), ok, "Expected *models.AccountStatusesResponse, got %T", v) + + s.Require().ElementsMatch(expectedAccountStatusesResponse, actualResponse.TransactionResults) } // TestAccountStatusesDataProvider_InvalidArguments tests the behavior of the transaction statuses data provider From a7a6f76cae3adb9dccf1cc5f51ad42c2bc90c17c Mon Sep 17 00:00:00 2001 From: Andrii Date: Tue, 17 Dec 2024 13:47:56 +0200 Subject: [PATCH 16/26] Fixed type in the log msg --- .../data_providers/transaction_statuses_provider_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go index ba37340ef41..723e866ca74 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go @@ -154,7 +154,7 @@ func (s *TransactionStatusesProviderSuite) requireTransactionStatuses( require.True(s.T(), ok, "unexpected type: %T", expectedResponse) actualResponse, ok := v.(*models.TransactionStatusesResponse) - require.True(s.T(), ok, "Expected *models.AccountStatusesResponse, got %T", v) + require.True(s.T(), ok, "Expected *models.TransactionStatusesResponse, got %T", v) s.Require().ElementsMatch(expectedAccountStatusesResponse, actualResponse.TransactionResults) } From e34d7877d03762f6c5b9537e6e0d294f9d4a19ea Mon Sep 17 00:00:00 2001 From: Andrii Date: Wed, 18 Dec 2024 13:49:42 +0200 Subject: [PATCH 17/26] Added new topic for send and subcribe tx statuses --- .../rest/websockets/data_providers/factory.go | 13 +- .../send_transaction_statuses_provider.go | 146 ++++++++++++++++++ .../rest/websockets/data_providers/util.go | 1 + 3 files changed, 154 insertions(+), 6 deletions(-) create mode 100644 engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go diff --git a/engine/access/rest/websockets/data_providers/factory.go b/engine/access/rest/websockets/data_providers/factory.go index b8897deb988..aa9b6920f67 100644 --- a/engine/access/rest/websockets/data_providers/factory.go +++ b/engine/access/rest/websockets/data_providers/factory.go @@ -15,12 +15,13 @@ import ( // Constants defining various topic names used to specify different types of // data providers. const ( - EventsTopic = "events" - AccountStatusesTopic = "account_statuses" - BlocksTopic = "blocks" - BlockHeadersTopic = "block_headers" - BlockDigestsTopic = "block_digests" - TransactionStatusesTopic = "transaction_statuses" + EventsTopic = "events" + AccountStatusesTopic = "account_statuses" + BlocksTopic = "blocks" + BlockHeadersTopic = "block_headers" + BlockDigestsTopic = "block_digests" + TransactionStatusesTopic = "transaction_statuses" + SendTransactionStatusesTopic = "send_transaction_statuses" ) // DataProviderFactory defines an interface for creating data providers diff --git a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go new file mode 100644 index 00000000000..2ccbc3c7f2a --- /dev/null +++ b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go @@ -0,0 +1,146 @@ +package data_providers + +import ( + "context" + "fmt" + "github.com/onflow/flow-go/engine/access/rest/common/parser" + "github.com/onflow/flow-go/engine/access/subscription" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/counters" + "github.com/onflow/flow/protobuf/go/flow/entities" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "strconv" + + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/access" + "github.com/onflow/flow-go/engine/access/rest/websockets/models" +) + +// sendTransactionStatusesArguments contains the arguments required for sending tx and subscribing to transaction statuses +type sendTransactionStatusesArguments struct { + Transaction flow.TransactionBody // The transaction body to be sent and monitored. +} + +type SendTransactionStatusesDataProvider struct { + *baseDataProvider + + logger zerolog.Logger + api access.API +} + +var _ DataProvider = (*SendTransactionStatusesDataProvider)(nil) + +func NewSendTransactionStatusesDataProvider( + ctx context.Context, + logger zerolog.Logger, + api access.API, + topic string, + arguments models.Arguments, + send chan<- interface{}, +) (*SendTransactionStatusesDataProvider, error) { + p := &SendTransactionStatusesDataProvider{ + logger: logger.With().Str("component", "send-transaction-statuses-data-provider").Logger(), + api: api, + } + + // Initialize arguments passed to the provider. + sendTxStatusesArgs, err := parseSendTransactionStatusesArguments(arguments) + if err != nil { + return nil, fmt.Errorf("invalid arguments for send tx statuses data provider: %w", err) + } + + subCtx, cancel := context.WithCancel(ctx) + + p.baseDataProvider = newBaseDataProvider( + topic, + cancel, + send, + p.createSubscription(subCtx, sendTxStatusesArgs), // Set up a subscription to tx statuses based on arguments. + ) + + return p, nil +} + +// Run starts processing the subscription for events and handles responses. +// +// No errors are expected during normal operations. +func (p *SendTransactionStatusesDataProvider) Run() error { + return subscription.HandleSubscription(p.subscription, p.handleResponse()) +} + +// createSubscription creates a new subscription using the specified input arguments. +func (p *SendTransactionStatusesDataProvider) createSubscription( + ctx context.Context, + args sendTransactionStatusesArguments, +) subscription.Subscription { + return p.api.SendAndSubscribeTransactionStatuses(ctx, &args.Transaction, entities.EventEncodingVersion_JSON_CDC_V0) +} + +// handleResponse processes an account statuses and sends the formatted response. +// +// No errors are expected during normal operations. +func (p *SendTransactionStatusesDataProvider) handleResponse() func(txResults []*access.TransactionResult) error { + + messageIndex := counters.NewMonotonousCounter(1) + + return func(txResults []*access.TransactionResult) error { + + index := messageIndex.Value() + if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } + + p.send <- &models.TransactionStatusesResponse{ + TransactionResults: txResults, + MessageIndex: strconv.FormatUint(index, 10), + } + + return nil + } + +} + +// parseAccountStatusesArguments validates and initializes the account statuses arguments. +func parseSendTransactionStatusesArguments( + arguments models.Arguments, +) (sendTransactionStatusesArguments, error) { + var args sendTransactionStatusesArguments + var tx flow.TransactionBody + + if scriptIn, ok := arguments["script"]; ok && scriptIn != "" { + script, ok := scriptIn.([]byte) + if !ok { + return args, fmt.Errorf("'script' must be a byte array") + } + + tx.Script = script + } + + if argumentsIn, ok := arguments["arguments"]; ok && argumentsIn != "" { + argumentsData, ok := argumentsIn.([][]byte) + if !ok { + return args, fmt.Errorf("'arguments' must be a [][]byte type") + } + + tx.Arguments = argumentsData + } + + if referenceBlockIDIn, ok := arguments["reference_block_id"]; ok && referenceBlockIDIn != "" { + result, ok := referenceBlockIDIn.(string) + if !ok { + return args, fmt.Errorf("'reference_block_id' must be a string") + } + + var referenceBlockID parser.ID + err := referenceBlockID.Parse(result) + if err != nil { + return args, fmt.Errorf("invalid 'reference_block_id': %w", err) + } + + tx.ReferenceBlockID = referenceBlockID.Flow() + } + + return args, nil +} diff --git a/engine/access/rest/websockets/data_providers/util.go b/engine/access/rest/websockets/data_providers/util.go index 8ddcd54f840..8ade7c127fc 100644 --- a/engine/access/rest/websockets/data_providers/util.go +++ b/engine/access/rest/websockets/data_providers/util.go @@ -20,6 +20,7 @@ type testType struct { setupBackend func(sub *statestreamsmock.Subscription) } +// testErrType represents an error cases for subscribing type testErrType struct { name string arguments models.Arguments From 08f4b29f5c7c4b790b62e67b3c0cbe07c2690fc4 Mon Sep 17 00:00:00 2001 From: Andrii Date: Thu, 19 Dec 2024 13:08:58 +0200 Subject: [PATCH 18/26] Refactored tx statuses parse function --- .../transaction_statuses_provider.go | 42 +++---------------- 1 file changed, 6 insertions(+), 36 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go index 2dfa62fbb88..c73a193da4b 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go @@ -12,7 +12,6 @@ import ( "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine/access/rest/common/parser" "github.com/onflow/flow-go/engine/access/rest/http/request" - "github.com/onflow/flow-go/engine/access/rest/util" "github.com/onflow/flow-go/engine/access/rest/websockets/models" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/model/flow" @@ -119,13 +118,13 @@ func parseTransactionStatusesArguments( ) (transactionStatusesArguments, error) { var args transactionStatusesArguments - // Check for mutual exclusivity of start_block_id and start_block_height early - startBlockIDIn, hasStartBlockID := arguments["start_block_id"] - startBlockHeightIn, hasStartBlockHeight := arguments["start_block_height"] - - if hasStartBlockID && hasStartBlockHeight { - return args, fmt.Errorf("can only provide either 'start_block_id' or 'start_block_height'") + // Parse block arguments + startBlockID, startBlockHeight, err := ParseStartBlock(arguments) + if err != nil { + return args, err } + args.StartBlockID = startBlockID + args.StartBlockHeight = startBlockHeight if txIDIn, ok := arguments["tx_id"]; ok && txIDIn != "" { result, ok := txIDIn.(string) @@ -140,34 +139,5 @@ func parseTransactionStatusesArguments( args.TxID = txID.Flow() } - // Parse 'start_block_id' if provided - if hasStartBlockID { - result, ok := startBlockIDIn.(string) - if !ok { - return args, fmt.Errorf("'start_block_id' must be a string") - } - var startBlockID parser.ID - err := startBlockID.Parse(result) - if err != nil { - return args, fmt.Errorf("invalid 'start_block_id': %w", err) - } - args.StartBlockID = startBlockID.Flow() - } - - // Parse 'start_block_height' if provided - var err error - if hasStartBlockHeight { - result, ok := startBlockHeightIn.(string) - if !ok { - return args, fmt.Errorf("'start_block_height' must be a string") - } - args.StartBlockHeight, err = util.ToUint64(result) - if err != nil { - return args, fmt.Errorf("invalid 'start_block_height': %w", err) - } - } else { - args.StartBlockHeight = request.EmptyHeight - } - return args, nil } From 80c97a6864f02edbb63eca70ce2a8d2f55fb4510 Mon Sep 17 00:00:00 2001 From: Andrii Date: Thu, 19 Dec 2024 14:17:29 +0200 Subject: [PATCH 19/26] Added test for tx statuses data provider for msgIndex increment --- .../send_transaction_statuses_provider.go | 22 +++--- .../transaction_statuses_provider.go | 5 +- .../transaction_statuses_provider_test.go | 79 +++++++++++++++++++ .../websockets/models/tx_statuses_model.go | 2 +- 4 files changed, 93 insertions(+), 15 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go index 2ccbc3c7f2a..4a661874d0e 100644 --- a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go @@ -3,19 +3,19 @@ package data_providers import ( "context" "fmt" - "github.com/onflow/flow-go/engine/access/rest/common/parser" - "github.com/onflow/flow-go/engine/access/subscription" - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module/counters" - "github.com/onflow/flow/protobuf/go/flow/entities" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "strconv" "github.com/rs/zerolog" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/onflow/flow-go/access" + "github.com/onflow/flow-go/engine/access/rest/common/parser" "github.com/onflow/flow-go/engine/access/rest/websockets/models" + "github.com/onflow/flow-go/engine/access/subscription" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/counters" + + "github.com/onflow/flow/protobuf/go/flow/entities" ) // sendTransactionStatusesArguments contains the arguments required for sending tx and subscribing to transaction statuses @@ -83,18 +83,18 @@ func (p *SendTransactionStatusesDataProvider) createSubscription( // No errors are expected during normal operations. func (p *SendTransactionStatusesDataProvider) handleResponse() func(txResults []*access.TransactionResult) error { - messageIndex := counters.NewMonotonousCounter(1) + messageIndex := counters.NewMonotonousCounter(0) return func(txResults []*access.TransactionResult) error { - index := messageIndex.Value() if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) } + index := messageIndex.Value() p.send <- &models.TransactionStatusesResponse{ TransactionResults: txResults, - MessageIndex: strconv.FormatUint(index, 10), + MessageIndex: index, } return nil diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go index c73a193da4b..7e5e993dd23 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go @@ -3,7 +3,6 @@ package data_providers import ( "context" "fmt" - "strconv" "github.com/rs/zerolog" "google.golang.org/grpc/codes" @@ -94,7 +93,7 @@ func (p *TransactionStatusesDataProvider) createSubscription( // // No errors are expected during normal operations. func (p *TransactionStatusesDataProvider) handleResponse() func(txResults []*access.TransactionResult) error { - messageIndex := counters.NewMonotonousCounter(1) + messageIndex := counters.NewMonotonousCounter(0) return func(txResults []*access.TransactionResult) error { @@ -105,7 +104,7 @@ func (p *TransactionStatusesDataProvider) handleResponse() func(txResults []*acc p.send <- &models.TransactionStatusesResponse{ TransactionResults: txResults, - MessageIndex: strconv.FormatUint(index, 10), + MessageIndex: index, } return nil diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go index 723e866ca74..11b36d14cb1 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go @@ -228,3 +228,82 @@ func invalidTransactionStatusesArgumentsTestCases() []testErrType { }, } } + +// TestMessageIndexTransactionStatusesProviderResponse_HappyPath tests that MessageIndex values in response are strictly increasing. +func (s *TransactionStatusesProviderSuite) TestMessageIndexTransactionStatusesProviderResponse_HappyPath() { + ctx := context.Background() + send := make(chan interface{}, 10) + topic := TransactionStatusesTopic + txStatusesCount := 4 + + // Create a channel to simulate the subscription's account statuses channel + txStatusesChan := make(chan interface{}) + + // Create a mock subscription and mock the channel + sub := ssmock.NewSubscription(s.T()) + sub.On("Channel").Return((<-chan interface{})(txStatusesChan)) + sub.On("Err").Return(nil) + + s.api.On( + "SubscribeTransactionStatusesFromStartBlockID", + mock.Anything, + mock.Anything, + mock.Anything, + entities.EventEncodingVersion_JSON_CDC_V0, + ).Return(sub) + + arguments := + map[string]interface{}{ + "start_block_id": s.rootBlock.ID().String(), + } + + // Create the TransactionStatusesDataProvider instance + provider, err := NewTransactionStatusesDataProvider( + ctx, + s.log, + s.api, + topic, + arguments, + send, + ) + s.Require().NotNil(provider) + s.Require().NoError(err) + + // Run the provider in a separate goroutine to simulate subscription processing + go func() { + err = provider.Run() + s.Require().NoError(err) + }() + + // Simulate emitting data to the еч statuses channel + go func() { + defer close(txStatusesChan) // Close the channel when done + + for i := 0; i < txStatusesCount; i++ { + txStatusesChan <- []*access.TransactionResult{} + } + }() + + // Collect responses + var responses []*models.TransactionStatusesResponse + for i := 0; i < txStatusesCount; i++ { + res := <-send + txStatusesRes, ok := res.(*models.TransactionStatusesResponse) + s.Require().True(ok, "Expected *models.TransactionStatusesResponse, got %T", res) + responses = append(responses, txStatusesRes) + } + + // Verifying that indices are starting from 0 + s.Require().Equal(uint64(0), responses[0].MessageIndex, "Expected MessageIndex to start with 0") + + // Verifying that indices are strictly increasing + for i := 1; i < len(responses); i++ { + prevIndex := responses[i-1].MessageIndex + currentIndex := responses[i].MessageIndex + s.Require().Equal(prevIndex+1, currentIndex, "Expected MessageIndex to increment by 1") + } + + // Ensure the provider is properly closed after the test + provider.Close() + +} diff --git a/engine/access/rest/websockets/models/tx_statuses_model.go b/engine/access/rest/websockets/models/tx_statuses_model.go index 55b9d3bdfdd..32754a06603 100644 --- a/engine/access/rest/websockets/models/tx_statuses_model.go +++ b/engine/access/rest/websockets/models/tx_statuses_model.go @@ -7,5 +7,5 @@ import ( // TransactionStatusesResponse is the response message for 'events' topic. type TransactionStatusesResponse struct { TransactionResults []*access.TransactionResult `json:"transaction_results"` - MessageIndex string `json:"message_index"` + MessageIndex uint64 `json:"message_index"` } From 317eb0e105109810c5b7117faaad38fc8c8cc9fd Mon Sep 17 00:00:00 2001 From: Andrii Date: Thu, 19 Dec 2024 17:02:16 +0200 Subject: [PATCH 20/26] Implemented parse function for send and susubdcribe tx statuses provider --- .../rest/websockets/data_providers/factory.go | 2 + .../send_transaction_statuses_provider.go | 97 ++++++++++++++++++- 2 files changed, 95 insertions(+), 4 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/factory.go b/engine/access/rest/websockets/data_providers/factory.go index aa9b6920f67..ff23708d337 100644 --- a/engine/access/rest/websockets/data_providers/factory.go +++ b/engine/access/rest/websockets/data_providers/factory.go @@ -105,6 +105,8 @@ func (s *DataProviderFactoryImpl) NewDataProvider( return NewAccountStatusesDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval) case TransactionStatusesTopic: return NewTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) + case SendTransactionStatusesTopic: + return NewSendTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) default: return nil, fmt.Errorf("unsupported topic \"%s\"", topic) } diff --git a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go index 4a661874d0e..8dc2a5e8dbd 100644 --- a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go @@ -10,6 +10,7 @@ import ( "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine/access/rest/common/parser" + "github.com/onflow/flow-go/engine/access/rest/util" "github.com/onflow/flow-go/engine/access/rest/websockets/models" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/model/flow" @@ -110,18 +111,33 @@ func parseSendTransactionStatusesArguments( var tx flow.TransactionBody if scriptIn, ok := arguments["script"]; ok && scriptIn != "" { - script, ok := scriptIn.([]byte) + result, ok := scriptIn.(string) if !ok { - return args, fmt.Errorf("'script' must be a byte array") + return args, fmt.Errorf("'script' must be a string") + } + + script, err := util.FromBase64(result) + if err != nil { + return args, fmt.Errorf("invalid 'script': %w", err) } tx.Script = script } if argumentsIn, ok := arguments["arguments"]; ok && argumentsIn != "" { - argumentsData, ok := argumentsIn.([][]byte) + result, ok := argumentsIn.([]string) if !ok { - return args, fmt.Errorf("'arguments' must be a [][]byte type") + return args, fmt.Errorf("'arguments' must be a []string type") + } + + var argumentsData [][]byte + for _, arg := range result { + argument, err := util.FromBase64(arg) + if err != nil { + return args, fmt.Errorf("invalid 'arguments': %w", err) + } + + argumentsData = append(argumentsData, argument) } tx.Arguments = argumentsData @@ -142,5 +158,78 @@ func parseSendTransactionStatusesArguments( tx.ReferenceBlockID = referenceBlockID.Flow() } + if gasLimitIn, ok := arguments["gas_limit"]; ok && gasLimitIn != "" { + result, ok := gasLimitIn.(string) + if !ok { + return args, fmt.Errorf("'gas_limit' must be a string") + } + + gasLimit, err := util.ToUint64(result) + if err != nil { + return args, fmt.Errorf("invalid 'gas_limit': %w", err) + } + tx.GasLimit = gasLimit + } + + if payerIn, ok := arguments["payer"]; ok && payerIn != "" { + result, ok := payerIn.(string) + if !ok { + return args, fmt.Errorf("'payerIn' must be a string") + } + + payerAddr, err := flow.StringToAddress(result) + if err != nil { + return args, fmt.Errorf("invalid 'payer': %w", err) + } + tx.Payer = payerAddr + } + + if proposalKeyIn, ok := arguments["proposal_key"]; ok && proposalKeyIn != "" { + proposalKey, ok := proposalKeyIn.(flow.ProposalKey) + if !ok { + return args, fmt.Errorf("'proposal_key' must be a object (ProposalKey)") + } + + tx.ProposalKey = proposalKey + } + + if authorizersIn, ok := arguments["authorizers"]; ok && authorizersIn != "" { + result, ok := authorizersIn.([]string) + if !ok { + return args, fmt.Errorf("'authorizers' must be a []string type") + } + + var authorizersData []flow.Address + for _, auth := range result { + authorizer, err := flow.StringToAddress(auth) + if err != nil { + return args, fmt.Errorf("invalid 'authorizers': %w", err) + } + + authorizersData = append(authorizersData, authorizer) + } + + tx.Authorizers = authorizersData + } + + if payloadSignaturesIn, ok := arguments["payload_signatures"]; ok && payloadSignaturesIn != "" { + payloadSignatures, ok := payloadSignaturesIn.([]flow.TransactionSignature) + if !ok { + return args, fmt.Errorf("'payload_signatures' must be an array of objects (TransactionSignature)") + } + + tx.PayloadSignatures = payloadSignatures + } + + if envelopeSignaturesIn, ok := arguments["envelope_signatures"]; ok && envelopeSignaturesIn != "" { + envelopeSignatures, ok := envelopeSignaturesIn.([]flow.TransactionSignature) + if !ok { + return args, fmt.Errorf("'payload_signatures' must be an array of objects (TransactionSignature)") + } + + tx.EnvelopeSignatures = envelopeSignatures + } + args.Transaction = tx + return args, nil } From 679ef2fb97bbb3be041d6c8d4515a41eeb778896 Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 23 Dec 2024 12:58:33 +0200 Subject: [PATCH 21/26] Fixed typos and added missing invalid cases to godoc --- .../send_transaction_statuses_provider_test.go | 1 + .../transaction_statuses_provider_test.go | 12 +++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) create mode 100644 engine/access/rest/websockets/data_providers/send_transaction_statuses_provider_test.go diff --git a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider_test.go new file mode 100644 index 00000000000..06387b46331 --- /dev/null +++ b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider_test.go @@ -0,0 +1 @@ +package data_providers diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go index 11b36d14cb1..68e79e3b978 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go @@ -53,7 +53,7 @@ func (s *TransactionStatusesProviderSuite) SetupTest() { s.log, nil, s.api, - flow.Testnet.Chain(), + s.chain, state_stream.DefaultEventFilterConfig, subscription.DefaultHeartbeatInterval) s.Require().NotNil(s.factory) @@ -159,13 +159,13 @@ func (s *TransactionStatusesProviderSuite) requireTransactionStatuses( s.Require().ElementsMatch(expectedAccountStatusesResponse, actualResponse.TransactionResults) } -// TestAccountStatusesDataProvider_InvalidArguments tests the behavior of the transaction statuses data provider +// TestTransactionStatusesDataProvider_InvalidArguments tests the behavior of the transaction statuses data provider // when invalid arguments are provided. It verifies that appropriate errors are returned // for missing or conflicting arguments. // This test covers the test cases: // 1. Invalid 'tx_id' argument. // 2. Invalid 'start_block_id' argument. -func (s *TransactionStatusesProviderSuite) TestAccountStatusesDataProvider_InvalidArguments() { +func (s *TransactionStatusesProviderSuite) TestTransactionStatusesDataProvider_InvalidArguments() { ctx := context.Background() send := make(chan interface{}) @@ -193,8 +193,10 @@ func (s *TransactionStatusesProviderSuite) TestAccountStatusesDataProvider_Inval // a set of input arguments, and the expected error message that should be returned. // // The test cases cover scenarios such as: -// 1. Providing invalid 'tx_id' value. -// 2. Providing invalid 'start_block_id' value. +// 1. Providing both 'start_block_id' and 'start_block_height' simultaneously. +// 2. Providing invalid 'tx_id' value. +// 3. Providing invalid 'start_block_id' value. +// 4. Invalid 'start_block_id' argument. func invalidTransactionStatusesArgumentsTestCases() []testErrType { return []testErrType{ { From 3e828b6b87f9e5c856661c0d25ff2d6abda83aa8 Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 23 Dec 2024 13:01:09 +0200 Subject: [PATCH 22/26] fixed small remarks --- engine/access/rest/server.go | 3 ++- .../websockets/data_providers/transaction_statuses_provider.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/engine/access/rest/server.go b/engine/access/rest/server.go index c45919725b2..98643f19638 100644 --- a/engine/access/rest/server.go +++ b/engine/access/rest/server.go @@ -57,7 +57,8 @@ func NewServer(serverAPI access.API, serverAPI, chain, stateStreamConfig.EventFilterConfig, - stateStreamConfig.HeartbeatInterval) + stateStreamConfig.HeartbeatInterval, + ) builder.AddWebsocketsRoute(chain, wsConfig, config.MaxRequestSize, dataProviderFactory) c := cors.New(cors.Options{ diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go index 7e5e993dd23..3e6fa0cc928 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go @@ -26,6 +26,7 @@ type transactionStatusesArguments struct { StartBlockHeight uint64 // Height of the block to start subscription from } +// TransactionStatusesDataProvider is responsible for providing tx statuses type TransactionStatusesDataProvider struct { *baseDataProvider From 71bac4979b8205926f3ea75539ffeb95370b0c96 Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 23 Dec 2024 16:31:12 +0200 Subject: [PATCH 23/26] Added tests for send and subscribe data provider --- .../websockets/data_providers/factory_test.go | 11 + .../send_transaction_statuses_provider.go | 2 +- ...send_transaction_statuses_provider_test.go | 232 ++++++++++++++++++ .../transaction_statuses_provider_test.go | 60 +++-- 4 files changed, 278 insertions(+), 27 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/factory_test.go b/engine/access/rest/websockets/data_providers/factory_test.go index 33389c3997e..f18455b7edd 100644 --- a/engine/access/rest/websockets/data_providers/factory_test.go +++ b/engine/access/rest/websockets/data_providers/factory_test.go @@ -143,6 +143,17 @@ func (s *DataProviderFactorySuite) TestSupportedTopics() { s.stateStreamApi.AssertExpectations(s.T()) }, }, + { + name: "send transaction statuses topic", + topic: SendTransactionStatusesTopic, + arguments: models.Arguments{}, + setupSubscription: func() { + s.setupSubscription(s.accessApi.On("SendAndSubscribeTransactionStatuses", mock.Anything, mock.Anything, mock.Anything)) + }, + assertExpectations: func() { + s.stateStreamApi.AssertExpectations(s.T()) + }, + }, } for _, test := range testCases { diff --git a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go index 8dc2a5e8dbd..c2ad0ca5937 100644 --- a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go @@ -224,7 +224,7 @@ func parseSendTransactionStatusesArguments( if envelopeSignaturesIn, ok := arguments["envelope_signatures"]; ok && envelopeSignaturesIn != "" { envelopeSignatures, ok := envelopeSignaturesIn.([]flow.TransactionSignature) if !ok { - return args, fmt.Errorf("'payload_signatures' must be an array of objects (TransactionSignature)") + return args, fmt.Errorf("'envelope_signatures' must be an array of objects (TransactionSignature)") } tx.EnvelopeSignatures = envelopeSignatures diff --git a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider_test.go index 06387b46331..ea617265d8f 100644 --- a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider_test.go @@ -1 +1,233 @@ package data_providers + +import ( + "context" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + accessmock "github.com/onflow/flow-go/access/mock" + "github.com/onflow/flow-go/engine/access/rest/websockets/models" + "github.com/onflow/flow-go/engine/access/state_stream" + ssmock "github.com/onflow/flow-go/engine/access/state_stream/mock" + "github.com/onflow/flow-go/engine/access/subscription" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" + + "github.com/onflow/flow/protobuf/go/flow/entities" +) + +type SendTransactionStatusesProviderSuite struct { + suite.Suite + + log zerolog.Logger + api *accessmock.API + + chain flow.Chain + rootBlock flow.Block + finalizedBlock *flow.Header + + factory *DataProviderFactoryImpl +} + +func TestNewSendTransactionStatusesDataProvider(t *testing.T) { + suite.Run(t, new(SendTransactionStatusesProviderSuite)) +} + +func (s *SendTransactionStatusesProviderSuite) SetupTest() { + s.log = unittest.Logger() + s.api = accessmock.NewAPI(s.T()) + + s.chain = flow.Testnet.Chain() + + s.rootBlock = unittest.BlockFixture() + s.rootBlock.Header.Height = 0 + + s.factory = NewDataProviderFactory( + s.log, + nil, + s.api, + s.chain, + state_stream.DefaultEventFilterConfig, + subscription.DefaultHeartbeatInterval) + s.Require().NotNil(s.factory) +} + +// TestSendTransactionStatusesDataProvider_HappyPath tests the behavior of the send transaction statuses data provider +// when it is configured correctly and operating under normal conditions. It +// validates that tx statuses are correctly streamed to the channel and ensures +// no unexpected errors occur. +func (s *TransactionStatusesProviderSuite) TestSendTransactionStatusesDataProvider_HappyPath() { + + sendTxStatutesTestCases := []testType{ + { + name: "SubscribeTransactionStatusesFromStartBlockID happy path", + arguments: models.Arguments{ + "start_block_id": s.rootBlock.ID().String(), + }, + setupBackend: func(sub *ssmock.Subscription) { + s.api.On( + "SendAndSubscribeTransactionStatuses", + mock.Anything, + mock.Anything, + entities.EventEncodingVersion_JSON_CDC_V0, + ).Return(sub).Once() + }, + }, + } + + expectedResponse := expectedTransactionStatusesResponse(s.rootBlock) + + testHappyPath( + s.T(), + SendTransactionStatusesTopic, + s.factory, + sendTxStatutesTestCases, + func(dataChan chan interface{}) { + for i := 0; i < len(expectedResponse); i++ { + dataChan <- expectedResponse[i] + } + }, + expectedResponse, + s.requireTransactionStatuses, + ) + +} + +// TestSendTransactionStatusesDataProvider_InvalidArguments tests the behavior of the send transaction statuses data provider +// when invalid arguments are provided. It verifies that appropriate errors are returned +// for missing or conflicting arguments. +// This test covers the test cases: +// 1. Invalid 'script' type. +// 2. Invalid 'script' value. +// 3. Invalid 'arguments' type. +// 4. Invalid 'arguments' value. +// 5. Invalid 'reference_block_id' value. +// 6. Invalid 'gas_limit' value. +// 7. Invalid 'payer' value. +// 8. Invalid 'proposal_key' value. +// 9. Invalid 'authorizers' value. +// 10. Invalid 'payload_signatures' value. +// 11. Invalid 'envelope_signatures' value. +func (s *SendTransactionStatusesProviderSuite) TestSendTransactionStatusesDataProvider_InvalidArguments() { + ctx := context.Background() + send := make(chan interface{}) + + topic := SendTransactionStatusesTopic + + for _, test := range invalidSendTransactionStatusesArgumentsTestCases() { + s.Run(test.name, func() { + provider, err := NewSendTransactionStatusesDataProvider( + ctx, + s.log, + s.api, + topic, + test.arguments, + send, + ) + s.Require().Nil(provider) + s.Require().Error(err) + s.Require().Contains(err.Error(), test.expectedErrorMsg) + }) + } +} + +// invalidSendTransactionStatusesArgumentsTestCases returns a list of test cases with invalid argument combinations +// for testing the behavior of send transaction statuses data providers. Each test case includes a name, +// a set of input arguments, and the expected error message that should be returned. +// +// The test cases cover scenarios such as: +// 1. Providing invalid 'script' type. +// 2. Providing invalid 'script' value. +// 3. Providing invalid 'arguments' type. +// 4. Providing invalid 'arguments' value. +// 5. Providing invalid 'reference_block_id' value. +// 6. Providing invalid 'gas_limit' value. +// 7. Providing invalid 'payer' value. +// 8. Providing invalid 'proposal_key' value. +// 9. Providing invalid 'authorizers' value. +// 10. Providing invalid 'payload_signatures' value. +// 11. Providing invalid 'envelope_signatures' value. +func invalidSendTransactionStatusesArgumentsTestCases() []testErrType { + return []testErrType{ + { + name: "invalid 'script' argument type", + arguments: map[string]interface{}{ + "script": 0, + }, + expectedErrorMsg: "'script' must be a string", + }, + { + name: "invalid 'script' argument", + arguments: map[string]interface{}{ + "script": "invalid_script", + }, + expectedErrorMsg: "invalid 'script': illegal base64 data ", + }, + { + name: "invalid 'arguments' type", + arguments: map[string]interface{}{ + "arguments": 0, + }, + expectedErrorMsg: "'arguments' must be a []string type", + }, + { + name: "invalid 'arguments' argument", + arguments: map[string]interface{}{ + "arguments": []string{"invalid_base64_1", "invalid_base64_2"}, + }, + expectedErrorMsg: "invalid 'arguments'", + }, + { + name: "invalid 'reference_block_id' argument", + arguments: map[string]interface{}{ + "reference_block_id": "invalid_reference_block_id", + }, + expectedErrorMsg: "invalid ID format", + }, + { + name: "invalid 'gas_limit' argument", + arguments: map[string]interface{}{ + "gas_limit": "-1", + }, + expectedErrorMsg: "value must be an unsigned 64 bit integer", + }, + { + name: "invalid 'payer' argument", + arguments: map[string]interface{}{ + "payer": "invalid_payer", + }, + expectedErrorMsg: "invalid 'payer': can not decode hex string", + }, + { + name: "invalid 'proposal_key' argument", + arguments: map[string]interface{}{ + "proposal_key": "invalid ProposalKey object", + }, + expectedErrorMsg: "'proposal_key' must be a object (ProposalKey)", + }, + { + name: "invalid 'authorizers' argument", + arguments: map[string]interface{}{ + "authorizers": []string{"invalid_base64_1", "invalid_base64_2"}, + }, + expectedErrorMsg: "invalid 'authorizers': can not decode hex string", + }, + { + name: "invalid 'payload_signatures' argument", + arguments: map[string]interface{}{ + "payload_signatures": "invalid TransactionSignature array", + }, + expectedErrorMsg: "'payload_signatures' must be an array of objects (TransactionSignature)", + }, + { + name: "invalid 'envelope_signatures' argument", + arguments: map[string]interface{}{ + "envelope_signatures": "invalid TransactionSignature array", + }, + expectedErrorMsg: "'envelope_signatures' must be an array of objects (TransactionSignature)", + }, + } +} diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go index 68e79e3b978..bfb81f82f81 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go @@ -59,28 +59,12 @@ func (s *TransactionStatusesProviderSuite) SetupTest() { s.Require().NotNil(s.factory) } +// TestTransactionStatusesDataProvider_HappyPath tests the behavior of the transaction statuses data provider +// when it is configured correctly and operating under normal conditions. It +// validates that tx statuses are correctly streamed to the channel and ensures +// no unexpected errors occur. func (s *TransactionStatusesProviderSuite) TestTransactionStatusesDataProvider_HappyPath() { - id := unittest.IdentifierFixture() - cid := unittest.IdentifierFixture() - txr := access.TransactionResult{ - Status: flow.TransactionStatusSealed, - StatusCode: 10, - Events: []flow.Event{ - unittest.EventFixture(flow.EventAccountCreated, 1, 0, id, 200), - }, - ErrorMessage: "", - BlockID: s.rootBlock.ID(), - CollectionID: cid, - BlockHeight: s.rootBlock.Header.Height, - } - - var expectedTxStatusesResponses [][]*access.TransactionResult - var expectedTxResultsResponses []*access.TransactionResult - - for i := 0; i < 2; i++ { - expectedTxResultsResponses = append(expectedTxResultsResponses, &txr) - expectedTxStatusesResponses = append(expectedTxStatusesResponses, expectedTxResultsResponses) - } + expectedResponse := expectedTransactionStatusesResponse(s.rootBlock) testHappyPath( s.T(), @@ -88,14 +72,13 @@ func (s *TransactionStatusesProviderSuite) TestTransactionStatusesDataProvider_H s.factory, s.subscribeTransactionStatusesDataProviderTestCases(), func(dataChan chan interface{}) { - for i := 0; i < len(expectedTxStatusesResponses); i++ { - dataChan <- expectedTxStatusesResponses[i] + for i := 0; i < len(expectedResponse); i++ { + dataChan <- expectedResponse[i] } }, - expectedTxStatusesResponses, + expectedResponse, s.requireTransactionStatuses, ) - } func (s *TransactionStatusesProviderSuite) subscribeTransactionStatusesDataProviderTestCases() []testType { @@ -195,7 +178,7 @@ func (s *TransactionStatusesProviderSuite) TestTransactionStatusesDataProvider_I // The test cases cover scenarios such as: // 1. Providing both 'start_block_id' and 'start_block_height' simultaneously. // 2. Providing invalid 'tx_id' value. -// 3. Providing invalid 'start_block_id' value. +// 3. Providing invalid 'start_block_id' value. // 4. Invalid 'start_block_id' argument. func invalidTransactionStatusesArgumentsTestCases() []testErrType { return []testErrType{ @@ -307,5 +290,30 @@ func (s *TransactionStatusesProviderSuite) TestMessageIndexTransactionStatusesPr // Ensure the provider is properly closed after the test provider.Close() +} + +func expectedTransactionStatusesResponse(block flow.Block) [][]*access.TransactionResult { + id := unittest.IdentifierFixture() + cid := unittest.IdentifierFixture() + txr := access.TransactionResult{ + Status: flow.TransactionStatusSealed, + StatusCode: 10, + Events: []flow.Event{ + unittest.EventFixture(flow.EventAccountCreated, 1, 0, id, 200), + }, + ErrorMessage: "", + BlockID: block.ID(), + CollectionID: cid, + BlockHeight: block.Header.Height, + } + + var expectedTxStatusesResponses [][]*access.TransactionResult + var expectedTxResultsResponses []*access.TransactionResult + + for i := 0; i < 2; i++ { + expectedTxResultsResponses = append(expectedTxResultsResponses, &txr) + expectedTxStatusesResponses = append(expectedTxStatusesResponses, expectedTxResultsResponses) + } + return expectedTxStatusesResponses } From f71edd5a056df1e78e4e22d51f44a369afa495f2 Mon Sep 17 00:00:00 2001 From: Andrii Date: Tue, 24 Dec 2024 12:42:25 +0200 Subject: [PATCH 24/26] Removed handleResponse func and using generic HandleResponse instead --- .../send_transaction_statuses_provider.go | 38 +++++++------------ .../transaction_statuses_provider.go | 36 +++++++----------- 2 files changed, 26 insertions(+), 48 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go index c2ad0ca5937..f8096a0b036 100644 --- a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go @@ -68,39 +68,27 @@ func NewSendTransactionStatusesDataProvider( // // No errors are expected during normal operations. func (p *SendTransactionStatusesDataProvider) Run() error { - return subscription.HandleSubscription(p.subscription, p.handleResponse()) -} - -// createSubscription creates a new subscription using the specified input arguments. -func (p *SendTransactionStatusesDataProvider) createSubscription( - ctx context.Context, - args sendTransactionStatusesArguments, -) subscription.Subscription { - return p.api.SendAndSubscribeTransactionStatuses(ctx, &args.Transaction, entities.EventEncodingVersion_JSON_CDC_V0) -} - -// handleResponse processes an account statuses and sends the formatted response. -// -// No errors are expected during normal operations. -func (p *SendTransactionStatusesDataProvider) handleResponse() func(txResults []*access.TransactionResult) error { - messageIndex := counters.NewMonotonousCounter(0) - return func(txResults []*access.TransactionResult) error { - + return subscription.HandleSubscription(p.subscription, subscription.HandleResponse(p.send, func(txResults []*access.TransactionResult) (interface{}, error) { + index := messageIndex.Value() if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { - return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + return nil, status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) } - index := messageIndex.Value() - p.send <- &models.TransactionStatusesResponse{ + return &models.TransactionStatusesResponse{ TransactionResults: txResults, MessageIndex: index, - } - - return nil - } + }, nil + })) +} +// createSubscription creates a new subscription using the specified input arguments. +func (p *SendTransactionStatusesDataProvider) createSubscription( + ctx context.Context, + args sendTransactionStatusesArguments, +) subscription.Subscription { + return p.api.SendAndSubscribeTransactionStatuses(ctx, &args.Transaction, entities.EventEncodingVersion_JSON_CDC_V0) } // parseAccountStatusesArguments validates and initializes the account statuses arguments. diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go index 3e6fa0cc928..f98acf61d16 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go @@ -71,7 +71,19 @@ func NewTransactionStatusesDataProvider( // // No errors are expected during normal operations. func (p *TransactionStatusesDataProvider) Run() error { - return subscription.HandleSubscription(p.subscription, p.handleResponse()) + messageIndex := counters.NewMonotonousCounter(0) + + return subscription.HandleSubscription(p.subscription, subscription.HandleResponse(p.send, func(txResults []*access.TransactionResult) (interface{}, error) { + index := messageIndex.Value() + if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { + return nil, status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } + + return &models.TransactionStatusesResponse{ + TransactionResults: txResults, + MessageIndex: index, + }, nil + })) } // createSubscription creates a new subscription using the specified input arguments. @@ -90,28 +102,6 @@ func (p *TransactionStatusesDataProvider) createSubscription( return p.api.SubscribeTransactionStatusesFromLatest(ctx, args.TxID, entities.EventEncodingVersion_JSON_CDC_V0) } -// handleResponse processes an account statuses and sends the formatted response. -// -// No errors are expected during normal operations. -func (p *TransactionStatusesDataProvider) handleResponse() func(txResults []*access.TransactionResult) error { - messageIndex := counters.NewMonotonousCounter(0) - - return func(txResults []*access.TransactionResult) error { - - index := messageIndex.Value() - if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { - return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) - } - - p.send <- &models.TransactionStatusesResponse{ - TransactionResults: txResults, - MessageIndex: index, - } - - return nil - } -} - // parseAccountStatusesArguments validates and initializes the account statuses arguments. func parseTransactionStatusesArguments( arguments models.Arguments, From 74c8c6da0a58c57e091cbba907a638c09d658910 Mon Sep 17 00:00:00 2001 From: Andrii Date: Thu, 26 Dec 2024 16:50:06 +0200 Subject: [PATCH 25/26] Renamed topic, deleted redundant godoc --- .../rest/websockets/data_providers/factory.go | 16 ++++++++-------- .../websockets/data_providers/factory_test.go | 2 +- .../send_transaction_statuses_provider_test.go | 16 ++-------------- 3 files changed, 11 insertions(+), 23 deletions(-) diff --git a/engine/access/rest/websockets/data_providers/factory.go b/engine/access/rest/websockets/data_providers/factory.go index ff23708d337..43161694122 100644 --- a/engine/access/rest/websockets/data_providers/factory.go +++ b/engine/access/rest/websockets/data_providers/factory.go @@ -15,13 +15,13 @@ import ( // Constants defining various topic names used to specify different types of // data providers. const ( - EventsTopic = "events" - AccountStatusesTopic = "account_statuses" - BlocksTopic = "blocks" - BlockHeadersTopic = "block_headers" - BlockDigestsTopic = "block_digests" - TransactionStatusesTopic = "transaction_statuses" - SendTransactionStatusesTopic = "send_transaction_statuses" + EventsTopic = "events" + AccountStatusesTopic = "account_statuses" + BlocksTopic = "blocks" + BlockHeadersTopic = "block_headers" + BlockDigestsTopic = "block_digests" + TransactionStatusesTopic = "transaction_statuses" + SendAndGetTransactionStatusesTopic = "send_and_get_transaction_statuses" ) // DataProviderFactory defines an interface for creating data providers @@ -105,7 +105,7 @@ func (s *DataProviderFactoryImpl) NewDataProvider( return NewAccountStatusesDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval) case TransactionStatusesTopic: return NewTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) - case SendTransactionStatusesTopic: + case SendAndGetTransactionStatusesTopic: return NewSendTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) default: return nil, fmt.Errorf("unsupported topic \"%s\"", topic) diff --git a/engine/access/rest/websockets/data_providers/factory_test.go b/engine/access/rest/websockets/data_providers/factory_test.go index f18455b7edd..ca418e494c7 100644 --- a/engine/access/rest/websockets/data_providers/factory_test.go +++ b/engine/access/rest/websockets/data_providers/factory_test.go @@ -145,7 +145,7 @@ func (s *DataProviderFactorySuite) TestSupportedTopics() { }, { name: "send transaction statuses topic", - topic: SendTransactionStatusesTopic, + topic: SendAndGetTransactionStatusesTopic, arguments: models.Arguments{}, setupSubscription: func() { s.setupSubscription(s.accessApi.On("SendAndSubscribeTransactionStatuses", mock.Anything, mock.Anything, mock.Anything)) diff --git a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider_test.go index ea617265d8f..3636df122b3 100644 --- a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider_test.go @@ -82,7 +82,7 @@ func (s *TransactionStatusesProviderSuite) TestSendTransactionStatusesDataProvid testHappyPath( s.T(), - SendTransactionStatusesTopic, + SendAndGetTransactionStatusesTopic, s.factory, sendTxStatutesTestCases, func(dataChan chan interface{}) { @@ -99,23 +99,11 @@ func (s *TransactionStatusesProviderSuite) TestSendTransactionStatusesDataProvid // TestSendTransactionStatusesDataProvider_InvalidArguments tests the behavior of the send transaction statuses data provider // when invalid arguments are provided. It verifies that appropriate errors are returned // for missing or conflicting arguments. -// This test covers the test cases: -// 1. Invalid 'script' type. -// 2. Invalid 'script' value. -// 3. Invalid 'arguments' type. -// 4. Invalid 'arguments' value. -// 5. Invalid 'reference_block_id' value. -// 6. Invalid 'gas_limit' value. -// 7. Invalid 'payer' value. -// 8. Invalid 'proposal_key' value. -// 9. Invalid 'authorizers' value. -// 10. Invalid 'payload_signatures' value. -// 11. Invalid 'envelope_signatures' value. func (s *SendTransactionStatusesProviderSuite) TestSendTransactionStatusesDataProvider_InvalidArguments() { ctx := context.Background() send := make(chan interface{}) - topic := SendTransactionStatusesTopic + topic := SendAndGetTransactionStatusesTopic for _, test := range invalidSendTransactionStatusesArgumentsTestCases() { s.Run(test.name, func() { From c4b1d0aad22d24f67948f24f77e6e085b399b0fa Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 27 Dec 2024 14:34:37 +0200 Subject: [PATCH 26/26] Fixed worng impl for tx statuses data privoders, fixed tests< renamed data provider --- .../rest/websockets/data_providers/factory.go | 2 +- ..._and_get_transaction_statuses_provider.go} | 68 +++++++++++-------- ...get_transaction_statuses_provider_test.go} | 26 +++++-- .../transaction_statuses_provider.go | 38 +++++++---- .../transaction_statuses_provider_test.go | 30 ++++---- .../websockets/models/tx_statuses_model.go | 4 +- 6 files changed, 103 insertions(+), 65 deletions(-) rename engine/access/rest/websockets/data_providers/{send_transaction_statuses_provider.go => send_and_get_transaction_statuses_provider.go} (74%) rename engine/access/rest/websockets/data_providers/{send_transaction_statuses_provider_test.go => send_and_get_transaction_statuses_provider_test.go} (86%) diff --git a/engine/access/rest/websockets/data_providers/factory.go b/engine/access/rest/websockets/data_providers/factory.go index 43161694122..6892c828123 100644 --- a/engine/access/rest/websockets/data_providers/factory.go +++ b/engine/access/rest/websockets/data_providers/factory.go @@ -106,7 +106,7 @@ func (s *DataProviderFactoryImpl) NewDataProvider( case TransactionStatusesTopic: return NewTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) case SendAndGetTransactionStatusesTopic: - return NewSendTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) + return NewSendAndGetTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) default: return nil, fmt.Errorf("unsupported topic \"%s\"", topic) } diff --git a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go similarity index 74% rename from engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go rename to engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go index f8096a0b036..f6db73ac4e0 100644 --- a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go @@ -19,35 +19,35 @@ import ( "github.com/onflow/flow/protobuf/go/flow/entities" ) -// sendTransactionStatusesArguments contains the arguments required for sending tx and subscribing to transaction statuses -type sendTransactionStatusesArguments struct { +// sendAndGetTransactionStatusesArguments contains the arguments required for sending tx and subscribing to transaction statuses +type sendAndGetTransactionStatusesArguments struct { Transaction flow.TransactionBody // The transaction body to be sent and monitored. } -type SendTransactionStatusesDataProvider struct { +type SendAndGetTransactionStatusesDataProvider struct { *baseDataProvider logger zerolog.Logger api access.API } -var _ DataProvider = (*SendTransactionStatusesDataProvider)(nil) +var _ DataProvider = (*SendAndGetTransactionStatusesDataProvider)(nil) -func NewSendTransactionStatusesDataProvider( +func NewSendAndGetTransactionStatusesDataProvider( ctx context.Context, logger zerolog.Logger, api access.API, topic string, arguments models.Arguments, send chan<- interface{}, -) (*SendTransactionStatusesDataProvider, error) { - p := &SendTransactionStatusesDataProvider{ +) (*SendAndGetTransactionStatusesDataProvider, error) { + p := &SendAndGetTransactionStatusesDataProvider{ logger: logger.With().Str("component", "send-transaction-statuses-data-provider").Logger(), api: api, } // Initialize arguments passed to the provider. - sendTxStatusesArgs, err := parseSendTransactionStatusesArguments(arguments) + sendTxStatusesArgs, err := parseSendAndGetTransactionStatusesArguments(arguments) if err != nil { return nil, fmt.Errorf("invalid arguments for send tx statuses data provider: %w", err) } @@ -67,35 +67,47 @@ func NewSendTransactionStatusesDataProvider( // Run starts processing the subscription for events and handles responses. // // No errors are expected during normal operations. -func (p *SendTransactionStatusesDataProvider) Run() error { - messageIndex := counters.NewMonotonousCounter(0) - - return subscription.HandleSubscription(p.subscription, subscription.HandleResponse(p.send, func(txResults []*access.TransactionResult) (interface{}, error) { - index := messageIndex.Value() - if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { - return nil, status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) - } - - return &models.TransactionStatusesResponse{ - TransactionResults: txResults, - MessageIndex: index, - }, nil - })) +func (p *SendAndGetTransactionStatusesDataProvider) Run() error { + return subscription.HandleSubscription(p.subscription, p.handleResponse()) } // createSubscription creates a new subscription using the specified input arguments. -func (p *SendTransactionStatusesDataProvider) createSubscription( +func (p *SendAndGetTransactionStatusesDataProvider) createSubscription( ctx context.Context, - args sendTransactionStatusesArguments, + args sendAndGetTransactionStatusesArguments, ) subscription.Subscription { return p.api.SendAndSubscribeTransactionStatuses(ctx, &args.Transaction, entities.EventEncodingVersion_JSON_CDC_V0) } -// parseAccountStatusesArguments validates and initializes the account statuses arguments. -func parseSendTransactionStatusesArguments( +// handleResponse processes a tx statuses and sends the formatted response. +// +// No errors are expected during normal operations. +func (p *SendAndGetTransactionStatusesDataProvider) handleResponse() func(txResults []*access.TransactionResult) error { + messageIndex := counters.NewMonotonousCounter(0) + + return func(txResults []*access.TransactionResult) error { + + for i := range txResults { + index := messageIndex.Value() + if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } + + p.send <- &models.TransactionStatusesResponse{ + TransactionResult: txResults[i], + MessageIndex: index, + } + } + + return nil + } +} + +// parseSendAndGetTransactionStatusesArguments validates and initializes the account statuses arguments. +func parseSendAndGetTransactionStatusesArguments( arguments models.Arguments, -) (sendTransactionStatusesArguments, error) { - var args sendTransactionStatusesArguments +) (sendAndGetTransactionStatusesArguments, error) { + var args sendAndGetTransactionStatusesArguments var tx flow.TransactionBody if scriptIn, ok := arguments["script"]; ok && scriptIn != "" { diff --git a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider_test.go similarity index 86% rename from engine/access/rest/websockets/data_providers/send_transaction_statuses_provider_test.go rename to engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider_test.go index 3636df122b3..1776d7a873a 100644 --- a/engine/access/rest/websockets/data_providers/send_transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider_test.go @@ -6,8 +6,10 @@ import ( "github.com/rs/zerolog" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/onflow/flow-go/access" accessmock "github.com/onflow/flow-go/access/mock" "github.com/onflow/flow-go/engine/access/rest/websockets/models" "github.com/onflow/flow-go/engine/access/state_stream" @@ -59,7 +61,7 @@ func (s *SendTransactionStatusesProviderSuite) SetupTest() { // when it is configured correctly and operating under normal conditions. It // validates that tx statuses are correctly streamed to the channel and ensures // no unexpected errors occur. -func (s *TransactionStatusesProviderSuite) TestSendTransactionStatusesDataProvider_HappyPath() { +func (s *SendTransactionStatusesProviderSuite) TestSendTransactionStatusesDataProvider_HappyPath() { sendTxStatutesTestCases := []testType{ { @@ -86,9 +88,7 @@ func (s *TransactionStatusesProviderSuite) TestSendTransactionStatusesDataProvid s.factory, sendTxStatutesTestCases, func(dataChan chan interface{}) { - for i := 0; i < len(expectedResponse); i++ { - dataChan <- expectedResponse[i] - } + dataChan <- expectedResponse }, expectedResponse, s.requireTransactionStatuses, @@ -96,6 +96,22 @@ func (s *TransactionStatusesProviderSuite) TestSendTransactionStatusesDataProvid } +// requireTransactionStatuses ensures that the received transaction statuses information matches the expected data. +func (s *SendTransactionStatusesProviderSuite) requireTransactionStatuses( + v interface{}, + expectedResponse interface{}, +) { + expectedTxStatusesResponse, ok := expectedResponse.(*access.TransactionResult) + require.True(s.T(), ok, "unexpected type: %T", expectedResponse) + + actualResponse, ok := v.(*models.TransactionStatusesResponse) + require.True(s.T(), ok, "Expected *models.TransactionStatusesResponse, got %T", v) + + require.Equal(s.T(), expectedTxStatusesResponse.BlockID, actualResponse.TransactionResult.BlockID) + require.Equal(s.T(), expectedTxStatusesResponse.BlockHeight, actualResponse.TransactionResult.BlockHeight) + +} + // TestSendTransactionStatusesDataProvider_InvalidArguments tests the behavior of the send transaction statuses data provider // when invalid arguments are provided. It verifies that appropriate errors are returned // for missing or conflicting arguments. @@ -107,7 +123,7 @@ func (s *SendTransactionStatusesProviderSuite) TestSendTransactionStatusesDataPr for _, test := range invalidSendTransactionStatusesArgumentsTestCases() { s.Run(test.name, func() { - provider, err := NewSendTransactionStatusesDataProvider( + provider, err := NewSendAndGetTransactionStatusesDataProvider( ctx, s.log, s.api, diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go index f98acf61d16..3b75bde006a 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go @@ -71,19 +71,7 @@ func NewTransactionStatusesDataProvider( // // No errors are expected during normal operations. func (p *TransactionStatusesDataProvider) Run() error { - messageIndex := counters.NewMonotonousCounter(0) - - return subscription.HandleSubscription(p.subscription, subscription.HandleResponse(p.send, func(txResults []*access.TransactionResult) (interface{}, error) { - index := messageIndex.Value() - if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { - return nil, status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) - } - - return &models.TransactionStatusesResponse{ - TransactionResults: txResults, - MessageIndex: index, - }, nil - })) + return subscription.HandleSubscription(p.subscription, p.handleResponse()) } // createSubscription creates a new subscription using the specified input arguments. @@ -102,6 +90,30 @@ func (p *TransactionStatusesDataProvider) createSubscription( return p.api.SubscribeTransactionStatusesFromLatest(ctx, args.TxID, entities.EventEncodingVersion_JSON_CDC_V0) } +// handleResponse processes a tx statuses and sends the formatted response. +// +// No errors are expected during normal operations. +func (p *TransactionStatusesDataProvider) handleResponse() func(txResults []*access.TransactionResult) error { + messageIndex := counters.NewMonotonousCounter(0) + + return func(txResults []*access.TransactionResult) error { + + for i := range txResults { + index := messageIndex.Value() + if ok := messageIndex.Set(messageIndex.Value() + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } + + p.send <- &models.TransactionStatusesResponse{ + TransactionResult: txResults[i], + MessageIndex: index, + } + } + + return nil + } +} + // parseAccountStatusesArguments validates and initializes the account statuses arguments. func parseTransactionStatusesArguments( arguments models.Arguments, diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go index bfb81f82f81..c045553f4bc 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go @@ -72,9 +72,7 @@ func (s *TransactionStatusesProviderSuite) TestTransactionStatusesDataProvider_H s.factory, s.subscribeTransactionStatusesDataProviderTestCases(), func(dataChan chan interface{}) { - for i := 0; i < len(expectedResponse); i++ { - dataChan <- expectedResponse[i] - } + dataChan <- expectedResponse }, expectedResponse, s.requireTransactionStatuses, @@ -133,21 +131,19 @@ func (s *TransactionStatusesProviderSuite) requireTransactionStatuses( v interface{}, expectedResponse interface{}, ) { - expectedAccountStatusesResponse, ok := expectedResponse.([]*access.TransactionResult) + expectedTxStatusesResponse, ok := expectedResponse.(*access.TransactionResult) require.True(s.T(), ok, "unexpected type: %T", expectedResponse) actualResponse, ok := v.(*models.TransactionStatusesResponse) require.True(s.T(), ok, "Expected *models.TransactionStatusesResponse, got %T", v) - s.Require().ElementsMatch(expectedAccountStatusesResponse, actualResponse.TransactionResults) + require.Equal(s.T(), expectedTxStatusesResponse.BlockID, actualResponse.TransactionResult.BlockID) + require.Equal(s.T(), expectedTxStatusesResponse.BlockHeight, actualResponse.TransactionResult.BlockHeight) } // TestTransactionStatusesDataProvider_InvalidArguments tests the behavior of the transaction statuses data provider // when invalid arguments are provided. It verifies that appropriate errors are returned // for missing or conflicting arguments. -// This test covers the test cases: -// 1. Invalid 'tx_id' argument. -// 2. Invalid 'start_block_id' argument. func (s *TransactionStatusesProviderSuite) TestTransactionStatusesDataProvider_InvalidArguments() { ctx := context.Background() send := make(chan interface{}) @@ -260,13 +256,17 @@ func (s *TransactionStatusesProviderSuite) TestMessageIndexTransactionStatusesPr s.Require().NoError(err) }() - // Simulate emitting data to the еч statuses channel + // Simulate emitting data to the tx statuses channel + var txResults []*access.TransactionResult + for i := 0; i < txStatusesCount; i++ { + txResults = append(txResults, &access.TransactionResult{ + BlockHeight: s.rootBlock.Header.Height, + }) + } go func() { defer close(txStatusesChan) // Close the channel when done - for i := 0; i < txStatusesCount; i++ { - txStatusesChan <- []*access.TransactionResult{} - } + txStatusesChan <- txResults }() // Collect responses @@ -292,7 +292,7 @@ func (s *TransactionStatusesProviderSuite) TestMessageIndexTransactionStatusesPr provider.Close() } -func expectedTransactionStatusesResponse(block flow.Block) [][]*access.TransactionResult { +func expectedTransactionStatusesResponse(block flow.Block) []*access.TransactionResult { id := unittest.IdentifierFixture() cid := unittest.IdentifierFixture() txr := access.TransactionResult{ @@ -307,13 +307,11 @@ func expectedTransactionStatusesResponse(block flow.Block) [][]*access.Transacti BlockHeight: block.Header.Height, } - var expectedTxStatusesResponses [][]*access.TransactionResult var expectedTxResultsResponses []*access.TransactionResult for i := 0; i < 2; i++ { expectedTxResultsResponses = append(expectedTxResultsResponses, &txr) - expectedTxStatusesResponses = append(expectedTxStatusesResponses, expectedTxResultsResponses) } - return expectedTxStatusesResponses + return expectedTxResultsResponses } diff --git a/engine/access/rest/websockets/models/tx_statuses_model.go b/engine/access/rest/websockets/models/tx_statuses_model.go index 32754a06603..338ba4216d9 100644 --- a/engine/access/rest/websockets/models/tx_statuses_model.go +++ b/engine/access/rest/websockets/models/tx_statuses_model.go @@ -6,6 +6,6 @@ import ( // TransactionStatusesResponse is the response message for 'events' topic. type TransactionStatusesResponse struct { - TransactionResults []*access.TransactionResult `json:"transaction_results"` - MessageIndex uint64 `json:"message_index"` + TransactionResult *access.TransactionResult `json:"transaction_result"` + MessageIndex uint64 `json:"message_index"` }