ROS2 Service详解

本文由 ZiQingDream 编写,未经允许禁止转载。

本文作者:工程课代表[B站] / ZiQingDream[Github]

在之前的《ROS2 Executor详解》文章中,我们解释了调度器怎么处理回调和回调组。对于ROS2客户端-服务端架构,服务端符合之前所说的回调机制,这里不再赘述。该文将重心放到客户端的上,ROS2的客户端包含两种写法,同步客户端和异步客户端。

阅读本文需要你有一定ROS2基本理解,至少要有关于客户端-服务端基础知识。

ROS2 服务机制

ROS2官网中关于Service的可视化图,整个处理流程如下(Understanding services — ROS 2 Documentation: Humble documentation):。

ROS2 Service类似于网络通信中“服务端-客户端”架构,客户端发送请求给服务端,服务端(Server)接收到请求,会执行预先注册回调函数。ROS2 Service服务端符合《ROS2 Executor详解》中的回调机制,这里不在赘述。Service模式属于”N-1”模式,多个客户端(Client)发送请求给服务端,服务端每次处理一个请求,并返回相应。(这也同样受回调组的影响)

客户端代码流程一般如下图。在客户端代码中,等待结果返回方式有两种模式:同步模式和异步回调模式。无论哪一种模式,请求的发送都是异步发送的。也就是说在发送完请求后,可以不必等待,去做其他工作,在之后合适的时机去等待、检查结果返回,并对结果做进一步处理。

同步Service-Client模式

实验

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <cinttypes>
#include <memory>

#include "example_interfaces/srv/add_two_ints.hpp"
#include "rclcpp/rclcpp.hpp"

using AddTwoInts = example_interfaces::srv::AddTwoInts;
rclcpp::Node::SharedPtr g_node = nullptr;

// 服务端回调,用于处理客户端请求
// 由于只有一个线程运行,整个服务处理一定要尽快完成
void handle_service(
const std::shared_ptr<rmw_request_id_t> request_header,
const std::shared_ptr<AddTwoInts::Request> request,
const std::shared_ptr<AddTwoInts::Response> response)
{
(void)request_header;
RCLCPP_INFO(
g_node->get_logger(),
"request: %" PRId64 " + %" PRId64, request->a, request->b);
response->sum = request->a + request->b;
}

int main(int argc, char ** argv)
{
rclcpp::init(argc, argv);
g_node = rclcpp::Node::make_shared("minimal_service");
auto server = g_node->create_service<AddTwoInts>("add_two_ints", handle_service);
rclcpp::spin(g_node);
rclcpp::shutdown();
g_node = nullptr;
return 0;
}

同步客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <chrono>
#include <cinttypes>
#include <memory>

#include "example_interfaces/srv/add_two_ints.hpp"
#include "rclcpp/rclcpp.hpp"

using AddTwoInts = example_interfaces::srv::AddTwoInts;

// 注意这里并没有建立一个新的类
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);

// 首先建立一个node节点
auto node = rclcpp::Node::make_shared("minimal_client");

// 调用节点API创建一个客户端
auto client = node->create_client<AddTwoInts>("add_two_ints");

// 等待服务端服务可用
while (!client->wait_for_service(std::chrono::seconds(1))) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(node->get_logger(), "client interrupted while waiting for service to appear.");
return 1;
}
RCLCPP_INFO(node->get_logger(), "waiting for service to appear...");
}

// 新建请求,填充数据
auto request = std::make_shared<AddTwoInts::Request>();
request->a = 41;
request->b = 1;

// 异步发送到服务端
auto result_future = client->async_send_request(request);

// 等待服务端处理完成
// 注意这里的node不能被加入到executor,否则程序会报错崩溃
if (rclcpp::spin_until_future_complete(node, result_future) != rclcpp::FutureReturnCode::SUCCESS)
{
// 失败的话,要移除挂起的请求
RCLCPP_ERROR(node->get_logger(), "service call failed :(");
client->remove_pending_request(result_future);
return 1;
}

// 获取客户端结果
auto result = result_future.get();
RCLCPP_INFO(
node->get_logger(), "result of %" PRId64 " + %" PRId64 " = %" PRId64,
request->a, request->b, result->sum);

rclcpp::shutdown();
return 0;
}

同步客户端分析

如之前所述,请求发送是异步的,通过调用async_send_request()函数,请求被通过DDS发送给服务端:

1
auto result_future = client->async_send_request(request);

async_send_request()异步发送的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// src/ros2/rclcpp/rclcpp/include/rclcpp/client.hpp
template<typename ServiceT>
class Client : public ClientBase
{
// 请求和回复是在服务消息中定义的
using Request = typename ServiceT::Request;
using Response = typename ServiceT::Response;

using SharedRequest = typename ServiceT::Request::SharedPtr;
using SharedResponse = typename ServiceT::Response::SharedPtr;

// 服务的Promise,其实相当于std::promise
using Promise = std::promise<SharedResponse>;
using PromiseWithRequest = std::promise<std::pair<SharedRequest, SharedResponse>>;

using SharedPromise = std::shared_ptr<Promise>;
using SharedPromiseWithRequest = std::shared_ptr<PromiseWithRequest>;

// 服务的Future,其实相当于std::future
using Future = std::future<SharedResponse>;
using SharedFuture = std::shared_future<SharedResponse>;
using SharedFutureWithRequest = std::shared_future<std::pair<SharedRequest, SharedResponse>>;

// 回调类型
using CallbackType = std::function<void (SharedFuture)>;
using CallbackWithRequestType = std::function<void (SharedFutureWithRequest)>;

// detail::FutureAndRequestId 其实是 future和ID的封装
struct FutureAndRequestId
: detail::FutureAndRequestId<std::future<SharedResponse>>
{
using detail::FutureAndRequestId<std::future<SharedResponse>>::FutureAndRequestId;
/// See std::future::share().
SharedFuture share() noexcept {return this->future.share();}
};

FutureAndRequestId async_send_request(SharedRequest request)
{
// 建立promise,并且获取对应future,以便异步操作
Promise promise;
auto future = promise.get_future();

// 执行异步发送,并且这里promise,通过std::move将控制权交出去了
auto req_id = async_send_request_impl(
*request,
std::move(promise));

return FutureAndRequestId(std::move(future), req_id);
}
};

async_send_request()首先建立promise并获取future(注意这里的future带有一个请求ID),以便未来能够通过promise通知结果已经准备好。(如果对这部分不清楚,可以参考std::promise)。消息发送的流程,交给了async_send_request_impl()函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template<typename ServiceT>
class Client : public ClientBase
{
using CallbackTypeValueVariant = std::tuple<CallbackType, SharedFuture, Promise>;
using CallbackWithRequestTypeValueVariant = std::tuple<
CallbackWithRequestType, SharedRequest, SharedFutureWithRequest, PromiseWithRequest>;

using CallbackInfoVariant = std::variant<
std::promise<SharedResponse>,
CallbackTypeValueVariant,
CallbackWithRequestTypeValueVariant>;
//
int64_t async_send_request_impl(const Request & request, CallbackInfoVariant value)
{
int64_t sequence_number;
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), &request, &sequence_number);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request");
}
pending_requests_.try_emplace(
sequence_number,
std::make_pair(std::chrono::system_clock::now(), std::move(value)));
return sequence_number;
}
};

async_send_request_impl()通过调用RCL层rcl_send_request()发送请求,并返回请求ID。然后以这个ID为键,将请求保留到pending_requests_,表明该请求在处理,之后收到服务端的反馈后,程序会pending_requests_中找到对应的请求Promise,然后通知客户端,结果已经返回。另外如果客户端不想接受反馈(例如出错了),则需要将请求从pending_requests_移除,防止回调被调用,这也很重要。
RCL底层细节暂不关注,请求发出后,返回的future就是等待任务完成的手段,程序通过spin_until_future_complete()等待结果返回:

1
2
3
4
5
6
7
8
9
10
auto request = std::make_shared<AddTwoInts::Request>();
request->a = 41;
request->b = 1;
auto result_future = client->async_send_request(request);
if (rclcpp::spin_until_future_complete(node, result_future) != rclcpp::FutureReturnCode::SUCCESS)
{
RCLCPP_ERROR(node->get_logger(), "service call failed :(");
client->remove_pending_request(result_future);
return 1;
}

这里有一个问题:为什么不能直接调用future.get()?

要回答这个问题,我们需要深入spin_until_future_complete()函数,看下有什么额外的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// src/ros2/rclcpp/rclcpp/include/rclcpp/executors.hpp
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
rclcpp::FutureReturnCode
spin_until_future_complete(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
rclcpp::ExecutorOptions options;
options.context = node_ptr->get_context();
rclcpp::executors::SingleThreadedExecutor executor(options);
return executors::spin_node_until_future_complete<FutureT>(executor, node_ptr, future, timeout);
}

template<typename NodeT = rclcpp::Node, typename FutureT, typename TimeRepT = int64_t,
typename TimeT = std::milli>
rclcpp::FutureReturnCode
spin_until_future_complete(
std::shared_ptr<NodeT> node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
return rclcpp::spin_until_future_complete(node_ptr->get_node_base_interface(), future, timeout);
}

在spin_until_future_complete()中建立一个单线程执行器,然后将传入的节点加入到单线程执行器,从而提供了与ROS2底层通信交互的能力,该执行器负责驱动底层获取由服务端反馈的消息。具体细节如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// src/ros2/rclcpp/rclcpp/include/rclcpp/executors.hpp
namespace executors
{
using rclcpp::executors::MultiThreadedExecutor;
using rclcpp::executors::SingleThreadedExecutor;

template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
rclcpp::FutureReturnCode
spin_node_until_future_complete(
rclcpp::Executor & executor,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.
executor.add_node(node_ptr);
auto retcode = executor.spin_until_future_complete(future, timeout);
executor.remove_node(node_ptr);
return retcode;
}

template<typename NodeT = rclcpp::Node, typename FutureT, typename TimeRepT = int64_t,
typename TimeT = std::milli>
rclcpp::FutureReturnCode
spin_node_until_future_complete(
rclcpp::Executor & executor,
std::shared_ptr<NodeT> node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
return rclcpp::executors::spin_node_until_future_complete(
executor,
node_ptr->get_node_base_interface(),
future,
timeout);
}

} // namespace executors

由于节点被加入到临时的单线程执行器中,并且执行器通过节点与ROS2底层进行交互,所以对节点有以下要求:

  1. 该节点必须是创建客户端的节点,否则会阻塞但不会返回;
  2. 该节点在调用前不能加入到其他执行器中,否则就会抛出异常,导致程序终止。
    紧接着控制权就交给了SingleThreadedExecutor的spin_until_future_complete()函数,该函数代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
FutureReturnCode
spin_until_future_complete(
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.
// 这一段警告写的很清晰,不要在一个callback中,调用spin_node_until_future_complete,否则会引起死锁

// 再进入spin循环之前,先检查一遍,看结果是不是已经反馈了
std::future_status status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}

// 等待时间处理,计算结束时间
auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;

// 进入spin循环中,这段代码我们之前在讲执行器的时候已经很熟悉了
if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_future_complete() called while already spinning");
}

RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok(this->context_) && spinning.load()) {
// 执行回调检查,看看有没有服务端消息反馈,这里其实就是调用了get_next_executable、execute_any_executable
spin_once_impl(timeout_left);

// 询问是否promise通知我已经消息返回了
status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
// 这里返回了成功,表示有结果正常返回
return FutureReturnCode::SUCCESS;
}
// 如果timeout<0,阻塞式等待,没有超时要求
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}

// 判断是否完成了超时
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The future did not complete before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
}

等待包含三种状态:

  1. FutureReturnCode::SUCCESS:一切正常,拿到了从服务端反馈的消息
  2. FutureReturnCode::TIMEOUT:超时了,需要继续等待或做错误处理
  3. FutureReturnCode::INTERRUPTED:操作被终端,有可能是ROS2底层出了问题
    在返回不为FutureReturnCode::SUCCESS,要处理好pending_requests_中滞留的请求(移除或继续等待)

有了之前文章中调度器概念后,整个过程并不复杂,就是通过执行器处理底层消息,当服务结果有反馈时,spin_once_impl()的execute_any_executable()触发客户端回调,future对应promise就会被设置,从而future.wait_for就会获取到”有结果到来”的通知:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// src/ros2/rclcpp/rclcpp/include/rclcpp/client.hpp
template<typename ServiceT>
class Client : public ClientBase
{
public:
void
handle_response(
std::shared_ptr<rmw_request_id_t> request_header,
std::shared_ptr<void> response) override
{
std::optional<CallbackInfoVariant>
optional_pending_request = this->get_and_erase_pending_request(request_header->sequence_number);
if (!optional_pending_request) {
return;
}
auto & value = *optional_pending_request;
auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(
std::move(response));
if (std::holds_alternative<Promise>(value)) {
// 这里promise执行了触发
auto & promise = std::get<Promise>(value);
promise.set_value(std::move(typed_response));
} else if (std::holds_alternative<CallbackTypeValueVariant>(value)) {
auto & inner = std::get<CallbackTypeValueVariant>(value);
const auto & callback = std::get<CallbackType>(inner);
auto & promise = std::get<Promise>(inner);
auto & future = std::get<SharedFuture>(inner);
promise.set_value(std::move(typed_response));
callback(std::move(future));
} else if (std::holds_alternative<CallbackWithRequestTypeValueVariant>(value)) {
auto & inner = std::get<CallbackWithRequestTypeValueVariant>(value);
const auto & callback = std::get<CallbackWithRequestType>(inner);
auto & promise = std::get<PromiseWithRequest>(inner);
auto & future = std::get<SharedFutureWithRequest>(inner);
auto & request = std::get<SharedRequest>(inner);
promise.set_value(std::make_pair(std::move(request), std::move(typed_response)));
callback(std::move(future));
}
}
};

// src/ros2/rclcpp/rclcpp/src/rclcpp/executor.cpp
void
Executor::execute_client(
rclcpp::ClientBase::SharedPtr client)
{
auto request_header = client->create_request_header();
std::shared_ptr<void> response = client->create_response();
take_and_do_error_handling(
"taking a service client response from service",
client->get_service_name(),
[&]() {return client->take_type_erased_response(response.get(), *request_header);},
[&]() {client->handle_response(request_header, response);});
}

void
Executor::execute_any_executable(AnyExecutable & any_exec)
{
......
// 当接收到服务端的反馈消息时,就会执行回调
if (any_exec.client) {
execute_client(any_exec.client);
}
// Reset the callback_group, regardless of type
any_exec.callback_group->can_be_taken_from().store(true);
......
}

从上方代码基本可以回答之前的问题了:

由于等待过程涉及到底层服务反馈到来后,才会触发promise.set_value,必须使用spin获取底层的消息,不能直接通过future.get()类函数完成等待,否则就会卡死。

异步Service-Client模式

实验

异步客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#include <chrono>
#include <cinttypes>
#include <limits>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "example_interfaces/srv/add_two_ints.hpp"
#include "rclcpp/rclcpp.hpp"

using AddTwoInts = example_interfaces::srv::AddTwoInts;
using namespace std::chrono_literals;

class ClientNode : public rclcpp::Node
{
public:
explicit ClientNode(const rclcpp::NodeOptions & options = rclcpp::NodeOptions{})
: Node("add_two_ints_async_client", options)
{
setvbuf(stdout, NULL, _IONBF, BUFSIZ);
// 创建client
client_ = create_client<AddTwoInts>("add_two_ints");
// 启动定时器,每隔5s裁剪掉之前请求
timer_ = this->create_wall_timer(
5s,
[this]() {
std::vector<int64_t> pruned_requests;
// Prune all requests older than 5s.
size_t n_pruned = this->client_->prune_requests_older_than(
std::chrono::system_clock::now() - 5s, &pruned_requests);
if (n_pruned) {
RCLCPP_INFO(
this->get_logger(),
"The server hasn't replied for more than 5s, %zu requests were discarded, "
"the discarded requests numbers are:",
n_pruned);
for (const auto & req_num : pruned_requests) {
RCLCPP_INFO(this->get_logger(), "\t%" PRId64, req_num);
}
}
});
}

bool
wait_for_service_server()
{
while (!client_->wait_for_service(std::chrono::seconds(1))) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(this->get_logger(), "client interrupted while waiting for service to appear.");
return false;
}
RCLCPP_INFO(this->get_logger(), "waiting for service to appear...");
}
return true;
}

void
queue_async_request(int64_t a, int64_t b)
{
auto request = std::make_shared<AddTwoInts::Request>();
request->a = a;
request->b = b;

// We give the async_send_request() method a callback that will get executed once the response
// is received.
// This way we can return immediately from this method and allow other work to be done by the
// executor in `spin` while waiting for the response.
using ServiceResponseFuture =
rclcpp::Client<example_interfaces::srv::AddTwoInts>::SharedFutureWithRequest;
auto response_received_callback =
[logger = this->get_logger()](ServiceResponseFuture future) {
auto request_response_pair = future.get();
RCLCPP_INFO(
logger,
"Result of %" PRId64 " + %" PRId64 " is: %" PRId64,
request_response_pair.first->a,
request_response_pair.first->b,
request_response_pair.second->sum);
};
auto result = client_->async_send_request(
request, std::move(response_received_callback));
RCLCPP_INFO(
this->get_logger(),
"Sending a request to the server (request_id =%" PRId64
"), we're going to let you know the result when ready!",
result.request_id);
}

private:
rclcpp::Client<AddTwoInts>::SharedPtr client_;
rclcpp::TimerBase::SharedPtr timer_;
};

bool
read_more(std::string & buffer, const rclcpp::Logger & logger)
{
buffer.clear();
std::getline(std::cin, buffer);
if (std::cin.fail() || std::cin.eof()) {
RCLCPP_INFO(logger, "\nProgram was interrupted, bye!");
return false;
}
return true;
}

std::optional<int64_t>
read_number(std::string & buffer, const rclcpp::Logger & logger)
{
while (1) {
size_t pos;
if (!read_more(buffer, logger)) {
return std::nullopt;
}
if (buffer == "q") {
return std::nullopt;
}
auto ret = std::stoll(buffer, &pos);
if (ret > std::numeric_limits<int64_t>().max()) {
RCLCPP_INFO(
logger,
"The input number should be less or equal than %" PRId64
"\n Please try again: ",
std::numeric_limits<int64_t>().max());
continue;
}
if (pos != buffer.size()) {
RCLCPP_INFO(
logger,
"The input should be a number not: %s\n Please try again: ", buffer.c_str());
continue;
}
return ret;
}
}

int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);

auto node = std::make_shared<ClientNode>();
const auto & logger = node->get_logger();

// 等待服务端启动建立服务
RCLCPP_INFO(logger, "waiting for service to appear...");
if (!node->wait_for_service_server()) {
return 1;
}
RCLCPP_INFO(logger, "Server is available!!!");
RCLCPP_INFO(logger, "We are going to add two numbers, insert the first one (or 'q' to exit): ");

int64_t a;
int64_t b;

// 启动线程用于spin,注意这里写法,例子里提供了一种停止spin()的方法
// 至此,有两个线程在并行执行:主线程和执行器spin线程
std::promise<void> stop_async_spinner;
std::thread async_spinner_thread(
[stop_token = stop_async_spinner.get_future(), node]() {
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
executor.spin_until_future_complete(stop_token);
});


while (1) {
// 获取数字,不重要
std::string buffer;
auto optional_number = read_number(buffer, logger);
if (!optional_number) {
break;
}
a = *optional_number;
RCLCPP_INFO(logger, "Insert the second number (or 'q' to exit): ");
optional_number = read_number(buffer, logger);
if (!optional_number) {
break;
}
b = *optional_number;

// 异步发送请求
node->queue_async_request(a, b);
RCLCPP_INFO(
logger,
"You can prepare another request to add two numbers, insert the first one (or 'q' to exit):"
);
}

// 停止spin线程
stop_async_spinner.set_value();
async_spinner_thread.join();
rclcpp::shutdown();
return 0;
}

异步客户端分析

以上代码仍然是ros2 example中异步客户端样例,核心在于:

  1. 主线程(其他线程)负责注册callback并发送请求
  2. 启动独立线程负责执行器spin(),以便在主线程执行其他操作,spin线程完成ROS2消息处理
    异步发送请求代码如下(注意这部分在主线程中完成):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void
queue_async_request(int64_t a, int64_t b)
{
// 首先建立一个请求,填充必要数据
auto request = std::make_shared<AddTwoInts::Request>();
request->a = a;
request->b = b;

// 异步发送请求的callback(lambda),当接收到服务器消息反馈时,就会执行该callback
using ServiceResponseFuture =
rclcpp::Client<example_interfaces::srv::AddTwoInts>::SharedFutureWithRequest;
auto response_received_callback =
[logger = this->get_logger()](ServiceResponseFuture future) {
auto request_response_pair = future.get();
RCLCPP_INFO(
logger,
"Result of %" PRId64 " + %" PRId64 " is: %" PRId64,
request_response_pair.first->a,
request_response_pair.first->b,
request_response_pair.second->sum);
};

// 异步发送请求,只不过这里多了一个callback函数
auto result = client_->async_send_request(
request, std::move(response_received_callback));
RCLCPP_INFO(
this->get_logger(),
"Sending a request to the server (request_id =%" PRId64
"), we're going to let you know the result when ready!",
result.request_id);
}

在发送请求代码中async_send_request()提供回调函数response_received_callback(),用于在接收到服务消息反馈时,触发回调函数调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template<typename ServiceT>
class Client : public ClientBase
template<
typename CallbackT,
typename std::enable_if<
rclcpp::function_traits::same_arguments<
CallbackT,
CallbackWithRequestType
>::value
>::type * = nullptr
>
SharedFutureWithRequestAndRequestId
async_send_request(SharedRequest request, CallbackT && cb)
{
PromiseWithRequest promise;
auto shared_future = promise.get_future().share();
auto req_id = async_send_request_impl(
*request,
// 这里将回调、请求、future、promise打包在一起
std::make_tuple(
CallbackWithRequestType{std::forward<CallbackT>(cb)},
request,
shared_future,
std::move(promise)));
return SharedFutureWithRequestAndRequestId{std::move(shared_future), req_id};
}
};

具体触发回调的位置,我们之前在同步客户端中已经说明了,具体细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// src/ros2/rclcpp/rclcpp/include/rclcpp/client.hpp
template<typename ServiceT>
class Client : public ClientBase
{
public:
void
handle_response(
std::shared_ptr<rmw_request_id_t> request_header,
std::shared_ptr<void> response) override
{
std::optional<CallbackInfoVariant>
optional_pending_request = this->get_and_erase_pending_request(request_header->sequence_number);
if (!optional_pending_request) {
return;
}
auto & value = *optional_pending_request;
auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(
std::move(response));
if (std::holds_alternative<Promise>(value)) {
auto & promise = std::get<Promise>(value);
promise.set_value(std::move(typed_response));
} else if (std::holds_alternative<CallbackTypeValueVariant>(value)) {
auto & inner = std::get<CallbackTypeValueVariant>(value);
const auto & callback = std::get<CallbackType>(inner);
auto & promise = std::get<Promise>(inner);
auto & future = std::get<SharedFuture>(inner);
promise.set_value(std::move(typed_response));
callback(std::move(future));
} else if (std::holds_alternative<CallbackWithRequestTypeValueVariant>(value)) {
// 会在这个分支
auto & inner = std::get<CallbackWithRequestTypeValueVariant>(value);
const auto & callback = std::get<CallbackWithRequestType>(inner);
auto & promise = std::get<PromiseWithRequest>(inner);
auto & future = std::get<SharedFutureWithRequest>(inner);
auto & request = std::get<SharedRequest>(inner);

// 注意这里,先让promise设置值,然后调用callback
promise.set_value(std::make_pair(std::move(request), std::move(typed_response)));
callback(std::move(future));
}
}
};

当从服务端接收到回应时,handle_response()被调用,代码通过promise.set_value通知客户端,消息已经被回应了。之后调用callback(),这就是用户注册回调函数,可对结果进一步地处理。另外值得注意的是,用户的回调是在执行器线程中被调用的,自定义的功能代码要做好共享变量的数据同步和保护。


ROS2 Service详解
https://ziqingdream.github.io/2025/04/14/-1-机器人技术栈/-1-ROS2操作系统/-1-ROS2-Humble代码阅读/【2】ROS2 Service详解/
作者
工程课代表[B站] / ZiQingDream[Github]
发布于
2025年4月14日
许可协议