diff --git a/packages/sdk/src/StreamrClient.ts b/packages/sdk/src/StreamrClient.ts index 6e2b3902f6..ed8e352236 100644 --- a/packages/sdk/src/StreamrClient.ts +++ b/packages/sdk/src/StreamrClient.ts @@ -28,6 +28,7 @@ import { Stream } from './Stream' import { StreamIDBuilder } from './StreamIDBuilder' import { StreamMetadata, getPartitionCount } from './StreamMetadata' import { StreamrClientError } from './StreamrClientError' +import { ChainEventPoller } from './contracts/ChainEventPoller' import { ContractFactory } from './contracts/ContractFactory' import { Operator } from './contracts/Operator' import { OperatorRegistry } from './contracts/OperatorRegistry' @@ -48,11 +49,11 @@ import { Subscription, SubscriptionEvents } from './subscribe/Subscription' import { initResendSubscription } from './subscribe/resendSubscription' import { waitForStorage } from './subscribe/waitForStorage' import { StreamDefinition } from './types' +import { map } from './utils/GeneratorUtils' import { LoggerFactory } from './utils/LoggerFactory' +import { addStreamToStorageNode } from './utils/addStreamToStorageNode' import { pOnce } from './utils/promises' import { convertPeerDescriptorToNetworkPeerDescriptor, createTheGraphClient } from './utils/utils' -import { addStreamToStorageNode } from './utils/addStreamToStorageNode' -import { map } from './utils/GeneratorUtils' // TODO: this type only exists to enable tsdoc to generate proper documentation export type SubscribeOptions = StreamDefinition & ExtraSubscribeOptions @@ -96,6 +97,7 @@ export class StreamrClient { private readonly operatorRegistry: OperatorRegistry private readonly contractFactory: ContractFactory private readonly localGroupKeyStore: LocalGroupKeyStore + private readonly chainEventPoller: ChainEventPoller private readonly theGraphClient: TheGraphClient private readonly streamIdBuilder: StreamIDBuilder private readonly config: StrictStreamrClientConfig @@ -132,6 +134,7 @@ export class StreamrClient { this.operatorRegistry = container.resolve(OperatorRegistry) this.contractFactory = container.resolve(ContractFactory) this.localGroupKeyStore = container.resolve(LocalGroupKeyStore) + this.chainEventPoller = container.resolve(ChainEventPoller) this.streamIdBuilder = container.resolve(StreamIDBuilder) this.eventEmitter = container.resolve(StreamrClientEventEmitter) this.destroySignal = container.resolve(DestroySignal) @@ -770,12 +773,12 @@ export class StreamrClient { operatorContractAddress, this.contractFactory, this.rpcProviderSource, + this.chainEventPoller, this.theGraphClient, this.authentication, this.destroySignal, this.loggerFactory, - () => this.getEthersOverrides(), - this.config.contracts.pollInterval + () => this.getEthersOverrides() ) } diff --git a/packages/sdk/src/contracts/ChainEventPoller.ts b/packages/sdk/src/contracts/ChainEventPoller.ts index 5d87d01c8c..f335d196ab 100644 --- a/packages/sdk/src/contracts/ChainEventPoller.ts +++ b/packages/sdk/src/contracts/ChainEventPoller.ts @@ -1,39 +1,51 @@ -import { Logger, Multimap, randomString, scheduleAtInterval, wait } from '@streamr/utils' -import { Contract, EventLog, Provider } from 'ethers' -import { sample } from 'lodash' - -type EventName = string -type Listener = (...args: any[]) => void +import { EthereumAddress, Logger, randomString, scheduleAtInterval, toEthereumAddress, wait } from '@streamr/utils' +import { AbstractProvider, EventFragment, Interface } from 'ethers' +import { remove, sample, uniq } from 'lodash' +import { inject, Lifecycle, scoped } from 'tsyringe' +import { ConfigInjectionToken, StrictStreamrClientConfig } from '../Config' +import { RpcProviderSource } from '../RpcProviderSource' + +export interface EventListenerDefinition { + onEvent: (...args: any[]) => void + contractInterfaceFragment: EventFragment + contractAddress: EthereumAddress +} const BLOCK_NUMBER_QUERY_RETRY_DELAY = 1000 export const POLLS_SINCE_LAST_FROM_BLOCK_UPDATE_THRESHOLD = 30 +@scoped(Lifecycle.ContainerScoped) export class ChainEventPoller { - private listeners: Multimap = new Multimap() - private abortController?: AbortController - private contracts: Contract[] + private listeners: EventListenerDefinition[] = [] + private providers: AbstractProvider[] private pollInterval: number + private abortController?: AbortController - // all these contracts are actually the same chain contract (i.e. StreamRegistry), but have different providers - // connected to them - constructor(contracts: Contract[], pollInterval: number) { - this.contracts = contracts - this.pollInterval = pollInterval + constructor( + rpcProviderSource: RpcProviderSource, + @inject(ConfigInjectionToken) config: Pick + ) { + this.providers = rpcProviderSource.getSubProviders() + this.pollInterval = config.contracts.pollInterval } - on(eventName: string, listener: Listener): void { - const started = !this.listeners.isEmpty() - this.listeners.add(eventName, listener) + on(definition: EventListenerDefinition): void { + const started = this.listeners.length > 0 + this.listeners.push(definition) if (!started) { this.start() } } - off(eventName: string, listener: Listener): void { - const started = !this.listeners.isEmpty() - this.listeners.remove(eventName, listener) - if (started && this.listeners.isEmpty()) { + off(definition: EventListenerDefinition): void { + const started = this.listeners.length > 0 + remove(this.listeners, (l) => { + return (l.contractAddress === definition.contractAddress) + && (l.contractInterfaceFragment.topicHash === definition.contractInterfaceFragment.topicHash) + && (l.onEvent == definition.onEvent) + }) + if (started && this.listeners.length === 0) { this.abortController!.abort() } } @@ -48,7 +60,7 @@ export class ChainEventPoller { let fromBlock: number | undefined = undefined do { try { - fromBlock = await sample(this.getProviders())!.getBlockNumber() + fromBlock = await sample(this.providers)!.getBlockNumber() } catch (err) { logger.debug('Failed to query block number', { err }) await wait(BLOCK_NUMBER_QUERY_RETRY_DELAY) // TODO: pass signal? @@ -57,25 +69,46 @@ export class ChainEventPoller { let pollsSinceFromBlockUpdate = 0 await scheduleAtInterval(async () => { - const contract = sample(this.contracts)! - const eventNames = [...this.listeners.keys()] + const provider = sample(this.providers)! + const eventNames = this.listeners.map((l) => l.contractInterfaceFragment.name) let newFromBlock = 0 - let events: EventLog[] | undefined = undefined + let events: { contractAddress: EthereumAddress, name: string, args: any[], blockNumber: number }[] | undefined = undefined try { // If we haven't updated `fromBlock` for a while, fetch the latest block number explicitly. If // `fromBlock` falls too much behind the current block number, the RPCs may start rejecting our // eth_getLogs requests (presumably for performance reasons). if (pollsSinceFromBlockUpdate >= POLLS_SINCE_LAST_FROM_BLOCK_UPDATE_THRESHOLD) { - newFromBlock = await contract.runner!.provider!.getBlockNumber() + 1 + newFromBlock = await provider.getBlockNumber() + 1 logger.debug('Fetch next block number explicitly', { newFromBlock } ) if (abortController.signal.aborted) { return } } - logger.debug('Polling', { fromBlock, eventNames }) - events = await contract.queryFilter([eventNames], fromBlock) as EventLog[] + const filter = { + address: uniq(this.listeners.map((l) => l.contractAddress)), + topics: [uniq(this.listeners.map((l) => l.contractInterfaceFragment.topicHash))], + fromBlock + } + const logItems = await provider.getLogs(filter) + events = [] + for (const logItem of logItems) { + const definition = this.listeners.find((l) => { + return (l.contractAddress === toEthereumAddress(logItem.address)) + && (l.contractInterfaceFragment.topicHash === logItem.topics[0]) + }) + if (definition !== undefined) { + const contractInterface = new Interface([definition.contractInterfaceFragment.format('minimal')]) + const args = contractInterface.decodeEventLog(definition.contractInterfaceFragment.name, logItem.data, logItem.topics) + events.push({ + contractAddress: definition.contractAddress, + name: definition.contractInterfaceFragment.name, + args, + blockNumber: logItem.blockNumber + }) + } + } logger.debug('Polled', { fromBlock, events: events.length }) } catch (err) { logger.debug('Failed to poll', { reason: err?.reason, eventNames, fromBlock }) @@ -87,9 +120,11 @@ export class ChainEventPoller { if (events !== undefined && events.length > 0) { for (const event of events) { - const listeners = this.listeners.get(event.fragment.name) + const listeners = this.listeners.filter( + (l) => (l.contractAddress === event.contractAddress) && (l.contractInterfaceFragment.name === event.name) + ) for (const listener of listeners) { - listener(...event.args, event.blockNumber) + listener.onEvent(...event.args, event.blockNumber) } } newFromBlock = Math.max(...events.map((e) => e.blockNumber)) + 1 @@ -108,8 +143,4 @@ export class ChainEventPoller { }, this.pollInterval, true, abortController.signal) }) } - - private getProviders(): Provider[] { - return this.contracts.map((c) => c.runner!.provider!) - } } diff --git a/packages/sdk/src/contracts/Operator.ts b/packages/sdk/src/contracts/Operator.ts index 9ac45a5490..7b71628b24 100644 --- a/packages/sdk/src/contracts/Operator.ts +++ b/packages/sdk/src/contracts/Operator.ts @@ -4,7 +4,7 @@ import { ObservableEventEmitter, StreamID, TheGraphClient, collect, ensureValidStreamPartitionIndex, toEthereumAddress, toStreamID } from '@streamr/utils' -import { Overrides } from 'ethers' +import { Interface, Overrides } from 'ethers' import { z } from 'zod' import { Authentication } from '../Authentication' import { DestroySignal } from '../DestroySignal' @@ -150,12 +150,12 @@ export class Operator { contractAddress: EthereumAddress, contractFactory: ContractFactory, rpcProviderSource: RpcProviderSource, + chainEventPoller: ChainEventPoller, theGraphClient: TheGraphClient, authentication: Authentication, destroySignal: DestroySignal, loggerFactory: LoggerFactory, getEthersOverrides: () => Promise, - eventPollInterval: number ) { this.contractAddress = contractAddress this.contractFactory = contractFactory @@ -169,7 +169,7 @@ export class Operator { this.theGraphClient = theGraphClient this.authentication = authentication this.getEthersOverrides = getEthersOverrides - this.initEventGateways(contractAddress, loggerFactory, eventPollInterval) + this.initEventGateways(contractAddress, chainEventPoller, loggerFactory) destroySignal.onDestroy.listen(() => { this.eventEmitter.removeAllListeners() }) @@ -177,17 +177,18 @@ export class Operator { private initEventGateways( contractAddress: EthereumAddress, - loggerFactory: LoggerFactory, - eventPollInterval: number + chainEventPoller: ChainEventPoller, + loggerFactory: LoggerFactory ): void { - const chainEventPoller = new ChainEventPoller(this.rpcProviderSource.getSubProviders().map((p) => { - return this.contractFactory.createEventContract(contractAddress, OperatorArtifact, p) - }), eventPollInterval) + const contractInterface = new Interface(OperatorArtifact) const stakeEventTransformation = (sponsorship: string) => ({ sponsorship: toEthereumAddress(sponsorship) }) initContractEventGateway({ - sourceName: 'Staked', + sourceDefinition: { + contractInterfaceFragment: contractInterface.getEvent('Staked')!, + contractAddress + }, sourceEmitter: chainEventPoller, targetName: 'staked', targetEmitter: this.eventEmitter, @@ -195,7 +196,10 @@ export class Operator { loggerFactory }) initContractEventGateway({ - sourceName: 'Unstaked', + sourceDefinition: { + contractInterfaceFragment: contractInterface.getEvent('Unstaked')!, + contractAddress + }, sourceEmitter: chainEventPoller, targetName: 'unstaked', targetEmitter: this.eventEmitter, @@ -219,7 +223,10 @@ export class Operator { } } initContractEventGateway({ - sourceName: 'ReviewRequest', + sourceDefinition: { + contractInterfaceFragment: contractInterface.getEvent('ReviewRequest')!, + contractAddress + }, sourceEmitter: chainEventPoller, targetName: 'reviewRequested', targetEmitter: this.eventEmitter, diff --git a/packages/sdk/src/contracts/StreamRegistry.ts b/packages/sdk/src/contracts/StreamRegistry.ts index 2f7b4b0ef8..7a8c6b4a28 100644 --- a/packages/sdk/src/contracts/StreamRegistry.ts +++ b/packages/sdk/src/contracts/StreamRegistry.ts @@ -14,7 +14,7 @@ import { toUserId, until } from '@streamr/utils' -import { ContractTransactionResponse } from 'ethers' +import { ContractTransactionResponse, Interface } from 'ethers' import { intersection } from 'lodash' import { Lifecycle, inject, scoped } from 'tsyringe' import { Authentication, AuthenticationInjectionToken } from '../Authentication' @@ -134,6 +134,7 @@ export class StreamRegistry { constructor( contractFactory: ContractFactory, rpcProviderSource: RpcProviderSource, + chainEventPoller: ChainEventPoller, theGraphClient: TheGraphClient, streamIdBuilder: StreamIDBuilder, @inject(ConfigInjectionToken) config: Pick, @@ -154,11 +155,11 @@ export class StreamRegistry { this.rpcProviderSource.getProvider(), 'streamRegistry' ) - const chainEventPoller = new ChainEventPoller(this.rpcProviderSource.getSubProviders().map((p) => { - return contractFactory.createEventContract(toEthereumAddress(this.config.contracts.streamRegistryChainAddress), StreamRegistryArtifact, p) - }), config.contracts.pollInterval) initContractEventGateway({ - sourceName: 'StreamCreated', + sourceDefinition: { + contractInterfaceFragment: new Interface(StreamRegistryArtifact).getEvent('StreamCreated')!, + contractAddress: toEthereumAddress(this.config.contracts.streamRegistryChainAddress) + }, sourceEmitter: chainEventPoller, targetName: 'streamCreated', targetEmitter: eventEmitter, diff --git a/packages/sdk/src/contracts/StreamStorageRegistry.ts b/packages/sdk/src/contracts/StreamStorageRegistry.ts index 9bb3e9aa99..909b6f13a5 100644 --- a/packages/sdk/src/contracts/StreamStorageRegistry.ts +++ b/packages/sdk/src/contracts/StreamStorageRegistry.ts @@ -1,4 +1,5 @@ import { EthereumAddress, Logger, StreamID, TheGraphClient, collect, toEthereumAddress, toStreamID } from '@streamr/utils' +import { Interface } from 'ethers' import min from 'lodash/min' import { Lifecycle, inject, scoped } from 'tsyringe' import { Authentication, AuthenticationInjectionToken } from '../Authentication' @@ -11,10 +12,10 @@ import StreamStorageRegistryArtifact from '../ethereumArtifacts/StreamStorageReg import { getEthersOverrides } from '../ethereumUtils' import { StreamrClientEventEmitter } from '../events' import { LoggerFactory } from '../utils/LoggerFactory' +import { Mapping, createCacheMap } from '../utils/Mapping' import { ChainEventPoller } from './ChainEventPoller' import { ContractFactory } from './ContractFactory' import { initContractEventGateway, waitForTx } from './contract' -import { createCacheMap, Mapping } from '../utils/Mapping' export interface StorageNodeAssignmentEvent { readonly streamId: StreamID @@ -51,6 +52,7 @@ export class StreamStorageRegistry { streamIdBuilder: StreamIDBuilder, contractFactory: ContractFactory, rpcProviderSource: RpcProviderSource, + chainEventPoller: ChainEventPoller, theGraphClient: TheGraphClient, @inject(ConfigInjectionToken) config: Pick, @inject(AuthenticationInjectionToken) authentication: Authentication, @@ -70,13 +72,6 @@ export class StreamStorageRegistry { rpcProviderSource.getProvider(), 'streamStorageRegistry' ) as StreamStorageRegistryContract - const chainEventPoller = new ChainEventPoller(this.rpcProviderSource.getSubProviders().map((p) => { - return contractFactory.createEventContract( - toEthereumAddress(this.config.contracts.streamStorageRegistryChainAddress), - StreamStorageRegistryArtifact, - p - ) - }), config.contracts.pollInterval) this.initStreamAssignmentEventListeners(eventEmitter, chainEventPoller, loggerFactory) this.storageNodesCache = createCacheMap({ valueFactory: (query) => { @@ -86,7 +81,6 @@ export class StreamStorageRegistry { }) } - // eslint-disable-next-line class-methods-use-this private initStreamAssignmentEventListeners( eventEmitter: StreamrClientEventEmitter, chainEventPoller: ChainEventPoller, @@ -97,8 +91,13 @@ export class StreamStorageRegistry { nodeAddress: toEthereumAddress(nodeAddress), blockNumber }) + const contractAddress = toEthereumAddress(this.config.contracts.streamStorageRegistryChainAddress) + const contractInterface = new Interface(StreamStorageRegistryArtifact) initContractEventGateway({ - sourceName: 'Added', + sourceDefinition: { + contractInterfaceFragment: contractInterface.getEvent('Added')!, + contractAddress + }, sourceEmitter: chainEventPoller, targetName: 'streamAddedToStorageNode', targetEmitter: eventEmitter, @@ -106,7 +105,10 @@ export class StreamStorageRegistry { loggerFactory }) initContractEventGateway({ - sourceName: 'Removed', + sourceDefinition: { + contractInterfaceFragment: contractInterface.getEvent('Removed')!, + contractAddress + }, sourceEmitter: chainEventPoller, targetName: 'streamRemovedFromFromStorageNode', targetEmitter: eventEmitter, diff --git a/packages/sdk/src/contracts/contract.ts b/packages/sdk/src/contracts/contract.ts index acbe52cf98..6278996c7b 100644 --- a/packages/sdk/src/contracts/contract.ts +++ b/packages/sdk/src/contracts/contract.ts @@ -11,6 +11,7 @@ import EventEmitter from 'eventemitter3' import without from 'lodash/without' import pLimit from 'p-limit' import { LoggerFactory } from '../utils/LoggerFactory' +import { ChainEventPoller, EventListenerDefinition } from './ChainEventPoller' export interface ContractEvent { onMethodExecute: (methodName: string) => void @@ -166,16 +167,12 @@ export const createDecoratedContract = ( export const initContractEventGateway = < TSourcePayloads extends any[], - TSourceName extends string, TTarget extends Events, TTargetName extends keyof TTarget >(opts: { - sourceName: TSourceName + sourceDefinition: Omit targetName: TTargetName - sourceEmitter: { - on: (name: TSourceName, listener: (...args: TSourcePayloads) => void) => void - off: (name: TSourceName, listener: (...args: TSourcePayloads) => void) => void - } + sourceEmitter: ChainEventPoller targetEmitter: ObservableEventEmitter transformation: (...args: TSourcePayloads) => Parameters[0] loggerFactory: LoggerFactory @@ -198,11 +195,17 @@ export const initContractEventGateway = < } emit(targetEvent) } - opts.sourceEmitter.on(opts.sourceName, listener) + opts.sourceEmitter.on({ + onEvent: listener, + ...opts.sourceDefinition + }) return listener }, (listener: Listener) => { - opts.sourceEmitter.off(opts.sourceName, listener) + opts.sourceEmitter.off({ + onEvent: listener, + ...opts.sourceDefinition + }) }, opts.targetEmitter ) diff --git a/packages/sdk/test/test-utils/FakeJsonRpcServer.ts b/packages/sdk/test/test-utils/FakeJsonRpcServer.ts index 1f452fbf41..afb2a60ca0 100644 --- a/packages/sdk/test/test-utils/FakeJsonRpcServer.ts +++ b/packages/sdk/test/test-utils/FakeJsonRpcServer.ts @@ -2,7 +2,7 @@ import { AbiCoder, id } from 'ethers' import { once } from 'events' import express, { Request, Response } from 'express' import { Server } from 'http' -import { intersection, isArray } from 'lodash' +import { intersection, isArray, isEqual } from 'lodash' import { AddressInfo } from 'net' import { promisify } from 'util' import { formEthereumFunctionSelector, parseEthereumFunctionSelectorFromCallData } from './utils' @@ -93,10 +93,10 @@ export class FakeJsonRpcServer { } else if (request.method === 'eth_getLogs') { const topics = request.params[0].topics const topicId = id('StreamCreated(string,string)') - if ((topics.length !== 1) || (topics[0] !== topicId)) { + if (!isEqual(topics, [[topicId]])) { throw new Error('Not implemented') } - if (request.params[0].toBlock !== 'latest') { + if (request.params[0].toBlock !== undefined) { throw new Error('Not implemented') } const fromBlock = parseInt(request.params[0].fromBlock, 16) diff --git a/packages/sdk/test/unit/ChainEventPoller.test.ts b/packages/sdk/test/unit/ChainEventPoller.test.ts index a9b85615f9..a65487d53f 100644 --- a/packages/sdk/test/unit/ChainEventPoller.test.ts +++ b/packages/sdk/test/unit/ChainEventPoller.test.ts @@ -1,195 +1,287 @@ -import { wait, until } from '@streamr/utils' -import { Contract, EventLog, Provider } from 'ethers' +import 'reflect-metadata' + +import { randomEthereumAddress } from '@streamr/test-utils' +import { until, wait } from '@streamr/utils' +import { AbstractProvider, Interface, Log } from 'ethers' +import { range } from 'lodash' import { ChainEventPoller, POLLS_SINCE_LAST_FROM_BLOCK_UPDATE_THRESHOLD } from './../../src/contracts/ChainEventPoller' -import range from 'lodash/range' -const POLL_INTERVAL = 100 +const INITIAL_BLOCK_NUMBER = 123 +const CONTRACT_ADDRESS = randomEthereumAddress() + +const createAbi = (eventName: string) => { + return [{ + type: 'event', + name: eventName, + anonymous: false, + inputs: [{ + indexed: false, + internalType: 'string', + name: 'param1', + type: 'string' + }, + { + indexed: false, + internalType: 'string', + name: 'param2', + type: 'string' + }] + }] +} + +const createEventLogItem = ( + eventName: string, + eventArgs: any[], + blockNumber: number +): Partial => { + const contractInterface = new Interface(createAbi(eventName)) + return { + blockNumber, + address: CONTRACT_ADDRESS, + ...contractInterface.encodeEventLog(eventName, eventArgs) + } +} + +const createChainEventPoller = (provider: AbstractProvider, pollInterval: number) => { + return new ChainEventPoller( + { getSubProviders: () => [provider] } as any, + { contracts: { pollInterval } } as any + ) +} describe('ChainEventPoller', () => { it('happy path', async () => { - const INITIAL_BLOCK_NUMBER = 123 - const EVENT_NAME = 'foo' + const EVENT_NAME = 'TestEventName' + const CONTRACT_INTERFACE_FRAGMENT = new Interface(createAbi(EVENT_NAME)).getEvent(EVENT_NAME)! const EVENT_ARGS = [ 'mock-arg1', 'mock-arg2' ] + const POLL_INTERVAL = 100 let blockNumber = INITIAL_BLOCK_NUMBER - const contract = { - queryFilter: jest.fn().mockImplementation(() => { - const result = [{ - fragment: { - name: EVENT_NAME - }, - args: EVENT_ARGS, - blockNumber - }] + const provider: Partial = { + getLogs: jest.fn().mockImplementation(async () => { + const result = [createEventLogItem(EVENT_NAME, EVENT_ARGS, blockNumber)] blockNumber++ return result }), - runner: { - provider: { - getBlockNumber: jest.fn().mockImplementation(async () => blockNumber) - } - } - } as unknown as Contract - const poller = new ChainEventPoller([contract], POLL_INTERVAL) + getBlockNumber: jest.fn().mockImplementation(async () => { + return blockNumber + }) + } + const poller = createChainEventPoller(provider as any, POLL_INTERVAL) const listener1 = jest.fn() - poller.on(EVENT_NAME, listener1) + poller.on({ + onEvent: listener1, + contractInterfaceFragment: CONTRACT_INTERFACE_FRAGMENT, + contractAddress: CONTRACT_ADDRESS + }) // poller starts await until(() => listener1.mock.calls.length === 1) - expect(contract.runner!.provider!.getBlockNumber).toHaveBeenCalledTimes(1) - expect(contract.queryFilter).toHaveBeenCalledTimes(1) - expect(contract.queryFilter).toHaveBeenCalledWith([[EVENT_NAME]], INITIAL_BLOCK_NUMBER) + expect(provider.getBlockNumber).toHaveBeenCalledTimes(1) + expect(provider.getLogs).toHaveBeenCalledTimes(1) + expect(provider.getLogs).toHaveBeenCalledWith({ + address: [CONTRACT_ADDRESS], + topics: [[CONTRACT_INTERFACE_FRAGMENT.topicHash]], + fromBlock: INITIAL_BLOCK_NUMBER + }) expect(listener1).toHaveBeenCalledTimes(1) expect(listener1).toHaveBeenCalledWith(...EVENT_ARGS, INITIAL_BLOCK_NUMBER) await until(() => listener1.mock.calls.length === 2) - expect(contract.runner!.provider!.getBlockNumber).toHaveBeenCalledTimes(1) - expect(contract.queryFilter).toHaveBeenCalledTimes(2) - expect(contract.queryFilter).toHaveBeenNthCalledWith(2, [[EVENT_NAME]], INITIAL_BLOCK_NUMBER + 1) + expect(provider.getBlockNumber).toHaveBeenCalledTimes(1) + expect(provider.getLogs).toHaveBeenCalledTimes(2) + expect(provider.getLogs).toHaveBeenNthCalledWith( + 2, + { + address: [CONTRACT_ADDRESS], + topics: [[CONTRACT_INTERFACE_FRAGMENT.topicHash]], + fromBlock: INITIAL_BLOCK_NUMBER + 1 + } + ) expect(listener1).toHaveBeenCalledTimes(2) expect(listener1).toHaveBeenNthCalledWith(2, ...EVENT_ARGS, INITIAL_BLOCK_NUMBER + 1) - poller.off(EVENT_NAME, listener1) - + poller.off({ + onEvent: listener1, + contractInterfaceFragment: CONTRACT_INTERFACE_FRAGMENT, + contractAddress: CONTRACT_ADDRESS + }) + // poller stops await wait(1.5 * POLL_INTERVAL) - expect(contract.runner!.provider!.getBlockNumber).toHaveBeenCalledTimes(1) - expect(contract.queryFilter).toHaveBeenCalledTimes(2) + expect(provider.getBlockNumber).toHaveBeenCalledTimes(1) + expect(provider.getLogs).toHaveBeenCalledTimes(2) expect(listener1).toHaveBeenCalledTimes(2) const listener2 = jest.fn() - poller.on(EVENT_NAME, listener2) + poller.on({ + onEvent: listener2, + contractInterfaceFragment: CONTRACT_INTERFACE_FRAGMENT, + contractAddress: CONTRACT_ADDRESS + }) // poller restarts await until(() => listener2.mock.calls.length === 1) - expect(contract.runner!.provider!.getBlockNumber).toHaveBeenCalledTimes(2) - expect(contract.queryFilter).toHaveBeenCalledTimes(3) + expect(provider.getBlockNumber).toHaveBeenCalledTimes(2) + expect(provider.getLogs).toHaveBeenCalledTimes(3) expect(listener2).toHaveBeenCalledTimes(1) expect(listener2).toHaveBeenCalledWith(...EVENT_ARGS, INITIAL_BLOCK_NUMBER + 2) - poller.off(EVENT_NAME, listener2) - + poller.off({ + onEvent: listener2, + contractInterfaceFragment: CONTRACT_INTERFACE_FRAGMENT, + contractAddress: CONTRACT_ADDRESS + }) + // poller stops await wait(1.5 * POLL_INTERVAL) - expect(contract.runner!.provider!.getBlockNumber).toHaveBeenCalledTimes(2) - expect(contract.queryFilter).toHaveBeenCalledTimes(3) + expect(provider.getBlockNumber).toHaveBeenCalledTimes(2) + expect(provider.getLogs).toHaveBeenCalledTimes(3) expect(listener2).toHaveBeenCalledTimes(1) }) it('multiple events and listeners', async () => { - const EVENT_NAME_1 = 'event-name-1' - const EVENT_NAME_2 = 'event-name-2' - const contract = { - queryFilter: jest.fn().mockImplementation(() => { - const result = [{ - fragment: { - name: EVENT_NAME_1 - }, - args: ['arg-foo1'], - blockNumber: 150 - }, { - fragment: { - name: EVENT_NAME_1 - }, - args: ['arg-foo2'], - blockNumber: 155 - }, { - fragment: { - name: EVENT_NAME_2 - }, - args: ['arg-bar'], - blockNumber: 152 - }] - return result + const EVENT_NAME_1 = 'TestEventName1' + const EVENT_NAME_2 = 'TestEventName2' + const CONTRACT_INTERFACE_FRAGMENT_1 = new Interface(createAbi(EVENT_NAME_1)).getEvent(EVENT_NAME_1)! + const CONTRACT_INTERFACE_FRAGMENT_2 = new Interface(createAbi(EVENT_NAME_2)).getEvent(EVENT_NAME_2)! + const POLL_INTERVAL = 100 + const provider: Partial = { + getLogs: jest.fn().mockImplementation(async () => { + return [ + createEventLogItem(EVENT_NAME_1, ['arg-foo1', ''], 150), + createEventLogItem(EVENT_NAME_1, ['arg-foo2', ''], 155), + createEventLogItem(EVENT_NAME_2, ['arg-bar', ''], 152) + ] }), - runner: { - provider: { - getBlockNumber: jest.fn().mockImplementation(async () => 123) - } - } - } as unknown as Contract - const poller = new ChainEventPoller([contract], POLL_INTERVAL) + getBlockNumber: jest.fn().mockImplementation(async () => { + return 123 + }) + } + const poller = createChainEventPoller(provider as any, POLL_INTERVAL) const listener1 = jest.fn() const listener2 = jest.fn() const listener3 = jest.fn() - poller.on(EVENT_NAME_1, listener1) - poller.on(EVENT_NAME_2, listener2) - poller.on(EVENT_NAME_2, listener3) + poller.on({ + onEvent: listener1, + contractInterfaceFragment: CONTRACT_INTERFACE_FRAGMENT_1, + contractAddress: CONTRACT_ADDRESS, + }) + poller.on({ + onEvent: listener2, + contractInterfaceFragment: CONTRACT_INTERFACE_FRAGMENT_2, + contractAddress: CONTRACT_ADDRESS + }) + poller.on({ + onEvent: listener3, + contractInterfaceFragment: CONTRACT_INTERFACE_FRAGMENT_2, + contractAddress: CONTRACT_ADDRESS + }) - await until( - () => { - return (listener1.mock.calls.length > 0) && (listener2.mock.calls.length > 0) && (listener3.mock.calls.length > 0) + await until(() => { + return (listener1.mock.calls.length > 0) && (listener2.mock.calls.length > 0) && (listener3.mock.calls.length > 0) + }) + expect(provider.getLogs).toHaveBeenNthCalledWith( + 1, + { + address: [CONTRACT_ADDRESS], + topics: [[CONTRACT_INTERFACE_FRAGMENT_1.topicHash, CONTRACT_INTERFACE_FRAGMENT_2.topicHash]], + fromBlock: INITIAL_BLOCK_NUMBER } ) - expect(contract.queryFilter).toHaveBeenNthCalledWith(1, [[EVENT_NAME_1, EVENT_NAME_2]], 123) expect(listener1).toHaveBeenCalledTimes(2) - expect(listener1).toHaveBeenCalledWith('arg-foo1', 150) - expect(listener1).toHaveBeenCalledWith('arg-foo2', 155) + expect(listener1).toHaveBeenCalledWith('arg-foo1', '', 150) + expect(listener1).toHaveBeenCalledWith('arg-foo2', '', 155) expect(listener2).toHaveBeenCalledTimes(1) - expect(listener2).toHaveBeenCalledWith('arg-bar', 152) + expect(listener2).toHaveBeenCalledWith('arg-bar', '', 152) expect(listener3).toHaveBeenCalledTimes(1) - expect(listener3).toHaveBeenCalledWith('arg-bar', 152) + expect(listener3).toHaveBeenCalledWith('arg-bar', '', 152) await wait(1.5 * POLL_INTERVAL) - expect(contract.queryFilter).toHaveBeenNthCalledWith(2, [[EVENT_NAME_1, EVENT_NAME_2]], 155 + 1) + expect(provider.getLogs).toHaveBeenNthCalledWith( + 2, + { + address: [CONTRACT_ADDRESS], + topics: [[CONTRACT_INTERFACE_FRAGMENT_1.topicHash, CONTRACT_INTERFACE_FRAGMENT_2.topicHash]], + fromBlock: 155 + 1 + } + ) - poller.off(EVENT_NAME_1, listener1) - poller.off(EVENT_NAME_2, listener2) - poller.off(EVENT_NAME_2, listener3) + poller.off({ + onEvent: listener1, + contractInterfaceFragment: CONTRACT_INTERFACE_FRAGMENT_1, + contractAddress: CONTRACT_ADDRESS, + }) + poller.off({ + onEvent: listener2, + contractInterfaceFragment: CONTRACT_INTERFACE_FRAGMENT_2, + contractAddress: CONTRACT_ADDRESS + }) + poller.off({ + onEvent: listener3, + contractInterfaceFragment: CONTRACT_INTERFACE_FRAGMENT_2, + contractAddress: CONTRACT_ADDRESS + }) }) describe('explicit block number fetching', () => { + + const EVENT_NAME = 'TestEventName' + const CONTRACT_INTERFACE_FRAGMENT = new Interface(createAbi(EVENT_NAME)).getEvent(EVENT_NAME)! + const POLL_INTERVAL = 10 let invocationHistory: string[] let onGetBlockNumber: (nthCall: number) => number - let onQueryFilter: (nthCall: number) => EventLog[] + let onGetLogs: (nthCall: number) => Log[] let poller: ChainEventPoller beforeEach(() => { invocationHistory = [] let getBlockNumberCallCount = 0 - const provider = { - getBlockNumber: async () => { - invocationHistory.push('getBlockNumber') - return onGetBlockNumber(getBlockNumberCallCount++) - } - } as Pick let queryFilterCallCount = 0 - const contract = { - runner: { - provider - }, - queryFilter: async (eventName, blockNumber) => { - // eslint-disable-next-line @typescript-eslint/restrict-template-expressions - invocationHistory.push(`queryFilter(${eventName}, ${blockNumber})`) - return onQueryFilter(queryFilterCallCount++) - } - } as Pick - poller = new ChainEventPoller([contract as Contract, contract as Contract], 10) + const provider: Partial = { + getLogs: jest.fn().mockImplementation(async (filter: { fromBlock: number }) => { + invocationHistory.push(`getLogs(fromBlock=${filter.fromBlock})`) + return onGetLogs(queryFilterCallCount++) + }), + getBlockNumber: jest.fn().mockImplementation(async () => { + invocationHistory.push('getBlockNumber()') + return onGetBlockNumber(getBlockNumberCallCount++) + }) + } + poller = createChainEventPoller(provider as any, POLL_INTERVAL) }) it('when no events, fetches block number explicitly after every POLLS_SINCE_LAST_FROM_BLOCK_UPDATE_THRESHOLD', async () => { - let currentRpcBlockNumber = 10 + let currentRpcBlockNumber = INITIAL_BLOCK_NUMBER onGetBlockNumber = () => { return currentRpcBlockNumber++ } - onQueryFilter = () => [] - const eventCb = () => {} - poller.on('event', eventCb) + onGetLogs = () => [] + const listener = () => {} + poller.on({ + onEvent: listener, + contractInterfaceFragment: CONTRACT_INTERFACE_FRAGMENT, + contractAddress: CONTRACT_ADDRESS, + }) const expectedLength = 3 * POLLS_SINCE_LAST_FROM_BLOCK_UPDATE_THRESHOLD + 6 await until(() => invocationHistory.length >= expectedLength) expect(invocationHistory.slice(0, expectedLength)).toEqual([ - 'getBlockNumber', - ...range(POLLS_SINCE_LAST_FROM_BLOCK_UPDATE_THRESHOLD).map(() => 'queryFilter(event, 10)'), - 'getBlockNumber', - 'queryFilter(event, 10)', - ...range(POLLS_SINCE_LAST_FROM_BLOCK_UPDATE_THRESHOLD).map(() => 'queryFilter(event, 12)'), - 'getBlockNumber', - 'queryFilter(event, 12)', - ...range(POLLS_SINCE_LAST_FROM_BLOCK_UPDATE_THRESHOLD).map(() => 'queryFilter(event, 13)'), - 'getBlockNumber', + 'getBlockNumber()', + ...range(POLLS_SINCE_LAST_FROM_BLOCK_UPDATE_THRESHOLD).map(() => `getLogs(fromBlock=${INITIAL_BLOCK_NUMBER})`), + 'getBlockNumber()', + `getLogs(fromBlock=${INITIAL_BLOCK_NUMBER})`, + ...range(POLLS_SINCE_LAST_FROM_BLOCK_UPDATE_THRESHOLD).map(() => `getLogs(fromBlock=${INITIAL_BLOCK_NUMBER + 2})`), + 'getBlockNumber()', + `getLogs(fromBlock=${INITIAL_BLOCK_NUMBER + 2})`, + ...range(POLLS_SINCE_LAST_FROM_BLOCK_UPDATE_THRESHOLD).map(() => `getLogs(fromBlock=${INITIAL_BLOCK_NUMBER + 3})`), + 'getBlockNumber()', ]) - poller.off('event', eventCb) + poller.off({ + onEvent: listener, + contractInterfaceFragment: CONTRACT_INTERFACE_FRAGMENT, + contractAddress: CONTRACT_ADDRESS, + }) }) // TODO: test other cases diff --git a/packages/sdk/test/unit/Operator.test.ts b/packages/sdk/test/unit/Operator.test.ts index 0374b15637..d9e17aefcb 100644 --- a/packages/sdk/test/unit/Operator.test.ts +++ b/packages/sdk/test/unit/Operator.test.ts @@ -2,8 +2,9 @@ import 'reflect-metadata' import { randomEthereumAddress } from '@streamr/test-utils' import { wait } from '@streamr/utils' -import { capitalize } from 'lodash' +import { capitalize, random } from 'lodash' import { DestroySignal } from '../../src/DestroySignal' +import { EventListenerDefinition } from '../../src/contracts/ChainEventPoller' import { Operator, OperatorEvents, @@ -38,56 +39,43 @@ describe(parsePartitionFromReviewRequestMetadata, () => { }) }) -const POLL_INTERVAL = 100 +const POLL_INTERVAL = 50 const OPERATOR_CONTRACT_ADDRESS = randomEthereumAddress() const SPONSORSHIP_ADDRESS = randomEthereumAddress() -const INITIAL_BLOCK_NUMBER = 111 -const EVENT_BLOCK_NUMBER = 222 -const createOperator = (eventName: string, args: any[]) => { - const fakeContract = { - queryFilter: (eventNames: string[][], fromBlock: number) => { - if ((eventNames[0][0] === eventName) && (fromBlock <= EVENT_BLOCK_NUMBER)) { - return [{ - fragment: { - name: eventName - }, - args, - blockNumber: EVENT_BLOCK_NUMBER - }] - } else { - return [] - } - }, - runner: { - provider: { - getBlockNumber: async () => INITIAL_BLOCK_NUMBER +const createOperator = (eventName: string, eventArgs: any[]): Operator => { + const chainEventPoller = { + on: (definition: EventListenerDefinition) => { + if (definition.contractInterfaceFragment.name === eventName) { + setTimeout(() => { + definition.onEvent(...eventArgs) + }, random(POLL_INTERVAL)) } } } + const contractFactory = { + createReadContract: () => ({}) + } + const rpcProviderSource = { + getProvider: () => ({}) + } return new Operator( OPERATOR_CONTRACT_ADDRESS, - { - createReadContract: () => fakeContract, - createEventContract: () => fakeContract - } as any, - { - getProvider: () => undefined, - getSubProviders: () => ['dummy'] - } as any, + contractFactory as any, + rpcProviderSource as any, + chainEventPoller as any, undefined as any, undefined as any, new DestroySignal(), mockLoggerFactory(), - undefined as any, - POLL_INTERVAL + undefined as any ) } describe('Operator', () => { describe('reviewRequest listener', () => { - + it('emitting ReviewRequest with valid metadata causes listener to be invoked', async () => { const operator = createOperator( 'ReviewRequest', @@ -96,16 +84,15 @@ describe('Operator', () => { const listener = jest.fn() operator.on('reviewRequested', listener) await wait(1.5 * POLL_INTERVAL) - expect(listener).toHaveBeenLastCalledWith({ - sponsorship: SPONSORSHIP_ADDRESS, + expect(listener).toHaveBeenLastCalledWith({ + sponsorship: SPONSORSHIP_ADDRESS, targetOperator: OPERATOR_CONTRACT_ADDRESS, partition: 7, votingPeriodStartTimestamp: 1000 * 1000, votingPeriodEndTimestamp: 1050 * 1000 }) - operator.off('reviewRequested', listener) }) - + it('emitting ReviewRequest with invalid metadata causes listener to not be invoked', async () => { const operator = createOperator( 'ReviewRequest', @@ -115,7 +102,6 @@ describe('Operator', () => { operator.on('reviewRequested', listener) await wait(1.5 * POLL_INTERVAL) expect(listener).not.toHaveBeenCalled() - operator.off('reviewRequested', listener) }) }) @@ -129,10 +115,9 @@ describe('Operator', () => { const listener = jest.fn() operator.on(eventName as keyof OperatorEvents, listener) await wait(1.5 * POLL_INTERVAL) - expect(listener).toHaveBeenLastCalledWith({ + expect(listener).toHaveBeenLastCalledWith({ sponsorship: SPONSORSHIP_ADDRESS }) - operator.off(eventName as keyof OperatorEvents, listener) }) }) })