Topic 消息处理指南#
介绍#
以下是 Autoware 中主题消息处理的编码指南.它包括比传统方式更推荐的方式,这在 Discussions page 中大致解释.请参阅该页面以了解推荐方式的基本概念. 您可以在 ros2_subscription_examples 中找到本文档引用的示例源代码.
常规消息处理方式#
首先,让我们看看一种常用的处理消息的常规方式. ROS 2 Tutorials 是 ROS 2 应用程序(包括 Autoware)引用最多的参考文献之一. 它隐式地建议订阅收到的每条消息都应该由专用回调函数引用和处理.Autoware 完全遵循这种方式.
steer_sub_ = create_subscription<SteeringReport>(
"input/steering", 1,
[this](SteeringReport::SharedPtr msg) { current_steer_ = msg->steering_tire_angle; });
在上面的代码中,当收到一个名为 input/steering 的主题消息时,一个描述为 {current_steer_ = msg->steering_tier_angle; 作为线程中的回调执行.回调函数总是在收到消息时执行,如果消息并不总是必要的,这会导致计算资源的浪费.此外,唤醒线程会消耗计算开销.
推荐方式#
本节介绍了一种推荐的方法,即仅在需要时使用 Subscription->take() 方法获取消息.
下面给出的示例代码显示,在执行任何回调函数期间都会调用 Subscription->take() 方法.大多数情况下,在接收到的消息被主逻辑消费之前,会调用 Subscription->take()` 方法.
在这种情况下,将从订阅队列(嵌入在订阅对象中的队列)中检索主题消息,而不是使用回调函数.准确地说,您必须对代码进行编程,以便不会自动调用回调函数.
SteeringReport msg;
rclcpp::MessageInfo msg_info;
if (sub_->take(msg, msg_info)) {
// processing and publishing after this
使用这种方式有以下好处.
- 可以减少对订阅回调函数的调用次数
- 无需从主逻辑不消耗的订阅中获取主题消息
- 回调函数没有强制线程唤醒,这会导致多线程编程、数据争用和独占锁定
处理 Topic 消息数据的方式#
本节将介绍 4 种方式,包括推荐的礼仪.
1.通过调用 Subscription->take() 获取数据#
要使用 Subscription->take() 的推荐方式,你基本上需要做以下两件事.
- 防止收到 Topic 消息时调用回调函数
- 当需要 topic 消息时,调用 subscription 对象的 take() 方法
你可以在 ros2_subscription_examples/simple_examples/src/timer_listener.cpp 中看到 take() 方法的典型用法示例.
阻止调用回调函数#
为了防止回调函数被自动调用,回调函数必须属于一个回调组,该回调函数的回调函数未添加到任何执行程序中.
根据 create_subscription_](http://docs.ros.org/en/iron/p/rclcpp/generated/classrclcpp_1_1Node.html) 的 [_API 规范,即使回调函数没有作,也必须将回调函数注册到基于 rclcpp::Subscription 的对象.
以下是 ros2_subscription_examples/simple_examples/src/timer_listener.cpp 中的示例代码片段.
rclcpp::CallbackGroup::SharedPtr cb_group_not_executed = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive, false);
auto subscription_options = rclcpp::SubscriptionOptions();
subscription_options.callback_group = cb_group_not_executed;
rclcpp::QoS qos(rclcpp::KeepLast(10));
if (use_transient_local) {
qos = qos.transient_local();
}
sub_ = create_subscription<std_msgs::msg::String>("chatter", qos, not_executed_callback, subscription_options);
在上面的代码中, cb_group_not_executed 是通过使用第二个参数 false 调用 create_callback_group 创建的.属于回调组的任何回调函数都不会被 Executor 调用.
如果 subscription_options 的 callback_group 成员设置为 cb_group_not_executed,则在收到相应的主题消息 not_executed_callback 时,将不会调用 .
create_callback_group 的第二个参数定义如下.
rclcpp::CallbackGroup::SharedPtr create_callback_group(rclcpp::CallbackGroupType group_type, \
bool automatically_add_to_executor_with_node = true)
当 automatically_add_to_executor_with_node 设置为 true 时,添加到执行程序的节点中包含的回调函数将被执行程序自动调用.
调用 Subscription 对象的 take() 方法#
要从基于 Subscription 的对象中获取主题消息,请在预期时间调用 take() 方法.
以下是使用 take() 方法的 ros2_subscription_examples/simple_examples/src/timer_listener.cpp 的示例代码片段.
std_msgs::msg::String msg;
rclcpp::MessageInfo msg_info;
if (sub_->take(msg, msg_info)) {
RCLCPP_INFO(this->get_logger(), "Catch message");
RCLCPP_INFO(this->get_logger(), "I heard: [%s]", msg.data.c_str());
在上面的代码中,take(msg, msg_info) 由从 rclcpp::Subscription 类实例化的 sub_ 对象调用.它在计时器驱动的回调函数中调用.msg 和 msg_info 分别表示消息正文及其元数据.如果在调用 take(msg, msg_info) 时订阅队列中有消息,则该消息将复制到 msg.
如果成功从订阅中获取消息,则 take(msg, msg_info) 返回 true.在这种情况下,上面的代码从 RCLCPP_INFO 打印出消息的字符串数据.
如果消息未从订阅中获取,则 take(msg, msg_info) 返回 false.
调用 take(msg, msg_info) 时,如果订阅队列的大小大于 1,并且队列中有两条或多条消息,则最早的消息将复制到 msg.如果队列的大小为 1,则始终获取最新消息.
注意
您可以使用 take() 方法的返回值检查是否存在传入消息.但是,您必须注意 take() 方法的破坏性.take() 方法修改订阅队列.此外,take() 方法是不可逆的,并且没有针对 take() 方法的撤消作.仅使用 take() 方法检查传入消息总是会更改订阅队列.如果您想在不更改订阅队列的情况下进行检查,建议使用 rclcpp::WaitSet.有关详细信息,请参阅 [supplement] Use rclcpp::WaitSet.
注意
支持 take() 方法,以仅获取作为进程间通信通过 DDS 传递的消息.您不得将其用于进程内通信,因为进程内通信基于 rclcpp 的另一个软件堆栈.在进程内通信的情况下,请参考 _[补充] 通过进程内communication_获取收到的消息.
1.1 从 Subscription 获取序列化消息#
ROS 2 提供了 Serialized Message 功能,它支持与 Class SerializedMessage 中描述的任意消息类型的通信.它由 Autoware 中的 topic_state_monitor 使用.
您必须使用 take_serialized() 方法而不是 take() 方法,才能从订阅中获取基于 rclcpp::SerializedMessage 的消息.
以下是 ros2_subscription_examples/simple_examples/src/timer_listener_serialized_message.cpp 中的示例代码片段.
// receive the serialized message.
rclcpp::MessageInfo msg_info;
auto msg = sub_->create_serialized_message();
if (sub_->take_serialized(*msg, msg_info) == false) {
return;
}
在上面的代码中,msg 由 create_serialized_message() 创建,用于存储接收到的消息,其类型为 std::shared_ptr<::rclcpp::SerializedMessage> .您可以使用 take_serialized() 方法获取 rclcpp::SerializedMessage 类型的消息.请注意,take_serialized() 方法需要引用类型 data 作为其第一个参数.由于 msg 是一个指针,因此 *msg 应该作为第一个参数传递给 `take_serialized().
注意
ROS 2 的 rclcpp 支持 rclcpp::LoanedMessage 和 rclcpp::SerializedMessage.如果 zero Copy communication via loaned messages 被引入 Autoware,则应使用 take_loaned() 方法通过借出的消息进行通信.在本文档中,省略了对 take_loaned() 方法的解释,因为它在这段时间(2024 年 5 月)不用于 Autoware.
2.获取 Subscription Queue 中存储的多个数据#
如果使用 QoS 设置配置了多个队列大小,则订阅对象可以在其队列中保存多条消息.使用 callback function 的常规方式强制每条消息执行一个 callback function.换句话说,存在一个约束;回调函数的单个周期处理一条消息.请注意,使用传统方式时,如果订阅队列中有一条或多条消息,则采用最早的消息,并分配一个线程来执行回调函数,该函数一直持续到队列为空.
take() 方法将缓解此限制.take() 方法可以多次迭代调用,因此回调函数的单个循环可以处理 take() 方法获取的多个消息.
下面是一个示例代码,摘自 ros2_subscription_examples/simple_examples/src/timer_batch_listener.cpp,它在回调函数的单个循环中调用 take() 方法.
std_msgs::msg::String msg;
rclcpp::MessageInfo msg_info;
while (sub_->take(msg, msg_info))
{
RCLCPP_INFO(this->get_logger(), "Catch message");
RCLCPP_INFO(this->get_logger(), "I heard: [%s]", msg.data.c_str());
在上面的代码中,while(sub->take(msg, msg_info)) 继续从订阅队列中获取消息,直到队列为空.获取的每条消息都按迭代进行处理.
请注意,您必须通过考虑回调函数的频率和消息接收的频率来确定订阅队列的大小.例如,以 10Hz 的频率调用回调函数,以 50Hz 的频率接收 Topic 消息,则订阅队列的大小必须至少为 5,以免丢失收到的消息.
分配线程来执行每条消息的回调函数将导致性能开销.您可以使用本节中介绍的方式来避免意外开销.
当接收频率和消费频率之间存在较大差异时,这种方式将有效.例如,即使以高于 100 Hz 的频率接收到 CAN 报文,用户逻辑也会以较慢的频率(如 10 Hz)使用报文.在这种情况下,用户逻辑应该使用 take() 方法检索所需数量的消息,以避免意外的开销.
3.通过调用 Subscription->take 获取数据,然后调用回调函数#
您可以结合使用 take() (严格来说是 take_type_erased()) 方法和回调函数,以一致的方式处理收到的消息.使用此组合不需要唤醒线程.
以下是 ros2_subscription_examples/simple_examples/src/timer_listener_using_callback.cpp 中的示例代码片段.
auto msg = sub_->create_message();
rclcpp::MessageInfo msg_info;
if (sub_->take_type_erased(msg.get(), msg_info)) {
sub_->handle_message(msg, msg_info);
在上面的代码中,在通过 handle_message() 方法调用已注册的回调函数之前,take_type_erased() 方法会获取一条消息.请注意,您必须使用 take_type_erased() 而不是 take() .take_type_erased() 需要 void 类型的数据作为其第一个参数.你必须使用 get() 方法将类型为 shared_ptr 的 msg void 类型.然后,使用获取的消息调用 handle_message() 方法.在 handle_message() 中调用已注册的回调函数.
你不需要处理传递给 take_type_erased() 和 handle_message() 的消息类型.您可以将 message 变量定义为 auto msg = sub_->create_message();.
你也可以参考 the API document 来了解create_message()、take_type_erased() 和 handle_message().
4.通过回调函数获取数据#
ROS 2 应用程序中通常使用的一种常规方式是使用回调函数的消息引用.如果不使用 automatically_add_to_executor_with_node = false的回调组,当收到 Topic 消息时,执行程序会自动调用已注册的回调函数.
这种方式的优点之一是,您不必关心主题消息是通过进程间还是进程内传递.请记住,take()只能用于通过 DDS 的进程间通信,而rclcpp提供的另一种方式可以通过rclcpp` 用于进程内通信.
附录#
回调函数用于在许多 ROS 2 应用程序中获取主题消息.这就像一条规则或一种习俗.正如本文档页面所解释的,你可以使用 Subscription->take() 方法获取 Topic 消息,而无需调用订阅回调函数.
这种方式也记录在 Template Class Subscription — rclcpp 16.0.8 documentation 中.
许多 ROS 2 用户可能害怕使用 take() 方法,因为他们可能不太熟悉它,并且缺乏关于 take() 的文档,但它在 rclcpp::Executor 实现中被广泛使用,如下所示的 rclcpp/executor.cpp.所以事实证明,无论你是否知道,你都在间接使用 take() 方法.
std::shared_ptr<void> message = subscription->create_message();
take_and_do_error_handling(
"taking a message from topic",
subscription->get_topic_name(),
[&]() {return subscription->take_type_erased(message.get(), message_info);},
[&]() {subscription->handle_message(message, message_info);});
Note
严格来说,take_type_erased() 方法在 executor 中调用,而不是 take() 方法.
但是 take_type_erased() 是 take() 的体现,而 take() 在内部调用 take_type_erased().
如果基于 rclcpp::Executor 的对象(executor)被编程为调用回调函数,则 executor 自己会决定何时执行此作.由于执行程序实质上是在调用尽力而为的回调函数,因此即使收到了消息,也不能保证必须引用或处理该消息.因此,最好直接调用 take() 方法,以确保在预期的时间引用或处理消息.
自 2024 年 5 月起,推荐的方式开始在 Autoware Universe 中使用. 如果您想在 Autoware Universe 中有一个示例,请参阅以下 PR.
feat(tier4_autoware_utils, obstacle_cruise): 通过轮询更改为阅读主题 #6702