-
Notifications
You must be signed in to change notification settings - Fork 15
System monitor #148
base: master
Are you sure you want to change the base?
System monitor #148
Changes from all commits
be39855
c3c8217
20b7602
3879a90
6a7ba12
4933b82
51e6ad7
212aef5
20cc80a
2ed0cc7
485f054
8d9602a
9754e10
bca8853
a573cb0
5ec78d4
49f3270
840cd2a
da8c4f8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
This is a system monitor for viewing available user slots, appointments, and other data related to Teos. Data is loaded and searched using Elasticsearch and visualized using Kibana to produce something like this: | ||
|
||
![Dashboard example](https://ibb.co/ypBtfdM) | ||
|
||
### Prerequisites | ||
|
||
Need to already be running a bitcoin node and a Teos watchtower. (See: https://github.com/talaia-labs/python-teos) | ||
|
||
### Installation | ||
|
||
Install and run both Elasticsearch and Kibana, which both need to be running for this visualization tool to work. | ||
|
||
https://www.elastic.co/guide/en/elasticsearch/reference/current/install-elasticsearch.html | ||
https://www.elastic.co/guide/en/kibana/current/install.html | ||
|
||
### Dependencies | ||
|
||
Install the dependencies by running: | ||
|
||
```pip install -r requirements.txt``` | ||
|
||
### Config | ||
|
||
It is also required to create a config file in this directory. `sample-monitor.conf` in this directory provides an example. | ||
orbitalturtle marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Create a file named `monitor.conf` in this directory with the correct configuration values, including the correct host and port where Elasticsearch and Kibana are running, either on localhost or on another host. | ||
|
||
### Run it | ||
|
||
Follow the same instructions as shown here for running the module: https://github.com/talaia-labs/python-teos/blob/master/INSTALL.md | ||
|
||
In short, run it with: | ||
|
||
```python3 -m monitor.monitor_start``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import os | ||
|
||
MONITOR_DIR = os.path.expanduser("~/.teos_monitor/") | ||
MONITOR_CONF = "monitor.conf" | ||
|
||
MONITOR_DEFAULT_CONF = { | ||
"ES_HOST": {"value": "localhost", "type": str}, | ||
"ES_PORT": {"value": 9200, "type": int}, | ||
"KIBANA_HOST": {"value": "localhost", "type": str}, | ||
"KIBANA_PORT": {"value": 5601, "type": int}, | ||
"API_BIND": {"value": "localhost", "type": str}, | ||
"API_PORT": {"value": 9814, "type": int}, | ||
} | ||
orbitalturtle marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,264 @@ | ||
import json | ||
import time | ||
|
||
from elasticsearch import Elasticsearch, helpers | ||
from elasticsearch.client import IndicesClient | ||
|
||
from cli import teos_cli | ||
from common.logger import Logger | ||
|
||
LOG_PREFIX = "System Monitor" | ||
logger = Logger(actor="Data loader", log_name_prefix=LOG_PREFIX) | ||
|
||
|
||
class DataLoader: | ||
""" | ||
The :class:`DataLoader` is in charge of the monitor's Elasticsearch functionality for loading and searching through data. | ||
|
||
Args: | ||
es_host (:obj:`str`): The host Elasticsearch is listening on. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Aren't ES host and port only used to create the ElasticSearch object? If so they don't need to be stored. |
||
es_port (:obj:`int`): The port Elasticsearch is listening on. | ||
api_host (:obj:`str`): The host Teos is listening on. | ||
api_port (:obj:`int`): The port Teos is listening on. | ||
log_file (:obj:`int`): The path to the log file ES will pull data from. | ||
|
||
Attributes: | ||
es_host (:obj:`str`): The host Elasticsearch is running on. | ||
es_port (:obj:`int`): The port Elasticsearch is runnning on. | ||
cloud_id (:obj:`str`): Elasticsearch cloud id, if Elasticsearch Cloud is being used. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be removed |
||
es (:obj:`Elasticsearch <elasticsearch.Elasticsearch>`): The Elasticsearch client for searching for data to be visualized. | ||
index_client (:obj:`IndicesClient <elasticsearch.client.IndiciesClient>`): The index client where log data is stored. | ||
log_path (:obj:`str`): The path to the log file where log file will be pulled from and analyzed by ES. | ||
api_host (:obj:`str`): The host Teos is listening on. | ||
api_port (:obj:`int`): The port Teos is listening on. | ||
|
||
""" | ||
|
||
def __init__(self, es_host, es_port, api_host, api_port, log_file): | ||
self.es_host = es_host | ||
self.es_port = es_port | ||
self.es = Elasticsearch([ | ||
{'host': self.es_host, 'port': self.es_port} | ||
]) | ||
self.index_client = IndicesClient(self.es) | ||
self.log_path = log_file | ||
self.api_host = api_host | ||
self.api_port = api_port | ||
|
||
def start(self): | ||
"""Loads data to be visualized in Kibana""" | ||
|
||
if self.index_client.exists("logs"): | ||
self.delete_index("logs") | ||
|
||
# Pull the watchtower logs into Elasticsearch. | ||
self.create_index("logs") | ||
log_data = self.load_logs(self.log_path) | ||
self.index_data_bulk("logs", log_data) | ||
|
||
# Grab the other data we need to visualize a graph. | ||
self.load_and_index_other_data() | ||
|
||
def create_index(self, index): | ||
""" | ||
Create index with a particular mapping. | ||
|
||
Args: | ||
index (:obj:`str`): Index the mapping is in. | ||
|
||
""" | ||
|
||
body = { | ||
"mappings": { | ||
"properties": { | ||
"doc.time": { | ||
"type": "date", | ||
"format": "epoch_second||strict_date_optional_time||dd/MM/yyyy HH:mm:ss" | ||
}, | ||
"doc.error.code": { | ||
"type": "integer" | ||
}, | ||
"doc.watcher_appts": { | ||
"type": "integer" | ||
}, | ||
"doc.responder_appts": { | ||
"type": "integer" | ||
} | ||
} | ||
} | ||
} | ||
|
||
resp = self.index_client.create(index, body) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. resp is not being used. If there's not return nor checks to be done, you may just call the method. |
||
|
||
# TODO: Logs are constantly being updated. Keep that data updated | ||
def load_logs(self, log_path): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be static |
||
""" | ||
Reads teos log into a list. | ||
|
||
Args: | ||
log_path (:obj:`str`): The path to the log file. | ||
|
||
Returns: | ||
:obj:`list`: A list of logs in dict form. | ||
|
||
Raises: | ||
FileNotFoundError: If path doesn't correspond to an existing log file. | ||
|
||
""" | ||
|
||
# Load the initial log file. | ||
logs = [] | ||
with open(log_path, "r") as log_file: | ||
for log in log_file: | ||
log_data = json.loads(log.strip()) | ||
logs.append(log_data) | ||
|
||
return logs | ||
|
||
# TODO: Throw an error if the file is empty or if data isn't JSON-y. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would also raise a FileNotFoundError if the log file is not found but you're not handling exceptions in the main method. Not a big deal since we should move from this approach to an online approach, but have in mind adding a general try/catch if you're bubbling exceptions up. |
||
|
||
def load_and_index_other_data(self): | ||
""" | ||
Loads and indexes the rest of the data into Elasticsearch that we'll need to visualize using Kibana. | ||
|
||
""" | ||
|
||
# Grab # of appointments in watcher and responder | ||
num_appts = self.get_num_appointments() | ||
watcher_appts = num_appts[0] | ||
responder_appts = num_appts[1] | ||
|
||
# self.es.search for the watcher_appts doc... if it exists, then update the item. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this an old line / comment? |
||
|
||
# index current number of appointments in watcher and responder | ||
self.index_item("logs", "watcher_appts", watcher_appts) | ||
self.index_item("logs", "responder_appts", responder_appts) | ||
|
||
def index_item(self, index, field, value): | ||
""" | ||
Indexes logs in elasticsearch so they can be searched. | ||
|
||
Args: | ||
index (:obj:`str`): The index to which we want to load data. | ||
field (:obj:`str`): The field of the data to be loaded. | ||
value (:obj:`str`): The value of the data to be loaded. | ||
|
||
""" | ||
|
||
body = { | ||
"doc.{}".format(field): value, | ||
"doc.time": time.time() | ||
} | ||
|
||
resp = self.es.index(index, body) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same, resp not being used. |
||
|
||
@staticmethod | ||
def gen_data(index, data): | ||
""" | ||
Formats logs so it can be sent to Elasticsearch in bulk. | ||
|
||
Args: | ||
log_data (:obj:`list`): A list of logs in dict form. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The args do not match |
||
|
||
Yields: | ||
:obj:`dict`: A dict conforming to the required format for sending data to elasticsearch in bulk. | ||
""" | ||
|
||
for log in data: | ||
yield { | ||
"_index": index, | ||
"doc": log | ||
} | ||
|
||
def index_data_bulk(self, index, data): | ||
""" | ||
Indexes logs in elasticsearch so they can be searched. | ||
|
||
Args: | ||
index (:obj:`str`): The index to which we want to load data. | ||
data (:obj:`list`): A list of data in dict form. | ||
|
||
Returns: | ||
response (:obj:`tuple`): The first value of the tuple equals the number of the logs data was entered successfully. If there are errors the second value in the tuple includes the errors. | ||
|
||
Raises: | ||
elasticsearch.helpers.errors.BulkIndexError: Returned by Elasticsearch if indexing log data fails. | ||
|
||
""" | ||
|
||
response = helpers.bulk(self.es, self.gen_data(index, data)) | ||
|
||
# The response is a tuple of two items: 1) The number of items successfully indexed. 2) Any errors returned. | ||
if (response[0] <= 0): | ||
logger.error("None of the logs were indexed. Log data might be in the wrong form.") | ||
|
||
return response | ||
|
||
def get_num_appointments(self): | ||
""" | ||
Gets number of appointments the tower is storing in the watcher and responder, so we can load this data into Elasticsearch. | ||
|
||
Returns: | ||
:obj:`list`: A list where the 0th element describes # of watcher appointments and the 1st element describes # of responder appointments. | ||
""" | ||
|
||
teos_url = "http://{}:{}".format(self.api_host, self.api_port) | ||
|
||
resp = teos_cli.get_all_appointments(teos_url) | ||
|
||
response = json.loads(resp) | ||
|
||
watcher_appts = len(response.get("watcher_appointments")) | ||
responder_appts = len(response.get("responder_trackers")) | ||
|
||
return [watcher_appts, responder_appts] | ||
|
||
def delete_index(self, index): | ||
""" | ||
Deletes the chosen index of Elasticsearch. | ||
|
||
Args: | ||
index (:obj:`str`): The ES index to delete. | ||
""" | ||
|
||
results = self.index_client.delete(index) | ||
|
||
# For testing purposes... | ||
def search_logs(self, field, keyword, index): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does not seem to be used anymore. If the only purpose is testing, maybe it could be part of a local temp file or the testing suite. |
||
""" | ||
Searches Elasticsearch for data with a certain field and keyword. | ||
|
||
Args: | ||
field (:obj:`str`): The search field. | ||
keyword (:obj:`str`): The search keyword. | ||
index (:obj:`str`): The index in Elasticsearch to search through. | ||
|
||
Returns: | ||
:obj:`dict`: A dict describing the results, including the first 10 docs matching the search words. | ||
""" | ||
|
||
body = { | ||
"query": {"match": {"doc.{}".format(field): keyword}} | ||
} | ||
results = self.es.search(body, index) | ||
|
||
return results | ||
|
||
|
||
def get_all_logs(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method does not seem to be used anywhere. |
||
""" | ||
Retrieves all logs in the logs index of Elasticsearch. | ||
|
||
Returns: | ||
:obj:`dict`: A dict describing the results, including the first 10 docs. | ||
""" | ||
|
||
body = { | ||
"query": { "match_all": {} } | ||
} | ||
results = self.es.search(body, "logs") | ||
|
||
results = json.dumps(results, indent=4) | ||
|
||
return results | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The README would need some work, but I'll leave the nits for the final version.