1. 使用 RCLPY 实现服务
  2. 创建功能包和节点:--node-name service_server_02 会帮你创建好节点文件和添加执行文件 但是只支持一个节点文件,需要额外手动创建一个 example_interfaces 是用到的接口文件
cd chapt3/chapt3_ws/src
ros2 pkg create example_service_rclpy --build-type ament_python --dependencies rclpy example_interfaces  --node-name service_server_02
 
cd example_service_rclpy/example_service_rclpy/
touch service_client_02.py
  1. 服务端实现:Node.create_service
    • srv_type – 服务类型。就是接口类型
    • srv_name (str) – 服务的名称。
    • callback (Callable[[~SrvType.Request, ~SrvType.Response], None]) – 当服务器收到服务请求时调用的用户定义回调函数。
    • qos_profile (QoSProfile) – 应用于服务服务器的服务质量配置文件。
    • callback_group (Optional[CallbackGroup]) – 服务服务器的回调组。如果为 None,则使用节点的默认回调组。
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
# 导入接口
from example_interfaces.srv import AddTwoInts
 
class ServiceServer02(Node):
    def __init__(self,name):
        super().__init__(name)
        self.get_logger().info("节点已启动:%s!" % name)
        self.add_ints_server_ = self.create_service(AddTwoInts,"add_two_ints_srv", self.handle_add_two_ints) 
 
    def handle_add_two_ints(self,request, response):
        self.get_logger().info(f"收到请求,计算{request.a} + {request.b}")
        response.sum = request.a + request.b
        return response
        
def main(args=None):
    rclpy.init(args=args) # 初始化rclpy
    node = ServiceServer02("service_server_02")  # 新建一个节点
    rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C)
    rclpy.shutdown() # 关闭rclpy
  1. 客户端实现
    • srv_type – 服务类型。
    • srv_name (str) – 服务的名称。
    • qos_profile (QoSProfile) – 应用于服务客户端的服务质量配置文件。
    • callback_group (Optional[CallbackGroup]) – 服务客户端的回调组。如果为 None,则使用节点的默认回调组。
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
# 导入接口
from example_interfaces.srv import AddTwoInts
 
class ServiceClient02(Node):
    def __init__(self,name):
        super().__init__(name)
        self.get_logger().info("节点已启动:%s!" % name)
        self.client_ = self.create_client(AddTwoInts,"add_two_ints_srv") 
 
    def result_callback_(self, result_future):
        response = result_future.result()
        self.get_logger().info(f"收到返回结果:{response.sum}")
    
    def send_request(self, a, b):
        while rclpy.ok() and self.client_.wait_for_service(1)==False:
            self.get_logger().info(f"等待服务端上线....")
            
        request = AddTwoInts.Request()
        request.a = a
        request.b = b
        self.client_.call_async(request).add_done_callback(self.result_callback_)
        
def main(args=None):
    rclpy.init(args=args) # 初始化rclpy
    node = ServiceClient02("service_client_02")  # 新建一个节点
    # 调用函数发送请求
    node.send_request(3,4)
    rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C)
    rclpy.shutdown() # 关闭rclpy
  1. 修改 setup.py
    entry_points={
        'console_scripts': [
            "service_client_02 = example_service_rclpy.service_client_02:main",
            "service_server_02 = example_service_rclpy.service_server_02:main"
        ],
    },
  1. 运行:build + source + run
colcon build --packages-select example_service_rclpy
 
source install/setup.bash
ros2 run example_service_rclpy service_client_02
 
source install/setup.bash
ros2 run example_service_rclpy service_server_02
  1. 测试:使用 rqt 查看 Node Graph