1. 使用 RCLPY 实现动作
  2. 创建功能包和节点并修改 setup.py
cd chapt4_ws/
ros2 pkg create example_action_rclpy --build-type ament_python --dependencies rclpy robot_control_interfaces --destination-directory src --node-name action_robot_02 --maintainer-name "fishros" --maintainer-email "fishros@foxmail.com"
# 手动再创建 action_control_02 节点文件
touch src/example_action_rclpy/example_action_rclpy/action_control_02.py
#手动创建机器人类 robot.py
touch src/example_action_rclpy/example_action_rclpy/robot.py
    entry_points={
        'console_scripts': [
            'action_robot_02 = example_action_rclpy.action_robot_02:main',
            'action_control_02 = example_action_rclpy.action_control_02:main'
        ],
    },
  1. 编写机器人类
    • 利用 robot_control_interfaces 中的 MoveRobot 接口进行信息传递
    • 利用 Feedback 设置状态,以备后续使用
from robot_control_interfaces.action import MoveRobot 
import math
 
class Robot():
    """机器人类,模拟一个机器人"""
 
    def __init__(self) -> None:
        self.current_pose_ = 0.0
        self.target_pose_ = 0.0
        self.move_distance_ = 0.0
        self.status_ = MoveRobot.Feedback
 
    def get_status(self):
        """获取状态"""
        return self.status_
 
    def get_current_pose(self):
        """获取当前位置"""
        return self.current_pose_
 
    def close_goal(self):
        """接近目标"""
        return math.fabs(self.target_pose_ - self.current_pose_) < 0.01
 
    def stop_move(self):
        """停止移动"""
        self.status_ = MoveRobot.Feedback.STATUS_STOP
 
    def move_step(self):
        """移动一小步"""
        direct = self.move_distance_ / math.fabs(self.move_distance_)
        step = direct * math.fabs(self.target_pose_ - self.current_pose_) * 0.1
        self.current_pose_ += step  # 移动一步
        print(f"移动了:{step}当前位置:{self.current_pose_}")
        return self.current_pose_
 
    def set_goal(self, distance):
        """设置目标"""
        self.move_distance_ = distance
        self.target_pose_ += distance  # 更新目标位置
 
        if self.close_goal():
            self.stop_move()
            return False
 
        self.status_ = MoveRobot.Feedback.STATUS_MOVEING  # 更新状态为移动
        return True
  1. 编写机器人节点
    • execute_callback,执行回调函数 像 VEX 的主循环,但单次处理
#!/usr/bin/env python3
 
import time
# 导入rclpy相关库
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from rclpy.action.server import ServerGoalHandle
# 导入接口
from robot_control_interfaces.action import MoveRobot
# 导入机器人类
from example_action_rclpy.robot import Robot
# from rclpy.executors import MultiThreadedExecutor
# from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
 
class ActionRobot02(Node):
    """机器人端Action服务"""
 
    def __init__(self,name):
        super().__init__(name)
        self.get_logger().info(f"节点已启动:{name}!")
 
        self.robot_ = Robot()
 
        self.action_server_ = ActionServer(
            self, MoveRobot, 'move_robot', self.execute_callback
            # ,callback_group=MutuallyExclusiveCallbackGroup()
        )
 
    def execute_callback(self, goal_handle: ServerGoalHandle):
        """执行回调函数,若采用默认handle_goal函数则会自动调用"""
        self.get_logger().info('执行移动机器人')
        feedback_msg = MoveRobot.Feedback()
        self.robot_.set_goal(goal_handle.request.distance)
 
        # rate = self.create_rate(2)
        while rclpy.ok() and not self.robot_.close_goal():
            # move
            self.robot_.move_step()
            # feedback
            feedback_msg.pose = self.robot_.get_current_pose()
            feedback_msg.status = self.robot_.get_status()
            goal_handle.publish_feedback(feedback_msg)
            # cancel check
            if goal_handle.is_cancel_requested:
                result = MoveRobot.Result()
                result.pose = self.robot_.get_current_pose()
                return result
            # rate.sleep() # Rate会造成死锁,单线程执行器时不能使用
            time.sleep(0.5)
 
        goal_handle.succeed()
        result = MoveRobot.Result()
        result.pose = self.robot_.get_current_pose()
        return result
        
def main(args=None):
    """主函数"""
    rclpy.init(args=args)
    action_robot_02 = ActionRobot02("action_robot_02")
    # 采用多线程执行器解决rate死锁问题
    # executor = MultiThreadedExecutor()
    # executor.add_node(action_robot_02)
    # executor.spin()
    rclpy.spin(action_robot_02)
    rclpy.shutdown()
  1. 编写控制节点
    • goal_response_callback,收到目标处理结果。
    • get_result_callback,获取结果反馈。
    • feedback_callback,接收过程信息。
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
# 导入Action接口
from robot_control_interfaces.action import MoveRobot
 
class ActionControl02(Node):
    """Action客户端"""
 
    def __init__(self, name):
        super().__init__(name)
        self.get_logger().info(f"节点已启动:{name}!")
        self.action_client_ = ActionClient(self, MoveRobot, 'move_robot')
        self.send_goal_timer_ = self.create_timer(1, self.send_goal)
 
    def send_goal(self):
        """发送目标"""
        self.send_goal_timer_.cancel()
        goal_msg = MoveRobot.Goal()
        goal_msg.distance = 5.0
        self.action_client_.wait_for_server()
        self._send_goal_future = self.action_client_.send_goal_async(goal_msg, 
	        feedback_callback=self.feedback_callback)
        self._send_goal_future.add_done_callback(self.goal_response_callback)
 
    def goal_response_callback(self, future):
        """收到目标处理结果"""
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return
        self.get_logger().info('Goal accepted :)')
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)
 
    def get_result_callback(self, future):
        """获取结果反馈"""
        result = future.result().result
        self.get_logger().info(f'Result: {result.pose}')
 
    def feedback_callback(self, feedback_msg):
        """获取回调反馈"""
        feedback = feedback_msg.feedback
        self.get_logger().info(f'Received feedback: {feedback.pose}')
 
def main(args=None):
    """主函数"""
    rclpy.init(args=args)
    action_robot_02 = ActionControl02("action_control_02")
    rclpy.spin(action_robot_02)
    rclpy.shutdown()
  1. 运行
cd chapt4_ws/
colcon build --packages-up-to example_action_rclpy
# 运行机器人节点
source install/setup.bash 
ros2 run example_action_rclpy action_robot_02
# 新终端
source install/setup.bash 
ros2 run example_action_rclpy action_control_02