Skip to content

Commit

Permalink
switch to polling, claim some messages now
Browse files Browse the repository at this point in the history
  • Loading branch information
kfastov committed Oct 1, 2024
1 parent 9a3ebff commit 679e3db
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 61 deletions.
3 changes: 3 additions & 0 deletions src/services/rootPropagator/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ dotenv.config();
const config = {
propagationPeriod: parseInt(process.env.PROPAGATION_PERIOD) || 3600000, // Default to 1 hour
listenerInitDelay: parseInt(process.env.LISTENER_INIT_DELAY) || 5000, // Default to 5 seconds
l1BlocksToQuery: parseInt(process.env.L1_BLOCKS_TO_QUERY) || 100000, // Default to 100,000 blocks
l2BlocksToQuery: parseInt(process.env.L2_BLOCKS_TO_QUERY) || 100000, // Default to 100,000 blocks
privateKey: process.env.PRIVATE_KEY,
l1RpcUrl: process.env.L1_RPC_URL,
l2RpcUrl: process.env.L2_RPC_URL,
lineaStateBridgeAddress: process.env.LINEA_STATE_BRIDGE_ADDRESS,
l1MessageServiceAddress: process.env.L1_MESSAGE_SERVICE_ADDRESS,
l2MessageServiceAddress: process.env.L2_MESSAGE_SERVICE_ADDRESS,
l2PollingInterval: parseInt(process.env.L2_POLLING_INTERVAL) || 30000, // Default to 30 seconds
};

export function validateConfig() {
Expand Down
14 changes: 7 additions & 7 deletions src/services/rootPropagator/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ export async function saveMessage(messageData) {
});
}

export async function getUnconfirmedMessages() {
export async function getUnclaimedMessages() {
return new Promise((resolve, reject) => {
db.find({ status: 'pending' }, (err, docs) => {
db.find({ status: 'confirmed' }, (err, docs) => {
if (err) {
console.error('Failed to get unconfirmed messages:', err);
console.error('Failed to get unclaimed messages:', err);
reject(err);
} else {
resolve(docs);
Expand All @@ -55,14 +55,14 @@ export async function updateMessageStatus(messageHash, status) {
});
}

export async function deleteConfirmedMessages() {
export async function deleteClaimedMessages() {
return new Promise((resolve, reject) => {
db.remove({ status: 'confirmed' }, { multi: true }, (err, numRemoved) => {
db.remove({ status: 'claimed' }, { multi: true }, (err, numRemoved) => {
if (err) {
console.error('Failed to delete confirmed messages:', err);
console.error('Failed to delete claimed messages:', err);
reject(err);
} else {
console.log(`Deleted ${numRemoved} confirmed messages`);
console.log(`Deleted ${numRemoved} claimed messages`);
resolve(numRemoved);
}
});
Expand Down
62 changes: 41 additions & 21 deletions src/services/rootPropagator/modules/L1/L1MessageListener.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,48 @@ export async function listenForL1Messages() {
const l1MessageServiceContract = new ethers.Contract(config.l1MessageServiceAddress, abi, provider);

const filter = l1MessageServiceContract.filters.MessageSent(config.lineaStateBridgeAddress);

l1MessageServiceContract.on(filter, async (payload) => {
logger.info(`MessageSent Event Detected:`);
const [_from, _to, _fee, _value, nonce, calldata, messageHash] = payload.args;
logger.info(`Message details`, {
nonce,
messageHash,
calldata,
blockNumber: payload.log.blockNumber,
transactionHash: payload.log.transactionHash
});

try {
await saveMessage({
messageHash: messageHash,
nonce: nonce,
calldata: calldata,
});
} catch (error) {
logger.error('Error saving message to database:', { error: error.message });
}
// Process past events
const latestBlock = await provider.getBlockNumber();
const blocksToQuery = config.l1BlocksToQuery || 100000; // Default to 100,000 blocks if not specified
const fromBlock = Math.max(0, latestBlock - blocksToQuery);
logger.info(`Querying past events from block ${fromBlock} to ${latestBlock}`);

const pastEvents = await l1MessageServiceContract.queryFilter(filter, fromBlock, latestBlock);
logger.info(`Found ${pastEvents.length} past events`);

for (const event of pastEvents) {
await processEvent(event);
}

// Set up listener for new events
l1MessageServiceContract.on(filter, async (event) => {
await processEvent(event);
});

logger.info('L1 Message Listener started');
logger.info('L1 Message Listener started for new events');
}

async function processEvent(event) {
const [_from, to, fee, value, nonce, calldata, messageHash] = event.args;
logger.info('Processing MessageSent Event', {
nonce,
messageHash,
});

try {
await saveMessage({
destination: to,
messageHash: messageHash,
nonce: nonce.toString(),
fee: fee.toString(),
value: value.toString(),
calldata: calldata,
status: 'pending',
blockNumber: event.blockNumber,
transactionHash: event.transactionHash
});
} catch (error) {
logger.error('Error saving message to database:', { error: error.message, messageHash });
}
}
106 changes: 84 additions & 22 deletions src/services/rootPropagator/modules/L2/L2MessageHandler.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ethers } from 'ethers';
import { LineaSDK } from '@consensys/linea-sdk';
import config from '../../config.js';
import { getUnconfirmedMessages, updateMessageStatus, deleteConfirmedMessages } from '../../database.js';
import { getUnclaimedMessages, updateMessageStatus, deleteClaimedMessages } from '../../database.js';
import { createModuleLogger } from '../../utils/logger.js';

const logger = createModuleLogger('L2MessageHandler');
Expand All @@ -10,21 +10,72 @@ const abi = [
"event L1L2MessageHashesAddedToInbox(bytes32[] messageHashes)"
];

export function listenForL2Messages() {
let latestProcessedBlock = 0;

export async function setupL2EventPolling() {
const provider = new ethers.JsonRpcProvider(config.l2RpcUrl);
const l2MessageServiceContract = new ethers.Contract(config.l2MessageServiceAddress, abi, provider);

l2MessageServiceContract.on("L1L2MessageHashesAddedToInbox", async (messageHashes) => {
logger.info('New message hashes received on L2', { messageHashes });
try {
// Update all message statuses in a batch
await updateMessageStatuses(messageHashes);
} catch (error) {
logger.error('Error updating message statuses', { error: error.message });
// Process past events
const latestBlock = await provider.getBlockNumber();
const blocksToQuery = config.l2BlocksToQuery || 100000; // Default to 100,000 blocks if not specified
const fromBlock = Math.max(0, latestBlock - blocksToQuery);
logger.info(`Querying past L2 events from block ${fromBlock} to ${latestBlock}`);

const filter = l2MessageServiceContract.filters.L1L2MessageHashesAddedToInbox();
const pastEvents = await l2MessageServiceContract.queryFilter(filter, fromBlock, latestBlock);
logger.info(`Found ${pastEvents.length} past L2 events`);

for (const event of pastEvents) {
await processEvent(event);
}

// Set the latest processed block after handling past events
latestProcessedBlock = latestBlock;
logger.info(`Processed past events up to block ${latestProcessedBlock}`);

// Set up interval for polling new events
setInterval(() => pollL2Events(provider, l2MessageServiceContract), config.l2PollingInterval);
}

async function pollL2Events(provider, l2MessageServiceContract) {
try {
const latestBlock = await provider.getBlockNumber();

if (latestBlock <= latestProcessedBlock) {
logger.debug('No new blocks to process');
return;
}

logger.info(`Polling L2 events from block ${latestProcessedBlock + 1} to ${latestBlock}`);

const filter = l2MessageServiceContract.filters.L1L2MessageHashesAddedToInbox();
const events = await l2MessageServiceContract.queryFilter(filter, latestProcessedBlock + 1, latestBlock);

for (const event of events) {
await processEvent(event);
}

latestProcessedBlock = latestBlock;
logger.info(`Processed events up to block ${latestProcessedBlock}`);
} catch (error) {
logger.error('Error polling L2 events', { error: error.message });
}
}

async function processEvent(event) {
const messageHashes = event.args.messageHashes;
logger.info('Processing L1L2MessageHashesAddedToInbox Event:', {
messageHashes,
blockNumber: event.blockNumber,
transactionHash: event.transactionHash
});

logger.info('L2 Message Listener started');
try {
await updateMessageStatuses(messageHashes);
} catch (error) {
logger.error('Error updating message statuses', { error: error.message });
}
}

async function updateMessageStatuses(messageHashes) {
Expand All @@ -37,38 +88,49 @@ async function updateMessageStatuses(messageHashes) {
await Promise.all(updatePromises);
}

export async function confirmL2Messages() {
export async function claimL2Messages() {
const lineaSDK = new LineaSDK({
l2RpcUrl: config.l2RpcUrl,
l2SignerPrivateKey: config.privateKey,
network: "linea-mainnet",
network: "linea-sepolia", // TODO: make this dynamic based on environment
mode: "read-write",
});

const l2Contract = lineaSDK.getL2Contract(config.l2MessageServiceAddress);

const unconfirmedMessages = await getUnconfirmedMessages();
const unclaimedMessages = await getUnclaimedMessages();

for (const message of unconfirmedMessages) {
for (const message of unclaimedMessages) {
try {
console.log('Claiming message', { messageHash: message.messageHash });
console.log('message contents', {
messageSender: config.lineaStateBridgeAddress,
destination: message.destination,
fee: message.fee,
value: message.value,
messageNonce: message.nonce,
calldata: message.calldata,
messageHash: message.messageHash
});
const tx = await l2Contract.claim({
messageSender: config.lineaStateBridgeAddress,
destination: config.l2MessageServiceAddress,
fee: "0", // Assuming no fee for now
value: "0", // Assuming no value transfer
destination: message.destination,
fee: message.fee,
value: message.value,
messageNonce: message.nonce,
calldata: message.calldata,
messageHash: message.messageHash
});

await tx.wait();
logger.info('Message confirmed successfully', { messageHash: message.messageHash });
await updateMessageStatus(message.messageHash, 'confirmed');
logger.info('Message claimed successfully', { messageHash: message.messageHash });
await updateMessageStatus(message.messageHash, 'claimed');
} catch (error) {
logger.error('Error confirming message', { messageHash: message.messageHash, error: error.message });
logger.error('Error claimed message', { messageHash: message.messageHash, error: error.message });
}
// break // process only one message for now
}

// Clean up confirmed messages
await deleteConfirmedMessages();
// Clean up claimed messages
await deleteClaimedMessages();
}
22 changes: 11 additions & 11 deletions src/services/rootPropagator/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import cron from 'node-cron';
import config from './config.js';
import { propagateRoot } from './modules/L1/LineaRootPropagator.js';
import { listenForL1Messages } from './modules/L1/L1MessageListener.js';
import { listenForL2Messages, confirmL2Messages } from './modules/L2/L2MessageHandler.js';
import { setupL2EventPolling, claimL2Messages } from './modules/L2/L2MessageHandler.js';
import { createModuleLogger } from './utils/logger.js';

const logger = createModuleLogger('Scheduler');
Expand All @@ -12,29 +12,29 @@ export async function startScheduler() {
logger.info('Starting L1 Message Listener...');
await listenForL1Messages();

// Start L2 message listener (runs continuously)
logger.info('Starting L2 Message Listener...');
listenForL2Messages();
// Start L2 event polling
logger.info('Setting up L2 Event Polling...');
await setupL2EventPolling();

// Delay the first root propagation to ensure listeners are ready
logger.info(`Waiting for listeners to initialize (${config.listenerInitDelay}ms)...`);
await new Promise(resolve => setTimeout(resolve, config.listenerInitDelay));

// Execute initial root propagation
logger.info('Executing initial root propagation...');
await propagateRoot();
// logger.info('Executing initial root propagation...');
// await propagateRoot();

// Schedule subsequent root propagations
const propagationPeriodMinutes = Math.floor(config.propagationPeriod / 60000);
cron.schedule(`*/${propagationPeriodMinutes} * * * *`, async () => {
logger.info('Executing scheduled root propagation...');
await propagateRoot();
// await propagateRoot();
});

// Schedule L2 message confirmation (every 5 minutes)
cron.schedule('*/5 * * * *', async () => {
logger.info('Confirming L2 messages...');
await confirmL2Messages();
// Schedule L2 message confirmation (every 1 minutes)
cron.schedule('*/1 * * * *', async () => {
logger.info('Claiming L2 messages...');
await claimL2Messages();
});

logger.info('Scheduler started successfully');
Expand Down

0 comments on commit 679e3db

Please sign in to comment.