From bd4530b6f286c5f4235fd2cda53d3d6a45c0d8aa Mon Sep 17 00:00:00 2001 From: Ouziel Slama Date: Mon, 16 Dec 2024 15:31:49 +0000 Subject: [PATCH 1/4] Add cache for unsupported transactions when parsing raw mempool --- .../counterpartycore/lib/follow.py | 40 ++++++++++++++++++- .../counterpartycore/lib/mempool.py | 6 +++ counterparty-core/counterpartycore/server.py | 5 +++ release-notes/release-notes-v10.9.0.md | 1 + 4 files changed, 51 insertions(+), 1 deletion(-) diff --git a/counterparty-core/counterpartycore/lib/follow.py b/counterparty-core/counterpartycore/lib/follow.py index ca7ba3cd8..b1a3ced27 100644 --- a/counterparty-core/counterpartycore/lib/follow.py +++ b/counterparty-core/counterpartycore/lib/follow.py @@ -1,5 +1,6 @@ import asyncio import logging +import os import struct import threading import time @@ -258,9 +259,10 @@ async def handle(self): logger.trace( f"Processing {len(mempool_block)} transaction(s) from the raw mempool..." ) - mempool.parse_mempool_transactions( + not_supported_tx_hashes = mempool.parse_mempool_transactions( self.db, mempool_block, timestamps=self.mempool_parser.timestamps ) + NotSupportedTransactionsCache().add(not_supported_tx_hashes) else: # sequence topic await self.receive_multipart(self.zmq_sub_socket_sequence, "sequence") @@ -319,6 +321,8 @@ def get_raw_mempool(db): cursor = db.cursor() txhash_list = [] for txid, tx_info in raw_mempool.items(): + if NotSupportedTransactionsCache().is_not_supported(txid): + continue existing_tx_in_mempool = cursor.execute( "SELECT * FROM mempool WHERE tx_hash = ? LIMIT 1", (txid,) ).fetchone() @@ -362,3 +366,37 @@ def stop(self): logger.debug("Stopping RawMempoolParser...") self.stop_event.set() self.join() + + +class NotSupportedTransactionsCache(metaclass=util.SingletonMeta): + def __init__(self): + self.not_suppported_txs = [] + self.cache_path = os.path.join(config.CACHE_DIR, "not_supported_tx_cache.txt") + self.restore() + + def restore(self): + if os.path.exists(self.cache_path): + with open(self.cache_path, "r") as f: + self.not_suppported_txs = [line.strip() for line in f] + logger.debug( + f"Restored {len(self.not_suppported_txs)} not supported transactions from cache" + ) + + def backup(self): + with open(self.cache_path, "w") as f: + f.write("\n".join(self.not_suppported_txs[:200000])) # limit to 200k txs + logger.trace( + f"Backed up {len(self.not_suppported_txs)} not supported transactions to cache" + ) + + def clear(self): + self.not_suppported_txs = [] + if os.path.exists(self.cache_path): + os.remove(self.cache_path) + + def add(self, more_not_supported_txs): + self.not_suppported_txs += more_not_supported_txs + self.backup() + + def is_not_supported(self, tx_hash): + return tx_hash in self.not_suppported_txs diff --git a/counterparty-core/counterpartycore/lib/mempool.py b/counterparty-core/counterpartycore/lib/mempool.py index 2d8f8eb3e..9d10e984d 100644 --- a/counterparty-core/counterpartycore/lib/mempool.py +++ b/counterparty-core/counterpartycore/lib/mempool.py @@ -15,6 +15,7 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None): now = time.time() transaction_events = [] cursor = db.cursor() + not_supported_txs = [] try: with db: # insert fake block @@ -46,6 +47,7 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None): for raw_tx in raw_tx_list: decoded_tx = deserialize.deserialize_tx(raw_tx, use_txid=True) existing_tx = ledger.get_transaction(db, decoded_tx["tx_hash"]) + not_supported_txs.append(decoded_tx["tx_hash"]) if existing_tx: logger.trace(f"Transaction {decoded_tx['tx_hash']} already in the database") continue @@ -82,6 +84,9 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None): except exceptions.MempoolError: # save events in the mempool table for event in transaction_events: + if event["tx_hash"] in not_supported_txs: + not_supported_txs.remove(event["tx_hash"]) + if timestamps: event["timestamp"] = timestamps.get(event["tx_hash"], now) else: @@ -105,6 +110,7 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None): ) logger.trace("Mempool transaction parsed successfully.") util.PARSING_MEMPOOL = False + return not_supported_txs def clean_transaction_events(db, tx_hash): diff --git a/counterparty-core/counterpartycore/server.py b/counterparty-core/counterpartycore/server.py index 087c3358f..284b20d84 100755 --- a/counterparty-core/counterpartycore/server.py +++ b/counterparty-core/counterpartycore/server.py @@ -193,6 +193,10 @@ def initialise_config( os.makedirs(data_dir, mode=0o755) config.DATA_DIR = data_dir + config.CACHE_DIR = appdirs.user_cache_dir(appauthor=config.XCP_NAME, appname=config.APP_NAME) + if not os.path.isdir(config.CACHE_DIR): + os.makedirs(config.CACHE_DIR, mode=0o755) + # testnet if testnet: config.TESTNET = testnet @@ -888,6 +892,7 @@ def rollback(block_index=None): try: blocks.rollback(ledger_db, block_index=block_index) dbbuilder.rollback_state_db(state_db, block_index) + follow.NotSupportedTransactionsCache().clear() finally: database.optimize(ledger_db) database.optimize(state_db) diff --git a/release-notes/release-notes-v10.9.0.md b/release-notes/release-notes-v10.9.0.md index 3fa0c031e..5bac7bbac 100644 --- a/release-notes/release-notes-v10.9.0.md +++ b/release-notes/release-notes-v10.9.0.md @@ -16,6 +16,7 @@ - Refactor raw mempool parsing; Don't block following - Add a timeout to parse mempool transaction from ZMQ +- Add cache for unsupported transactions when parsing raw mempool ## API From 92adf9c17460058131d272a049f1a43fa4551537 Mon Sep 17 00:00:00 2001 From: Ouziel Slama Date: Mon, 16 Dec 2024 15:35:51 +0000 Subject: [PATCH 2/4] one file by network --- counterparty-core/counterpartycore/lib/follow.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/counterparty-core/counterpartycore/lib/follow.py b/counterparty-core/counterpartycore/lib/follow.py index b1a3ced27..eac58bcde 100644 --- a/counterparty-core/counterpartycore/lib/follow.py +++ b/counterparty-core/counterpartycore/lib/follow.py @@ -371,7 +371,9 @@ def stop(self): class NotSupportedTransactionsCache(metaclass=util.SingletonMeta): def __init__(self): self.not_suppported_txs = [] - self.cache_path = os.path.join(config.CACHE_DIR, "not_supported_tx_cache.txt") + self.cache_path = os.path.join( + config.CACHE_DIR, f"not_supported_tx_cache.{config.NETWORK_NAME}.txt" + ) self.restore() def restore(self): From 642a6d464d73bdc2099ff5e4a6e071e3a4596d84 Mon Sep 17 00:00:00 2001 From: Ouziel Slama Date: Mon, 16 Dec 2024 17:12:00 +0000 Subject: [PATCH 3/4] tweaks --- counterparty-core/counterpartycore/lib/follow.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/counterparty-core/counterpartycore/lib/follow.py b/counterparty-core/counterpartycore/lib/follow.py index eac58bcde..266a50bb7 100644 --- a/counterparty-core/counterpartycore/lib/follow.py +++ b/counterparty-core/counterpartycore/lib/follow.py @@ -191,7 +191,8 @@ def receive_sequence(self, body): # parse mempool block if needed if self.need_to_parse_mempool_block(): # parse mempool block - mempool.parse_mempool_transactions(self.db, self.mempool_block) + not_supported = mempool.parse_mempool_transactions(self.db, self.mempool_block) + NotSupportedTransactionsCache().add(not_supported) self.last_mempool_parsing_time = time.time() # reset mempool block self.mempool_block = [] @@ -386,7 +387,7 @@ def restore(self): def backup(self): with open(self.cache_path, "w") as f: - f.write("\n".join(self.not_suppported_txs[:200000])) # limit to 200k txs + f.write("\n".join(self.not_suppported_txs[-200000:])) # limit to 200k txs logger.trace( f"Backed up {len(self.not_suppported_txs)} not supported transactions to cache" ) From d60943de8222984bce011810d893e03cdf83f311 Mon Sep 17 00:00:00 2001 From: Ouziel Slama Date: Mon, 16 Dec 2024 17:20:56 +0000 Subject: [PATCH 4/4] Add --cache-dir flag --- counterparty-core/counterpartycore/cli.py | 1 + counterparty-core/counterpartycore/server.py | 13 +++++++------ counterparty-core/counterpartycore/test/conftest.py | 1 + release-notes/release-notes-v10.9.0.md | 2 ++ 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/counterparty-core/counterpartycore/cli.py b/counterparty-core/counterpartycore/cli.py index b6e90fd82..156a45c7d 100755 --- a/counterparty-core/counterpartycore/cli.py +++ b/counterparty-core/counterpartycore/cli.py @@ -240,6 +240,7 @@ def float_range_checker(arg): {"action": "store_true", "default": False, "help": "don't ask for confirmation"}, ], [("--data-dir",), {"default": None, "help": "the path to the data directory"}], + [("--cache-dir",), {"default": None, "help": "the path to the cache directory"}], [ ("--log-file",), {"nargs": "?", "const": None, "default": False, "help": "log to the specified file"}, diff --git a/counterparty-core/counterpartycore/server.py b/counterparty-core/counterpartycore/server.py index 284b20d84..371211b85 100755 --- a/counterparty-core/counterpartycore/server.py +++ b/counterparty-core/counterpartycore/server.py @@ -136,6 +136,7 @@ def initialise_log_config( def initialise_config( data_dir=None, + cache_dir=None, testnet=False, testcoin=False, regtest=False, @@ -193,9 +194,11 @@ def initialise_config( os.makedirs(data_dir, mode=0o755) config.DATA_DIR = data_dir - config.CACHE_DIR = appdirs.user_cache_dir(appauthor=config.XCP_NAME, appname=config.APP_NAME) - if not os.path.isdir(config.CACHE_DIR): + if not cache_dir: + cache_dir = appdirs.user_cache_dir(appauthor=config.XCP_NAME, appname=config.APP_NAME) + if not os.path.isdir(cache_dir): os.makedirs(config.CACHE_DIR, mode=0o755) + config.CACHE_DIR = cache_dir # testnet if testnet: @@ -245,10 +248,7 @@ def initialise_config( config.DATABASE = os.path.join(data_dir, filename) config.FETCHER_DB_OLD = os.path.join(os.path.dirname(config.DATABASE), f"fetcherdb{network}") - config.FETCHER_DB = os.path.join( - appdirs.user_cache_dir(appauthor=config.XCP_NAME, appname=config.APP_NAME), - f"fetcherdb{network}", - ) + config.FETCHER_DB = os.path.join(config.CACHE_DIR, f"fetcherdb{network}") config.STATE_DATABASE = os.path.join(os.path.dirname(config.DATABASE), f"state{network}.db") @@ -627,6 +627,7 @@ def initialise_log_and_config(args, api=False): # Configuration init_args = { "data_dir": args.data_dir, + "cache_dir": args.cache_dir, "testnet": args.testnet, "testcoin": args.testcoin, "regtest": args.regtest, diff --git a/counterparty-core/counterpartycore/test/conftest.py b/counterparty-core/counterpartycore/test/conftest.py index 2e402cb37..4b3e70dc9 100644 --- a/counterparty-core/counterpartycore/test/conftest.py +++ b/counterparty-core/counterpartycore/test/conftest.py @@ -323,6 +323,7 @@ def api_server_v2(request, cp_server): "max_log_file_rotations": 20, "log_exclude_filters": None, "log_include_filters": None, + "cache_dir": None, } server_config = ( default_config diff --git a/release-notes/release-notes-v10.9.0.md b/release-notes/release-notes-v10.9.0.md index 5bac7bbac..9a222deda 100644 --- a/release-notes/release-notes-v10.9.0.md +++ b/release-notes/release-notes-v10.9.0.md @@ -24,6 +24,8 @@ ## CLI +- Add `--cache-dir` flag + # Credits - Ouziel Slama