-
Notifications
You must be signed in to change notification settings - Fork 4
/
k9_speechserver.py
103 lines (95 loc) · 2.92 KB
/
k9_speechserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import sys
import time
from subprocess import Popen
from memory import Memory
from eyes import Eyes
mem = Memory()
eyes = Eyes()
import paho.mqtt.client as mqtt
print("MQTT found...")
from queue import Queue
print("Queues forming...")
# These values control K9s voice
SPEED_DEFAULT = 150
SPEED_DOWN = 125
AMP_UP = 100
AMP_DEFAULT = 50
AMP_DOWN = 25
PITCH_DEFAULT = 99
PITCH_DOWN = 89
SOX_VOL_UP = 25
SOX_VOL_DEFAULT = 20
SOX_VOL_DOWN = 15
SOX_PITCH_UP = 100
SOX_PITCH_DEFAULT = 0
SOX_PITCH_DOWN = -100
def speak(speech:str) -> None:
'''
Break speech up into clauses using | and speak each one with
various pitches, volumes and distortions
to make the voice more John Leeson like
> will raise the pitch and amplitude
< will lower it
'''
mem.storeState("speaking",1.0)
store_eyes = eyes.get_level()
eyes.on()
print('Speech server:', speech)
speaking = None
clauses = speech.split("|")
for clause in clauses:
if clause and not clause.isspace():
if clause[:1] == ">":
clause = clause[1:]
pitch = PITCH_DEFAULT
speed = SPEED_DOWN
amplitude = AMP_UP
sox_vol = SOX_VOL_UP
sox_pitch = SOX_PITCH_UP
elif clause[:1] == "<":
clause = clause[1:]
pitch = PITCH_DOWN
speed = SPEED_DOWN
amplitude = AMP_DOWN
sox_vol = SOX_VOL_DOWN
sox_pitch = SOX_PITCH_DOWN
else:
pitch = PITCH_DEFAULT
speed = SPEED_DEFAULT
amplitude = AMP_DEFAULT
sox_vol = SOX_VOL_DEFAULT
sox_pitch = SOX_PITCH_DEFAULT
#cmd = "espeak -v en-rp '%s' -p %s -s %s -a %s -z" % (clause, pitch, speed, amplitude)
cmd = ['espeak','-v','en-rp',str(clause),'-p',str(pitch),'-s',str(speed),'-a',str(amplitude)]
speaking = Popen(cmd)
Popen.wait(speaking)
eyes.set_level(store_eyes)
mem.storeState("speaking",0.0)
def mqtt_callback(client, userdata, message):
"""
Enables K9 to receive an MQTT message and place it in a queue
"""
payload = str(message.payload.decode("utf-8"))
print("Server payload:", payload)
queue.put(payload)
queue = Queue()
client = mqtt.Client("k9-speech-server")
client.connect("localhost")
client.on_message = mqtt_callback # attach function to callback
client.subscribe("k9/events/speech", qos=2)
# self.client.subscribe("/ble/advertise/watch/m")
client.loop_start()
print("Speech MQTT interface active")
try:
while True:
time.sleep(0.2)
while not queue.empty():
utterance = queue.get()
if utterance is None:
continue
print("Voice server:", utterance)
speak(utterance)
except KeyboardInterrupt:
client.loop_stop()
"K9 silenced and MQTT client stopped"
sys.exit(0)