From d507af3a1adfa3c01f380b6b716677962d348ae1 Mon Sep 17 00:00:00 2001 From: Aleksandr Bukata <96521086+bukata-sa@users.noreply.github.com> Date: Wed, 23 Oct 2024 17:43:09 +0100 Subject: [PATCH] [CCIP-2611] Report new heads to atlas' OTI (#13647) (#1508) Cherry-pick OTI head report https://github.com/smartcontractkit/chainlink/commit/a41b353a20d73aa2d3fe3e8e979a0bcacc46fafe https://github.com/smartcontractkit/chainlink/commit/633eb41a4467f91506e05e7fda6873c7b34f4731 https://github.com/smartcontractkit/chainlink/commit/621e87538c931d5d3996974589dc27a0ab43f758 https://github.com/smartcontractkit/chainlink/commit/25d29611543c3d43484c168e7efc23a7bf83f035 --------- Co-authored-by: Jordan Krage Co-authored-by: Lukasz <120112546+lukaszcl@users.noreply.github.com> --- .changeset/eight-bees-speak.md | 5 + .changeset/hot-laws-deny.md | 5 + .changeset/proud-jokes-exercise.md | 5 + .changeset/swift-pumas-taste.md | 5 + .mockery.yaml | 22 +- common/types/mocks/monitoring_endpoint.go | 65 +++++ core/internal/mocks/prometheus_backend.go | 204 -------------- core/services/chainlink/application.go | 14 +- core/services/headreporter/head_reporter.go | 111 ++++++++ .../headreporter/head_reporter_mock.go | 130 +++++++++ .../headreporter/head_reporter_test.go | 51 ++++ core/services/headreporter/helper_test.go | 5 + .../headreporter/prometheus_backend_mock.go | 204 ++++++++++++++ .../prometheus_reporter.go} | 168 +++--------- .../prometheus_reporter_test.go} | 172 ++++++------ .../headreporter/telemetry_reporter.go | 69 +++++ .../headreporter/telemetry_reporter_test.go | 108 ++++++++ core/services/synchronization/common.go | 1 + .../telem/telem_head_report.pb.go | 256 ++++++++++++++++++ .../telem/telem_head_report.proto | 17 ++ .../monitoring_endpoint_generator_mock.go | 88 ++++++ core/web/testdata/body/health.html | 6 +- core/web/testdata/body/health.json | 18 +- core/web/testdata/body/health.txt | 2 +- .../testconfig/vrfv2plus/vrfv2plus.toml | 1 + testdata/scripts/health/default.txtar | 20 +- testdata/scripts/health/multi-chain.txtar | 20 +- 27 files changed, 1309 insertions(+), 463 deletions(-) create mode 100644 .changeset/eight-bees-speak.md create mode 100644 .changeset/hot-laws-deny.md create mode 100644 .changeset/proud-jokes-exercise.md create mode 100644 .changeset/swift-pumas-taste.md create mode 100644 common/types/mocks/monitoring_endpoint.go delete mode 100644 core/internal/mocks/prometheus_backend.go create mode 100644 core/services/headreporter/head_reporter.go create mode 100644 core/services/headreporter/head_reporter_mock.go create mode 100644 core/services/headreporter/head_reporter_test.go create mode 100644 core/services/headreporter/helper_test.go create mode 100644 core/services/headreporter/prometheus_backend_mock.go rename core/services/{promreporter/prom_reporter.go => headreporter/prometheus_reporter.go} (63%) rename core/services/{promreporter/prom_reporter_test.go => headreporter/prometheus_reporter_test.go} (64%) create mode 100644 core/services/headreporter/telemetry_reporter.go create mode 100644 core/services/headreporter/telemetry_reporter_test.go create mode 100644 core/services/synchronization/telem/telem_head_report.pb.go create mode 100644 core/services/synchronization/telem/telem_head_report.proto create mode 100644 core/services/telemetry/monitoring_endpoint_generator_mock.go diff --git a/.changeset/eight-bees-speak.md b/.changeset/eight-bees-speak.md new file mode 100644 index 0000000000..9c8ebe428d --- /dev/null +++ b/.changeset/eight-bees-speak.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#bugfix head reporter non-zero reporting period diff --git a/.changeset/hot-laws-deny.md b/.changeset/hot-laws-deny.md new file mode 100644 index 0000000000..d71783d1b7 --- /dev/null +++ b/.changeset/hot-laws-deny.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#internal log info on missed finalized head instead of returning an error diff --git a/.changeset/proud-jokes-exercise.md b/.changeset/proud-jokes-exercise.md new file mode 100644 index 0000000000..4e36d139de --- /dev/null +++ b/.changeset/proud-jokes-exercise.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#added Report new heads as a telemetry to OTI diff --git a/.changeset/swift-pumas-taste.md b/.changeset/swift-pumas-taste.md new file mode 100644 index 0000000000..eb08662e20 --- /dev/null +++ b/.changeset/swift-pumas-taste.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#internal add head report chain_id diff --git a/.mockery.yaml b/.mockery.yaml index 471f931856..07d4fbed65 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -261,11 +261,20 @@ packages: ORM: Runner: PipelineParamUnmarshaler: - github.com/smartcontractkit/chainlink/v2/core/services/promreporter: + github.com/smartcontractkit/chainlink/v2/core/services/headreporter: config: - dir: core/internal/mocks + dir: "{{ .InterfaceDir }}" + filename: "{{ .InterfaceName | snakecase }}_mock.go" + inpackage: true + mockname: "Mock{{ .InterfaceName | camelcase }}" interfaces: + HeadReporter: PrometheusBackend: + github.com/smartcontractkit/libocr/commontypes: + config: + dir: "common/types/mocks" + interfaces: + MonitoringEndpoint: github.com/smartcontractkit/chainlink/v2/core/services/relay/evm: interfaces: BatchCaller: @@ -300,6 +309,15 @@ packages: interfaces: Config: FeeConfig: + github.com/smartcontractkit/chainlink/v2/core/services/telemetry: + config: + dir: "{{ .InterfaceDir }}" + filename: "{{ .InterfaceName | snakecase }}_mock.go" + inpackage: true + mockname: "Mock{{ .InterfaceName | camelcase }}" + interfaces: + MonitoringEndpointGenerator: + IngressAgent: github.com/smartcontractkit/chainlink/v2/core/services/webhook: interfaces: ExternalInitiatorManager: diff --git a/common/types/mocks/monitoring_endpoint.go b/common/types/mocks/monitoring_endpoint.go new file mode 100644 index 0000000000..5afc04c909 --- /dev/null +++ b/common/types/mocks/monitoring_endpoint.go @@ -0,0 +1,65 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// MonitoringEndpoint is an autogenerated mock type for the MonitoringEndpoint type +type MonitoringEndpoint struct { + mock.Mock +} + +type MonitoringEndpoint_Expecter struct { + mock *mock.Mock +} + +func (_m *MonitoringEndpoint) EXPECT() *MonitoringEndpoint_Expecter { + return &MonitoringEndpoint_Expecter{mock: &_m.Mock} +} + +// SendLog provides a mock function with given fields: log +func (_m *MonitoringEndpoint) SendLog(log []byte) { + _m.Called(log) +} + +// MonitoringEndpoint_SendLog_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendLog' +type MonitoringEndpoint_SendLog_Call struct { + *mock.Call +} + +// SendLog is a helper method to define mock.On call +// - log []byte +func (_e *MonitoringEndpoint_Expecter) SendLog(log interface{}) *MonitoringEndpoint_SendLog_Call { + return &MonitoringEndpoint_SendLog_Call{Call: _e.mock.On("SendLog", log)} +} + +func (_c *MonitoringEndpoint_SendLog_Call) Run(run func(log []byte)) *MonitoringEndpoint_SendLog_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]byte)) + }) + return _c +} + +func (_c *MonitoringEndpoint_SendLog_Call) Return() *MonitoringEndpoint_SendLog_Call { + _c.Call.Return() + return _c +} + +func (_c *MonitoringEndpoint_SendLog_Call) RunAndReturn(run func([]byte)) *MonitoringEndpoint_SendLog_Call { + _c.Call.Return(run) + return _c +} + +// NewMonitoringEndpoint creates a new instance of MonitoringEndpoint. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMonitoringEndpoint(t interface { + mock.TestingT + Cleanup(func()) +}) *MonitoringEndpoint { + mock := &MonitoringEndpoint{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/internal/mocks/prometheus_backend.go b/core/internal/mocks/prometheus_backend.go deleted file mode 100644 index d02f7062cb..0000000000 --- a/core/internal/mocks/prometheus_backend.go +++ /dev/null @@ -1,204 +0,0 @@ -// Code generated by mockery v2.43.2. DO NOT EDIT. - -package mocks - -import ( - big "math/big" - - mock "github.com/stretchr/testify/mock" -) - -// PrometheusBackend is an autogenerated mock type for the PrometheusBackend type -type PrometheusBackend struct { - mock.Mock -} - -type PrometheusBackend_Expecter struct { - mock *mock.Mock -} - -func (_m *PrometheusBackend) EXPECT() *PrometheusBackend_Expecter { - return &PrometheusBackend_Expecter{mock: &_m.Mock} -} - -// SetMaxUnconfirmedAge provides a mock function with given fields: _a0, _a1 -func (_m *PrometheusBackend) SetMaxUnconfirmedAge(_a0 *big.Int, _a1 float64) { - _m.Called(_a0, _a1) -} - -// PrometheusBackend_SetMaxUnconfirmedAge_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetMaxUnconfirmedAge' -type PrometheusBackend_SetMaxUnconfirmedAge_Call struct { - *mock.Call -} - -// SetMaxUnconfirmedAge is a helper method to define mock.On call -// - _a0 *big.Int -// - _a1 float64 -func (_e *PrometheusBackend_Expecter) SetMaxUnconfirmedAge(_a0 interface{}, _a1 interface{}) *PrometheusBackend_SetMaxUnconfirmedAge_Call { - return &PrometheusBackend_SetMaxUnconfirmedAge_Call{Call: _e.mock.On("SetMaxUnconfirmedAge", _a0, _a1)} -} - -func (_c *PrometheusBackend_SetMaxUnconfirmedAge_Call) Run(run func(_a0 *big.Int, _a1 float64)) *PrometheusBackend_SetMaxUnconfirmedAge_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*big.Int), args[1].(float64)) - }) - return _c -} - -func (_c *PrometheusBackend_SetMaxUnconfirmedAge_Call) Return() *PrometheusBackend_SetMaxUnconfirmedAge_Call { - _c.Call.Return() - return _c -} - -func (_c *PrometheusBackend_SetMaxUnconfirmedAge_Call) RunAndReturn(run func(*big.Int, float64)) *PrometheusBackend_SetMaxUnconfirmedAge_Call { - _c.Call.Return(run) - return _c -} - -// SetMaxUnconfirmedBlocks provides a mock function with given fields: _a0, _a1 -func (_m *PrometheusBackend) SetMaxUnconfirmedBlocks(_a0 *big.Int, _a1 int64) { - _m.Called(_a0, _a1) -} - -// PrometheusBackend_SetMaxUnconfirmedBlocks_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetMaxUnconfirmedBlocks' -type PrometheusBackend_SetMaxUnconfirmedBlocks_Call struct { - *mock.Call -} - -// SetMaxUnconfirmedBlocks is a helper method to define mock.On call -// - _a0 *big.Int -// - _a1 int64 -func (_e *PrometheusBackend_Expecter) SetMaxUnconfirmedBlocks(_a0 interface{}, _a1 interface{}) *PrometheusBackend_SetMaxUnconfirmedBlocks_Call { - return &PrometheusBackend_SetMaxUnconfirmedBlocks_Call{Call: _e.mock.On("SetMaxUnconfirmedBlocks", _a0, _a1)} -} - -func (_c *PrometheusBackend_SetMaxUnconfirmedBlocks_Call) Run(run func(_a0 *big.Int, _a1 int64)) *PrometheusBackend_SetMaxUnconfirmedBlocks_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*big.Int), args[1].(int64)) - }) - return _c -} - -func (_c *PrometheusBackend_SetMaxUnconfirmedBlocks_Call) Return() *PrometheusBackend_SetMaxUnconfirmedBlocks_Call { - _c.Call.Return() - return _c -} - -func (_c *PrometheusBackend_SetMaxUnconfirmedBlocks_Call) RunAndReturn(run func(*big.Int, int64)) *PrometheusBackend_SetMaxUnconfirmedBlocks_Call { - _c.Call.Return(run) - return _c -} - -// SetPipelineRunsQueued provides a mock function with given fields: n -func (_m *PrometheusBackend) SetPipelineRunsQueued(n int) { - _m.Called(n) -} - -// PrometheusBackend_SetPipelineRunsQueued_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetPipelineRunsQueued' -type PrometheusBackend_SetPipelineRunsQueued_Call struct { - *mock.Call -} - -// SetPipelineRunsQueued is a helper method to define mock.On call -// - n int -func (_e *PrometheusBackend_Expecter) SetPipelineRunsQueued(n interface{}) *PrometheusBackend_SetPipelineRunsQueued_Call { - return &PrometheusBackend_SetPipelineRunsQueued_Call{Call: _e.mock.On("SetPipelineRunsQueued", n)} -} - -func (_c *PrometheusBackend_SetPipelineRunsQueued_Call) Run(run func(n int)) *PrometheusBackend_SetPipelineRunsQueued_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int)) - }) - return _c -} - -func (_c *PrometheusBackend_SetPipelineRunsQueued_Call) Return() *PrometheusBackend_SetPipelineRunsQueued_Call { - _c.Call.Return() - return _c -} - -func (_c *PrometheusBackend_SetPipelineRunsQueued_Call) RunAndReturn(run func(int)) *PrometheusBackend_SetPipelineRunsQueued_Call { - _c.Call.Return(run) - return _c -} - -// SetPipelineTaskRunsQueued provides a mock function with given fields: n -func (_m *PrometheusBackend) SetPipelineTaskRunsQueued(n int) { - _m.Called(n) -} - -// PrometheusBackend_SetPipelineTaskRunsQueued_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetPipelineTaskRunsQueued' -type PrometheusBackend_SetPipelineTaskRunsQueued_Call struct { - *mock.Call -} - -// SetPipelineTaskRunsQueued is a helper method to define mock.On call -// - n int -func (_e *PrometheusBackend_Expecter) SetPipelineTaskRunsQueued(n interface{}) *PrometheusBackend_SetPipelineTaskRunsQueued_Call { - return &PrometheusBackend_SetPipelineTaskRunsQueued_Call{Call: _e.mock.On("SetPipelineTaskRunsQueued", n)} -} - -func (_c *PrometheusBackend_SetPipelineTaskRunsQueued_Call) Run(run func(n int)) *PrometheusBackend_SetPipelineTaskRunsQueued_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int)) - }) - return _c -} - -func (_c *PrometheusBackend_SetPipelineTaskRunsQueued_Call) Return() *PrometheusBackend_SetPipelineTaskRunsQueued_Call { - _c.Call.Return() - return _c -} - -func (_c *PrometheusBackend_SetPipelineTaskRunsQueued_Call) RunAndReturn(run func(int)) *PrometheusBackend_SetPipelineTaskRunsQueued_Call { - _c.Call.Return(run) - return _c -} - -// SetUnconfirmedTransactions provides a mock function with given fields: _a0, _a1 -func (_m *PrometheusBackend) SetUnconfirmedTransactions(_a0 *big.Int, _a1 int64) { - _m.Called(_a0, _a1) -} - -// PrometheusBackend_SetUnconfirmedTransactions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetUnconfirmedTransactions' -type PrometheusBackend_SetUnconfirmedTransactions_Call struct { - *mock.Call -} - -// SetUnconfirmedTransactions is a helper method to define mock.On call -// - _a0 *big.Int -// - _a1 int64 -func (_e *PrometheusBackend_Expecter) SetUnconfirmedTransactions(_a0 interface{}, _a1 interface{}) *PrometheusBackend_SetUnconfirmedTransactions_Call { - return &PrometheusBackend_SetUnconfirmedTransactions_Call{Call: _e.mock.On("SetUnconfirmedTransactions", _a0, _a1)} -} - -func (_c *PrometheusBackend_SetUnconfirmedTransactions_Call) Run(run func(_a0 *big.Int, _a1 int64)) *PrometheusBackend_SetUnconfirmedTransactions_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*big.Int), args[1].(int64)) - }) - return _c -} - -func (_c *PrometheusBackend_SetUnconfirmedTransactions_Call) Return() *PrometheusBackend_SetUnconfirmedTransactions_Call { - _c.Call.Return() - return _c -} - -func (_c *PrometheusBackend_SetUnconfirmedTransactions_Call) RunAndReturn(run func(*big.Int, int64)) *PrometheusBackend_SetUnconfirmedTransactions_Call { - _c.Call.Return(run) - return _c -} - -// NewPrometheusBackend creates a new instance of PrometheusBackend. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewPrometheusBackend(t interface { - mock.TestingT - Cleanup(func()) -}) *PrometheusBackend { - mock := &PrometheusBackend{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index de3e4c6280..a759f0ee11 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -49,6 +49,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/feeds" "github.com/smartcontractkit/chainlink/v2/core/services/fluxmonitorv2" "github.com/smartcontractkit/chainlink/v2/core/services/gateway" + "github.com/smartcontractkit/chainlink/v2/core/services/headreporter" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keeper" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" @@ -60,7 +61,6 @@ import ( externalp2p "github.com/smartcontractkit/chainlink/v2/core/services/p2p/wrapper" "github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" - "github.com/smartcontractkit/chainlink/v2/core/services/promreporter" "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" @@ -324,8 +324,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { srvcs = append(srvcs, mailMon) srvcs = append(srvcs, relayerChainInterops.Services()...) - promReporter := promreporter.NewPromReporter(opts.DS, legacyEVMChains, globalLogger) - srvcs = append(srvcs, promReporter) // Initialize Local Users ORM and Authentication Provider specified in config // BasicAdminUsersORM is initialized and required regardless of separate Authentication Provider @@ -365,8 +363,16 @@ func NewApplication(opts ApplicationOpts) (Application, error) { workflowORM = workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock()) ) + promReporter := headreporter.NewPrometheusReporter(opts.DS, legacyEVMChains) + chainIDs := make([]*big.Int, legacyEVMChains.Len()) + for i, chain := range legacyEVMChains.Slice() { + chainIDs[i] = chain.ID() + } + telemReporter := headreporter.NewTelemetryReporter(telemetryManager, globalLogger, chainIDs...) + headReporter := headreporter.NewHeadReporterService(opts.DS, globalLogger, promReporter, telemReporter) + srvcs = append(srvcs, headReporter) for _, chain := range legacyEVMChains.Slice() { - chain.HeadBroadcaster().Subscribe(promReporter) + chain.HeadBroadcaster().Subscribe(headReporter) chain.TxManager().RegisterResumeCallback(pipelineRunner.ResumeRun) } diff --git a/core/services/headreporter/head_reporter.go b/core/services/headreporter/head_reporter.go new file mode 100644 index 0000000000..94de8ae2be --- /dev/null +++ b/core/services/headreporter/head_reporter.go @@ -0,0 +1,111 @@ +package headreporter + +import ( + "context" + "sync" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +type ( + HeadReporter interface { + ReportNewHead(ctx context.Context, head *evmtypes.Head) error + ReportPeriodic(ctx context.Context) error + } + + HeadReporterService struct { + services.StateMachine + ds sqlutil.DataSource + lggr logger.Logger + newHeads *mailbox.Mailbox[*evmtypes.Head] + chStop services.StopChan + wgDone sync.WaitGroup + reportPeriod time.Duration + reporters []HeadReporter + unsubscribeFns []func() + } +) + +func NewHeadReporterService(ds sqlutil.DataSource, lggr logger.Logger, reporters ...HeadReporter) *HeadReporterService { + return &HeadReporterService{ + ds: ds, + lggr: lggr.Named("HeadReporter"), + newHeads: mailbox.NewSingle[*evmtypes.Head](), + chStop: make(chan struct{}), + reporters: reporters, + reportPeriod: 15 * time.Second, + } +} + +func (hrd *HeadReporterService) Subscribe(subFn func(types.HeadTrackable) (evmtypes.Head, func())) { + _, unsubscribe := subFn(hrd) + hrd.unsubscribeFns = append(hrd.unsubscribeFns, unsubscribe) +} + +func (hrd *HeadReporterService) Start(context.Context) error { + return hrd.StartOnce(hrd.Name(), func() error { + hrd.wgDone.Add(1) + go hrd.eventLoop() + return nil + }) +} + +func (hrd *HeadReporterService) Close() error { + return hrd.StopOnce(hrd.Name(), func() error { + close(hrd.chStop) + hrd.wgDone.Wait() + return nil + }) +} + +func (hrd *HeadReporterService) Name() string { + return hrd.lggr.Name() +} + +func (hrd *HeadReporterService) HealthReport() map[string]error { + return map[string]error{hrd.Name(): hrd.Healthy()} +} + +func (hrd *HeadReporterService) OnNewLongestChain(ctx context.Context, head *evmtypes.Head) { + hrd.newHeads.Deliver(head) +} + +func (hrd *HeadReporterService) eventLoop() { + hrd.lggr.Debug("Starting event loop") + defer hrd.wgDone.Done() + ctx, cancel := hrd.chStop.NewCtx() + defer cancel() + after := time.After(hrd.reportPeriod) + for { + select { + case <-hrd.newHeads.Notify(): + head, exists := hrd.newHeads.Retrieve() + if !exists { + continue + } + for _, reporter := range hrd.reporters { + err := reporter.ReportNewHead(ctx, head) + if err != nil && ctx.Err() == nil { + hrd.lggr.Errorw("Error reporting new head", "err", err) + } + } + case <-after: + for _, reporter := range hrd.reporters { + err := reporter.ReportPeriodic(ctx) + if err != nil && ctx.Err() == nil { + hrd.lggr.Errorw("Error in periodic report", "err", err) + } + } + after = time.After(hrd.reportPeriod) + case <-hrd.chStop: + return + } + } +} diff --git a/core/services/headreporter/head_reporter_mock.go b/core/services/headreporter/head_reporter_mock.go new file mode 100644 index 0000000000..21978abb86 --- /dev/null +++ b/core/services/headreporter/head_reporter_mock.go @@ -0,0 +1,130 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package headreporter + +import ( + context "context" + + types "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + mock "github.com/stretchr/testify/mock" +) + +// MockHeadReporter is an autogenerated mock type for the HeadReporter type +type MockHeadReporter struct { + mock.Mock +} + +type MockHeadReporter_Expecter struct { + mock *mock.Mock +} + +func (_m *MockHeadReporter) EXPECT() *MockHeadReporter_Expecter { + return &MockHeadReporter_Expecter{mock: &_m.Mock} +} + +// ReportNewHead provides a mock function with given fields: ctx, head +func (_m *MockHeadReporter) ReportNewHead(ctx context.Context, head *types.Head) error { + ret := _m.Called(ctx, head) + + if len(ret) == 0 { + panic("no return value specified for ReportNewHead") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *types.Head) error); ok { + r0 = rf(ctx, head) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockHeadReporter_ReportNewHead_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReportNewHead' +type MockHeadReporter_ReportNewHead_Call struct { + *mock.Call +} + +// ReportNewHead is a helper method to define mock.On call +// - ctx context.Context +// - head *types.Head +func (_e *MockHeadReporter_Expecter) ReportNewHead(ctx interface{}, head interface{}) *MockHeadReporter_ReportNewHead_Call { + return &MockHeadReporter_ReportNewHead_Call{Call: _e.mock.On("ReportNewHead", ctx, head)} +} + +func (_c *MockHeadReporter_ReportNewHead_Call) Run(run func(ctx context.Context, head *types.Head)) *MockHeadReporter_ReportNewHead_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*types.Head)) + }) + return _c +} + +func (_c *MockHeadReporter_ReportNewHead_Call) Return(_a0 error) *MockHeadReporter_ReportNewHead_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockHeadReporter_ReportNewHead_Call) RunAndReturn(run func(context.Context, *types.Head) error) *MockHeadReporter_ReportNewHead_Call { + _c.Call.Return(run) + return _c +} + +// ReportPeriodic provides a mock function with given fields: ctx +func (_m *MockHeadReporter) ReportPeriodic(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for ReportPeriodic") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockHeadReporter_ReportPeriodic_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReportPeriodic' +type MockHeadReporter_ReportPeriodic_Call struct { + *mock.Call +} + +// ReportPeriodic is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockHeadReporter_Expecter) ReportPeriodic(ctx interface{}) *MockHeadReporter_ReportPeriodic_Call { + return &MockHeadReporter_ReportPeriodic_Call{Call: _e.mock.On("ReportPeriodic", ctx)} +} + +func (_c *MockHeadReporter_ReportPeriodic_Call) Run(run func(ctx context.Context)) *MockHeadReporter_ReportPeriodic_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockHeadReporter_ReportPeriodic_Call) Return(_a0 error) *MockHeadReporter_ReportPeriodic_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockHeadReporter_ReportPeriodic_Call) RunAndReturn(run func(context.Context) error) *MockHeadReporter_ReportPeriodic_Call { + _c.Call.Return(run) + return _c +} + +// NewMockHeadReporter creates a new instance of MockHeadReporter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockHeadReporter(t interface { + mock.TestingT + Cleanup(func()) +}) *MockHeadReporter { + mock := &MockHeadReporter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/headreporter/head_reporter_test.go b/core/services/headreporter/head_reporter_test.go new file mode 100644 index 0000000000..304dd59a47 --- /dev/null +++ b/core/services/headreporter/head_reporter_test.go @@ -0,0 +1,51 @@ +package headreporter + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" + + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +func NewHead() evmtypes.Head { + return evmtypes.Head{Number: 42, EVMChainID: ubig.NewI(0)} +} + +func Test_HeadReporterService(t *testing.T) { + t.Run("report everything", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + + headReporter := NewMockHeadReporter(t) + service := NewHeadReporterService(db, logger.TestLogger(t), headReporter) + service.reportPeriod = time.Second + err := service.Start(testutils.Context(t)) + require.NoError(t, err) + + var reportCalls atomic.Int32 + head := NewHead() + headReporter.On("ReportNewHead", mock.Anything, &head).Run(func(args mock.Arguments) { + reportCalls.Add(1) + }).Return(nil) + headReporter.On("ReportPeriodic", mock.Anything).Run(func(args mock.Arguments) { + reportCalls.Add(1) + }).Return(nil) + service.OnNewLongestChain(testutils.Context(t), &head) + + require.Eventually(t, func() bool { return reportCalls.Load() == 2 }, 5*time.Second, 100*time.Millisecond) + }) + + t.Run("has default report period", func(t *testing.T) { + service := NewHeadReporterService(pgtest.NewSqlxDB(t), logger.TestLogger(t), NewMockHeadReporter(t)) + assert.Equal(t, service.reportPeriod, 15*time.Second) + }) +} diff --git a/core/services/headreporter/helper_test.go b/core/services/headreporter/helper_test.go new file mode 100644 index 0000000000..fa05182a85 --- /dev/null +++ b/core/services/headreporter/helper_test.go @@ -0,0 +1,5 @@ +package headreporter + +func (p *prometheusReporter) SetBackend(b PrometheusBackend) { + p.backend = b +} diff --git a/core/services/headreporter/prometheus_backend_mock.go b/core/services/headreporter/prometheus_backend_mock.go new file mode 100644 index 0000000000..ca83f6c4fb --- /dev/null +++ b/core/services/headreporter/prometheus_backend_mock.go @@ -0,0 +1,204 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package headreporter + +import ( + big "math/big" + + mock "github.com/stretchr/testify/mock" +) + +// MockPrometheusBackend is an autogenerated mock type for the PrometheusBackend type +type MockPrometheusBackend struct { + mock.Mock +} + +type MockPrometheusBackend_Expecter struct { + mock *mock.Mock +} + +func (_m *MockPrometheusBackend) EXPECT() *MockPrometheusBackend_Expecter { + return &MockPrometheusBackend_Expecter{mock: &_m.Mock} +} + +// SetMaxUnconfirmedAge provides a mock function with given fields: _a0, _a1 +func (_m *MockPrometheusBackend) SetMaxUnconfirmedAge(_a0 *big.Int, _a1 float64) { + _m.Called(_a0, _a1) +} + +// MockPrometheusBackend_SetMaxUnconfirmedAge_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetMaxUnconfirmedAge' +type MockPrometheusBackend_SetMaxUnconfirmedAge_Call struct { + *mock.Call +} + +// SetMaxUnconfirmedAge is a helper method to define mock.On call +// - _a0 *big.Int +// - _a1 float64 +func (_e *MockPrometheusBackend_Expecter) SetMaxUnconfirmedAge(_a0 interface{}, _a1 interface{}) *MockPrometheusBackend_SetMaxUnconfirmedAge_Call { + return &MockPrometheusBackend_SetMaxUnconfirmedAge_Call{Call: _e.mock.On("SetMaxUnconfirmedAge", _a0, _a1)} +} + +func (_c *MockPrometheusBackend_SetMaxUnconfirmedAge_Call) Run(run func(_a0 *big.Int, _a1 float64)) *MockPrometheusBackend_SetMaxUnconfirmedAge_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*big.Int), args[1].(float64)) + }) + return _c +} + +func (_c *MockPrometheusBackend_SetMaxUnconfirmedAge_Call) Return() *MockPrometheusBackend_SetMaxUnconfirmedAge_Call { + _c.Call.Return() + return _c +} + +func (_c *MockPrometheusBackend_SetMaxUnconfirmedAge_Call) RunAndReturn(run func(*big.Int, float64)) *MockPrometheusBackend_SetMaxUnconfirmedAge_Call { + _c.Call.Return(run) + return _c +} + +// SetMaxUnconfirmedBlocks provides a mock function with given fields: _a0, _a1 +func (_m *MockPrometheusBackend) SetMaxUnconfirmedBlocks(_a0 *big.Int, _a1 int64) { + _m.Called(_a0, _a1) +} + +// MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetMaxUnconfirmedBlocks' +type MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call struct { + *mock.Call +} + +// SetMaxUnconfirmedBlocks is a helper method to define mock.On call +// - _a0 *big.Int +// - _a1 int64 +func (_e *MockPrometheusBackend_Expecter) SetMaxUnconfirmedBlocks(_a0 interface{}, _a1 interface{}) *MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call { + return &MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call{Call: _e.mock.On("SetMaxUnconfirmedBlocks", _a0, _a1)} +} + +func (_c *MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call) Run(run func(_a0 *big.Int, _a1 int64)) *MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*big.Int), args[1].(int64)) + }) + return _c +} + +func (_c *MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call) Return() *MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call { + _c.Call.Return() + return _c +} + +func (_c *MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call) RunAndReturn(run func(*big.Int, int64)) *MockPrometheusBackend_SetMaxUnconfirmedBlocks_Call { + _c.Call.Return(run) + return _c +} + +// SetPipelineRunsQueued provides a mock function with given fields: n +func (_m *MockPrometheusBackend) SetPipelineRunsQueued(n int) { + _m.Called(n) +} + +// MockPrometheusBackend_SetPipelineRunsQueued_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetPipelineRunsQueued' +type MockPrometheusBackend_SetPipelineRunsQueued_Call struct { + *mock.Call +} + +// SetPipelineRunsQueued is a helper method to define mock.On call +// - n int +func (_e *MockPrometheusBackend_Expecter) SetPipelineRunsQueued(n interface{}) *MockPrometheusBackend_SetPipelineRunsQueued_Call { + return &MockPrometheusBackend_SetPipelineRunsQueued_Call{Call: _e.mock.On("SetPipelineRunsQueued", n)} +} + +func (_c *MockPrometheusBackend_SetPipelineRunsQueued_Call) Run(run func(n int)) *MockPrometheusBackend_SetPipelineRunsQueued_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int)) + }) + return _c +} + +func (_c *MockPrometheusBackend_SetPipelineRunsQueued_Call) Return() *MockPrometheusBackend_SetPipelineRunsQueued_Call { + _c.Call.Return() + return _c +} + +func (_c *MockPrometheusBackend_SetPipelineRunsQueued_Call) RunAndReturn(run func(int)) *MockPrometheusBackend_SetPipelineRunsQueued_Call { + _c.Call.Return(run) + return _c +} + +// SetPipelineTaskRunsQueued provides a mock function with given fields: n +func (_m *MockPrometheusBackend) SetPipelineTaskRunsQueued(n int) { + _m.Called(n) +} + +// MockPrometheusBackend_SetPipelineTaskRunsQueued_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetPipelineTaskRunsQueued' +type MockPrometheusBackend_SetPipelineTaskRunsQueued_Call struct { + *mock.Call +} + +// SetPipelineTaskRunsQueued is a helper method to define mock.On call +// - n int +func (_e *MockPrometheusBackend_Expecter) SetPipelineTaskRunsQueued(n interface{}) *MockPrometheusBackend_SetPipelineTaskRunsQueued_Call { + return &MockPrometheusBackend_SetPipelineTaskRunsQueued_Call{Call: _e.mock.On("SetPipelineTaskRunsQueued", n)} +} + +func (_c *MockPrometheusBackend_SetPipelineTaskRunsQueued_Call) Run(run func(n int)) *MockPrometheusBackend_SetPipelineTaskRunsQueued_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int)) + }) + return _c +} + +func (_c *MockPrometheusBackend_SetPipelineTaskRunsQueued_Call) Return() *MockPrometheusBackend_SetPipelineTaskRunsQueued_Call { + _c.Call.Return() + return _c +} + +func (_c *MockPrometheusBackend_SetPipelineTaskRunsQueued_Call) RunAndReturn(run func(int)) *MockPrometheusBackend_SetPipelineTaskRunsQueued_Call { + _c.Call.Return(run) + return _c +} + +// SetUnconfirmedTransactions provides a mock function with given fields: _a0, _a1 +func (_m *MockPrometheusBackend) SetUnconfirmedTransactions(_a0 *big.Int, _a1 int64) { + _m.Called(_a0, _a1) +} + +// MockPrometheusBackend_SetUnconfirmedTransactions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetUnconfirmedTransactions' +type MockPrometheusBackend_SetUnconfirmedTransactions_Call struct { + *mock.Call +} + +// SetUnconfirmedTransactions is a helper method to define mock.On call +// - _a0 *big.Int +// - _a1 int64 +func (_e *MockPrometheusBackend_Expecter) SetUnconfirmedTransactions(_a0 interface{}, _a1 interface{}) *MockPrometheusBackend_SetUnconfirmedTransactions_Call { + return &MockPrometheusBackend_SetUnconfirmedTransactions_Call{Call: _e.mock.On("SetUnconfirmedTransactions", _a0, _a1)} +} + +func (_c *MockPrometheusBackend_SetUnconfirmedTransactions_Call) Run(run func(_a0 *big.Int, _a1 int64)) *MockPrometheusBackend_SetUnconfirmedTransactions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*big.Int), args[1].(int64)) + }) + return _c +} + +func (_c *MockPrometheusBackend_SetUnconfirmedTransactions_Call) Return() *MockPrometheusBackend_SetUnconfirmedTransactions_Call { + _c.Call.Return() + return _c +} + +func (_c *MockPrometheusBackend_SetUnconfirmedTransactions_Call) RunAndReturn(run func(*big.Int, int64)) *MockPrometheusBackend_SetUnconfirmedTransactions_Call { + _c.Call.Return(run) + return _c +} + +// NewMockPrometheusBackend creates a new instance of MockPrometheusBackend. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockPrometheusBackend(t interface { + mock.TestingT + Cleanup(func()) +}) *MockPrometheusBackend { + mock := &MockPrometheusBackend{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/promreporter/prom_reporter.go b/core/services/headreporter/prometheus_reporter.go similarity index 63% rename from core/services/promreporter/prom_reporter.go rename to core/services/headreporter/prometheus_reporter.go index 31d5f1129e..3e39c7aca4 100644 --- a/core/services/promreporter/prom_reporter.go +++ b/core/services/headreporter/prometheus_reporter.go @@ -1,40 +1,28 @@ -package promreporter +package headreporter import ( "context" "fmt" "math/big" - "sync" "time" - "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" - txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/multierr" - "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" - + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" - "github.com/smartcontractkit/chainlink/v2/core/logger" ) type ( - promReporter struct { - services.StateMachine - ds sqlutil.DataSource - chains legacyevm.LegacyChainContainer - lggr logger.Logger - backend PrometheusBackend - newHeads *mailbox.Mailbox[*evmtypes.Head] - chStop services.StopChan - wgDone sync.WaitGroup - reportPeriod time.Duration + prometheusReporter struct { + ds sqlutil.DataSource + chains legacyevm.LegacyChainContainer + backend PrometheusBackend } PrometheusBackend interface { @@ -71,103 +59,15 @@ var ( }) ) -func (defaultBackend) SetUnconfirmedTransactions(evmChainID *big.Int, n int64) { - promUnconfirmedTransactions.WithLabelValues(evmChainID.String()).Set(float64(n)) -} - -func (defaultBackend) SetMaxUnconfirmedAge(evmChainID *big.Int, s float64) { - promMaxUnconfirmedAge.WithLabelValues(evmChainID.String()).Set(s) -} - -func (defaultBackend) SetMaxUnconfirmedBlocks(evmChainID *big.Int, n int64) { - promMaxUnconfirmedBlocks.WithLabelValues(evmChainID.String()).Set(float64(n)) -} - -func (defaultBackend) SetPipelineRunsQueued(n int) { - promPipelineTaskRunsQueued.Set(float64(n)) -} - -func (defaultBackend) SetPipelineTaskRunsQueued(n int) { - promPipelineRunsQueued.Set(float64(n)) -} - -func NewPromReporter(ds sqlutil.DataSource, chainContainer legacyevm.LegacyChainContainer, lggr logger.Logger, opts ...interface{}) *promReporter { - var backend PrometheusBackend = defaultBackend{} - period := 15 * time.Second - for _, opt := range opts { - switch v := opt.(type) { - case time.Duration: - period = v - case PrometheusBackend: - backend = v - } - } - - chStop := make(chan struct{}) - return &promReporter{ - ds: ds, - chains: chainContainer, - lggr: lggr.Named("PromReporter"), - backend: backend, - newHeads: mailbox.NewSingle[*evmtypes.Head](), - chStop: chStop, - reportPeriod: period, +func NewPrometheusReporter(ds sqlutil.DataSource, chainContainer legacyevm.LegacyChainContainer) *prometheusReporter { + return &prometheusReporter{ + ds: ds, + chains: chainContainer, + backend: defaultBackend{}, } } -// Start starts PromReporter. -func (pr *promReporter) Start(context.Context) error { - return pr.StartOnce("PromReporter", func() error { - pr.wgDone.Add(1) - go pr.eventLoop() - return nil - }) -} - -func (pr *promReporter) Close() error { - return pr.StopOnce("PromReporter", func() error { - close(pr.chStop) - pr.wgDone.Wait() - return nil - }) -} -func (pr *promReporter) Name() string { - return pr.lggr.Name() -} - -func (pr *promReporter) HealthReport() map[string]error { - return map[string]error{pr.Name(): pr.Healthy()} -} - -func (pr *promReporter) OnNewLongestChain(ctx context.Context, head *evmtypes.Head) { - pr.newHeads.Deliver(head) -} - -func (pr *promReporter) eventLoop() { - pr.lggr.Debug("Starting event loop") - defer pr.wgDone.Done() - ctx, cancel := pr.chStop.NewCtx() - defer cancel() - for { - select { - case <-pr.newHeads.Notify(): - head, exists := pr.newHeads.Retrieve() - if !exists { - continue - } - pr.reportHeadMetrics(ctx, head) - case <-time.After(pr.reportPeriod): - if err := errors.Wrap(pr.reportPipelineRunStats(ctx), "reportPipelineRunStats failed"); err != nil { - pr.lggr.Errorw("Error reporting prometheus metrics", "err", err) - } - - case <-pr.chStop: - return - } - } -} - -func (pr *promReporter) getTxm(evmChainID *big.Int) (txmgr.TxManager, error) { +func (pr *prometheusReporter) getTxm(evmChainID *big.Int) (txmgr.TxManager, error) { chain, err := pr.chains.Get(evmChainID.String()) if err != nil { return nil, fmt.Errorf("failed to get chain: %w", err) @@ -175,20 +75,16 @@ func (pr *promReporter) getTxm(evmChainID *big.Int) (txmgr.TxManager, error) { return chain.TxManager(), nil } -func (pr *promReporter) reportHeadMetrics(ctx context.Context, head *evmtypes.Head) { +func (pr *prometheusReporter) ReportNewHead(ctx context.Context, head *evmtypes.Head) error { evmChainID := head.EVMChainID.ToInt() - err := multierr.Combine( + return multierr.Combine( errors.Wrap(pr.reportPendingEthTxes(ctx, evmChainID), "reportPendingEthTxes failed"), errors.Wrap(pr.reportMaxUnconfirmedAge(ctx, evmChainID), "reportMaxUnconfirmedAge failed"), errors.Wrap(pr.reportMaxUnconfirmedBlocks(ctx, head), "reportMaxUnconfirmedBlocks failed"), ) - - if err != nil && ctx.Err() == nil { - pr.lggr.Errorw("Error reporting prometheus metrics", "err", err) - } } -func (pr *promReporter) reportPendingEthTxes(ctx context.Context, evmChainID *big.Int) (err error) { +func (pr *prometheusReporter) reportPendingEthTxes(ctx context.Context, evmChainID *big.Int) (err error) { txm, err := pr.getTxm(evmChainID) if err != nil { return fmt.Errorf("failed to get txm: %w", err) @@ -202,7 +98,7 @@ func (pr *promReporter) reportPendingEthTxes(ctx context.Context, evmChainID *bi return nil } -func (pr *promReporter) reportMaxUnconfirmedAge(ctx context.Context, evmChainID *big.Int) (err error) { +func (pr *prometheusReporter) reportMaxUnconfirmedAge(ctx context.Context, evmChainID *big.Int) (err error) { txm, err := pr.getTxm(evmChainID) if err != nil { return fmt.Errorf("failed to get txm: %w", err) @@ -221,7 +117,7 @@ func (pr *promReporter) reportMaxUnconfirmedAge(ctx context.Context, evmChainID return nil } -func (pr *promReporter) reportMaxUnconfirmedBlocks(ctx context.Context, head *evmtypes.Head) (err error) { +func (pr *prometheusReporter) reportMaxUnconfirmedBlocks(ctx context.Context, head *evmtypes.Head) (err error) { txm, err := pr.getTxm(head.EVMChainID.ToInt()) if err != nil { return fmt.Errorf("failed to get txm: %w", err) @@ -240,7 +136,11 @@ func (pr *promReporter) reportMaxUnconfirmedBlocks(ctx context.Context, head *ev return nil } -func (pr *promReporter) reportPipelineRunStats(ctx context.Context) (err error) { +func (pr *prometheusReporter) ReportPeriodic(ctx context.Context) error { + return errors.Wrap(pr.reportPipelineRunStats(ctx), "reportPipelineRunStats failed") +} + +func (pr *prometheusReporter) reportPipelineRunStats(ctx context.Context) (err error) { rows, err := pr.ds.QueryContext(ctx, ` SELECT pipeline_run_id FROM pipeline_task_runs WHERE finished_at IS NULL `) @@ -271,3 +171,23 @@ SELECT pipeline_run_id FROM pipeline_task_runs WHERE finished_at IS NULL return nil } + +func (defaultBackend) SetUnconfirmedTransactions(evmChainID *big.Int, n int64) { + promUnconfirmedTransactions.WithLabelValues(evmChainID.String()).Set(float64(n)) +} + +func (defaultBackend) SetMaxUnconfirmedAge(evmChainID *big.Int, s float64) { + promMaxUnconfirmedAge.WithLabelValues(evmChainID.String()).Set(s) +} + +func (defaultBackend) SetMaxUnconfirmedBlocks(evmChainID *big.Int, n int64) { + promMaxUnconfirmedBlocks.WithLabelValues(evmChainID.String()).Set(float64(n)) +} + +func (defaultBackend) SetPipelineRunsQueued(n int) { + promPipelineTaskRunsQueued.Set(float64(n)) +} + +func (defaultBackend) SetPipelineTaskRunsQueued(n int) { + promPipelineRunsQueued.Set(float64(n)) +} diff --git a/core/services/promreporter/prom_reporter_test.go b/core/services/headreporter/prometheus_reporter_test.go similarity index 64% rename from core/services/promreporter/prom_reporter_test.go rename to core/services/headreporter/prometheus_reporter_test.go index b61fa25bdc..32d2c09d0e 100644 --- a/core/services/promreporter/prom_reporter_test.go +++ b/core/services/headreporter/prometheus_reporter_test.go @@ -1,8 +1,7 @@ -package promreporter_test +package headreporter_test import ( "math/big" - "sync/atomic" "testing" "time" @@ -10,90 +9,40 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" - evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" - ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" - "github.com/smartcontractkit/chainlink/v2/core/internal/mocks" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/promreporter" + "github.com/smartcontractkit/chainlink/v2/core/services/headreporter" ) -func newHead() evmtypes.Head { - return evmtypes.Head{Number: 42, EVMChainID: ubig.NewI(0)} -} - -func newLegacyChainContainer(t *testing.T, db *sqlx.DB) legacyevm.LegacyChainContainer { - config, dbConfig, evmConfig := txmgr.MakeTestConfigs(t) - keyStore := cltest.NewKeyStore(t, db).Eth() - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - estimator, err := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator()) - require.NoError(t, err) - lggr := logger.TestLogger(t) - lpOpts := logpoller.Opts{ - PollPeriod: 100 * time.Millisecond, - FinalityDepth: 2, - BackfillBatchSize: 3, - RpcBatchSize: 2, - KeepFinalizedBlocksDepth: 1000, - } - ht := headtracker.NewSimulatedHeadTracker(ethClient, lpOpts.UseFinalityTag, lpOpts.FinalityDepth) - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr), ethClient, lggr, ht, lpOpts) - - txm, err := txmgr.NewTxm( - db, - evmConfig, - evmConfig.GasEstimator(), - evmConfig.Transactions(), - nil, - dbConfig, - dbConfig.Listener(), - ethClient, - lggr, - lp, - keyStore, - estimator) - require.NoError(t, err) - - cfg := configtest.NewGeneralConfig(t, nil) - return cltest.NewLegacyChainsWithMockChainAndTxManager(t, ethClient, cfg, txm) -} - -func Test_PromReporter_OnNewLongestChain(t *testing.T) { +func Test_PrometheusReporter(t *testing.T) { t.Run("with nothing in the database", func(t *testing.T) { db := pgtest.NewSqlxDB(t) - backend := mocks.NewPrometheusBackend(t) - reporter := promreporter.NewPromReporter(db, newLegacyChainContainer(t, db), logger.TestLogger(t), backend, 10*time.Millisecond) - - var subscribeCalls atomic.Int32 - + backend := headreporter.NewMockPrometheusBackend(t) backend.On("SetUnconfirmedTransactions", big.NewInt(0), int64(0)).Return() backend.On("SetMaxUnconfirmedAge", big.NewInt(0), float64(0)).Return() backend.On("SetMaxUnconfirmedBlocks", big.NewInt(0), int64(0)).Return() - backend.On("SetPipelineTaskRunsQueued", 0).Return() - backend.On("SetPipelineRunsQueued", 0). - Run(func(args mock.Arguments) { - subscribeCalls.Add(1) - }). - Return() - servicetest.Run(t, reporter) + reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db)) + reporter.SetBackend(backend) - head := newHead() - reporter.OnNewLongestChain(testutils.Context(t), &head) + head := headreporter.NewHead() + err := reporter.ReportNewHead(testutils.Context(t), &head) + require.NoError(t, err) - require.Eventually(t, func() bool { return subscribeCalls.Load() >= 1 }, 12*time.Second, 100*time.Millisecond) + backend.On("SetPipelineTaskRunsQueued", 0).Return() + backend.On("SetPipelineRunsQueued", 0).Return() + err = reporter.ReportPeriodic(testutils.Context(t)) + require.NoError(t, err) }) t.Run("with unconfirmed evm.txes", func(t *testing.T) { @@ -102,61 +51,92 @@ func Test_PromReporter_OnNewLongestChain(t *testing.T) { ethKeyStore := cltest.NewKeyStore(t, db).Eth() _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore) - var subscribeCalls atomic.Int32 + etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress) + cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 1, fromAddress) + cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 2, fromAddress) + require.NoError(t, txStore.UpdateTxAttemptBroadcastBeforeBlockNum(testutils.Context(t), etx.ID, 7)) - backend := mocks.NewPrometheusBackend(t) + backend := headreporter.NewMockPrometheusBackend(t) backend.On("SetUnconfirmedTransactions", big.NewInt(0), int64(3)).Return() backend.On("SetMaxUnconfirmedAge", big.NewInt(0), mock.MatchedBy(func(s float64) bool { return s > 0 })).Return() backend.On("SetMaxUnconfirmedBlocks", big.NewInt(0), int64(35)).Return() - backend.On("SetPipelineTaskRunsQueued", 0).Return() - backend.On("SetPipelineRunsQueued", 0). - Run(func(args mock.Arguments) { - subscribeCalls.Add(1) - }). - Return() - reporter := promreporter.NewPromReporter(db, newLegacyChainContainer(t, db), logger.TestLogger(t), backend, 10*time.Millisecond) - servicetest.Run(t, reporter) - etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress) - cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 1, fromAddress) - cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 2, fromAddress) - require.NoError(t, txStore.UpdateTxAttemptBroadcastBeforeBlockNum(testutils.Context(t), etx.ID, 7)) + reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db)) + reporter.SetBackend(backend) + + head := headreporter.NewHead() + err := reporter.ReportNewHead(testutils.Context(t), &head) + require.NoError(t, err) - head := newHead() - reporter.OnNewLongestChain(testutils.Context(t), &head) + backend.On("SetPipelineTaskRunsQueued", 0).Return() + backend.On("SetPipelineRunsQueued", 0).Return() - require.Eventually(t, func() bool { return subscribeCalls.Load() >= 1 }, 12*time.Second, 100*time.Millisecond) + err = reporter.ReportPeriodic(testutils.Context(t)) + require.NoError(t, err) }) t.Run("with unfinished pipeline task runs", func(t *testing.T) { db := pgtest.NewSqlxDB(t) pgtest.MustExec(t, db, `SET CONSTRAINTS pipeline_task_runs_pipeline_run_id_fkey DEFERRED`) - backend := mocks.NewPrometheusBackend(t) - reporter := promreporter.NewPromReporter(db, newLegacyChainContainer(t, db), logger.TestLogger(t), backend, 10*time.Millisecond) - cltest.MustInsertUnfinishedPipelineTaskRun(t, db, 1) cltest.MustInsertUnfinishedPipelineTaskRun(t, db, 1) cltest.MustInsertUnfinishedPipelineTaskRun(t, db, 2) - var subscribeCalls atomic.Int32 - + backend := headreporter.NewMockPrometheusBackend(t) backend.On("SetUnconfirmedTransactions", big.NewInt(0), int64(0)).Return() backend.On("SetMaxUnconfirmedAge", big.NewInt(0), float64(0)).Return() backend.On("SetMaxUnconfirmedBlocks", big.NewInt(0), int64(0)).Return() - backend.On("SetPipelineTaskRunsQueued", 3).Return() - backend.On("SetPipelineRunsQueued", 2). - Run(func(args mock.Arguments) { - subscribeCalls.Add(1) - }). - Return() - servicetest.Run(t, reporter) - head := newHead() - reporter.OnNewLongestChain(testutils.Context(t), &head) + reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db)) + reporter.SetBackend(backend) + + head := headreporter.NewHead() + err := reporter.ReportNewHead(testutils.Context(t), &head) + require.NoError(t, err) + + backend.On("SetPipelineTaskRunsQueued", 3).Return() + backend.On("SetPipelineRunsQueued", 2).Return() - require.Eventually(t, func() bool { return subscribeCalls.Load() >= 1 }, 12*time.Second, 100*time.Millisecond) + err = reporter.ReportPeriodic(testutils.Context(t)) + require.NoError(t, err) }) } + +func newLegacyChainContainer(t *testing.T, db *sqlx.DB) legacyevm.LegacyChainContainer { + config, dbConfig, evmConfig := txmgr.MakeTestConfigs(t) + keyStore := cltest.NewKeyStore(t, db).Eth() + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + estimator, err := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator()) + require.NoError(t, err) + lggr := logger.TestLogger(t) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + ht := headtracker.NewSimulatedHeadTracker(ethClient, lpOpts.UseFinalityTag, lpOpts.FinalityDepth) + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr), ethClient, lggr, ht, lpOpts) + + txm, err := txmgr.NewTxm( + db, + evmConfig, + evmConfig.GasEstimator(), + evmConfig.Transactions(), + nil, + dbConfig, + dbConfig.Listener(), + ethClient, + lggr, + lp, + keyStore, + estimator) + require.NoError(t, err) + + cfg := configtest.NewGeneralConfig(t, nil) + return cltest.NewLegacyChainsWithMockChainAndTxManager(t, ethClient, cfg, txm) +} diff --git a/core/services/headreporter/telemetry_reporter.go b/core/services/headreporter/telemetry_reporter.go new file mode 100644 index 0000000000..0d93ca59a4 --- /dev/null +++ b/core/services/headreporter/telemetry_reporter.go @@ -0,0 +1,69 @@ +package headreporter + +import ( + "context" + "math/big" + + "github.com/pkg/errors" + + "github.com/smartcontractkit/libocr/commontypes" + "google.golang.org/protobuf/proto" + + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/synchronization" + "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" + "github.com/smartcontractkit/chainlink/v2/core/services/telemetry" +) + +type telemetryReporter struct { + lggr logger.Logger + endpoints map[uint64]commontypes.MonitoringEndpoint +} + +func NewTelemetryReporter(monitoringEndpointGen telemetry.MonitoringEndpointGenerator, lggr logger.Logger, chainIDs ...*big.Int) HeadReporter { + endpoints := make(map[uint64]commontypes.MonitoringEndpoint) + for _, chainID := range chainIDs { + endpoints[chainID.Uint64()] = monitoringEndpointGen.GenMonitoringEndpoint("EVM", chainID.String(), "", synchronization.HeadReport) + } + return &telemetryReporter{lggr: lggr.Named("TelemetryReporter"), endpoints: endpoints} +} + +func (t *telemetryReporter) ReportNewHead(ctx context.Context, head *evmtypes.Head) error { + monitoringEndpoint := t.endpoints[head.EVMChainID.ToInt().Uint64()] + if monitoringEndpoint == nil { + return errors.Errorf("No monitoring endpoint provided chain_id=%d", head.EVMChainID.Int64()) + } + var finalized *telem.Block + latestFinalizedHead := head.LatestFinalizedHead() + if latestFinalizedHead != nil { + finalized = &telem.Block{ + Timestamp: uint64(latestFinalizedHead.GetTimestamp().UTC().Unix()), + Number: uint64(latestFinalizedHead.BlockNumber()), + Hash: latestFinalizedHead.BlockHash().Hex(), + } + } + request := &telem.HeadReportRequest{ + ChainID: head.EVMChainID.String(), + Latest: &telem.Block{ + Timestamp: uint64(head.Timestamp.UTC().Unix()), + Number: uint64(head.Number), + Hash: head.Hash.Hex(), + }, + Finalized: finalized, + } + bytes, err := proto.Marshal(request) + if err != nil { + return errors.WithMessage(err, "telem.HeadReportRequest marshal error") + } + monitoringEndpoint.SendLog(bytes) + if finalized == nil { + t.lggr.Infow("No finalized block was found", "chainID", head.EVMChainID.Int64(), + "head.number", head.Number, "chainLength", head.ChainLength()) + } + return nil +} + +func (t *telemetryReporter) ReportPeriodic(ctx context.Context) error { + return nil +} diff --git a/core/services/headreporter/telemetry_reporter_test.go b/core/services/headreporter/telemetry_reporter_test.go new file mode 100644 index 0000000000..85bfea5866 --- /dev/null +++ b/core/services/headreporter/telemetry_reporter_test.go @@ -0,0 +1,108 @@ +package headreporter_test + +import ( + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" + + mocks2 "github.com/smartcontractkit/chainlink/v2/common/types/mocks" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/headreporter" + "github.com/smartcontractkit/chainlink/v2/core/services/synchronization" + "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" + "github.com/smartcontractkit/chainlink/v2/core/services/telemetry" +) + +func Test_TelemetryReporter_NewHead(t *testing.T) { + head := evmtypes.Head{ + Number: 42, + EVMChainID: ubig.NewI(100), + Hash: common.HexToHash("0x1010"), + Timestamp: time.UnixMilli(1000), + IsFinalized: false, + Parent: &evmtypes.Head{ + Number: 41, + Hash: common.HexToHash("0x1009"), + Timestamp: time.UnixMilli(999), + IsFinalized: true, + }, + } + requestBytes, err := proto.Marshal(&telem.HeadReportRequest{ + ChainID: "100", + Latest: &telem.Block{ + Timestamp: uint64(head.Timestamp.UTC().Unix()), + Number: 42, + Hash: head.Hash.Hex(), + }, + Finalized: &telem.Block{ + Timestamp: uint64(head.Parent.Timestamp.UTC().Unix()), + Number: 41, + Hash: head.Parent.Hash.Hex(), + }, + }) + assert.NoError(t, err) + + monitoringEndpoint := mocks2.NewMonitoringEndpoint(t) + monitoringEndpoint.On("SendLog", requestBytes).Return() + + monitoringEndpointGen := telemetry.NewMockMonitoringEndpointGenerator(t) + monitoringEndpointGen. + On("GenMonitoringEndpoint", "EVM", "100", "", synchronization.HeadReport). + Return(monitoringEndpoint) + reporter := headreporter.NewTelemetryReporter(monitoringEndpointGen, logger.TestLogger(t), big.NewInt(100)) + + err = reporter.ReportNewHead(testutils.Context(t), &head) + assert.NoError(t, err) +} + +func Test_TelemetryReporter_NewHeadMissingFinalized(t *testing.T) { + head := evmtypes.Head{ + Number: 42, + EVMChainID: ubig.NewI(100), + Hash: common.HexToHash("0x1010"), + Timestamp: time.UnixMilli(1000), + IsFinalized: false, + } + requestBytes, err := proto.Marshal(&telem.HeadReportRequest{ + ChainID: "100", + Latest: &telem.Block{ + Timestamp: uint64(head.Timestamp.UTC().Unix()), + Number: 42, + Hash: head.Hash.Hex(), + }, + }) + assert.NoError(t, err) + + monitoringEndpoint := mocks2.NewMonitoringEndpoint(t) + monitoringEndpoint.On("SendLog", requestBytes).Return() + + monitoringEndpointGen := telemetry.NewMockMonitoringEndpointGenerator(t) + monitoringEndpointGen. + On("GenMonitoringEndpoint", "EVM", "100", "", synchronization.HeadReport). + Return(monitoringEndpoint) + reporter := headreporter.NewTelemetryReporter(monitoringEndpointGen, logger.TestLogger(t), big.NewInt(100)) + + err = reporter.ReportNewHead(testutils.Context(t), &head) + assert.NoError(t, err) +} + +func Test_TelemetryReporter_NewHead_MissingEndpoint(t *testing.T) { + monitoringEndpointGen := telemetry.NewMockMonitoringEndpointGenerator(t) + monitoringEndpointGen. + On("GenMonitoringEndpoint", "EVM", "100", "", synchronization.HeadReport). + Return(nil) + + reporter := headreporter.NewTelemetryReporter(monitoringEndpointGen, logger.TestLogger(t), big.NewInt(100)) + + head := evmtypes.Head{Number: 42, EVMChainID: ubig.NewI(100)} + + err := reporter.ReportNewHead(testutils.Context(t), &head) + assert.Errorf(t, err, "No monitoring endpoint provided chain_id=100") +} diff --git a/core/services/synchronization/common.go b/core/services/synchronization/common.go index 394830a76a..a6c0191e3a 100644 --- a/core/services/synchronization/common.go +++ b/core/services/synchronization/common.go @@ -28,6 +28,7 @@ const ( OCR3CCIPCommit TelemetryType = "ocr3-ccip-commit" OCR3CCIPExec TelemetryType = "ocr3-ccip-exec" OCR3CCIPBootstrap TelemetryType = "ocr3-bootstrap" + HeadReport TelemetryType = "head-report" ) type TelemPayload struct { diff --git a/core/services/synchronization/telem/telem_head_report.pb.go b/core/services/synchronization/telem/telem_head_report.pb.go new file mode 100644 index 0000000000..12801314a7 --- /dev/null +++ b/core/services/synchronization/telem/telem_head_report.pb.go @@ -0,0 +1,256 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v4.25.1 +// source: core/services/synchronization/telem/telem_head_report.proto + +package telem + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type HeadReportRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ChainID string `protobuf:"bytes,1,opt,name=chainID,proto3" json:"chainID,omitempty"` + Latest *Block `protobuf:"bytes,2,opt,name=latest,proto3" json:"latest,omitempty"` + Finalized *Block `protobuf:"bytes,3,opt,name=finalized,proto3,oneof" json:"finalized,omitempty"` +} + +func (x *HeadReportRequest) Reset() { + *x = HeadReportRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_core_services_synchronization_telem_telem_head_report_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HeadReportRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HeadReportRequest) ProtoMessage() {} + +func (x *HeadReportRequest) ProtoReflect() protoreflect.Message { + mi := &file_core_services_synchronization_telem_telem_head_report_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HeadReportRequest.ProtoReflect.Descriptor instead. +func (*HeadReportRequest) Descriptor() ([]byte, []int) { + return file_core_services_synchronization_telem_telem_head_report_proto_rawDescGZIP(), []int{0} +} + +func (x *HeadReportRequest) GetChainID() string { + if x != nil { + return x.ChainID + } + return "" +} + +func (x *HeadReportRequest) GetLatest() *Block { + if x != nil { + return x.Latest + } + return nil +} + +func (x *HeadReportRequest) GetFinalized() *Block { + if x != nil { + return x.Finalized + } + return nil +} + +type Block struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Timestamp uint64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Number uint64 `protobuf:"varint,2,opt,name=number,proto3" json:"number,omitempty"` + Hash string `protobuf:"bytes,3,opt,name=hash,proto3" json:"hash,omitempty"` +} + +func (x *Block) Reset() { + *x = Block{} + if protoimpl.UnsafeEnabled { + mi := &file_core_services_synchronization_telem_telem_head_report_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Block) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Block) ProtoMessage() {} + +func (x *Block) ProtoReflect() protoreflect.Message { + mi := &file_core_services_synchronization_telem_telem_head_report_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Block.ProtoReflect.Descriptor instead. +func (*Block) Descriptor() ([]byte, []int) { + return file_core_services_synchronization_telem_telem_head_report_proto_rawDescGZIP(), []int{1} +} + +func (x *Block) GetTimestamp() uint64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +func (x *Block) GetNumber() uint64 { + if x != nil { + return x.Number + } + return 0 +} + +func (x *Block) GetHash() string { + if x != nil { + return x.Hash + } + return "" +} + +var File_core_services_synchronization_telem_telem_head_report_proto protoreflect.FileDescriptor + +var file_core_services_synchronization_telem_telem_head_report_proto_rawDesc = []byte{ + 0x0a, 0x3b, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, + 0x73, 0x79, 0x6e, 0x63, 0x68, 0x72, 0x6f, 0x6e, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, + 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x5f, 0x68, 0x65, 0x61, 0x64, + 0x5f, 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x74, + 0x65, 0x6c, 0x65, 0x6d, 0x22, 0x92, 0x01, 0x0a, 0x11, 0x48, 0x65, 0x61, 0x64, 0x52, 0x65, 0x70, + 0x6f, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x68, + 0x61, 0x69, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, 0x61, + 0x69, 0x6e, 0x49, 0x44, 0x12, 0x24, 0x0a, 0x06, 0x6c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x2e, 0x42, 0x6c, 0x6f, + 0x63, 0x6b, 0x52, 0x06, 0x6c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x12, 0x2f, 0x0a, 0x09, 0x66, 0x69, + 0x6e, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0c, 0x2e, + 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x00, 0x52, 0x09, 0x66, + 0x69, 0x6e, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x88, 0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, + 0x66, 0x69, 0x6e, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x22, 0x51, 0x0a, 0x05, 0x42, 0x6c, 0x6f, + 0x63, 0x6b, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x12, 0x16, 0x0a, 0x06, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x06, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x61, 0x73, 0x68, 0x42, 0x4e, 0x5a, 0x4c, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x6d, 0x61, 0x72, 0x74, + 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x6b, 0x69, 0x74, 0x2f, 0x63, 0x68, 0x61, 0x69, + 0x6e, 0x6c, 0x69, 0x6e, 0x6b, 0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x73, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x73, 0x79, 0x6e, 0x63, 0x68, 0x72, 0x6f, 0x6e, 0x69, + 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_core_services_synchronization_telem_telem_head_report_proto_rawDescOnce sync.Once + file_core_services_synchronization_telem_telem_head_report_proto_rawDescData = file_core_services_synchronization_telem_telem_head_report_proto_rawDesc +) + +func file_core_services_synchronization_telem_telem_head_report_proto_rawDescGZIP() []byte { + file_core_services_synchronization_telem_telem_head_report_proto_rawDescOnce.Do(func() { + file_core_services_synchronization_telem_telem_head_report_proto_rawDescData = protoimpl.X.CompressGZIP(file_core_services_synchronization_telem_telem_head_report_proto_rawDescData) + }) + return file_core_services_synchronization_telem_telem_head_report_proto_rawDescData +} + +var file_core_services_synchronization_telem_telem_head_report_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_core_services_synchronization_telem_telem_head_report_proto_goTypes = []interface{}{ + (*HeadReportRequest)(nil), // 0: telem.HeadReportRequest + (*Block)(nil), // 1: telem.Block +} +var file_core_services_synchronization_telem_telem_head_report_proto_depIdxs = []int32{ + 1, // 0: telem.HeadReportRequest.latest:type_name -> telem.Block + 1, // 1: telem.HeadReportRequest.finalized:type_name -> telem.Block + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_core_services_synchronization_telem_telem_head_report_proto_init() } +func file_core_services_synchronization_telem_telem_head_report_proto_init() { + if File_core_services_synchronization_telem_telem_head_report_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_core_services_synchronization_telem_telem_head_report_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HeadReportRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_core_services_synchronization_telem_telem_head_report_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Block); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_core_services_synchronization_telem_telem_head_report_proto_msgTypes[0].OneofWrappers = []interface{}{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_core_services_synchronization_telem_telem_head_report_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_core_services_synchronization_telem_telem_head_report_proto_goTypes, + DependencyIndexes: file_core_services_synchronization_telem_telem_head_report_proto_depIdxs, + MessageInfos: file_core_services_synchronization_telem_telem_head_report_proto_msgTypes, + }.Build() + File_core_services_synchronization_telem_telem_head_report_proto = out.File + file_core_services_synchronization_telem_telem_head_report_proto_rawDesc = nil + file_core_services_synchronization_telem_telem_head_report_proto_goTypes = nil + file_core_services_synchronization_telem_telem_head_report_proto_depIdxs = nil +} diff --git a/core/services/synchronization/telem/telem_head_report.proto b/core/services/synchronization/telem/telem_head_report.proto new file mode 100644 index 0000000000..6f4cf2ddae --- /dev/null +++ b/core/services/synchronization/telem/telem_head_report.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +option go_package = "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem"; + +package telem; + +message HeadReportRequest { + string chainID = 1; + Block latest = 2; + optional Block finalized = 3; +} + +message Block { + uint64 timestamp = 1; + uint64 number = 2; + string hash = 3; +} diff --git a/core/services/telemetry/monitoring_endpoint_generator_mock.go b/core/services/telemetry/monitoring_endpoint_generator_mock.go new file mode 100644 index 0000000000..a0fc503ecc --- /dev/null +++ b/core/services/telemetry/monitoring_endpoint_generator_mock.go @@ -0,0 +1,88 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package telemetry + +import ( + commontypes "github.com/smartcontractkit/libocr/commontypes" + mock "github.com/stretchr/testify/mock" + + synchronization "github.com/smartcontractkit/chainlink/v2/core/services/synchronization" +) + +// MockMonitoringEndpointGenerator is an autogenerated mock type for the MonitoringEndpointGenerator type +type MockMonitoringEndpointGenerator struct { + mock.Mock +} + +type MockMonitoringEndpointGenerator_Expecter struct { + mock *mock.Mock +} + +func (_m *MockMonitoringEndpointGenerator) EXPECT() *MockMonitoringEndpointGenerator_Expecter { + return &MockMonitoringEndpointGenerator_Expecter{mock: &_m.Mock} +} + +// GenMonitoringEndpoint provides a mock function with given fields: network, chainID, contractID, telemType +func (_m *MockMonitoringEndpointGenerator) GenMonitoringEndpoint(network string, chainID string, contractID string, telemType synchronization.TelemetryType) commontypes.MonitoringEndpoint { + ret := _m.Called(network, chainID, contractID, telemType) + + if len(ret) == 0 { + panic("no return value specified for GenMonitoringEndpoint") + } + + var r0 commontypes.MonitoringEndpoint + if rf, ok := ret.Get(0).(func(string, string, string, synchronization.TelemetryType) commontypes.MonitoringEndpoint); ok { + r0 = rf(network, chainID, contractID, telemType) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(commontypes.MonitoringEndpoint) + } + } + + return r0 +} + +// MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GenMonitoringEndpoint' +type MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call struct { + *mock.Call +} + +// GenMonitoringEndpoint is a helper method to define mock.On call +// - network string +// - chainID string +// - contractID string +// - telemType synchronization.TelemetryType +func (_e *MockMonitoringEndpointGenerator_Expecter) GenMonitoringEndpoint(network interface{}, chainID interface{}, contractID interface{}, telemType interface{}) *MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call { + return &MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call{Call: _e.mock.On("GenMonitoringEndpoint", network, chainID, contractID, telemType)} +} + +func (_c *MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call) Run(run func(network string, chainID string, contractID string, telemType synchronization.TelemetryType)) *MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(string), args[2].(string), args[3].(synchronization.TelemetryType)) + }) + return _c +} + +func (_c *MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call) Return(_a0 commontypes.MonitoringEndpoint) *MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call) RunAndReturn(run func(string, string, string, synchronization.TelemetryType) commontypes.MonitoringEndpoint) *MockMonitoringEndpointGenerator_GenMonitoringEndpoint_Call { + _c.Call.Return(run) + return _c +} + +// NewMockMonitoringEndpointGenerator creates a new instance of MockMonitoringEndpointGenerator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockMonitoringEndpointGenerator(t interface { + mock.TestingT + Cleanup(func()) +}) *MockMonitoringEndpointGenerator { + mock := &MockMonitoringEndpointGenerator{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/web/testdata/body/health.html b/core/web/testdata/body/health.html index 2a1b222753..4e244a9fe7 100644 --- a/core/web/testdata/body/health.html +++ b/core/web/testdata/body/health.html @@ -69,6 +69,9 @@ +
+ HeadReporter +
JobSpawner
@@ -96,9 +99,6 @@ BridgeCache -
- PromReporter -
TelemetryManager
diff --git a/core/web/testdata/body/health.json b/core/web/testdata/body/health.json index 10415c0abd..224a3534c9 100644 --- a/core/web/testdata/body/health.json +++ b/core/web/testdata/body/health.json @@ -99,6 +99,15 @@ "output": "" } }, + { + "type": "checks", + "id": "HeadReporter", + "attributes": { + "name": "HeadReporter", + "status": "passing", + "output": "" + } + }, { "type": "checks", "id": "JobSpawner", @@ -162,15 +171,6 @@ "output": "" } }, - { - "type": "checks", - "id": "PromReporter", - "attributes": { - "name": "PromReporter", - "status": "passing", - "output": "" - } - }, { "type": "checks", "id": "TelemetryManager", diff --git a/core/web/testdata/body/health.txt b/core/web/testdata/body/health.txt index 09c8cff6c2..0fbac846a6 100644 --- a/core/web/testdata/body/health.txt +++ b/core/web/testdata/body/health.txt @@ -10,6 +10,7 @@ ok EVM.0.Txm.BlockHistoryEstimator ok EVM.0.Txm.Broadcaster ok EVM.0.Txm.Confirmer ok EVM.0.Txm.WrappedEvmEstimator +ok HeadReporter ok JobSpawner ok Mailbox.Monitor ok Mercury.WSRPCPool @@ -17,5 +18,4 @@ ok Mercury.WSRPCPool.CacheSet ok PipelineORM ok PipelineRunner ok PipelineRunner.BridgeCache -ok PromReporter ok TelemetryManager diff --git a/integration-tests/testconfig/vrfv2plus/vrfv2plus.toml b/integration-tests/testconfig/vrfv2plus/vrfv2plus.toml index 8f8aa9530e..cd089013db 100644 --- a/integration-tests/testconfig/vrfv2plus/vrfv2plus.toml +++ b/integration-tests/testconfig/vrfv2plus/vrfv2plus.toml @@ -19,6 +19,7 @@ MaxSize = '0b' [WebServer] AllowOrigins = '*' HTTPPort = 6688 +HTTPWriteTimeout = '1m0s' SecureCookies = false [WebServer.RateLimit] diff --git a/testdata/scripts/health/default.txtar b/testdata/scripts/health/default.txtar index 1dbf6b8eb9..777d3e5e12 100644 --- a/testdata/scripts/health/default.txtar +++ b/testdata/scripts/health/default.txtar @@ -31,6 +31,7 @@ fj293fbBnlQ!f9vNs HTTPPort = $PORT -- out.txt -- +ok HeadReporter ok JobSpawner ok Mailbox.Monitor ok Mercury.WSRPCPool @@ -38,12 +39,20 @@ ok Mercury.WSRPCPool.CacheSet ok PipelineORM ok PipelineRunner ok PipelineRunner.BridgeCache -ok PromReporter ok TelemetryManager -- out.json -- { "data": [ + { + "type": "checks", + "id": "HeadReporter", + "attributes": { + "name": "HeadReporter", + "status": "passing", + "output": "" + } + }, { "type": "checks", "id": "JobSpawner", @@ -107,15 +116,6 @@ ok TelemetryManager "output": "" } }, - { - "type": "checks", - "id": "PromReporter", - "attributes": { - "name": "PromReporter", - "status": "passing", - "output": "" - } - }, { "type": "checks", "id": "TelemetryManager", diff --git a/testdata/scripts/health/multi-chain.txtar b/testdata/scripts/health/multi-chain.txtar index 8178f8e821..3bd15850af 100644 --- a/testdata/scripts/health/multi-chain.txtar +++ b/testdata/scripts/health/multi-chain.txtar @@ -75,6 +75,7 @@ ok EVM.1.Txm.BlockHistoryEstimator ok EVM.1.Txm.Broadcaster ok EVM.1.Txm.Confirmer ok EVM.1.Txm.WrappedEvmEstimator +ok HeadReporter ok JobSpawner ok Mailbox.Monitor ok Mercury.WSRPCPool @@ -82,7 +83,6 @@ ok Mercury.WSRPCPool.CacheSet ok PipelineORM ok PipelineRunner ok PipelineRunner.BridgeCache -ok PromReporter ok Solana.Bar ok StarkNet.Baz ok TelemetryManager @@ -216,6 +216,15 @@ ok TelemetryManager "output": "" } }, + { + "type": "checks", + "id": "HeadReporter", + "attributes": { + "name": "HeadReporter", + "status": "passing", + "output": "" + } + }, { "type": "checks", "id": "JobSpawner", @@ -279,15 +288,6 @@ ok TelemetryManager "output": "" } }, - { - "type": "checks", - "id": "PromReporter", - "attributes": { - "name": "PromReporter", - "status": "passing", - "output": "" - } - }, { "type": "checks", "id": "Solana.Bar",