Skip to content

Commit

Permalink
Add cache for unsupported transactions when parsing raw mempool
Browse files Browse the repository at this point in the history
  • Loading branch information
Ouziel committed Dec 16, 2024
1 parent 341b2f0 commit bd4530b
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 1 deletion.
40 changes: 39 additions & 1 deletion counterparty-core/counterpartycore/lib/follow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
import os
import struct
import threading
import time
Expand Down Expand Up @@ -258,9 +259,10 @@ async def handle(self):
logger.trace(
f"Processing {len(mempool_block)} transaction(s) from the raw mempool..."
)
mempool.parse_mempool_transactions(
not_supported_tx_hashes = mempool.parse_mempool_transactions(
self.db, mempool_block, timestamps=self.mempool_parser.timestamps
)
NotSupportedTransactionsCache().add(not_supported_tx_hashes)
else:
# sequence topic
await self.receive_multipart(self.zmq_sub_socket_sequence, "sequence")
Expand Down Expand Up @@ -319,6 +321,8 @@ def get_raw_mempool(db):
cursor = db.cursor()
txhash_list = []
for txid, tx_info in raw_mempool.items():
if NotSupportedTransactionsCache().is_not_supported(txid):
continue
existing_tx_in_mempool = cursor.execute(
"SELECT * FROM mempool WHERE tx_hash = ? LIMIT 1", (txid,)
).fetchone()
Expand Down Expand Up @@ -362,3 +366,37 @@ def stop(self):
logger.debug("Stopping RawMempoolParser...")
self.stop_event.set()
self.join()


class NotSupportedTransactionsCache(metaclass=util.SingletonMeta):
def __init__(self):
self.not_suppported_txs = []
self.cache_path = os.path.join(config.CACHE_DIR, "not_supported_tx_cache.txt")
self.restore()

def restore(self):
if os.path.exists(self.cache_path):
with open(self.cache_path, "r") as f:
self.not_suppported_txs = [line.strip() for line in f]
logger.debug(
f"Restored {len(self.not_suppported_txs)} not supported transactions from cache"
)

def backup(self):
with open(self.cache_path, "w") as f:
f.write("\n".join(self.not_suppported_txs[:200000])) # limit to 200k txs
logger.trace(
f"Backed up {len(self.not_suppported_txs)} not supported transactions to cache"
)

def clear(self):
self.not_suppported_txs = []
if os.path.exists(self.cache_path):
os.remove(self.cache_path)

def add(self, more_not_supported_txs):
self.not_suppported_txs += more_not_supported_txs
self.backup()

def is_not_supported(self, tx_hash):
return tx_hash in self.not_suppported_txs
6 changes: 6 additions & 0 deletions counterparty-core/counterpartycore/lib/mempool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None):
now = time.time()
transaction_events = []
cursor = db.cursor()
not_supported_txs = []
try:
with db:
# insert fake block
Expand Down Expand Up @@ -46,6 +47,7 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None):
for raw_tx in raw_tx_list:
decoded_tx = deserialize.deserialize_tx(raw_tx, use_txid=True)
existing_tx = ledger.get_transaction(db, decoded_tx["tx_hash"])
not_supported_txs.append(decoded_tx["tx_hash"])
if existing_tx:
logger.trace(f"Transaction {decoded_tx['tx_hash']} already in the database")
continue
Expand Down Expand Up @@ -82,6 +84,9 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None):
except exceptions.MempoolError:
# save events in the mempool table
for event in transaction_events:
if event["tx_hash"] in not_supported_txs:
not_supported_txs.remove(event["tx_hash"])

if timestamps:
event["timestamp"] = timestamps.get(event["tx_hash"], now)
else:
Expand All @@ -105,6 +110,7 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None):
)
logger.trace("Mempool transaction parsed successfully.")
util.PARSING_MEMPOOL = False
return not_supported_txs


def clean_transaction_events(db, tx_hash):
Expand Down
5 changes: 5 additions & 0 deletions counterparty-core/counterpartycore/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ def initialise_config(
os.makedirs(data_dir, mode=0o755)
config.DATA_DIR = data_dir

config.CACHE_DIR = appdirs.user_cache_dir(appauthor=config.XCP_NAME, appname=config.APP_NAME)
if not os.path.isdir(config.CACHE_DIR):
os.makedirs(config.CACHE_DIR, mode=0o755)

# testnet
if testnet:
config.TESTNET = testnet
Expand Down Expand Up @@ -888,6 +892,7 @@ def rollback(block_index=None):
try:
blocks.rollback(ledger_db, block_index=block_index)
dbbuilder.rollback_state_db(state_db, block_index)
follow.NotSupportedTransactionsCache().clear()
finally:
database.optimize(ledger_db)
database.optimize(state_db)
Expand Down
1 change: 1 addition & 0 deletions release-notes/release-notes-v10.9.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

- Refactor raw mempool parsing; Don't block following
- Add a timeout to parse mempool transaction from ZMQ
- Add cache for unsupported transactions when parsing raw mempool

## API

Expand Down

0 comments on commit bd4530b

Please sign in to comment.