使用 RCLPY 实现动作
创建功能包和节点并修改 setup.py
cd chapt4_ws/
ros2 pkg create example_action_rclpy --build-type ament_python --dependencies rclpy robot_control_interfaces --destination-directory src --node-name action_robot_02 --maintainer-name "fishros" --maintainer-email "fishros@foxmail.com"
# 手动再创建 action_control_02 节点文件
touch src/example_action_rclpy/example_action_rclpy/action_control_02.py
#手动创建机器人类 robot.py
touch src/example_action_rclpy/example_action_rclpy/robot.py
entry_points = {
'console_scripts' : [
'action_robot_02 = example_action_rclpy.action_robot_02:main' ,
'action_control_02 = example_action_rclpy.action_control_02:main'
],
},
编写机器人类
利用 robot_control_interfaces 中的 MoveRobot 接口进行信息传递
利用 Feedback 设置状态,以备后续使用
from robot_control_interfaces.action import MoveRobot
import math
class Robot ():
"""机器人类,模拟一个机器人"""
def __init__ (self) -> None :
self .current_pose_ = 0.0
self .target_pose_ = 0.0
self .move_distance_ = 0.0
self .status_ = MoveRobot.Feedback
def get_status (self):
"""获取状态"""
return self .status_
def get_current_pose (self):
"""获取当前位置"""
return self .current_pose_
def close_goal (self):
"""接近目标"""
return math.fabs( self .target_pose_ - self .current_pose_) < 0.01
def stop_move (self):
"""停止移动"""
self .status_ = MoveRobot.Feedback. STATUS_STOP
def move_step (self):
"""移动一小步"""
direct = self .move_distance_ / math.fabs( self .move_distance_)
step = direct * math.fabs( self .target_pose_ - self .current_pose_) * 0.1
self .current_pose_ += step # 移动一步
print ( f "移动了: { step } 当前位置: {self .current_pose_ } " )
return self .current_pose_
def set_goal (self, distance):
"""设置目标"""
self .move_distance_ = distance
self .target_pose_ += distance # 更新目标位置
if self .close_goal():
self .stop_move()
return False
self .status_ = MoveRobot.Feedback. STATUS_MOVEING # 更新状态为移动
return True
编写机器人节点
execute_callback,执行回调函数 像 VEX 的主循环,但单次处理
#!/usr/bin/env python3
import time
# 导入rclpy相关库
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from rclpy.action.server import ServerGoalHandle
# 导入接口
from robot_control_interfaces.action import MoveRobot
# 导入机器人类
from example_action_rclpy.robot import Robot
# from rclpy.executors import MultiThreadedExecutor
# from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
class ActionRobot02 ( Node ):
"""机器人端Action服务"""
def __init__ (self,name):
super (). __init__ (name)
self .get_logger().info( f "节点已启动: { name } !" )
self .robot_ = Robot()
self .action_server_ = ActionServer(
self , MoveRobot, 'move_robot' , self .execute_callback
# ,callback_group=MutuallyExclusiveCallbackGroup()
)
def execute_callback (self, goal_handle: ServerGoalHandle):
"""执行回调函数,若采用默认handle_goal函数则会自动调用"""
self .get_logger().info( '执行移动机器人' )
feedback_msg = MoveRobot.Feedback()
self .robot_.set_goal(goal_handle.request.distance)
# rate = self.create_rate(2)
while rclpy.ok() and not self .robot_.close_goal():
# move
self .robot_.move_step()
# feedback
feedback_msg.pose = self .robot_.get_current_pose()
feedback_msg.status = self .robot_.get_status()
goal_handle.publish_feedback(feedback_msg)
# cancel check
if goal_handle.is_cancel_requested:
result = MoveRobot.Result()
result.pose = self .robot_.get_current_pose()
return result
# rate.sleep() # Rate会造成死锁,单线程执行器时不能使用
time.sleep( 0.5 )
goal_handle.succeed()
result = MoveRobot.Result()
result.pose = self .robot_.get_current_pose()
return result
def main (args = None ):
"""主函数"""
rclpy.init( args = args)
action_robot_02 = ActionRobot02( "action_robot_02" )
# 采用多线程执行器解决rate死锁问题
# executor = MultiThreadedExecutor()
# executor.add_node(action_robot_02)
# executor.spin()
rclpy.spin(action_robot_02)
rclpy.shutdown()
编写控制节点
goal_response_callback,收到目标处理结果。
get_result_callback,获取结果反馈。
feedback_callback,接收过程信息。
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
# 导入Action接口
from robot_control_interfaces.action import MoveRobot
class ActionControl02 ( Node ):
"""Action客户端"""
def __init__ (self, name):
super (). __init__ (name)
self .get_logger().info( f "节点已启动: { name } !" )
self .action_client_ = ActionClient( self , MoveRobot, 'move_robot' )
self .send_goal_timer_ = self .create_timer( 1 , self .send_goal)
def send_goal (self):
"""发送目标"""
self .send_goal_timer_.cancel()
goal_msg = MoveRobot.Goal()
goal_msg.distance = 5.0
self .action_client_.wait_for_server()
self ._send_goal_future = self .action_client_.send_goal_async(goal_msg,
feedback_callback = self .feedback_callback)
self ._send_goal_future.add_done_callback( self .goal_response_callback)
def goal_response_callback (self, future):
"""收到目标处理结果"""
goal_handle = future.result()
if not goal_handle.accepted:
self .get_logger().info( 'Goal rejected :(' )
return
self .get_logger().info( 'Goal accepted :)' )
self ._get_result_future = goal_handle.get_result_async()
self ._get_result_future.add_done_callback( self .get_result_callback)
def get_result_callback (self, future):
"""获取结果反馈"""
result = future.result().result
self .get_logger().info( f 'Result: { result.pose } ' )
def feedback_callback (self, feedback_msg):
"""获取回调反馈"""
feedback = feedback_msg.feedback
self .get_logger().info( f 'Received feedback: { feedback.pose } ' )
def main (args = None ):
"""主函数"""
rclpy.init( args = args)
action_robot_02 = ActionControl02( "action_control_02" )
rclpy.spin(action_robot_02)
rclpy.shutdown()
运行
cd chapt4_ws/
colcon build --packages-up-to example_action_rclpy
# 运行机器人节点
source install/setup.bash
ros2 run example_action_rclpy action_robot_02
# 新终端
source install/setup.bash
ros2 run example_action_rclpy action_control_02