Code: Select all
import rclpy
from rclpy.node import Node
from geographic_msgs.msg import GeoPointStamped
from mavros_msgs.msg import PositionTarget
from mavros_msgs.srv import CommandBool, SetMode
class Arac:
# arm operation
# System.sleep(50000000)
# codes
def __init__(self,mode):
self.mode = mode
class FollowMe(Node):
def __init__(self):
super().__init__('follow_me_node')
# Publisher for setting target position
self.target_pub = self.create_publisher(PositionTarget, '/mavros/setpoint_raw/local', 10)
# Service clients
self.arming_client = self.create_client(CommandBool, '/mavros/cmd/arming')
self.mode_client = self.create_client(SetMode, '/mavros/set_mode')
# Subscriber for target GPS position
self.create_subscription(GeoPointStamped, '/target_position', self.target_callback, 10)
self.target_position = GeoPointStamped()
self.timer = self.create_timer(0.1, self.publish_target_position) # 10 Hz
# Wait for service availability
while not self.arming_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Arming service not available, waiting...')
while not self.mode_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Mode service not available, waiting...')
# Arm and set to GUIDED mode
self.arm()
self.set_guided_mode()
def arm(self):
self.get_logger().info("Arming drone")
arm_req = CommandBool.Request()
arm_req.value = True
future = self.arming_client.call_async(arm_req)
rclpy.spin_until_future_complete(self, future)
if future.result().success:
self.get_logger().info('Drone armed successfully!')
else:
self.get_logger().error('Failed to arm drone.')
self.get_logger().info("Setting mode to GUIDED")
mode_req = SetMode.Request()
mode_req.custom_mode = 'GUIDED'
self.mode_client.call_async(mode_req)
def set_guided_mode(self):
self.get_logger().info("Setting mode to GUIDED")
mode_req = SetMode.Request()
mode_req.custom_mode = 'GUIDED'
self.mode_client.call_async(mode_req)
def target_callback(self, msg):
self.target_position = msg
def publish_target_position(self):
target = PositionTarget()
target.coordinate_frame = PositionTarget.FRAME_LOCAL_NED
target.type_mask = PositionTarget.IGNORE_VX | PositionTarget.IGNORE_VY | PositionTarget.IGNORE_VZ \
| PositionTarget.IGNORE_AFX | PositionTarget.IGNORE_AFY | PositionTarget.IGNORE_AFZ \
| PositionTarget.IGNORE_YAW | PositionTarget.IGNORE_YAW_RATE
# Convert GPS to local coordinates here (this is simplified)
target.position.x = self.target_position.position.latitude
target.position.y = self.target_position.position.longitude
target.position.z = 15.0 # desired altitude
self.target_pub.publish(target)
def main(args=None):
rclpy.init(args=args)
follow_me = FollowMe()
try:
rclpy.spin(follow_me)
except KeyboardInterrupt:
pass
follow_me.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()