-
Notifications
You must be signed in to change notification settings - Fork 1
/
MessageBroker.py
152 lines (123 loc) · 4.85 KB
/
MessageBroker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import logging
import json
import threading
import config
import FilesHandler
from Notifiers.MQTT import publisher
from Notifiers.MQTT.Secrets import KILN
from geventwebsocket import WebSocketError
log = logging.getLogger(__name__)
# log.level = logging.DEBUG
class MessageBroker:
def __init__(self):
self.observers = []
# Callbacks from Controller.py
self.controller_callbacks = None
self.original_profile = None
self.updated_profile = None
self.fileshandler = FilesHandler.FilesHandler()
self. pub = publisher.Publisher(KILN)
self.lock = threading.Lock()
# Callback functions for access to Controller.p
def set_controller_functions(self, contoller_callbacks: dict):
self.controller_callbacks = contoller_callbacks
def start_stop_firing(self):
self.controller_callbacks['start_stop']()
def auto_manual(self):
self.controller_callbacks['auto_manual']()
def set_heat_for_zone(self, heat, zone):
self.controller_callbacks['set_heat_for_zone'](heat, zone)
def set_profile(self, profile_name: str):
self.controller_callbacks['set_profile_by_name'](profile_name)
def add_observer(self, observer):
if self.original_profile is not None:
self.update_profile_and_firing_data(observer, self.original_profile)
self.observers.append(observer)
if self.updated_profile is not None:
self.update_profile_all(self.updated_profile)
self.controller_callbacks['add_observer']()
log.info('Added observer.')
def update_names(self, names: list):
profile_names = {
'profile_names': names
}
names_json = json.dumps(profile_names)
log.debug("Names " + names_json)
self.send_socket(names_json)
# On adding an observer.
def update_profile_and_firing_data(self, observer, profile):
prof = {
'profile': profile,
}
prof_json = json.dumps(prof)
try:
observer.send(prof_json)
except Exception as ex:
log.error("Could not send profile to front end: " + str(ex))
# TODO use os.path.getsize and limit the size to around ??20MB - it bombs the browser if too long.
path = self.fileshandler.get_full_path()
if path is not None:
with open(path, 'r') as firing:
for line in firing:
observer.send(line)
log.debug('Sent line: ' + line)
# Send to all observers. Update the original profile start time on start button pressed.
def new_profile_all(self, profile):
self.fileshandler.start_firing(profile)
self.original_profile = profile
prof = {
'profile': profile,
}
prof_json = json.dumps(prof)
log.info("New " + prof_json)
self.send_socket(prof_json)
# Dynamically udated profile durign firing, e.g. when temperature falls behind.
def update_profile_all(self, profile):
self.updated_profile = profile
prof = {
'profile_update': profile,
}
prof_json = json.dumps(prof)
log.info("Update " + prof_json)
self.send_socket(prof_json)
def update_UI_status(self, UI_message: dict):
status = {
'status': UI_message
}
message = json.dumps(status)
self.send_socket(message)
log.debug('Status sent: ' + message)
def update_zones(self, zones_status_array: list):
zones = {
'zones_status_array': zones_status_array,
}
message = json.dumps(zones)
self.fileshandler.save_update(message)
self.send_socket(message)
log.debug('Zone status sent: ' + str(message))
def send_socket(self, message):
for observer in self.observers:
try:
with self.lock:
observer.send(message)
except WebSocketError as ex:
self.observers.remove(observer)
log.info('Observer deleted, socket error: ' + str(ex))
def update_tc_data(self, tc_data: list):
thermocouple_data = { 'thermocouple_data': tc_data}
message = json.dumps(thermocouple_data)
self.send_socket(message)
if config.mqtt:
self.publish_mqtt(tc_data) # TODO Control how often
def publish_mqtt(self, tc_data: list):
for i, tc in enumerate(tc_data):
if i == 0: #TODO this needs to come from the zones info
name = 'Top 55'
else:
name = 'Bottom 56'
time = tc['time_ms']
temperature = tc['temperature']
message = {name: temperature}
time_stamped_message = {'ts': time, 'values': message}
self.pub.send_message(str(time_stamped_message))
log.debug('MQTT message: ' + str(message))