-
Notifications
You must be signed in to change notification settings - Fork 6
/
ROSLaunch.py
61 lines (50 loc) · 2.13 KB
/
ROSLaunch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import rospy
from geometry_msgs.msg import PoseStamped
from mavros_msgs.msg import State
from mavros_msgs.srv import CommandBool, SetMode
from nav_msgs.msg import Path
from std_msgs.msg import String
class StrixSwarmNode:
def __init__(self):
# Initialize ROS node and subscribers/publishers
rospy.init_node('strix_swarm_node')
self.state_sub = rospy.Subscriber('/mavros/state', State, self.state_callback)
self.goal_sub = rospy.Subscriber('/strix_swarm/goal', PoseStamped, self.goal_callback)
self.mode_service = rospy.ServiceProxy('/mavros/set_mode', SetMode)
self.arm_service = rospy.ServiceProxy('/mavros/cmd/arming', CommandBool)
self.path_pub = rospy.Publisher('/strix_swarm/path', Path, queue_size=10)
self.status_pub = rospy.Publisher('/strix_swarm/status', String, queue_size=10)
# Set initial state variables
self.current_state = State()
self.current_goal = PoseStamped()
self.is_armed = False
self.current_mode = 'OFFBOARD'
def state_callback(self, state):
self.current_state = state
def goal_callback(self, goal):
self.current_goal = goal
def set_mode(self, mode):
rospy.loginfo('Setting mode: %s', mode)
response = self.mode_service(0, mode)
return response.mode_sent
def arm(self):
rospy.loginfo('Arming...')
response = self.arm_service(True)
return response.success
def run(self):
# Main loop
rate = rospy.Rate(10)
while not rospy.is_shutdown():
# Check if we're armed and in the correct mode
if not self.is_armed and self.current_state.mode != self.current_mode:
self.set_mode(self.current_mode)
if not self.is_armed and self.current_state.mode == self.current_mode:
self.arm()
# Publish the current goal and status
self.path_pub.publish(self.current_goal)
self.status_pub.publish('Flying to goal...')
# Wait for the next iteration
rate.sleep()
if __name__ == '__main__':
node = StrixSwarmNode()
node.run()