Skip to content

Commit

Permalink
Merge pull request #2855 from CounterpartyXCP/rawmempool
Browse files Browse the repository at this point in the history
Add cache for unsupported transactions when parsing raw mempool
  • Loading branch information
ouziel-slama authored Dec 16, 2024
2 parents fc9edba + d60943d commit 00348e0
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 6 deletions.
1 change: 1 addition & 0 deletions counterparty-core/counterpartycore/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ def float_range_checker(arg):
{"action": "store_true", "default": False, "help": "don't ask for confirmation"},
],
[("--data-dir",), {"default": None, "help": "the path to the data directory"}],
[("--cache-dir",), {"default": None, "help": "the path to the cache directory"}],
[
("--log-file",),
{"nargs": "?", "const": None, "default": False, "help": "log to the specified file"},
Expand Down
45 changes: 43 additions & 2 deletions counterparty-core/counterpartycore/lib/follow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
import os
import struct
import threading
import time
Expand Down Expand Up @@ -190,7 +191,8 @@ def receive_sequence(self, body):
# parse mempool block if needed
if self.need_to_parse_mempool_block():
# parse mempool block
mempool.parse_mempool_transactions(self.db, self.mempool_block)
not_supported = mempool.parse_mempool_transactions(self.db, self.mempool_block)
NotSupportedTransactionsCache().add(not_supported)
self.last_mempool_parsing_time = time.time()
# reset mempool block
self.mempool_block = []
Expand Down Expand Up @@ -258,9 +260,10 @@ async def handle(self):
logger.trace(
f"Processing {len(mempool_block)} transaction(s) from the raw mempool..."
)
mempool.parse_mempool_transactions(
not_supported_tx_hashes = mempool.parse_mempool_transactions(
self.db, mempool_block, timestamps=self.mempool_parser.timestamps
)
NotSupportedTransactionsCache().add(not_supported_tx_hashes)
else:
# sequence topic
await self.receive_multipart(self.zmq_sub_socket_sequence, "sequence")
Expand Down Expand Up @@ -319,6 +322,8 @@ def get_raw_mempool(db):
cursor = db.cursor()
txhash_list = []
for txid, tx_info in raw_mempool.items():
if NotSupportedTransactionsCache().is_not_supported(txid):
continue
existing_tx_in_mempool = cursor.execute(
"SELECT * FROM mempool WHERE tx_hash = ? LIMIT 1", (txid,)
).fetchone()
Expand Down Expand Up @@ -362,3 +367,39 @@ def stop(self):
logger.debug("Stopping RawMempoolParser...")
self.stop_event.set()
self.join()


class NotSupportedTransactionsCache(metaclass=util.SingletonMeta):
def __init__(self):
self.not_suppported_txs = []
self.cache_path = os.path.join(
config.CACHE_DIR, f"not_supported_tx_cache.{config.NETWORK_NAME}.txt"
)
self.restore()

def restore(self):
if os.path.exists(self.cache_path):
with open(self.cache_path, "r") as f:
self.not_suppported_txs = [line.strip() for line in f]
logger.debug(
f"Restored {len(self.not_suppported_txs)} not supported transactions from cache"
)

def backup(self):
with open(self.cache_path, "w") as f:
f.write("\n".join(self.not_suppported_txs[-200000:])) # limit to 200k txs
logger.trace(
f"Backed up {len(self.not_suppported_txs)} not supported transactions to cache"
)

def clear(self):
self.not_suppported_txs = []
if os.path.exists(self.cache_path):
os.remove(self.cache_path)

def add(self, more_not_supported_txs):
self.not_suppported_txs += more_not_supported_txs
self.backup()

def is_not_supported(self, tx_hash):
return tx_hash in self.not_suppported_txs
6 changes: 6 additions & 0 deletions counterparty-core/counterpartycore/lib/mempool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None):
now = time.time()
transaction_events = []
cursor = db.cursor()
not_supported_txs = []
try:
with db:
# insert fake block
Expand Down Expand Up @@ -46,6 +47,7 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None):
for raw_tx in raw_tx_list:
decoded_tx = deserialize.deserialize_tx(raw_tx, use_txid=True)
existing_tx = ledger.get_transaction(db, decoded_tx["tx_hash"])
not_supported_txs.append(decoded_tx["tx_hash"])
if existing_tx:
logger.trace(f"Transaction {decoded_tx['tx_hash']} already in the database")
continue
Expand Down Expand Up @@ -82,6 +84,9 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None):
except exceptions.MempoolError:
# save events in the mempool table
for event in transaction_events:
if event["tx_hash"] in not_supported_txs:
not_supported_txs.remove(event["tx_hash"])

if timestamps:
event["timestamp"] = timestamps.get(event["tx_hash"], now)
else:
Expand All @@ -105,6 +110,7 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None):
)
logger.trace("Mempool transaction parsed successfully.")
util.PARSING_MEMPOOL = False
return not_supported_txs


def clean_transaction_events(db, tx_hash):
Expand Down
14 changes: 10 additions & 4 deletions counterparty-core/counterpartycore/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def initialise_log_config(

def initialise_config(
data_dir=None,
cache_dir=None,
testnet=False,
testcoin=False,
regtest=False,
Expand Down Expand Up @@ -193,6 +194,12 @@ def initialise_config(
os.makedirs(data_dir, mode=0o755)
config.DATA_DIR = data_dir

if not cache_dir:
cache_dir = appdirs.user_cache_dir(appauthor=config.XCP_NAME, appname=config.APP_NAME)
if not os.path.isdir(cache_dir):
os.makedirs(config.CACHE_DIR, mode=0o755)
config.CACHE_DIR = cache_dir

# testnet
if testnet:
config.TESTNET = testnet
Expand Down Expand Up @@ -241,10 +248,7 @@ def initialise_config(
config.DATABASE = os.path.join(data_dir, filename)

config.FETCHER_DB_OLD = os.path.join(os.path.dirname(config.DATABASE), f"fetcherdb{network}")
config.FETCHER_DB = os.path.join(
appdirs.user_cache_dir(appauthor=config.XCP_NAME, appname=config.APP_NAME),
f"fetcherdb{network}",
)
config.FETCHER_DB = os.path.join(config.CACHE_DIR, f"fetcherdb{network}")

config.STATE_DATABASE = os.path.join(os.path.dirname(config.DATABASE), f"state{network}.db")

Expand Down Expand Up @@ -623,6 +627,7 @@ def initialise_log_and_config(args, api=False):
# Configuration
init_args = {
"data_dir": args.data_dir,
"cache_dir": args.cache_dir,
"testnet": args.testnet,
"testcoin": args.testcoin,
"regtest": args.regtest,
Expand Down Expand Up @@ -888,6 +893,7 @@ def rollback(block_index=None):
try:
blocks.rollback(ledger_db, block_index=block_index)
dbbuilder.rollback_state_db(state_db, block_index)
follow.NotSupportedTransactionsCache().clear()
finally:
database.optimize(ledger_db)
database.optimize(state_db)
Expand Down
1 change: 1 addition & 0 deletions counterparty-core/counterpartycore/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ def api_server_v2(request, cp_server):
"max_log_file_rotations": 20,
"log_exclude_filters": None,
"log_include_filters": None,
"cache_dir": None,
}
server_config = (
default_config
Expand Down
3 changes: 3 additions & 0 deletions release-notes/release-notes-v10.9.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

- Refactor raw mempool parsing; Don't block following
- Add a timeout to parse mempool transaction from ZMQ
- Add cache for unsupported transactions when parsing raw mempool

## API

- Throw Error if BTC Quantity in Dispense isn't enough to Trigger Dispenser

## CLI

- Add `--cache-dir` flag

# Credits

- Ouziel Slama
Expand Down

0 comments on commit 00348e0

Please sign in to comment.