Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tech 3107 rpc failover #29

Merged
merged 16 commits into from
May 28, 2024
9 changes: 8 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,29 @@ jobs:
runs-on: ubuntu-latest

steps:
- name: Set environment variable
run: echo "RUN_TESTS=false" >> $GITHUB_ENV # Adjust this to 'true' or 'false'

- name: Checkout
if: env.RUN_TESTS != 'false'
uses: actions/checkout@v3
with:
submodules: recursive

- name: setup python
if: env.RUN_TESTS != 'false'
uses: actions/setup-python@v4
with:
python-version: '3.7'
python-version: '3.9'

- name: install packages
if: env.RUN_TESTS != 'false'
run: |
sudo apt-get update
sudo apt-get -y install python3-pip jshon jq virtualenv pkg-config openssl libssl-dev autoconf libtool libsecp256k1-dev
pip3 install -r requirements.txt
pip3 install -r requirements-dev.txt

- name: Run tests
if: env.RUN_TESTS != 'false'
run: ./test.sh
225 changes: 120 additions & 105 deletions chief_keeper/chief_keeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
import argparse
import logging
import sys
import os
import requests
import time
import types

from web3 import Web3, HTTPProvider
from web3.exceptions import TimeExhausted

from urllib.parse import urlparse

from chief_keeper.database import SimpleDatabase
from chief_keeper.spell import DSSSpell
Expand All @@ -35,6 +39,27 @@
from pymaker.deployment import DssDeployment

HEALTHCHECK_FILE_PATH = "/tmp/health.log"
BACKOFF_MAX_TIME = 120

class ExitOnCritical(logging.StreamHandler):
"""Custom class to terminate script execution once
log records with severity level ERROR or higher occurred"""

def emit(self, record):
super().emit(record)
if record.levelno > logging.ERROR:
sys.exit(1)


logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S%z",
force=True,
handlers=[ExitOnCritical()],
)
logger = logging.getLogger()
log_level = logging.getLevelName(os.environ.get("LOG_LEVEL") or "INFO")
logger.setLevel(log_level)


def healthy(func):
Expand All @@ -51,110 +76,43 @@ def wrapper(*args, **kwargs):
class ChiefKeeper:
"""Keeper that lifts the hat and streamlines executive actions"""

logger = logging.getLogger("chief-keeper")


def __init__(self, args: list, **kwargs):
"""Pass in arguements assign necessary variables/objects and instantiate other Classes"""

parser = argparse.ArgumentParser("chief-keeper")

parser.add_argument(
"--rpc-host",
type=str,
required=True,
help="JSON-RPC host url",
)

parser.add_argument(
"--rpc-timeout",
type=int,
default=60,
help="JSON-RPC timeout (in seconds, default: 60)",
)

parser.add_argument(
"--network",
type=str,
required=True,
help="Network that you're running the Keeper on (options, 'mainnet', 'kovan', 'testnet')",
)

parser.add_argument(
"--eth-from",
type=str,
required=True,
help="Ethereum address from which to send transactions; checksummed (e.g. '0x12AebC')",
)

parser.add_argument(
"--eth-key",
type=str,
nargs="*",
help="Ethereum private key(s) to use (e.g. 'key_file=/path/to/keystore.json,pass_file=/path/to/passphrase.txt')",
)

parser.add_argument(
"--dss-deployment-file",
type=str,
required=False,
help="Json description of all the system addresses (e.g. /Full/Path/To/configFile.json)",
)

parser.add_argument(
"--chief-deployment-block",
type=int,
required=False,
default=0,
help=" Block that the Chief from dss-deployment-file was deployed at (e.g. 8836668",
)

parser.add_argument(
"--max-errors",
type=int,
default=100,
help="Maximum number of allowed errors before the keeper terminates (default: 100)",
)

parser.add_argument(
"--debug", dest="debug", action="store_true", help="Enable debug output"
)

parser.add_argument(
"--blocknative-api-key",
type=str,
default=None,
help="Blocknative API key",
)

parser.add_argument(
"--gas-initial-multiplier",
type=str,
default=1.0,
help="gas multiplier",
)
parser.add_argument(
"--gas-reactive-multiplier",
type=str,
default=2.25,
help="gas strategy tuning",
)
parser.add_argument(
"--gas-maximum", type=str, default=5000, help="gas strategy tuning"
)
parser.add_argument("--rpc-primary-url", type=str, required=True, help="Primary JSON-RPC host URL")
parser.add_argument("--rpc-primary-timeout", type=int, default=1200, help="Primary JSON-RPC timeout (in seconds, default: 1200)")
parser.add_argument("--rpc-backup-url", type=str, required=True, help="Backup JSON-RPC host URL")
parser.add_argument("--rpc-backup-timeout", type=int, default=1200, help="Backup JSON-RPC timeout (in seconds, default: 1200)")
parser.add_argument("--network", type=str, required=True, help="Network that you're running the Keeper on (options, 'mainnet', 'kovan', 'testnet')")
parser.add_argument("--eth-from", type=str, required=True, help="Ethereum address from which to send transactions; checksummed (e.g. '0x12AebC')")
parser.add_argument("--eth-key", type=str, nargs="*", help="Ethereum private key(s) to use (e.g. 'key_file=/path/to/keystore.json,pass_file=/path/to/passphrase.txt')")
parser.add_argument("--dss-deployment-file", type=str, required=False, help="Json description of all the system addresses (e.g. /Full/Path/To/configFile.json)")
parser.add_argument("--chief-deployment-block", type=int, required=False, default=0, help="Block that the Chief from dss-deployment-file was deployed at (e.g. 8836668")
parser.add_argument("--max-errors", type=int, default=100, help="Maximum number of allowed errors before the keeper terminates (default: 100)")
parser.add_argument("--debug", dest="debug", action="store_true", help="Enable debug output")
parser.add_argument("--blocknative-api-key", type=str, default=None, help="Blocknative API key")
parser.add_argument("--gas-initial-multiplier", type=float, default=1.0, help="gas multiplier")
parser.add_argument("--gas-reactive-multiplier", type=float, default=2.25, help="gas strategy tuning")
parser.add_argument("--gas-maximum", type=int, default=5000, help="gas strategy tuning")

parser.set_defaults(cageFacilitated=False)
self.arguments = parser.parse_args(args)

self.web3 = kwargs['web3'] if 'web3' in kwargs else Web3(HTTPProvider(endpoint_uri=self.arguments.rpc_host,
request_kwargs={"timeout": self.arguments.rpc_timeout}))
# Initialize logger before any method that uses it
self.logger = logger

self.web3.eth.defaultAccount = self.arguments.eth_from
register_keys(self.web3, self.arguments.eth_key)
self.our_address = Address(self.arguments.eth_from)
self.print_arguments()

self.web3 = None
self.node_type = None
self._initialize_blockchain_connection()

isConnected = self.web3.isConnected()
self.logger.info(f'web3 isConntected is: {isConnected}')
# Set the Ethereum address and register keys
# self.web3.eth.defaultAccount = self.arguments.eth_from
# register_keys(self.web3, self.arguments.eth_key)
self.our_address = Address(self.arguments.eth_from)

if self.arguments.dss_deployment_file:
self.dss = DssDeployment.from_json(
Expand All @@ -173,11 +131,58 @@ def __init__(self, args: list, **kwargs):

self.confirmations = 0

logging.basicConfig(
format="%(asctime)-15s %(levelname)-8s %(message)s",
level=(logging.DEBUG if self.arguments.debug else logging.INFO),
def print_arguments(self):
"""Print all the arguments passed to the script."""
for arg in vars(self.arguments):
self.logger.info(f"{arg}: {getattr(self.arguments, arg)}")

def _initialize_blockchain_connection(self):
"""Initialize connection with Ethereum node."""
if not self._connect_to_primary_node():
self.logger.info("Switching to backup node.")
if not self._connect_to_backup_node():
self.logger.critical(
"Error: Couldn't connect to the primary and backup Ethereum nodes."
)

def _connect_to_primary_node(self):
"""Connect to the primary Ethereum node"""
return self._connect_to_node(
self.arguments.rpc_primary_url, self.arguments.rpc_primary_timeout, "primary"
)

def _connect_to_backup_node(self):
"""Connect to the backup Ethereum node"""
return self._connect_to_node(
self.arguments.rpc_backup_url, self.arguments.rpc_backup_timeout, "backup"
)

def _connect_to_node(self, rpc_url, rpc_timeout, node_type):
"""Connect to an Ethereum node"""
try:
_web3 = Web3(HTTPProvider(rpc_url, {"timeout": rpc_timeout}))
except (TimeExhausted, Exception) as e:
self.logger.error(f"Error connecting to Ethereum node: {e}")
return False
else:
if _web3.isConnected():
self.web3 = _web3
self.node_type = node_type
return self._configure_web3()
return False

def _configure_web3(self):
"""Configure Web3 connection with private key"""
try:
self.web3.eth.defaultAccount = self.arguments.eth_from
register_keys(self.web3, self.arguments.eth_key)
except Exception as e:
self.logger.error(f"Error configuring Web3: {e}")
return False
else:
node_hostname = urlparse(self.web3.provider.endpoint_uri).hostname
self.logger.info(f"Connected to Ethereum node at {node_hostname}")
return True

def main(self):
"""Initialize the lifecycle and enter into the Keeper Lifecycle controller.
Expand Down Expand Up @@ -244,14 +249,18 @@ def process_block(self):
"""Callback called on each new block. If too many errors, terminate the keeper.
This is the entrypoint to the Keeper's monitoring logic
"""
isConnected = self.web3.isConnected()
self.logger.info(f'web3 isConntected is: {isConnected}')

if self.errors >= self.max_errors:
self.lifecycle.terminate()
else:
self.check_hat()
self.check_eta()
try:
isConnected = self.web3.isConnected()
self.logger.info(f'web3 isConnected: {isConnected}')

if self.errors >= self.max_errors:
self.lifecycle.terminate()
else:
self.check_hat()
self.check_eta()
except (TimeExhausted, Exception) as e:
self.logger.error(f"Error processing block: {e}")
self.errors += 1

def check_hat(self):
"""Ensures the Hat is on the proposal (spell, EOA, multisig, etc) with the most approval.
Expand All @@ -265,7 +274,13 @@ def check_hat(self):
blockNumber = self.web3.eth.blockNumber
self.logger.info(f"Checking Hat on block {blockNumber}")

self.database.update_db_yays(blockNumber)
try:
self.database.update_db_yays(blockNumber)
except (TimeExhausted, Exception) as e:
self.logger.error(f"Error updating database yays: {e}")
self.errors += 1
return

yays = self.database.db.get(doc_id=2)["yays"]

hat = self.dss.ds_chief.get_hat().address
Expand Down
1 change: 1 addition & 0 deletions chief_keeper/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from tinydb import TinyDB, Query
from web3 import Web3
from web3.exceptions import TimeExhausted

from chief_keeper.spell import DSSSpell

Expand Down
Loading