ROS2 Action详解

本文由 ZiQingDream 编写,未经允许禁止转载。

本文作者:工程课代表[B站] / ZiQingDream[Github]

在《ROS2 Executor详解》文章中,我们理解了调度器怎么处理回调和回调组。在《ROS2 Service详解》文章中,我们解释了同步、异步服务操作流程和底层细节。本文将分析ROS2动作服务器架构,以阐述动作服务器复杂的回调、反馈、中断等操作。
由于动作服务器本质上是“服务+话题”综合体,之前对于回调、服务等机制,这里就不再赘述,仅结合Action的场景,重点关注动作服务器的回调流程。

阅读本文需要你有一定ROS2基本理解,至少要有关于动作服务器基础知识。

动作服务器

实验

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#include <functional>
#include <memory>
#include <thread>

#include "example_interfaces/action/fibonacci.hpp"
#include "rclcpp/rclcpp.hpp"
// TODO(jacobperron): Remove this once it is included as part of 'rclcpp.hpp'
#include "rclcpp_action/rclcpp_action.hpp"

class MinimalActionServer : public rclcpp::Node
{
public:
// 类型重命名,可以有效减少代码长度
using Fibonacci = example_interfaces::action::Fibonacci;
using GoalHandleFibonacci = rclcpp_action::ServerGoalHandle<Fibonacci>;

explicit MinimalActionServer(const rclcpp::NodeOptions & options = rclcpp::NodeOptions())
: Node("minimal_action_server", options)
{
using namespace std::placeholders;
// 创建动作服务器服务端:
// 包含有三个回调:handle_goal用于处理请求到达回调
// handle_cancel用于处理取消的回调
// handle_accepted用于处理请求被接收回调
this->action_server_ = rclcpp_action::create_server<Fibonacci>(
this->get_node_base_interface(),
this->get_node_clock_interface(),
this->get_node_logging_interface(),
this->get_node_waitables_interface(),
"fibonacci",
std::bind(&MinimalActionServer::handle_goal, this, _1, _2),
std::bind(&MinimalActionServer::handle_cancel, this, _1),
std::bind(&MinimalActionServer::handle_accepted, this, _1));
}

private:
rclcpp_action::Server<Fibonacci>::SharedPtr action_server_;

rclcpp_action::GoalResponse handle_goal(
const rclcpp_action::GoalUUID & uuid,
std::shared_ptr<const Fibonacci::Goal> goal)
{
RCLCPP_INFO(this->get_logger(), "Received goal request with order %d", goal->order);
(void)uuid;
// 拒绝9000以上的计算
if (goal->order > 9000) {
return rclcpp_action::GoalResponse::REJECT;
}
return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE;
}

rclcpp_action::CancelResponse handle_cancel(
const std::shared_ptr<GoalHandleFibonacci> goal_handle)
{
// cancel不做任何处理
RCLCPP_INFO(this->get_logger(), "Received request to cancel goal");
(void)goal_handle;
return rclcpp_action::CancelResponse::ACCEPT;
}

// 异步执行流程
void execute(const std::shared_ptr<GoalHandleFibonacci> goal_handle)
{
RCLCPP_INFO(this->get_logger(), "Executing goal");
rclcpp::Rate loop_rate(1);

// 首先获取goal、feedback
const auto goal = goal_handle->get_goal();
auto feedback = std::make_shared<Fibonacci::Feedback>();
auto & sequence = feedback->sequence;
sequence.push_back(0);
sequence.push_back(1);
auto result = std::make_shared<Fibonacci::Result>();

for (int i = 1; (i < goal->order) && rclcpp::ok(); ++i) {
// 检查是否需要取消,执行取消流程,并反馈已经取消完成
if (goal_handle->is_canceling()) {
result->sequence = sequence;
goal_handle->canceled(result);
RCLCPP_INFO(this->get_logger(), "Goal Canceled");
return;
}
// Update sequence
sequence.push_back(sequence[i] + sequence[i - 1]);
// 发布feedback(这个可以不断进行)
goal_handle->publish_feedback(feedback);
RCLCPP_INFO(this->get_logger(), "Publish Feedback");

loop_rate.sleep();
}

// 成功,反馈最后结果
if (rclcpp::ok()) {
result->sequence = sequence;
goal_handle->succeed(result);
RCLCPP_INFO(this->get_logger(), "Goal Succeeded");
}
}

void handle_accepted(const std::shared_ptr<GoalHandleFibonacci> goal_handle)
{
using namespace std::placeholders;
// 接收不能阻塞,这里直接通过新建一个线程,异步处理计算过程(这个过程可能会很耗时)
std::thread{std::bind(&MinimalActionServer::execute, this, _1), goal_handle}.detach();
}
}; // class MinimalActionServer

int main(int argc, char ** argv)
{
rclcpp::init(argc, argv);

auto action_server = std::make_shared<MinimalActionServer>();

rclcpp::spin(action_server);

rclcpp::shutdown();
return 0;
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#include <chrono>
#include <cinttypes>
#include <functional>
#include <future>
#include <memory>
#include <string>

#include "example_interfaces/action/fibonacci.hpp"
#include "rclcpp/rclcpp.hpp"
// TODO(jacobperron): Remove this once it is included as part of 'rclcpp.hpp'
#include "rclcpp_action/rclcpp_action.hpp"

class MinimalActionClient : public rclcpp::Node
{
public:
using Fibonacci = example_interfaces::action::Fibonacci;
using GoalHandleFibonacci = rclcpp_action::ClientGoalHandle<Fibonacci>;

explicit MinimalActionClient(const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions())
: Node("minimal_action_client", node_options), goal_done_(false)
{
// 创建客户端
this->client_ptr_ = rclcpp_action::create_client<Fibonacci>(
this->get_node_base_interface(),
this->get_node_graph_interface(),
this->get_node_logging_interface(),
this->get_node_waitables_interface(),
"fibonacci");

this->timer_ = this->create_wall_timer(
std::chrono::milliseconds(500),
std::bind(&MinimalActionClient::send_goal, this));
}

bool is_goal_done() const
{
return this->goal_done_;
}

// 定时回调函数,每隔500ms,由于会取消定时器,其实只执行了一次
void send_goal()
{
using namespace std::placeholders;

this->timer_->cancel();

this->goal_done_ = false;

if (!this->client_ptr_) {
RCLCPP_ERROR(this->get_logger(), "Action client not initialized");
}

// 等待动作服务被建立
if (!this->client_ptr_->wait_for_action_server(std::chrono::seconds(10))) {
RCLCPP_ERROR(this->get_logger(), "Action server not available after waiting");
this->goal_done_ = true;
return;
}

// 建立计算目标
auto goal_msg = Fibonacci::Goal();
goal_msg.order = 10;

RCLCPP_INFO(this->get_logger(), "Sending goal");

// 执行异步发送,注意这里并没有处理future,直接通过回调处理:
// 1. goal_response_callback 目标被接受或拒绝的反馈
// 2. feedback_callback 不断反馈中间结果的反馈
// 3. result_callback 结果反馈
auto send_goal_options = rclcpp_action::Client<Fibonacci>::SendGoalOptions();
send_goal_options.goal_response_callback =
std::bind(&MinimalActionClient::goal_response_callback, this, _1);
send_goal_options.feedback_callback =
std::bind(&MinimalActionClient::feedback_callback, this, _1, _2);
send_goal_options.result_callback =
std::bind(&MinimalActionClient::result_callback, this, _1);
auto goal_handle_future = this->client_ptr_->async_send_goal(goal_msg, send_goal_options);
}

private:
rclcpp_action::Client<Fibonacci>::SharedPtr client_ptr_;
rclcpp::TimerBase::SharedPtr timer_;
bool goal_done_;

void goal_response_callback(GoalHandleFibonacci::SharedPtr goal_handle)
{
if (!goal_handle) {
RCLCPP_ERROR(this->get_logger(), "Goal was rejected by server");
} else {
RCLCPP_INFO(this->get_logger(), "Goal accepted by server, waiting for result");
}
}

void feedback_callback(
GoalHandleFibonacci::SharedPtr,
const std::shared_ptr<const Fibonacci::Feedback> feedback)
{
RCLCPP_INFO(
this->get_logger(),
"Next number in sequence received: %" PRId32,
feedback->sequence.back());
}

void result_callback(const GoalHandleFibonacci::WrappedResult & result)
{
this->goal_done_ = true;
switch (result.code) {
case rclcpp_action::ResultCode::SUCCEEDED:
break;
case rclcpp_action::ResultCode::ABORTED:
RCLCPP_ERROR(this->get_logger(), "Goal was aborted");
return;
case rclcpp_action::ResultCode::CANCELED:
RCLCPP_ERROR(this->get_logger(), "Goal was canceled");
return;
default:
RCLCPP_ERROR(this->get_logger(), "Unknown result code");
return;
}

RCLCPP_INFO(this->get_logger(), "Result received");
for (auto number : result.result->sequence) {
RCLCPP_INFO(this->get_logger(), "%" PRId32, number);
}
}
}; // class MinimalActionClient

int main(int argc, char ** argv)
{
rclcpp::init(argc, argv);
auto action_client = std::make_shared<MinimalActionClient>();

while (!action_client->is_goal_done()) {
rclcpp::spin_some(action_client);
}

rclcpp::shutdown();
return 0;
}

动作服务器处理机制

在ROS2官方网站有动作服务器可视化介绍,整个处理流程如图(Understanding actions — ROS 2 Documentation: Humble documentation):

动作服务器正常情况下的复杂交互流程可以总结为以下5个阶段:

  1. Goal Service客户端向Goal Service服务端发送请求
  2. Goal Service服务端反馈目标接受状态:接收或拒绝
  3. Result Service客户端向Result Service服务端发送请求
  4. 动作服务器开始计算,并通过Feedback Topic反馈中间结果
  5. 最终结果计算完毕后,Result Service服务端反馈结果给Result Service客户端
    由以上的交互流程可知:
  • 动作服务器客户端至少包含以下组合:“Goal Service Client + Feedback Subscriber + Result Service Client”;
  • 动作服务器服务端至少包含以下组合:“Goal Service Server + Feedback Publisher + Result Service Server”。

动作服务器拆分

为了清楚说明以上交互逻辑,我们将详细拆解“动作服务器服务端+动作服务器客户端”代码,由于整个流程冗长和复杂,读者可以分部阅读:

动作服务器创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 客户端创建代码
this->client_ptr_ = rclcpp_action::create_client<Fibonacci>(
this->get_node_base_interface(),
this->get_node_graph_interface(),
this->get_node_logging_interface(),
this->get_node_waitables_interface(),
"fibonacci");

// 服务端创建代码
this->action_server_ = rclcpp_action::create_server<Fibonacci>(
this->get_node_base_interface(),
this->get_node_clock_interface(),
this->get_node_logging_interface(),
this->get_node_waitables_interface(),
"fibonacci",
std::bind(&MinimalActionServer::handle_goal, this, _1, _2),
std::bind(&MinimalActionServer::handle_cancel, this, _1),
std::bind(&MinimalActionServer::handle_accepted, this, _1));

动作服务器服务端

其中rclcpp_action::create_server()执行服务端创建,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// src/ros2/rclcpp/rclcpp_action/include/rclcpp_action/create_server.hpp
template<typename ActionT>
typename Server<ActionT>::SharedPtr
create_server(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_interface,
rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock_interface,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging_interface,
rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr node_waitables_interface,
const std::string & name,
typename Server<ActionT>::GoalCallback handle_goal,
typename Server<ActionT>::CancelCallback handle_cancel,
typename Server<ActionT>::AcceptedCallback handle_accepted,
const rcl_action_server_options_t & options = rcl_action_server_get_default_options(),
rclcpp::CallbackGroup::SharedPtr group = nullptr)
{
std::weak_ptr<rclcpp::node_interfaces::NodeWaitablesInterface> weak_node =
node_waitables_interface;
std::weak_ptr<rclcpp::CallbackGroup> weak_group = group;
bool group_is_null = (nullptr == group.get());

auto deleter = [weak_node, weak_group, group_is_null](Server<ActionT> * ptr)
{
if (nullptr == ptr) {
return;
}
auto shared_node = weak_node.lock();
if (shared_node) {
// API expects a shared pointer, give it one with a deleter that does nothing.
std::shared_ptr<Server<ActionT>> fake_shared_ptr(ptr, [](Server<ActionT> *) {});

if (group_is_null) {
// Was added to default group
shared_node->remove_waitable(fake_shared_ptr, nullptr);
} else {
// Was added to a specific group
auto shared_group = weak_group.lock();
if (shared_group) {
shared_node->remove_waitable(fake_shared_ptr, shared_group);
}
}
}
delete ptr;
};

std::shared_ptr<Server<ActionT>> action_server(new Server<ActionT>(
node_base_interface,
node_clock_interface,
node_logging_interface,
name,
options,
handle_goal,
handle_cancel,
handle_accepted), deleter);

node_waitables_interface->add_waitable(action_server, group);
return action_server;
}

从create_server()代码中可以看出,程序直接新建Server< ActionT >对象并返回,该对象就是动作服务器内部表示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
template<typename ActionT>
class Server : public ServerBase, public std::enable_shared_from_this<Server<ActionT>>
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Server)

/// Signature of a callback that accepts or rejects goal requests.
using GoalCallback = std::function<GoalResponse(
const GoalUUID &, std::shared_ptr<const typename ActionT::Goal>)>;
/// Signature of a callback that accepts or rejects requests to cancel a goal.
using CancelCallback = std::function<CancelResponse(std::shared_ptr<ServerGoalHandle<ActionT>>)>;
/// Signature of a callback that is used to notify when the goal has been accepted.
using AcceptedCallback = std::function<void (std::shared_ptr<ServerGoalHandle<ActionT>>)>;

/// Construct an action server.
Server(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
const std::string & name,
const rcl_action_server_options_t & options,
GoalCallback handle_goal,
CancelCallback handle_cancel,
AcceptedCallback handle_accepted
)
: ServerBase(
node_base,
node_clock,
node_logging,
name,
rosidl_typesupport_cpp::get_action_type_support_handle<ActionT>(),
options),
handle_goal_(handle_goal),
handle_cancel_(handle_cancel),
handle_accepted_(handle_accepted)
{
}

virtual ~Server() = default;
...... // 其他代码逻辑
};

在Server构造函数中构建了ServerBase类,这个类继承自rclcpp::Waitable,表示RCL可等待对象,本质上和ROS2话题等类似,可以被调度、等待。从这个角度看,任何一个动作服务器,都是一个可等待的对象。在ServerBase构造函数中,执行服务初始化动作,具体流程如下代码注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// src/ros2/rclcpp/rclcpp_action/include/rclcpp_action/server.hpp
class ServerBase : public rclcpp::Waitable
{
...... // 其他代码
public:
RCLCPP_ACTION_PUBLIC
virtual ~ServerBase();

protected:
RCLCPP_ACTION_PUBLIC
ServerBase(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
const std::string & name,
const rosidl_action_type_support_t * type_support,
const rcl_action_server_options_t & options);
...... // 其他代码
};

// src/ros2/rclcpp/rclcpp_action/src/server.cpp
ServerBase::ServerBase(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
const std::string & name,
const rosidl_action_type_support_t * type_support,
const rcl_action_server_options_t & options
)
// 这里初始化内部数据结果ServerBaseImpl
: pimpl_(new ServerBaseImpl(
node_clock->get_clock(), node_logging->get_logger().get_child("rclcpp_action")))
{
auto deleter = [node_base](rcl_action_server_t * ptr)
{
if (nullptr != ptr) {
rcl_node_t * rcl_node = node_base->get_rcl_node_handle();
rcl_ret_t ret = rcl_action_server_fini(ptr, rcl_node);
if (RCL_RET_OK != ret) {
RCLCPP_DEBUG(
rclcpp::get_logger("rclcpp_action"),
"failed to fini rcl_action_server_t in deleter");
}
delete ptr;
}
};

// 首先创建一个动作服务器,并清理服务器数据结构
// 值得注意的是这里的pimpl_本质上就是动作服务器(PIMPL模式)
pimpl_->action_server_.reset(new rcl_action_server_t, deleter);
*(pimpl_->action_server_) = rcl_action_get_zero_initialized_server();

rcl_node_t * rcl_node = node_base->get_rcl_node_handle();
rcl_clock_t * rcl_clock = pimpl_->clock_->get_clock_handle();

// 执行动作服务器的初始化(RCL层)
rcl_ret_t ret = rcl_action_server_init(
pimpl_->action_server_.get(), rcl_node, rcl_clock, type_support, name.c_str(), &options);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}

ret = rcl_action_server_wait_set_get_num_entities(
pimpl_->action_server_.get(),
&pimpl_->num_subscriptions_,
&pimpl_->num_guard_conditions_,
&pimpl_->num_timers_,
&pimpl_->num_clients_,
&pimpl_->num_services_);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}

ServerBase::ServerBase()程序通过rcl_action_server_init()初始化RCL动作服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// src/ros2/rcl/rcl_action/src/rcl_action/action_server_impl.h
/// rcl动作服务器内部实现结构体
typedef struct rcl_action_server_impl_s
{
rcl_service_t goal_service;
rcl_service_t cancel_service;
rcl_service_t result_service;
rcl_publisher_t feedback_publisher;
rcl_publisher_t status_publisher;
rcl_timer_t expire_timer;
char * action_name;
rcl_action_server_options_t options;
// Array of goal handles
rcl_action_goal_handle_t ** goal_handles;
size_t num_goal_handles;
// Clock
rcl_clock_t * clock;
// Wait set records
size_t wait_set_goal_service_index;
size_t wait_set_cancel_service_index;
size_t wait_set_result_service_index;
size_t wait_set_expire_timer_index;
} rcl_action_server_impl_t;

// src/ros2/rcl/rcl_action/include/rcl_action/action_server.h
/// Internal rcl_action implementation struct.
typedef struct rcl_action_server_impl_s rcl_action_server_impl_t;

/// Structure which encapsulates a ROS Action Server.
typedef struct rcl_action_server_s
{
/// Pointer to the action server implementation
rcl_action_server_impl_t * impl;
} rcl_action_server_t;

// src/ros2/rcl/rcl_action/src/rcl_action/action_server.c(注意这里是C代码)
rcl_ret_t
rcl_action_server_init(
rcl_action_server_t * action_server,
rcl_node_t * node,
rcl_clock_t * clock,
const rosidl_action_type_support_t * type_support,
const char * action_name,
const rcl_action_server_options_t * options)
{
......// 参数检查代码
// 为动作服务内部数据结构申请内存
action_server->impl = (rcl_action_server_impl_t *)allocator.allocate(
sizeof(rcl_action_server_impl_t), allocator.state);
RCL_CHECK_FOR_NULL_WITH_MSG(
action_server->impl, "allocating memory failed", return RCL_RET_BAD_ALLOC);

// 对新申请内存进行零初始化
action_server->impl->goal_service = rcl_get_zero_initialized_service();
action_server->impl->cancel_service = rcl_get_zero_initialized_service();
action_server->impl->result_service = rcl_get_zero_initialized_service();
action_server->impl->expire_timer = rcl_get_zero_initialized_timer();
action_server->impl->feedback_publisher = rcl_get_zero_initialized_publisher();
action_server->impl->status_publisher = rcl_get_zero_initialized_publisher();
action_server->impl->action_name = NULL;
action_server->impl->options = *options; // copy options
action_server->impl->goal_handles = NULL;
action_server->impl->num_goal_handles = 0u;
action_server->impl->clock = NULL;

rcl_ret_t ret = RCL_RET_OK;
// 以下部分是通过C宏函数来初始化rcl变量,这里就不展开了
// 初始化action_server->impl->goal_service
SERVICE_INIT(goal);
// 初始化action_server->impl->cancel_service
SERVICE_INIT(cancel);
// 初始化action_server->impl->result_service
SERVICE_INIT(result);

// 初始化action_server->impl->feedback_publisher
PUBLISHER_INIT(feedback);
// 初始化action_server->impl->status_publisher
PUBLISHER_INIT(status);

// Store reference to clock
action_server->impl->clock = clock;

// 初始化action_server->impl->expire_timer超时定时器
ret = rcl_timer_init(
&action_server->impl->expire_timer, action_server->impl->clock, node->context,
options->result_timeout.nanoseconds, NULL, allocator);
if (RCL_RET_OK != ret) {
goto fail;
}
// Cancel timer so it doesn't start firing
ret = rcl_timer_cancel(&action_server->impl->expire_timer);
if (RCL_RET_OK != ret) {
goto fail;
}

// Copy action name
action_server->impl->action_name = rcutils_strdup(action_name, allocator);
if (NULL == action_server->impl->action_name) {
ret = RCL_RET_BAD_ALLOC;
goto fail;
}
return ret;
fail:
{
// Finalize any services/publishers that were initialized and deallocate action_server->impl
rcl_ret_t ret_throwaway = rcl_action_server_fini(action_server, node);
// Since there is already a failure, it is likely that finalize will error on one or more of
// the action servers members
(void)ret_throwaway;
}
return ret;
}

rcl_action_server_init()对动作服务器服务端初始化,该函数在RCL层建立了多个调用对象,如前所述包括:

  • Goal Service Server:目标服务器服务端,用于Goal接收,并确定是接收还是拒绝目标
  • Cancel Service Server:取消服务器服务端,用于Goal取消操作
  • Result Service Server:结果服务器服务端,用于Result请求接收,并反馈最后结果
  • Feedback Publisher:反馈发布者,用于Feedback的不断反馈
  • Status Publiser:状态发布者,用于自身状态发布
  • Expire timer:超时定时器

动作服务器客户端

rclcpp_action::create_client()创建动作服务器客户端,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// src/ros2/rclcpp/rclcpp_action/include/rclcpp_action/create_client.hpp
template<typename ActionT>
typename Client<ActionT>::SharedPtr
create_client(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_interface,
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph_interface,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging_interface,
rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr node_waitables_interface,
const std::string & name,
rclcpp::CallbackGroup::SharedPtr group = nullptr,
const rcl_action_client_options_t & options = rcl_action_client_get_default_options())
{
std::weak_ptr<rclcpp::node_interfaces::NodeWaitablesInterface> weak_node =
node_waitables_interface;
std::weak_ptr<rclcpp::CallbackGroup> weak_group = group;
bool group_is_null = (nullptr == group.get());

auto deleter = [weak_node, weak_group, group_is_null](Client<ActionT> * ptr)
{
if (nullptr == ptr) {
return;
}
auto shared_node = weak_node.lock();
if (shared_node) {
// API expects a shared pointer, give it one with a deleter that does nothing.
std::shared_ptr<Client<ActionT>> fake_shared_ptr(ptr, [](Client<ActionT> *) {});

if (group_is_null) {
// Was added to default group
shared_node->remove_waitable(fake_shared_ptr, nullptr);
} else {
// Was added to a specific group
auto shared_group = weak_group.lock();
if (shared_group) {
shared_node->remove_waitable(fake_shared_ptr, shared_group);
}
}
}
delete ptr;
};

std::shared_ptr<Client<ActionT>> action_client(
new Client<ActionT>(
node_base_interface,
node_graph_interface,
node_logging_interface,
name,
options),
deleter);

node_waitables_interface->add_waitable(action_client, group);
return action_client;
}

代码基本和动作服务器服务端类似,代码新建的Client< ActionT >对象指针, 同样的每一个客户端也是一个可等待对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
// src/ros2/rclcpp/rclcpp_action/include/rclcpp_action/client.hpp
class Client : public ClientBase
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Client<ActionT>)

using Goal = typename ActionT::Goal;
using Feedback = typename ActionT::Feedback;
using GoalHandle = ClientGoalHandle<ActionT>;
using WrappedResult = typename GoalHandle::WrappedResult;
using GoalResponseCallback = std::function<void (typename GoalHandle::SharedPtr)>;
using FeedbackCallback = typename GoalHandle::FeedbackCallback;
using ResultCallback = typename GoalHandle::ResultCallback;
using CancelRequest = typename ActionT::Impl::CancelGoalService::Request;
using CancelResponse = typename ActionT::Impl::CancelGoalService::Response;
using CancelCallback = std::function<void (typename CancelResponse::SharedPtr)>;

/// Options for sending a goal.
/**
* This struct is used to pass parameters to the function `async_send_goal`.
*/
struct SendGoalOptions
{
SendGoalOptions()
: goal_response_callback(nullptr),
feedback_callback(nullptr),
result_callback(nullptr)
{
}

/// Function called when the goal is accepted or rejected.
/**
* Takes a single argument that is a goal handle shared pointer.
* If the goal is accepted, then the pointer points to a valid goal handle.
* If the goal is rejected, then pointer has the value `nullptr`.
*/
GoalResponseCallback goal_response_callback;

/// Function called whenever feedback is received for the goal.
FeedbackCallback feedback_callback;

/// Function called when the result for the goal is received.
ResultCallback result_callback;
};

/// Construct an action client.
Client(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
const std::string & action_name,
const rcl_action_client_options_t & client_options = rcl_action_client_get_default_options()
)
: ClientBase(
node_base, node_graph, node_logging, action_name,
rosidl_typesupport_cpp::get_action_type_support_handle<ActionT>(),
client_options)
{
}
......// 其他代码
};


class ClientBase : public rclcpp::Waitable
{
public:
RCLCPP_ACTION_PUBLIC
virtual ~ClientBase();

/// Enum to identify entities belonging to the action client
enum class EntityType : std::size_t
{
GoalClient,
ResultClient,
CancelClient,
FeedbackSubscription,
StatusSubscription,
};

protected:
RCLCPP_ACTION_PUBLIC
ClientBase(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
const std::string & action_name,
const rosidl_action_type_support_t * type_support,
const rcl_action_client_options_t & options);
......// 其他代码逻辑
};

class ClientBaseImpl
{
public:
ClientBaseImpl(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
const std::string & action_name,
const rosidl_action_type_support_t * type_support,
const rcl_action_client_options_t & client_options)
: node_graph_(node_graph),
node_handle(node_base->get_shared_rcl_node_handle()),
logger(node_logging->get_logger().get_child("rclcpp_action")),
random_bytes_generator(std::random_device{}())
{
std::weak_ptr<rcl_node_t> weak_node_handle(node_handle);
client_handle = std::shared_ptr<rcl_action_client_t>(
new rcl_action_client_t, [weak_node_handle](rcl_action_client_t * client)
{
auto handle = weak_node_handle.lock();
if (handle) {
if (RCL_RET_OK != rcl_action_client_fini(client, handle.get())) {
RCLCPP_ERROR(
rclcpp::get_logger(rcl_node_get_logger_name(handle.get())).get_child("rclcpp_action"),
"Error in destruction of rcl action client handle: %s", rcl_get_error_string().str);
rcl_reset_error();
}
} else {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp_action"),
"Error in destruction of rcl action client handle: "
"the Node Handle was destructed too early. You will leak memory");
}
delete client;
});
*client_handle = rcl_action_get_zero_initialized_client();
rcl_ret_t ret = rcl_action_client_init(
client_handle.get(), node_handle.get(), type_support,
action_name.c_str(), &client_options);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "could not initialize rcl action client");
}

ret = rcl_action_client_wait_set_get_num_entities(
client_handle.get(),
&num_subscriptions,
&num_guard_conditions,
&num_timers,
&num_clients,
&num_services);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "could not retrieve rcl action client details");
}
}

size_t num_subscriptions{0u};
size_t num_guard_conditions{0u};
size_t num_timers{0u};
size_t num_clients{0u};
size_t num_services{0u};

// Lock for action_client_
std::recursive_mutex action_client_mutex_;

// next ready event for taking, will be set by is_ready and will be processed by take_data
std::atomic<size_t> next_ready_event;
// used to indicate that next_ready_event has no ready event for processing
static constexpr size_t NO_EVENT_READY = std::numeric_limits<size_t>::max();

rclcpp::Context::SharedPtr context_;
rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
// node_handle must be destroyed after client_handle to prevent memory leak
std::shared_ptr<rcl_node_t> node_handle{nullptr};
std::shared_ptr<rcl_action_client_t> client_handle{nullptr};
rclcpp::Logger logger;

using ResponseCallback = std::function<void (std::shared_ptr<void> response)>;

std::map<int64_t, ResponseCallback> pending_goal_responses;
std::mutex goal_requests_mutex;

std::map<int64_t, ResponseCallback> pending_result_responses;
std::mutex result_requests_mutex;

std::map<int64_t, ResponseCallback> pending_cancel_responses;
std::mutex cancel_requests_mutex;

std::independent_bits_engine<
std::default_random_engine, 8, unsigned int> random_bytes_generator;
};

ClientBase::ClientBase(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
const std::string & action_name,
const rosidl_action_type_support_t * type_support,
const rcl_action_client_options_t & client_options)
: pimpl_(new ClientBaseImpl(
node_base, node_graph, node_logging, action_name, type_support, client_options))
{
}

ClientBase::~ClientBase()
{
clear_on_ready_callback();
}

ClientBase::ClientBase()整个代码结构和ServerBase类似,rcl_action_client_init() 负责初始化客户端底层数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// src/ros2/rcl/rcl_action/src/rcl_action/action_client_impl.h
typedef struct rcl_action_client_impl_s
{
rcl_client_t goal_client;
rcl_client_t cancel_client;
rcl_client_t result_client;
rcl_subscription_t feedback_subscription;
rcl_subscription_t status_subscription;
rcl_action_client_options_t options;
char * action_name;
// Wait set records
size_t wait_set_goal_client_index;
size_t wait_set_cancel_client_index;
size_t wait_set_result_client_index;
size_t wait_set_feedback_subscription_index;
size_t wait_set_status_subscription_index;
} rcl_action_client_impl_t;

// src/ros2/rcl/rcl_action/include/rcl_action/action_client.h
/// Internal action client implementation struct.
typedef struct rcl_action_client_impl_s rcl_action_client_impl_t;

/// Structure which encapsulates a ROS action client.
typedef struct rcl_action_client_s
{
/// Pointer to the action client implementation
rcl_action_client_impl_t * impl;
} rcl_action_client_t;

// src/ros2/rcl/rcl_action/src/rcl_action/action_client.c(注意C代码)
rcl_ret_t
rcl_action_client_init(
rcl_action_client_t * action_client,
rcl_node_t * node,
const rosidl_action_type_support_t * type_support,
const char * action_name,
const rcl_action_client_options_t * options)
{
......// 检查代码逻辑

// 申请客户端内存数据结构存储,并做零初始化
action_client->impl = allocator.allocate(sizeof(rcl_action_client_impl_t), allocator.state);
RCL_CHECK_FOR_NULL_WITH_MSG(
action_client->impl, "allocating memory failed", return RCL_RET_BAD_ALLOC);

// Avoid uninitialized pointers should initialization fail
*action_client->impl = _rcl_action_get_zero_initialized_client_impl();
// Copy action client name and options.
action_client->impl->action_name = rcutils_strdup(action_name, allocator);
if (NULL == action_client->impl->action_name) {
RCL_SET_ERROR_MSG("failed to duplicate action name");
ret = RCL_RET_BAD_ALLOC;
goto fail;
}
action_client->impl->options = *options;

// 初始化action_client->impl->goal_client
CLIENT_INIT(goal);
// 初始化action_client->impl->cancel_client
CLIENT_INIT(cancel);
// 初始化action_client->impl->result_client
CLIENT_INIT(result);

// 初始化action_client->impl->feedback_subscription
SUBSCRIPTION_INIT(feedback);
// 初始化action_client->impl->status_subscription
SUBSCRIPTION_INIT(status);

RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Action client initialized");
return ret;
fail:
fini_ret = _rcl_action_client_fini_impl(action_client, node, allocator);
if (RCL_RET_OK != fini_ret) {
RCL_SET_ERROR_MSG("failed to cleanup action client");
ret = RCL_RET_ERROR;
}
return ret;
}

对动作服务器客户端初始化时,我们在RCL底层建立了多个可调用对象,如前所述包括:

  • Goal Service Client:目标服务器客户端,用于发送目标请求、接收请求回复
  • Cancel Service Client:取消服务器客户端,用于发送取消请求
  • Result Service Client:结果服务器客户端,用于发送结果请求,接收结果回复
  • Feedback Subscription:反馈订阅者,用于反馈消息的接收
  • Status Subscription:动作服务器状态订阅者

请求处理过程

动作服务器客户端发送Goal请求

1
2
3
4
5
6
7
8
9
10
11
12
13
auto goal_msg = Fibonacci::Goal();
goal_msg.order = 10;

RCLCPP_INFO(this->get_logger(), "Sending goal");

auto send_goal_options = rclcpp_action::Client<Fibonacci>::SendGoalOptions();
send_goal_options.goal_response_callback =
std::bind(&MinimalActionClient::goal_response_callback, this, _1);
send_goal_options.feedback_callback =
std::bind(&MinimalActionClient::feedback_callback, this, _1, _2);
send_goal_options.result_callback =
std::bind(&MinimalActionClient::result_callback, this, _1);
auto goal_handle_future = this->client_ptr_->async_send_goal(goal_msg, send_goal_options);

服务端和客户端初始化完毕后,客户端调用async_send_goal()发送请求,参数除了目标消息外,还有回调用的callback。由于代码比较复杂,这里直接在代码中注释逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
// src/ros2/rclcpp/rclcpp_action/include/rclcpp_action/client.hpp
template<typename ActionT>
class Client : public ClientBase
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Client<ActionT>)

using Goal = typename ActionT::Goal;
using Feedback = typename ActionT::Feedback;
using GoalHandle = ClientGoalHandle<ActionT>;
using WrappedResult = typename GoalHandle::WrappedResult;
using GoalResponseCallback = std::function<void (typename GoalHandle::SharedPtr)>;
using FeedbackCallback = typename GoalHandle::FeedbackCallback;
using ResultCallback = typename GoalHandle::ResultCallback;
using CancelRequest = typename ActionT::Impl::CancelGoalService::Request;
using CancelResponse = typename ActionT::Impl::CancelGoalService::Response;
using CancelCallback = std::function<void (typename CancelResponse::SharedPtr)>;
.....// 其他代码逻辑

std::shared_future<typename GoalHandle::SharedPtr>
async_send_goal(const Goal & goal, const SendGoalOptions & options = SendGoalOptions())
{
// 建立promise,反馈内容GoalHandle
auto promise = std::make_shared<std::promise<typename GoalHandle::SharedPtr>>();
// 获取promise的future,注意这个future回复反馈给用户,以便用户等待goal通知
std::shared_future<typename GoalHandle::SharedPtr> future(promise->get_future());

// 填充目标请求,这里每隔goal都有一个uuid
using GoalRequest = typename ActionT::Impl::SendGoalService::Request;
auto goal_request = std::make_shared<GoalRequest>();
goal_request->goal_id.uuid = this->generate_goal_id();
goal_request->goal = goal;

// 发送goal请求
this->send_goal_request(
std::static_pointer_cast<void>(goal_request),
// Goal响应,由于调用流程在其他地方,我们暂时不谈
[this, goal_request, options, promise](std::shared_ptr<void> response) mutable
{
using GoalResponse = typename ActionT::Impl::SendGoalService::Response;
auto goal_response = std::static_pointer_cast<GoalResponse>(response);

// 如果goal被拒绝(goal响应消息中accepted==false)
if (!goal_response->accepted) {
// 通知future goal设置为空
promise->set_value(nullptr);
// 然后调用goal的回调函数,填充为空
if (options.goal_response_callback) {
options.goal_response_callback(nullptr);
}
return;
}

// 到此处goal被接受了
GoalInfo goal_info;
goal_info.goal_id.uuid = goal_request->goal_id.uuid;
goal_info.stamp = goal_response->stamp;
// Do not use std::make_shared as friendship cannot be forwarded.
std::shared_ptr<GoalHandle> goal_handle(
new GoalHandle(goal_info, options.feedback_callback, options.result_callback));
{
std::lock_guard<std::mutex> guard(goal_handles_mutex_);
goal_handles_[goal_handle->get_goal_id()] = goal_handle;
}

// 新建goal_handle,并且通知promise,goal被接受了
promise->set_value(goal_handle);

// 调用goal的回调,此时参数有效
if (options.goal_response_callback) {
options.goal_response_callback(goal_handle);
}

// 向服务端发送goal result请求
if (options.result_callback) {
this->make_result_aware(goal_handle);
}
});

// 清除不再使用goal,暂不关心
{
std::lock_guard<std::mutex> guard(goal_handles_mutex_);
auto goal_handle_it = goal_handles_.begin();
while (goal_handle_it != goal_handles_.end()) {
if (!goal_handle_it->second.lock()) {
RCLCPP_DEBUG(
this->get_logger(),
"Dropping weak reference to goal handle during send_goal()");
goal_handle_it = goal_handles_.erase(goal_handle_it);
} else {
++goal_handle_it;
}
}
}

return future;
}

/// \internal
void
make_result_aware(typename GoalHandle::SharedPtr goal_handle)
{
// Avoid making more than one request
if (goal_handle->set_result_awareness(true)) {
return;
}
using GoalResultRequest = typename ActionT::Impl::GetResultService::Request;
auto goal_result_request = std::make_shared<GoalResultRequest>();
goal_result_request->goal_id.uuid = goal_handle->get_goal_id();
try {
// 这里发送结果请求
this->send_result_request(
std::static_pointer_cast<void>(goal_result_request),
[goal_handle, this](std::shared_ptr<void> response) mutable
{
// Wrap the response in a struct with the fields a user cares about
WrappedResult wrapped_result;
using GoalResultResponse = typename ActionT::Impl::GetResultService::Response;
// 处理结果,拷贝到wrapped_result
auto result_response = std::static_pointer_cast<GoalResultResponse>(response);
wrapped_result.result = std::make_shared<typename ActionT::Result>();
*wrapped_result.result = result_response->result;
wrapped_result.goal_id = goal_handle->get_goal_id();
wrapped_result.code = static_cast<ResultCode>(result_response->status);
// 这里将结果也放到goal_handle中
goal_handle->set_result(wrapped_result);
std::lock_guard<std::mutex> lock(goal_handles_mutex_);
goal_handles_.erase(goal_handle->get_goal_id());
});
} catch (rclcpp::exceptions::RCLError & ex) {
// This will cause an exception when the user tries to access the result
goal_handle->invalidate(exceptions::UnawareGoalHandleError(ex.message));
}
}
};

// src/ros2/rclcpp/rclcpp_action/include/rclcpp_action/client_goal_handle.hpp
template<typename ActionT>
class ClientGoalHandle
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(ClientGoalHandle)

// A wrapper that defines the result of an action
struct WrappedResult
{
/// The unique identifier of the goal
GoalUUID goal_id;
/// A status to indicate if the goal was canceled, aborted, or suceeded
ResultCode code;
/// User defined fields sent back with an action
typename ActionT::Result::SharedPtr result;
};

using Feedback = typename ActionT::Feedback;
using Result = typename ActionT::Result;
using FeedbackCallback =
std::function<void (
typename ClientGoalHandle<ActionT>::SharedPtr,
const std::shared_ptr<const Feedback>)>;
using ResultCallback = std::function<void (const WrappedResult & result)>;

virtual ~ClientGoalHandle();

.....// 其他代码逻辑

private:
// The templated Client creates goal handles
friend class Client<ActionT>;

ClientGoalHandle(
const GoalInfo & info,
FeedbackCallback feedback_callback,
ResultCallback result_callback);

.....// 其他代码逻辑

GoalInfo info_;

std::exception_ptr invalidate_exception_{nullptr};

bool is_result_aware_{false};
std::promise<WrappedResult> result_promise_;
std::shared_future<WrappedResult> result_future_;

FeedbackCallback feedback_callback_{nullptr};
ResultCallback result_callback_{nullptr};
int8_t status_{GoalStatus::STATUS_ACCEPTED};

std::mutex handle_mutex_;
};

客户端send_goal_request()调用通过Goal Service发送目标请求,具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// src/ros2/rclcpp/rclcpp_action/src/client.cpp
void
ClientBase::send_goal_request(std::shared_ptr<void> request, ResponseCallback callback)
{
std::unique_lock<std::mutex> guard(pimpl_->goal_requests_mutex);
int64_t sequence_number;
// 发送请求到rcl层
rcl_ret_t ret = rcl_action_send_goal_request(
pimpl_->client_handle.get(), request.get(), &sequence_number);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send goal request");
}
assert(pimpl_->pending_goal_responses.count(sequence_number) == 0);
// 建立序列号到callback映射
pimpl_->pending_goal_responses[sequence_number] = callback;
}

动作服务器客户端通过ClientBase调用rcl_action_send_goal_request()将goal发送出去,发送后函数返回代表goal的序列号,动作服务器依据此序列号,建立序列号到callback映射,以便在goal反馈后,执行对应callback函数。

动作服务器服务端接收到Goal请求

我们之前说过,动作服务器本身是一个Waitable,当请求到来时,会触发Waitable的数据处理函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// src/ros2/rclcpp/rclcpp/src/rclcpp/executor.cpp
void
Executor::execute_any_executable(AnyExecutable & any_exec)
{
if (!spinning.load()) {
return;
}
....//其他代码逻辑
if (any_exec.waitable) {
any_exec.waitable->execute(any_exec.data);
}
// Reset the callback_group, regardless of type
any_exec.callback_group->can_be_taken_from().store(true);
....//其他代码逻辑
}

bool
Executor::get_next_ready_executable_from_map(
AnyExecutable & any_executable,
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
weak_groups_to_nodes)
{
TRACEPOINT(rclcpp_executor_get_next_ready);
bool success = false;
std::lock_guard<std::mutex> guard{mutex_};
......//其他代码逻辑
if (!success) {
// Check the waitables to see if there are any that are ready
memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes);
if (any_executable.waitable) {
any_executable.data = any_executable.waitable->take_data();
success = true;
}
}
......// 其他代码逻辑
if (success) {
// If it is valid, check to see if the group is mutually exclusive or
// not, then mark it accordingly ..Check if the callback_group belongs to this executor
if (any_executable.callback_group && any_executable.callback_group->type() == \
CallbackGroupType::MutuallyExclusive)
{
// It should not have been taken otherwise
assert(any_executable.callback_group->can_be_taken_from().load());
// Set to false to indicate something is being run from this group
// This is reset to true either when the any_executable is executed or when the
// any_executable is destructued
any_executable.callback_group->can_be_taken_from().store(false);
}
}
// If there is no ready executable, return false
return success;
}

当动作服务器客户端请求到来,首先get_next_executable()调用get_next_ready_executable_from_map(),执行Waitable的take_data()首先根据到来请求的类型,做数据处理,然后execute_any_executable()执行Waitable的execute()函数,这两者代码如下,由于代码较长,直接在代码中注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
std::shared_ptr<void>
ServerBase::take_data()
{
size_t next_ready_event = pimpl_->next_ready_event.exchange(ServerBaseImpl::NO_EVENT_READY);

if (next_ready_event == ServerBaseImpl::NO_EVENT_READY) {
throw std::runtime_error("Taking data from action server but no ready event");
}
// 调用take_data_by_entity_id处理数据
return take_data_by_entity_id(next_ready_event);
}

std::shared_ptr<void>
ServerBase::take_data_by_entity_id(size_t id)
{
.....//校验逻辑

std::shared_ptr<ServerBaseData> data_ptr;
// Mark as ready the entity from which we want to take data
switch (static_cast<ActionEventType>(id)) {
// GoalService类型
//
case ActionEventType::GoalService:
{
rcl_ret_t ret;
rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info();
rmw_request_id_t request_header;

std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);

// 调用rcl层处理goal请求,并返回request_header
std::shared_ptr<void> message = create_goal_request();
ret = rcl_action_take_goal_request(
pimpl_->action_server_.get(),
&request_header,
message.get());

// 构建ServerBaseData
data_ptr = std::make_shared<ServerBaseData>(
ServerBaseData::GoalRequestData(ret, goal_info, request_header, message));
}
break;
case ActionEventType::ResultService:
{
rcl_ret_t ret;
// 调用rcl层处理goal请求,并返回request_header
rmw_request_id_t request_header;
std::shared_ptr<void> result_request = create_result_request();
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_take_result_request(
pimpl_->action_server_.get(), &request_header, result_request.get());

// 构建ServerBaseData
data_ptr =
std::make_shared<ServerBaseData>(
ServerBaseData::ResultRequestData(ret, result_request, request_header));
}
break;
case ActionEventType::CancelService:
{
rcl_ret_t ret;
rmw_request_id_t request_header;

// Initialize cancel request
auto request = std::make_shared<action_msgs::srv::CancelGoal::Request>();

// 调用rcl层处理goal请求,并返回request_header
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_take_cancel_request(
pimpl_->action_server_.get(),
&request_header,
request.get());
// 调用rcl层处理goal请求,并返回request_header
data_ptr =
std::make_shared<ServerBaseData>(
ServerBaseData::CancelRequestData(ret, request, request_header));
}
break;
case ActionEventType::Expired:
{
data_ptr =
std::make_shared<ServerBaseData>(ServerBaseData::GoalExpiredData());
}
break;
}

return std::static_pointer_cast<void>(data_ptr);
}

void
ServerBase::execute(std::shared_ptr<void> & data_in)
{
if (!data_in) {
throw std::runtime_error("Executing action server but 'data' is empty");
}

// 首先获取到ServerBaseData
std::shared_ptr<ServerBaseData> data_ptr = std::static_pointer_cast<ServerBaseData>(data_in);

// 分发请求
std::visit(
[&](auto && data) -> void {
using T = std::decay_t<decltype(data)>;
if constexpr (std::is_same_v<T, ServerBaseData::GoalRequestData>) {
execute_goal_request_received(data_in);
}
if constexpr (std::is_same_v<T, ServerBaseData::CancelRequestData>) {
execute_cancel_request_received(data_in);
}
if constexpr (std::is_same_v<T, ServerBaseData::ResultRequestData>) {
execute_result_request_received(data_in);
}
if constexpr (std::is_same_v<T, ServerBaseData::GoalExpiredData>) {
execute_check_expired_goals();
}
},
data_ptr->data);
}

在获取到动作服务器Goal请求后,服务端先建立包含GoalRequestData类型的ServerBaseData,然后调用execute_goal_request_received()执行回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
void
ServerBase::execute_goal_request_received(std::shared_ptr<void> & data)
{
// 解包数据
std::shared_ptr<ServerBaseData> data_ptr = std::static_pointer_cast<ServerBaseData>(data);
const ServerBaseData::GoalRequestData & gData(
std::get<ServerBaseData::GoalRequestData>(data_ptr->data));

rcl_ret_t ret = std::get<0>(gData);
rcl_action_goal_info_t goal_info = std::get<1>(gData);
rmw_request_id_t request_header = std::get<2>(gData);
const std::shared_ptr<void> message = std::get<3>(gData);

if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
// This happens when a client shuts down and connext receives a sample saying the client is
// no longer alive.
return;
} else if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}

// 根据goal信息,获取UUID
GoalUUID uuid = get_goal_id_from_goal_request(message.get());
convert(uuid, &goal_info);

// 调用用户的回调函数
auto response_pair = call_handle_goal_callback(uuid, message);

// 调用rcl给客户端反馈服务端响应
// 注意:给动作服务器客户端反馈是在调用了用户的goal_handle_后,在goal_accepted_前
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_send_goal_response(
pimpl_->action_server_.get(),
&request_header,
response_pair.second.get());
}

if (RCL_RET_OK != ret) {
if (ret == RCL_RET_TIMEOUT) {
RCLCPP_WARN(
pimpl_->logger_,
"Failed to send goal response %s (timeout): %s",
to_string(uuid).c_str(), rcl_get_error_string().str);
rcl_reset_error();
return;
} else {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}

const auto status = response_pair.first;

// if goal is accepted, create a goal handle, and store it
if (GoalResponse::ACCEPT_AND_EXECUTE == status || GoalResponse::ACCEPT_AND_DEFER == status) {
RCLCPP_DEBUG(pimpl_->logger_, "Accepted goal %s", to_string(uuid).c_str());
// rcl_action will set time stamp
auto deleter = [](rcl_action_goal_handle_t * ptr)
{
if (nullptr != ptr) {
rcl_ret_t fail_ret = rcl_action_goal_handle_fini(ptr);
if (RCL_RET_OK != fail_ret) {
RCLCPP_DEBUG(
rclcpp::get_logger("rclcpp_action"),
"failed to fini rcl_action_goal_handle_t in deleter");
}
delete ptr;
}
};
// 当Goal被用户接受,调用rcl创建Goal Handle
rcl_action_goal_handle_t * rcl_handle;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_handle = rcl_action_accept_new_goal(pimpl_->action_server_.get(), &goal_info);
}
if (!rcl_handle) {
throw std::runtime_error("Failed to accept new goal\n");
}

std::shared_ptr<rcl_action_goal_handle_t> handle(new rcl_action_goal_handle_t, deleter);
// Copy out goal handle since action server storage disappears when it is fini'd
*handle = *rcl_handle;

// 将Goal Handle根据UUID存入Server
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_handles_[uuid] = handle;
}

// 更新底层rcl,表示Goal正在执行
if (GoalResponse::ACCEPT_AND_EXECUTE == status) {
// Change status to executing
ret = rcl_action_update_goal_state(handle.get(), GOAL_EVENT_EXECUTE);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}
// publish status since a goal's state has changed (was accepted or has begun execution)
// 将Goal状态发不出去,这个就会使用服务端建立Status Publisher,这个暂不关注
publish_status();

// 调用用户自定义Accepted回调函数
call_goal_accepted_callback(handle, uuid, message);
}
data.reset();
}

如上述代码,执行器在收到Goal后,首先调用call_handle_goal_callback(),该函数内部调用handle_goal_(),就是我们MinimalActionServer::handle_goal()。然后动作服务器给客户端发送响应。同时开始处理Goal目标,如果接受(Accepted)Goal目标,那么就建立Goal Handle,并通过call_goal_accepted_callback()调用用户handle_accepted_()回调。

给动作服务器客户端反馈是在调用了用户的goal_handle_后,在goal_accepted_前

动作服务器服务端发送到Goal回应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
template<typename ActionT>
class Server : public ServerBase, public std::enable_shared_from_this<Server<ActionT>>
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Server)

/// Signature of a callback that accepts or rejects goal requests.
using GoalCallback = std::function<GoalResponse(
const GoalUUID &, std::shared_ptr<const typename ActionT::Goal>)>;
/// Signature of a callback that accepts or rejects requests to cancel a goal.
using CancelCallback = std::function<CancelResponse(std::shared_ptr<ServerGoalHandle<ActionT>>)>;
/// Signature of a callback that is used to notify when the goal has been accepted.
using AcceptedCallback = std::function<void (std::shared_ptr<ServerGoalHandle<ActionT>>)>;

protected:
std::pair<GoalResponse, std::shared_ptr<void>>
call_handle_goal_callback(GoalUUID & uuid, std::shared_ptr<void> message) override
{
auto request = std::static_pointer_cast<
typename ActionT::Impl::SendGoalService::Request>(message);
auto goal = std::shared_ptr<typename ActionT::Goal>(request, &request->goal);
// 调用用户自定义的回调函数
GoalResponse user_response = handle_goal_(uuid, goal);

// 如果返回状态为GoalResponse::ACCEPT_AND_EXECUTE或者GoalResponse::ACCEPT_AND_DEFER,表示Goal被接受
auto ros_response = std::make_shared<typename ActionT::Impl::SendGoalService::Response>();
ros_response->accepted = GoalResponse::ACCEPT_AND_EXECUTE == user_response ||
GoalResponse::ACCEPT_AND_DEFER == user_response;
return std::make_pair(user_response, ros_response);
}
};

如果Goal被用户决定了Accepted,程序会调用用户自定义的Accepted回调handle_goal_(),也就是MinimalActionServer::handle_accepted(),来做用户自定义后处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
template<typename ActionT>
class Server : public ServerBase, public std::enable_shared_from_this<Server<ActionT>>
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Server)

/// Signature of a callback that accepts or rejects goal requests.
using GoalCallback = std::function<GoalResponse(
const GoalUUID &, std::shared_ptr<const typename ActionT::Goal>)>;
/// Signature of a callback that accepts or rejects requests to cancel a goal.
using CancelCallback = std::function<CancelResponse(std::shared_ptr<ServerGoalHandle<ActionT>>)>;
/// Signature of a callback that is used to notify when the goal has been accepted.
using AcceptedCallback = std::function<void (std::shared_ptr<ServerGoalHandle<ActionT>>)>;

protected:
/// \internal
void
call_goal_accepted_callback(
std::shared_ptr<rcl_action_goal_handle_t> rcl_goal_handle,
GoalUUID uuid, std::shared_ptr<void> goal_request_message) override
{
std::shared_ptr<ServerGoalHandle<ActionT>> goal_handle;
std::weak_ptr<Server<ActionT>> weak_this = this->shared_from_this();

std::function<void(const GoalUUID &, std::shared_ptr<void>)> on_terminal_state =
[weak_this](const GoalUUID & goal_uuid, std::shared_ptr<void> result_message)
{
std::shared_ptr<Server<ActionT>> shared_this = weak_this.lock();
if (!shared_this) {
return;
}
// Send result message to anyone that asked
shared_this->publish_result(goal_uuid, result_message);
// Publish a status message any time a goal handle changes state
shared_this->publish_status();
// notify base so it can recalculate the expired goal timer
shared_this->notify_goal_terminal_state();
// Delete data now (ServerBase and rcl_action_server_t keep data until goal handle expires)
std::lock_guard<std::mutex> lock(shared_this->goal_handles_mutex_);
shared_this->goal_handles_.erase(goal_uuid);
};

std::function<void(const GoalUUID &)> on_executing =
[weak_this](const GoalUUID & goal_uuid)
{
std::shared_ptr<Server<ActionT>> shared_this = weak_this.lock();
if (!shared_this) {
return;
}
(void)goal_uuid;
// Publish a status message any time a goal handle changes state
shared_this->publish_status();
};

std::function<void(std::shared_ptr<typename ActionT::Impl::FeedbackMessage>)> publish_feedback =
[weak_this](std::shared_ptr<typename ActionT::Impl::FeedbackMessage> feedback_msg)
{
std::shared_ptr<Server<ActionT>> shared_this = weak_this.lock();
if (!shared_this) {
return;
}
shared_this->publish_feedback(std::static_pointer_cast<void>(feedback_msg));
};

auto request = std::static_pointer_cast<
const typename ActionT::Impl::SendGoalService::Request>(goal_request_message);
auto goal = std::shared_ptr<const typename ActionT::Goal>(request, &request->goal);
goal_handle.reset(
new ServerGoalHandle<ActionT>(
rcl_goal_handle, uuid, goal, on_terminal_state, on_executing, publish_feedback));
{
std::lock_guard<std::mutex> lock(goal_handles_mutex_);
goal_handles_[uuid] = goal_handle;
}

// 调用handle_accepted_,注意这里goal_handle,作为参数传递到函数中
handle_accepted_(goal_handle);
}
};

动作服务器客户端接收到Goal回应

由于动作服务器客户端也是Waitable,其在执行器层处理是一致的,只不过执行器调用的是动作服务器客户端的take_data()和execute()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// src/ros2/rclcpp/rclcpp_action/src/client.cpp
std::shared_ptr<void>
ClientBase::take_data()
{
// next_ready_event is an atomic, caching localy
size_t next_ready_event = pimpl_->next_ready_event.exchange(ClientBaseImpl::NO_EVENT_READY);

if (next_ready_event == ClientBaseImpl::NO_EVENT_READY) {
throw std::runtime_error("Taking data from action client but no ready event");
}

return take_data_by_entity_id(next_ready_event);
}

std::shared_ptr<void>
ClientBase::take_data_by_entity_id(size_t id)
{
std::shared_ptr<ClientBaseData> data_ptr;
rcl_ret_t ret;

// Mark as ready the entity from which we want to take data
switch (static_cast<EntityType>(id)) {
// Goal响应
case EntityType::GoalClient:
{
rmw_request_id_t response_header;
std::shared_ptr<void> goal_response;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_client_mutex_);

goal_response = this->create_goal_response();
ret = rcl_action_take_goal_response(
pimpl_->client_handle.get(), &response_header, goal_response.get());
}
data_ptr = std::make_shared<ClientBaseData>(
ClientBaseData::GoalResponseData(
ret, response_header, goal_response));
}
break;
case EntityType::ResultClient:
{
rmw_request_id_t response_header;
std::shared_ptr<void> result_response;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_client_mutex_);
result_response = this->create_result_response();
ret = rcl_action_take_result_response(
pimpl_->client_handle.get(), &response_header, result_response.get());
}
data_ptr =
std::make_shared<ClientBaseData>(
ClientBaseData::ResultResponseData(
ret, response_header, result_response));
}
break;
case EntityType::CancelClient:
{
rmw_request_id_t response_header;
std::shared_ptr<void> cancel_response;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_client_mutex_);
cancel_response = this->create_cancel_response();
ret = rcl_action_take_cancel_response(
pimpl_->client_handle.get(), &response_header, cancel_response.get());
}
data_ptr =
std::make_shared<ClientBaseData>(
ClientBaseData::CancelResponseData(
ret, response_header, cancel_response));
}
break;
case EntityType::FeedbackSubscription:
{
std::shared_ptr<void> feedback_message;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_client_mutex_);
feedback_message = this->create_feedback_message();
ret = rcl_action_take_feedback(
pimpl_->client_handle.get(), feedback_message.get());
}
data_ptr =
std::make_shared<ClientBaseData>(
ClientBaseData::FeedbackReadyData(
ret, feedback_message));
}
break;
case EntityType::StatusSubscription:
{
std::shared_ptr<void> status_message;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_client_mutex_);
status_message = this->create_status_message();
ret = rcl_action_take_status(
pimpl_->client_handle.get(), status_message.get());
}
data_ptr =
std::make_shared<ClientBaseData>(
ClientBaseData::StatusReadyData(
ret, status_message));
}
break;
}

return std::static_pointer_cast<void>(data_ptr);
}

void
ClientBase::execute(std::shared_ptr<void> & data_in)
{
if (!data_in) {
throw std::runtime_error("Executing action client but 'data' is empty");
}

std::shared_ptr<ClientBaseData> data_ptr = std::static_pointer_cast<ClientBaseData>(data_in);

std::visit(
[&](auto && data) -> void {
using T = std::decay_t<decltype(data)>;
if constexpr (std::is_same_v<T, ClientBaseData::FeedbackReadyData>) {
if (RCL_RET_OK == data.ret) {
this->handle_feedback_message(data.feedback_message);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking feedback");
}
}
if constexpr (std::is_same_v<T, ClientBaseData::StatusReadyData>) {
if (RCL_RET_OK == data.ret) {
this->handle_status_message(data.status_message);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking status");
}
}
if constexpr (std::is_same_v<T, ClientBaseData::GoalResponseData>) {
if (RCL_RET_OK == data.ret) {
this->handle_goal_response(data.response_header, data.goal_response);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking goal response");
}
}
if constexpr (std::is_same_v<T, ClientBaseData::ResultResponseData>) {
if (RCL_RET_OK == data.ret) {
this->handle_result_response(data.response_header, data.result_response);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking result response");
}
}
if constexpr (std::is_same_v<T, ClientBaseData::CancelResponseData>) {
if (RCL_RET_OK == data.ret) {
this->handle_cancel_response(data.response_header, data.cancel_response);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking cancel response");
}
}
}, data_ptr->data);
}

整个过程类似于服务端,构建ClientBaseData,并在execute()执行handle_goal_response函数,该函数知道了Goal被服务端正确处理。此时客户端调用之前send_goal_request()注册回调函数,然后将请求从pending列表中删除:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void
ClientBase::handle_goal_response(
const rmw_request_id_t & response_header,
std::shared_ptr<void> response)
{
std::lock_guard<std::mutex> guard(pimpl_->goal_requests_mutex);
const int64_t & sequence_number = response_header.sequence_number;
if (pimpl_->pending_goal_responses.count(sequence_number) == 0) {
RCLCPP_ERROR(pimpl_->logger, "unknown goal response, ignoring...");
return;
}
// 这里回调send_goal_request() 注册回调函数
pimpl_->pending_goal_responses[sequence_number](response);
pimpl_->pending_goal_responses.erase(sequence_number);
}

我们回顾下send_goal_request()回调函数:

  • 如果Goal被拒绝了,就用nullptr调用options.goal_response_callback()
  • 如果Goal被接受了,就用Goal 反馈调用options.goal_response_callback()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
std::shared_future<typename GoalHandle::SharedPtr>
async_send_goal(const Goal & goal, const SendGoalOptions & options = SendGoalOptions())
{
// Put promise in the heap to move it around.
auto promise = std::make_shared<std::promise<typename GoalHandle::SharedPtr>>();
std::shared_future<typename GoalHandle::SharedPtr> future(promise->get_future());
using GoalRequest = typename ActionT::Impl::SendGoalService::Request;
auto goal_request = std::make_shared<GoalRequest>();
goal_request->goal_id.uuid = this->generate_goal_id();
goal_request->goal = goal;
this->send_goal_request(
std::static_pointer_cast<void>(goal_request),
[this, goal_request, options, promise](std::shared_ptr<void> response) mutable
{
using GoalResponse = typename ActionT::Impl::SendGoalService::Response;
auto goal_response = std::static_pointer_cast<GoalResponse>(response);
if (!goal_response->accepted) {
promise->set_value(nullptr);
if (options.goal_response_callback) {
options.goal_response_callback(nullptr);
}
return;
}
GoalInfo goal_info;
goal_info.goal_id.uuid = goal_request->goal_id.uuid;
goal_info.stamp = goal_response->stamp;
// Do not use std::make_shared as friendship cannot be forwarded.
std::shared_ptr<GoalHandle> goal_handle(
new GoalHandle(goal_info, options.feedback_callback, options.result_callback));
{
std::lock_guard<std::mutex> guard(goal_handles_mutex_);
goal_handles_[goal_handle->get_goal_id()] = goal_handle;
}
promise->set_value(goal_handle);
if (options.goal_response_callback) {
options.goal_response_callback(goal_handle);
}

if (options.result_callback) {
this->make_result_aware(goal_handle);
}
});

....//其他代码逻辑

return future;
}

动作服务器客户端发送Result请求

在调用options.goal_response_callback()响应回调后,make_result_aware()会发送Result请求,并注册Result响应回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// src/ros2/rclcpp/rclcpp_action/include/rclcpp_action/client.hpp
template<typename ActionT>
class Client : public ClientBase
{
void
make_result_aware(typename GoalHandle::SharedPtr goal_handle)
{
void
ClientBase::send_result_request(std::shared_ptr<void> request, ResponseCallback callback)
{
std::lock_guard<std::mutex> guard(pimpl_->result_requests_mutex);
int64_t sequence_number;
rcl_ret_t ret = rcl_action_send_result_request(
pimpl_->client_handle.get(), request.get(), &sequence_number);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send result request");
}
assert(pimpl_->pending_result_responses.count(sequence_number) == 0);
pimpl_->pending_result_responses[sequence_number] = callback;
}

// Avoid making more than one request
if (goal_handle->set_result_awareness(true)) {
return;
}
using GoalResultRequest = typename ActionT::Impl::GetResultService::Request;
auto goal_result_request = std::make_shared<GoalResultRequest>();
goal_result_request->goal_id.uuid = goal_handle->get_goal_id();
try {
// 异步发送Result请求
this->send_result_request(
std::static_pointer_cast<void>(goal_result_request),
[goal_handle, this](std::shared_ptr<void> response) mutable
{
// Wrap the response in a struct with the fields a user cares about
WrappedResult wrapped_result;
using GoalResultResponse = typename ActionT::Impl::GetResultService::Response;
auto result_response = std::static_pointer_cast<GoalResultResponse>(response);
wrapped_result.result = std::make_shared<typename ActionT::Result>();
*wrapped_result.result = result_response->result;
wrapped_result.goal_id = goal_handle->get_goal_id();
wrapped_result.code = static_cast<ResultCode>(result_response->status);
goal_handle->set_result(wrapped_result);
std::lock_guard<std::mutex> lock(goal_handles_mutex_);
goal_handles_.erase(goal_handle->get_goal_id());
});
} catch (rclcpp::exceptions::RCLError & ex) {
// This will cause an exception when the user tries to access the result
goal_handle->invalidate(exceptions::UnawareGoalHandleError(ex.message));
}
}
};

整个过程和Goal请求类似,这里就不在赘述了,回调函数保存在pimpl_->pending_result_responses。

动作服务器服务端接收Result请求

当动作服务器服务端接收到来自客户端请求,同样会触发execute()执行,进而调用到execute_result_request_received():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
std::shared_ptr<void>
ServerBase::take_data_by_entity_id(size_t id)
{
.......//其他代码逻辑
std::shared_ptr<ServerBaseData> data_ptr;
// Mark as ready the entity from which we want to take data
switch (static_cast<ActionEventType>(id)) {
.......//其他代码逻辑
case ActionEventType::ResultService:
{
rcl_ret_t ret;
// Get the result request message
rmw_request_id_t request_header;
std::shared_ptr<void> result_request = create_result_request();
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_take_result_request(
pimpl_->action_server_.get(), &request_header, result_request.get());

data_ptr =
std::make_shared<ServerBaseData>(
ServerBaseData::ResultRequestData(ret, result_request, request_header));
}
break;
.......//其他代码逻辑
}

return std::static_pointer_cast<void>(data_ptr);
}

void
ServerBase::execute(std::shared_ptr<void> & data_in)
{
if (!data_in) {
throw std::runtime_error("Executing action server but 'data' is empty");
}

std::shared_ptr<ServerBaseData> data_ptr = std::static_pointer_cast<ServerBaseData>(data_in);

std::visit(
[&](auto && data) -> void {
using T = std::decay_t<decltype(data)>;
if constexpr (std::is_same_v<T, ServerBaseData::GoalRequestData>) {
execute_goal_request_received(data_in);
}
if constexpr (std::is_same_v<T, ServerBaseData::CancelRequestData>) {
execute_cancel_request_received(data_in);
}
if constexpr (std::is_same_v<T, ServerBaseData::ResultRequestData>) {
execute_result_request_received(data_in);
}
if constexpr (std::is_same_v<T, ServerBaseData::GoalExpiredData>) {
execute_check_expired_goals();
}
},
data_ptr->data);
}

execute_result_request_received()处理请求数据,注意当对应的Goal没有找到或者结果已经存在,就直接反馈,否则需要等到结果计算出来,才能回复。这里先把request存起来,因为接下来要发送Feedback。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
void
ServerBase::execute_result_request_received(std::shared_ptr<void> & data)
{
std::shared_ptr<ServerBaseData> data_ptr = std::static_pointer_cast<ServerBaseData>(data);
const ServerBaseData::ResultRequestData & gData(
std::get<ServerBaseData::ResultRequestData>(data_ptr->data));

rcl_ret_t ret = std::get<0>(gData);
std::shared_ptr<void> result_request = std::get<1>(gData);
rmw_request_id_t request_header = std::get<2>(gData);

if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
// This happens when a client shuts down and connext receives a sample saying the client is
// no longer alive.
return;
} else if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}

std::shared_ptr<void> result_response;

// 根据uuid检查Goal是否存在
GoalUUID uuid = get_goal_id_from_result_request(result_request.get());
rcl_action_goal_info_t goal_info;
convert(uuid, &goal_info);
bool goal_exists;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
}
if (!goal_exists) {
// 如果goal不存在,则返回STATUS_UNKNOWN表示错误
result_response = create_result_response(action_msgs::msg::GoalStatus::STATUS_UNKNOWN);
} else {
// 如果Goal存在,并且结果也存在(多次请求?),也立刻返回结果
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
auto iter = pimpl_->goal_results_.find(uuid);
if (iter != pimpl_->goal_results_.end()) {
result_response = iter->second;
} else {
// 如果Goal存在,且没有结果,等待结果计算
pimpl_->result_requests_[uuid].push_back(request_header);
}
}

// 如果Goal不存在或者Result已存在,立刻发送响应返回
if (result_response) {
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t rcl_ret = rcl_action_send_result_response(
pimpl_->action_server_.get(), &request_header, result_response.get());
if (rcl_ret == RCL_RET_TIMEOUT) {
RCLCPP_WARN(
pimpl_->logger_,
"Failed to send result response %s (timeout): %s",
to_string(uuid).c_str(), rcl_get_error_string().str);
rcl_reset_error();
return;
}
if (RCL_RET_OK != rcl_ret) {
rclcpp::exceptions::throw_from_rcl_error(rcl_ret);
}
}
data.reset();
}

动作服务器服务端计算并Feedback

在动作服务器正在计算时,可以通过publish_feedback()发送中间结果反馈,可以通过succeed()告知计算结果已经完成:

1
2
3
4
5
// 反馈中间结果
goal_handle->publish_feedback(feedback);

// 发送最后计算结果
goal_handle->succeed(result);

其中feedback反馈是通过话题发送回客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
template<typename ActionT>
class ServerGoalHandle : public ServerGoalHandleBase
{
public:
/// Send an update about the progress of a goal.
void
publish_feedback(std::shared_ptr<typename ActionT::Feedback> feedback_msg)
{
auto feedback_message = std::make_shared<typename ActionT::Impl::FeedbackMessage>();
feedback_message->goal_id.uuid = uuid_;
feedback_message->feedback = *feedback_msg;
// 这里publish_feedback_对应于ServerBase::publish_feedback
// src/ros2/rclcpp/rclcpp_action/include/rclcpp_action/server.hpp:call_goal_accepted_callback()
publish_feedback_(feedback_message);
}
......//其他代码逻辑
};

//src/ros2/rclcpp/rclcpp_action/src/server.cpp
void
ServerBase::publish_feedback(std::shared_ptr<void> feedback_msg)
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_publish_feedback(pimpl_->action_server_.get(), feedback_msg.get());
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to publish feedback");
}
}

// src/ros2/rcl/rcl_action/src/rcl_action/action_server.c
rcl_ret_t
rcl_action_publish_feedback(
const rcl_action_server_t * action_server,
void * ros_feedback)
{
if (!rcl_action_server_is_valid(action_server)) {
return RCL_RET_ACTION_SERVER_INVALID; // error already set
}
RCL_CHECK_ARGUMENT_FOR_NULL(ros_feedback, RCL_RET_INVALID_ARGUMENT);
// 这里调用rcl层直接发送话题到客户端
rcl_ret_t ret = rcl_publish(&action_server->impl->feedback_publisher, ros_feedback, NULL);
if (RCL_RET_OK != ret) {
return RCL_RET_ERROR; // error already set
}
return RCL_RET_OK;
}

动作服务器客户端接收Feedback

在接收到Feedback的时候,处理过程类似于Goal响应过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// src/ros2/rclcpp/rclcpp_action/src/client.cpp
void
ClientBase::execute(std::shared_ptr<void> & data_in)
{
if (!data_in) {
throw std::runtime_error("Executing action client but 'data' is empty");
}

std::shared_ptr<ClientBaseData> data_ptr = std::static_pointer_cast<ClientBaseData>(data_in);

std::visit(
[&](auto && data) -> void {
using T = std::decay_t<decltype(data)>;
if constexpr (std::is_same_v<T, ClientBaseData::FeedbackReadyData>) {
if (RCL_RET_OK == data.ret) {
this->handle_feedback_message(data.feedback_message);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking feedback");
}
}
......//其他代码逻辑
}, data_ptr->data);
}

// src/ros2/rclcpp/rclcpp_action/include/rclcpp_action/client.hpp
template<typename ActionT>
class Client : public ClientBase
{
void
handle_feedback_message(std::shared_ptr<void> message) override
{
std::lock_guard<std::mutex> guard(goal_handles_mutex_);
using FeedbackMessage = typename ActionT::Impl::FeedbackMessage;
typename FeedbackMessage::SharedPtr feedback_message =
std::static_pointer_cast<FeedbackMessage>(message);
const GoalUUID & goal_id = feedback_message->goal_id.uuid;
if (goal_handles_.count(goal_id) == 0) {
RCLCPP_DEBUG(
this->get_logger(),
"Received feedback for unknown goal. Ignoring...");
return;
}
typename GoalHandle::SharedPtr goal_handle = goal_handles_[goal_id].lock();
// Forget about the goal if there are no more user references
if (!goal_handle) {
RCLCPP_DEBUG(
this->get_logger(),
"Dropping weak reference to goal handle during feedback callback");
goal_handles_.erase(goal_id);
return;
}

// 调用用户自定义feedback回调函数
auto feedback = std::make_shared<Feedback>();
*feedback = feedback_message->feedback;
goal_handle->call_feedback_callback(goal_handle, feedback);
}
};

接收到feedback,客户端会调用用户自定义feedback回调,也就是MinimalActionClient::feedback_callback()。

动作服务器服务端发送Result回应

当最终的结果计算成后,succeed()负责将结果传递给服务端,并反馈给客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 
// src/ros2/rclcpp/rclcpp_action/src/server_goal_handle.cpp
void
ServerGoalHandleBase::_succeed()
{
std::lock_guard<std::mutex> lock(rcl_handle_mutex_);
rcl_ret_t ret = rcl_action_update_goal_state(rcl_handle_.get(), GOAL_EVENT_SUCCEED);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}

// src/ros2/rclcpp/rclcpp_action/include/rclcpp_action/server_goal_handle.hpp
template<typename ActionT>
class ServerGoalHandle : public ServerGoalHandleBase
{
public:
void
succeed(typename ActionT::Result::SharedPtr result_msg)
{
// 更新rcl内部goal状态
_succeed();
auto response = std::make_shared<typename ActionT::Impl::GetResultService::Response>();
response->status = action_msgs::msg::GoalStatus::STATUS_SUCCEEDED;
response->result = *result_msg;

// src/ros2/rclcpp/rclcpp_action/include/rclcpp_action/server.hpp:call_goal_accepted_callback()
on_terminal_state_(uuid_, response);
}
};

on_terminal_state_是在ServerGoalHandle构建的时候填充的,回调函数在call_goal_accepted_callback()定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
template<typename ActionT>
class Server : public ServerBase, public std::enable_shared_from_this<Server<ActionT>>
{
public:
/// \internal
void
call_goal_accepted_callback(
std::shared_ptr<rcl_action_goal_handle_t> rcl_goal_handle,
GoalUUID uuid, std::shared_ptr<void> goal_request_message) override
{
std::shared_ptr<ServerGoalHandle<ActionT>> goal_handle;
std::weak_ptr<Server<ActionT>> weak_this = this->shared_from_this();

std::function<void(const GoalUUID &, std::shared_ptr<void>)> on_terminal_state =
[weak_this](const GoalUUID & goal_uuid, std::shared_ptr<void> result_message)
{
std::shared_ptr<Server<ActionT>> shared_this = weak_this.lock();
if (!shared_this) {
return;
}
// Send result message to anyone that asked
shared_this->publish_result(goal_uuid, result_message);
// Publish a status message any time a goal handle changes state
shared_this->publish_status();
// notify base so it can recalculate the expired goal timer
shared_this->notify_goal_terminal_state();
// Delete data now (ServerBase and rcl_action_server_t keep data until goal handle expires)
std::lock_guard<std::mutex> lock(shared_this->goal_handles_mutex_);
shared_this->goal_handles_.erase(goal_uuid);
};
......// 其他代码逻辑
}
};

回调函数中将结果反馈给客户端,并且从goal_handles_移除当前goal,因为已经计算完毕了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void
ServerBase::publish_result(const GoalUUID & uuid, std::shared_ptr<void> result_msg)
{
// 由于要发送结果给客户端,首先检查服务端Goal是否存在
rcl_action_goal_info_t goal_info;
convert(uuid, &goal_info);
bool goal_exists;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
}

// 不存在就抛出异常(不应该发生)
if (!goal_exists) {
throw std::runtime_error("Asked to publish result for goal that does not exist");
}

{
std::lock_guard<std::recursive_mutex> unordered_map_lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_results_[uuid] = result_msg;

// if there are clients who already asked for the result, send it to them
auto iter = pimpl_->result_requests_.find(uuid);
if (iter != pimpl_->result_requests_.end()) {
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
for (auto & request_header : iter->second) {

// 给客户端发送结果响应
rcl_ret_t ret = rcl_action_send_result_response(
pimpl_->action_server_.get(), &request_header, result_msg.get());
if (ret == RCL_RET_TIMEOUT) {
RCLCPP_WARN(
pimpl_->logger_,
"Failed to send result response %s (timeout): %s",
to_string(uuid).c_str(), rcl_get_error_string().str);
rcl_reset_error();
} else if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}
}
}
}

动作服务器客户端接收到Result回应

类似的动作服务器Goal处理回应过程,结果回应的处理如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// src/ros2/rclcpp/rclcpp_action/src/client.cpp
void
ClientBase::handle_result_response(
const rmw_request_id_t & response_header,
std::shared_ptr<void> response)
{
ResponseCallback response_callback;
{
std::lock_guard<std::mutex> guard(pimpl_->result_requests_mutex);
const int64_t & sequence_number = response_header.sequence_number;
if (pimpl_->pending_result_responses.count(sequence_number) == 0) {
RCLCPP_ERROR(pimpl_->logger, "unknown result response, ignoring...");
return;
}
response_callback = std::move(pimpl_->pending_result_responses[sequence_number]);
pimpl_->pending_result_responses.erase(sequence_number);
}
response_callback(response);
}

void
ClientBase::execute(std::shared_ptr<void> & data_in)
{
if (!data_in) {
throw std::runtime_error("Executing action client but 'data' is empty");
}

std::shared_ptr<ClientBaseData> data_ptr = std::static_pointer_cast<ClientBaseData>(data_in);

std::visit(
[&](auto && data) -> void {
using T = std::decay_t<decltype(data)>;
......//其他代码逻辑
if constexpr (std::is_same_v<T, ClientBaseData::ResultResponseData>) {
if (RCL_RET_OK == data.ret) {
this->handle_result_response(data.response_header, data.result_response);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking result response");
}
}
......//其他代码逻辑
}, data_ptr->data);
}

客户端首先擦除了pimpl_->pending_result_responses对应结果响应,然后调用response_callback(),该函数就是客户端注册的回调MinimalActionClient::result_callback(),即用户可以针对结果的回调函数。

动作服务器客户端发送Cancel请求

从之前代码可以看到,除了Goal Service和Result Service,还有一个Cancel Service,由于Cancel对于服务端影响较大,所以这里我们重点关注服务端行为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void
ServerBase::execute(std::shared_ptr<void> & data_in)
{
if (!data_in) {
throw std::runtime_error("Executing action server but 'data' is empty");
}

std::shared_ptr<ServerBaseData> data_ptr = std::static_pointer_cast<ServerBaseData>(data_in);

std::visit(
[&](auto && data) -> void {
using T = std::decay_t<decltype(data)>;
if constexpr (std::is_same_v<T, ServerBaseData::GoalRequestData>) {
execute_goal_request_received(data_in);
}
if constexpr (std::is_same_v<T, ServerBaseData::CancelRequestData>) {
execute_cancel_request_received(data_in);
}
if constexpr (std::is_same_v<T, ServerBaseData::ResultRequestData>) {
execute_result_request_received(data_in);
}
if constexpr (std::is_same_v<T, ServerBaseData::GoalExpiredData>) {
execute_check_expired_goals();
}
},
data_ptr->data);
}

当接收到Cancel消息后,动作服务器客户端则执行execute_result_request_received(),尝试取消当前正在进行的goal:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
void
ServerBase::execute_cancel_request_received(std::shared_ptr<void> & data)
{
std::shared_ptr<ServerBaseData> data_ptr = std::static_pointer_cast<ServerBaseData>(data);
const ServerBaseData::CancelRequestData & gData(
std::get<ServerBaseData::CancelRequestData>(data_ptr->data));

rcl_ret_t ret = std::get<0>(gData);
std::shared_ptr<action_msgs::srv::CancelGoal::Request> request = std::get<1>(gData);
rmw_request_id_t request_header = std::get<2>(gData);


if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
// This happens when a client shuts down and connext receives a sample saying the client is
// no longer alive.
return;
} else if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}

// Convert c++ message to C message
rcl_action_cancel_request_t cancel_request = rcl_action_get_zero_initialized_cancel_request();
convert(request->goal_info.goal_id.uuid, &cancel_request.goal_info);
cancel_request.goal_info.stamp.sec = request->goal_info.stamp.sec;
cancel_request.goal_info.stamp.nanosec = request->goal_info.stamp.nanosec;

// Get a list of goal info that should be attempted to be cancelled
rcl_action_cancel_response_t cancel_response = rcl_action_get_zero_initialized_cancel_response();

// 首先底层判断是否可以取消当前Goal,并且初始化cancel_response
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_process_cancel_request(
pimpl_->action_server_.get(),
&cancel_request,
&cancel_response);
}

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
// 这段代码在退出的时候执行,销毁cancel_response
RCPPUTILS_SCOPE_EXIT(
{
ret = rcl_action_cancel_response_fini(&cancel_response);
if (RCL_RET_OK != ret) {
RCLCPP_ERROR(pimpl_->logger_, "Failed to fini cancel response");
}
});

auto response = std::make_shared<action_msgs::srv::CancelGoal::Response>();

response->return_code = cancel_response.msg.return_code;
auto & goals = cancel_response.msg.goals_canceling;
// 对于每一个Goal,调用用户自定义的取消函数:MinimalActionServer::handle_cancel
// 这里有个疑问,会有多个Goal同时存在?
for (size_t i = 0; i < goals.size; ++i) {
const rcl_action_goal_info_t & goal_info = goals.data[i];
GoalUUID uuid;
convert(goal_info, &uuid);
auto response_code = call_handle_cancel_callback(uuid);
if (CancelResponse::ACCEPT == response_code) {
action_msgs::msg::GoalInfo cpp_info;
cpp_info.goal_id.uuid = uuid;
cpp_info.stamp.sec = goal_info.stamp.sec;
cpp_info.stamp.nanosec = goal_info.stamp.nanosec;
response->goals_canceling.push_back(cpp_info);
}
}

// If the user rejects all individual requests to cancel goals,
// then we consider the top-level cancel request as rejected.
if (goals.size >= 1u && 0u == response->goals_canceling.size()) {
response->return_code = action_msgs::srv::CancelGoal::Response::ERROR_REJECTED;
}

if (!response->goals_canceling.empty()) {
// 有Goal状态改变了,这里通知变更
publish_status();
}

// 发送取消反馈给客户端,告知客户端取消已经完成
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_send_cancel_response(
pimpl_->action_server_.get(), &request_header, response.get());
}

if (ret == RCL_RET_TIMEOUT) {
GoalUUID uuid = request->goal_info.goal_id.uuid;
RCLCPP_WARN(
pimpl_->logger_,
"Failed to send cancel response %s (timeout): %s",
to_string(uuid).c_str(), rcl_get_error_string().str);
rcl_reset_error();
return;
}
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
data.reset();
}

其中call_handle_cancel_callback(),调用用户自定义的回调函数handle_cancel_(),即用户注册的MinimalActionServer::handle_cancel()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
template<typename ActionT>
class Server : public ServerBase, public std::enable_shared_from_this<Server<ActionT>>
{
/// \internal
CancelResponse
call_handle_cancel_callback(const GoalUUID & uuid) override
{
std::shared_ptr<ServerGoalHandle<ActionT>> goal_handle;
{
std::lock_guard<std::mutex> lock(goal_handles_mutex_);
auto element = goal_handles_.find(uuid);
if (element != goal_handles_.end()) {
goal_handle = element->second.lock();
}
}

CancelResponse resp = CancelResponse::REJECT;
if (goal_handle) {
resp = handle_cancel_(goal_handle);
if (CancelResponse::ACCEPT == resp) {
try {
goal_handle->_cancel_goal();
} catch (const rclcpp::exceptions::RCLError & ex) {
RCLCPP_DEBUG(
rclcpp::get_logger("rclcpp_action"),
"Failed to cancel goal in call_handle_cancel_callback: %s", ex.what());
return CancelResponse::REJECT;
}
}
}
return resp;
}
};

至此动作服务器整体行为已经了解清楚,在细节上还有一些疑问,如Cancel如果发生在边界之类位置,会导致什么后果等,留待以后分析。


ROS2 Action详解
https://ziqingdream.github.io/2025/04/15/-1-机器人技术栈/-1-ROS2操作系统/-1-ROS2-Humble代码阅读/【3】ROS2 Action详解/
作者
工程课代表[B站] / ZiQingDream[Github]
发布于
2025年4月15日
许可协议