diff --git a/counterparty-core/counterpartycore/lib/follow.py b/counterparty-core/counterpartycore/lib/follow.py index ca7ba3cd8..b1a3ced27 100644 --- a/counterparty-core/counterpartycore/lib/follow.py +++ b/counterparty-core/counterpartycore/lib/follow.py @@ -1,5 +1,6 @@ import asyncio import logging +import os import struct import threading import time @@ -258,9 +259,10 @@ async def handle(self): logger.trace( f"Processing {len(mempool_block)} transaction(s) from the raw mempool..." ) - mempool.parse_mempool_transactions( + not_supported_tx_hashes = mempool.parse_mempool_transactions( self.db, mempool_block, timestamps=self.mempool_parser.timestamps ) + NotSupportedTransactionsCache().add(not_supported_tx_hashes) else: # sequence topic await self.receive_multipart(self.zmq_sub_socket_sequence, "sequence") @@ -319,6 +321,8 @@ def get_raw_mempool(db): cursor = db.cursor() txhash_list = [] for txid, tx_info in raw_mempool.items(): + if NotSupportedTransactionsCache().is_not_supported(txid): + continue existing_tx_in_mempool = cursor.execute( "SELECT * FROM mempool WHERE tx_hash = ? LIMIT 1", (txid,) ).fetchone() @@ -362,3 +366,37 @@ def stop(self): logger.debug("Stopping RawMempoolParser...") self.stop_event.set() self.join() + + +class NotSupportedTransactionsCache(metaclass=util.SingletonMeta): + def __init__(self): + self.not_suppported_txs = [] + self.cache_path = os.path.join(config.CACHE_DIR, "not_supported_tx_cache.txt") + self.restore() + + def restore(self): + if os.path.exists(self.cache_path): + with open(self.cache_path, "r") as f: + self.not_suppported_txs = [line.strip() for line in f] + logger.debug( + f"Restored {len(self.not_suppported_txs)} not supported transactions from cache" + ) + + def backup(self): + with open(self.cache_path, "w") as f: + f.write("\n".join(self.not_suppported_txs[:200000])) # limit to 200k txs + logger.trace( + f"Backed up {len(self.not_suppported_txs)} not supported transactions to cache" + ) + + def clear(self): + self.not_suppported_txs = [] + if os.path.exists(self.cache_path): + os.remove(self.cache_path) + + def add(self, more_not_supported_txs): + self.not_suppported_txs += more_not_supported_txs + self.backup() + + def is_not_supported(self, tx_hash): + return tx_hash in self.not_suppported_txs diff --git a/counterparty-core/counterpartycore/lib/mempool.py b/counterparty-core/counterpartycore/lib/mempool.py index 2d8f8eb3e..9d10e984d 100644 --- a/counterparty-core/counterpartycore/lib/mempool.py +++ b/counterparty-core/counterpartycore/lib/mempool.py @@ -15,6 +15,7 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None): now = time.time() transaction_events = [] cursor = db.cursor() + not_supported_txs = [] try: with db: # insert fake block @@ -46,6 +47,7 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None): for raw_tx in raw_tx_list: decoded_tx = deserialize.deserialize_tx(raw_tx, use_txid=True) existing_tx = ledger.get_transaction(db, decoded_tx["tx_hash"]) + not_supported_txs.append(decoded_tx["tx_hash"]) if existing_tx: logger.trace(f"Transaction {decoded_tx['tx_hash']} already in the database") continue @@ -82,6 +84,9 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None): except exceptions.MempoolError: # save events in the mempool table for event in transaction_events: + if event["tx_hash"] in not_supported_txs: + not_supported_txs.remove(event["tx_hash"]) + if timestamps: event["timestamp"] = timestamps.get(event["tx_hash"], now) else: @@ -105,6 +110,7 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None): ) logger.trace("Mempool transaction parsed successfully.") util.PARSING_MEMPOOL = False + return not_supported_txs def clean_transaction_events(db, tx_hash): diff --git a/counterparty-core/counterpartycore/server.py b/counterparty-core/counterpartycore/server.py index 087c3358f..284b20d84 100755 --- a/counterparty-core/counterpartycore/server.py +++ b/counterparty-core/counterpartycore/server.py @@ -193,6 +193,10 @@ def initialise_config( os.makedirs(data_dir, mode=0o755) config.DATA_DIR = data_dir + config.CACHE_DIR = appdirs.user_cache_dir(appauthor=config.XCP_NAME, appname=config.APP_NAME) + if not os.path.isdir(config.CACHE_DIR): + os.makedirs(config.CACHE_DIR, mode=0o755) + # testnet if testnet: config.TESTNET = testnet @@ -888,6 +892,7 @@ def rollback(block_index=None): try: blocks.rollback(ledger_db, block_index=block_index) dbbuilder.rollback_state_db(state_db, block_index) + follow.NotSupportedTransactionsCache().clear() finally: database.optimize(ledger_db) database.optimize(state_db) diff --git a/release-notes/release-notes-v10.9.0.md b/release-notes/release-notes-v10.9.0.md index 3fa0c031e..5bac7bbac 100644 --- a/release-notes/release-notes-v10.9.0.md +++ b/release-notes/release-notes-v10.9.0.md @@ -16,6 +16,7 @@ - Refactor raw mempool parsing; Don't block following - Add a timeout to parse mempool transaction from ZMQ +- Add cache for unsupported transactions when parsing raw mempool ## API