ROS源代码阅读——publish底层实现部分

在之前对ROS的源码的学习中,基本弄清楚了ROS的topic通信方式中,节点发布/订阅的机制和原理,可以说解释了节点与master之间的交流方式。但是对于节点与节点之间通信的具体过程,却一笔带过,所以这次通过再次阅读这部分源码,来明确节点在advertise之后,注册完成之后,publish消息时底层通信的逻辑和形式。

本次源码阅读主要通过dfs的方式来进行。首先是ROS wiki提供的发布者节点发布信息时的最表层调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include "ros/ros.h"
#include "std_msgs/String.h"
#include <sstream>

int main(int argc, char **argv)
{
ros::init(argc, argv, "talker");

ros::NodeHandle n;

ros::Publisher chatter_pub = n.advertise<std_msgs::String>("chatter", 1000);

ros::Rate loop_rate(10);
int count = 0;
while (ros::ok())
{
std_msgs::String msg;
std::stringstream ss;
ss << "hello world " << count;
msg.data = ss.str();
ROS_INFO("%s", msg.data.c_str());
chatter_pub.publish(msg);
ros::spinOnce();
loop_rate.sleep();
}

return 0;
}

可以看到,其中调用publish函数的正是之前我们使用nodehandle对象的advertise注册发布者时所返回的publisher类对象chatterpub,传入的参数时我们自定义的msg类型。
我们在roscpp包中寻找相关的代码,如下是在publisher.h中所定义的两种重载的模板函数,分别用来应传入参数类型不同的的两种情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
······
//共享指针类型的引用传参
template <typename M>
void publish(const boost::shared_ptr<M>& message) const
{
using namespace serialization;

if (!impl_)
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher");
return;
}

if (!impl_->isValid())
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
return;
}

ROS_ASSERT_MSG(impl_->md5sum_ == "*" || std::string(mt::md5sum<M>(*message)) == "*" || impl_->md5sum_ == mt::md5sum<M>(*message),
"Trying to publish message of type [%s/%s] on a publisher with type [%s/%s]",
mt::datatype<M>(*message), mt::md5sum<M>(*message),
impl_->datatype_.c_str(), impl_->md5sum_.c_str());

SerializedMessage m;
m.type_info = &typeid(M);
m.message = message;

// 这里使用ref函数是给bind绑定的参数传入ref()引用包装类型,使参数为引用传递;
// 使用bind应该是为了把message绑定给serializeMessage
publish(boost::bind(serializeMessage<M>, boost::ref(*message)), m);
}
// M类型的引用传参 泛型
template <typename M>
void publish(const M& message) const
{
using namespace serialization;
namespace mt = ros::message_traits;

if (!impl_)
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher");
return;
}

if (!impl_->isValid())
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
return;
}

ROS_ASSERT_MSG(impl_->md5sum_ == "*" || std::string(mt::md5sum<M>(message)) == "*" || impl_->md5sum_ == mt::md5sum<M>(message),
"Trying to publish message of type [%s/%s] on a publisher with type [%s/%s]",
mt::datatype<M>(message), mt::md5sum<M>(message),
impl_->datatype_.c_str(), impl_->md5sum_.c_str());

SerializedMessage m;
publish(boost::bind(serializeMessage<M>, boost::ref(message)), m);
}
······

他们最终都调用的publish函数在publisher.cpp中实现,具体如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    ········
// publish函数
void Publisher::publish(const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m) const
{
// impl是否存在
if (!impl_)
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
return;
}
// impl是否可用
if (!impl_->isValid())
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
return;
}
// topicManager的单例调用publish
TopicManager::instance()->publish(impl_->topic_, serfunc, m);
}
···········

其中的impl是publisher类在传参构造时的生成一个内部类对象,一般是由nodehandle类中的advertise函数,获取到ops的相关信息并传进来生成的impl
对象。也就是说在注册成功后才会存在的一个对象。
显然之后,Publisher.publish()函数让TopicManager的唯一实例调用了它的publish方法,我们在TopicManger.h函数中找到了相关代码,这也是一个模板函数,将message绑定给serializeMessage后,将topic、bind、m传给后面的具体实现函数
1
2
3
4
5
6
7
8
9
10
········
template<typename M>
void publish(const std::string& topic, const M& message)
{
using namespace serialization;

SerializedMessage m;
publish(topic, boost::bind(serializeMessage<M>, boost::ref(message)), m);
}
········

具体实现函数在topicmanager.cpp中。大概如我注释写的那样的流程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
······
// 发布消息
void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
{
// 递归互斥量,允许多次加锁在统一线程内
boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
// 是否shutdown
if (isShuttingDown())
{
return;
}
// 寻找publication是否已注册
PublicationPtr p = lookupPublicationWithoutLock(topic);
// 当publication有订阅者或者有latch属性,就执行下列代码
if (p->hasSubscribers() || p->isLatching())
{
ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());

// Determine what kinds of subscribers we're publishing to. If they're intraprocess with the same C++ type we can
// do a no-copy publish.
// 确认我们要发布给的订阅者是什么类别,如果是进程内的,并且使用者相同的c++类型,我们就可以进行no-copy模式的publish
// 这里定义两个标志nocopy和serialize
bool nocopy = false;
bool serialize = false;
// 如果msg的指针已经有了,而且我们有他的类型我们就可以使用no-copy的publish,否则我们就要进行序列化了
// We can only do a no-copy publish if a shared_ptr to the message is provided, and we have type information for it
if (m.type_info && m.message)
{
// 注意这里表面传值,实际是引用传参,会修改serialize和nocopy的值,
p->getPublishTypes(serialize, nocopy, *m.type_info);
}
else
{
serialize = true;
}
// 如果nocopy为false,就说明publish的消息没有进程内同c++的订阅者,我们就要reset他的msg,并初始化type_info ?
if (!nocopy)
{
m.message.reset();
m.type_info = 0;
}
// 如果序列化为true,并且p设置了latch属性,就设置msg,进行序列化
if (serialize || p->isLatching())
{
SerializedMessage m2 = serfunc();
m.buf = m2.buf;
m.num_bytes = m2.num_bytes;
m.message_start = m2.message_start;
}
// 让publication执行发布命令
p->publish(m);
// 如果我们序列化了msg,那么显然我们就要进行进程间通信,利用的就是poll_manager的信号槽机制,调用signal信号
// If we're not doing a serialized publish we don't need to signal the pollset. The write()
// call inside signal() is actually relatively expensive when doing a nocopy publish.
if (serialize)
{
poll_manager_->getPollSet().signal();
}
}
//如果publication既没有订阅者,也没有latch属性,就执行自增序列就好,sequence一般是分布式系统的唯一id,这里我先猜测他是用来表明新的发布的吧。
else
{
p->incrementSequence();
}
}
······

解释一下其中的一些变量。

  1. 首先是topicmanager本身,该类是每一个节点用来管理topic通信的类,采用的设计模式是单例模式,也就是说在同一个节点,或者说进程中,仅有一个topicmanager类对象,他管理了所有的topic相关的函数调用,它的初始化是在init函数中实现的。
  2. 然后介绍下加锁的对象,是advertised_topic_mutex这个信号量,它的类型是递归互斥量,在同一个线程中,它是允许多次加锁的
  3. Publication类是在ROS的topic通信中极其重要的类,PublicationPtr是一个智能指针(boost::make_shared)。Publication的每个对象,在单个节点中,可以理解为topic的存在,也就是说每一个publication类的对象,都对应了一个topic,在同一个节点的topic通信中,不同的publisher如果往同一个topic上发送消息,都是通过同一个pubication的对象来实现的。而之所以使用智能指针,就是为了实现内存的自动管理,方便pubication的自动销毁。lookupPublicationWithoutLock()函数就是在topicmanager已存在的pubication中寻找当前topic是否已经具有相应的publication,一般如果具有impl_,那就必然能找到publication,因为我们的impl_的各个属性(包括topic)都是在advertise注册时从publication那里得到的。
  4. publication的latch属性。这个属性的含义是是否为订阅者保留最后一次发送的信息。如果该属性设为true,当新的订阅者订阅时,就必然会收到之前发布者所发布的最后一条消息。

接下来,在topicmanager函数中,会检测一下publish消息所要发送的对象是在进程内还是在进程间。我个人认为其中比较关键的就是getPublishType函数和publish函数这两部分。
首先,这一段的逻辑首先讲清楚是这个样子的:

  1. 定义nocpy和serialize两个bool型变量,来分别代表有进程内同C++类型订阅者和进程间订阅者
  2. 调用publication的getPublishTypes()函数,该函数原型如下:void getPublishTypes(bool& serialize, bool& nocopy, const std::type_info& ti);这里注意使用了引用传参,也就是说会将serialize和nocopy的值进行更改。
  3. 如果nocopy为false,就会执行第一个if判断的语句,将m进行一个初始化(存疑)操作,然后判断是否需要序列化,如果需要,就对它进行序列化的赋值操作。
  4. 使用publication的publish函数进行发布消息
  5. 判断是否进行了序列化,如果进行了,就调用pollSet的信号函数触发槽函数。
    我们先来dfs一下getPublishTypes()函数,首先是调用了publication类的函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    ·····················
    /**
    * 得到publish的type,传入三个参数,是否序列化,是否无需拷贝,type_info
    * 这里调用了subscriberLink的getPublishTypes,得到它的serialize和nocopy,然后用一个或运算来返回给topicmanager是否需要进行nocopy和serialize
    */
    void Publication::getPublishTypes(bool& serialize, bool& nocopy, const std::type_info& ti)
    {
    boost::mutex::scoped_lock lock(subscriber_links_mutex_);
    V_SubscriberLink::const_iterator it = subscriber_links_.begin();
    V_SubscriberLink::const_iterator end = subscriber_links_.end();
    for (; it != end; ++it)
    {
    const SubscriberLinkPtr& sub = *it;
    bool s = false;
    bool n = false;
    // 这里的详细说明见下文
    sub->getPublishTypes(s, n, ti);
    serialize = serialize || s;
    nocopy = nocopy || n;

    if (serialize && nocopy)
    {
    break;
    }
    }
    }
    ···················
    这里的sub是一个subscriberLink类的父类指针,该类本身没有实现getPublishTypes()函数,而是以虚函数的方式实现的,它的子类IntraProcessPSubscriberLink类也进行了实现,具体代码如下:
    virtual void getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti) { (void)ti; ser = true; nocopy = false; }
    这是一个虚函数而不是纯虚函数,纯虚函数父类不会实现,只是在声明后加一个=0,如virtual void enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy) = 0;。虚函数一定会有一个自己的实现,在用父类指针进行调用函数时,实现了该函数的子类对象会调用自己的函数,没有实现该函数的子类对象会调用父类的函数,也就是说在这里,如果sub为transportPublisherLink对象,就会像上述代码一样,设置序列化为true,nocopy为false。否则,就会像下面的代码一样。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 这里会调用subscription的的getPublisType函数来修改值
    void IntraProcessSubscriberLink::getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti)
    {
    boost::recursive_mutex::scoped_lock lock(drop_mutex_);
    if (dropped_)
    {
    return;
    }

    subscriber_->getPublishTypes(ser, nocopy, ti);
    }
    这里有一个很容易误会的地方就是subscriber_的类型,很容易以为她是subscription类,事实上,它是IntraProcessPublisherLinkPtr subscriber_定义的,所以实际这里调用了IntraProcessPublisherLink类中的方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    void IntraProcessPublisherLink::getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti)
    {
    boost::recursive_mutex::scoped_lock lock(drop_mutex_);
    // 如果连接已经dropped了,就全都false,无需序列化也无需查看是否nocopy直接return就好
    // dropoed是IntraProcessPublisherLink的一个成员变量,表示当前link是否已经失效
    if (dropped_)
    {
    ser = false;
    nocopy = false;
    return;
    }
    // parent_是父类PublisherLink定义的一个弱指针,指向intraprocessPublisher的所属subscription,所以这里实际是调用了subscription的这个函数,使用lock是因为weak指针的特性,只有使用lock才能短暂拥有对象。
    SubscriptionPtr parent = parent_.lock();
    if (parent)
    {
    parent->getPublishTypes(ser, nocopy, ti);
    }
    else
    {
    // 如果parent不存在,就说明没有内部订阅类,序列化设置为true,nocopy设置为false
    ser = true;
    nocopy = false;
    }
    }
    在这里首先会判断当前link是否dropped掉了,如果没有,那就说明还在订阅,就尝试获取它的parent,也就是订阅的subscription,如果存在,就调用subscription的getPublishType()函数,否则,就说明他没有进程内订阅,就直接设置序列化为真,nocopy为假就ok了。至于subscription类的函数实现如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // 遍历callback_队列,比较传进来的type_info是否和callback_中的相同,如果相同,就将nocopy设置为true,表示不需要进行序列化,否则就是serialize为true,需要序列化,直到两个参数都为ture
    void Subscription::getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti)
    {
    boost::mutex::scoped_lock lock(callbacks_mutex_);
    for (V_CallbackInfo::iterator cb = callbacks_.begin();
    cb != callbacks_.end(); ++cb)
    {
    const CallbackInfoPtr& info = *cb;
    if (info->helper_->getTypeInfo() == ti)
    {
    nocopy = true;
    }
    else
    {
    ser = true;
    }

    if (nocopy && ser)
    {
    return;
    }
    }
    }
    callbacks_容器就是一个放置了CallbackInfo的容器,它包含了subscription所订阅的message的类型等消息。具体细节会在subscribe中进行讨论。
    接下来就调用了publication对象的publish函数,函数具体代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    ········
    void Publication::publish(SerializedMessage& m)
    {
    if (m.message)
    {
    // 给subscriber_link_加上区域锁
    boost::mutex::scoped_lock lock(subscriber_links_mutex_);
    // 遍历subscriber_links_,如果找到的subscriber_links_是进程内的,就直接enqueueMessage信息,遍历完之后进行message.reset();
    V_SubscriberLink::const_iterator it = subscriber_links_.begin();
    V_SubscriberLink::const_iterator end = subscriber_links_.end();
    for (; it != end; ++it)
    {
    const SubscriberLinkPtr& sub = *it;
    if (sub->isIntraprocess())
    {
    sub->enqueueMessage(m, false, true);
    }
    }
    //
    m.message.reset();
    }
    // 如果buff有内容,就把m放入publish_queue中
    if (m.buf)
    {
    boost::mutex::scoped_lock lock(publish_queue_mutex_);
    publish_queue_.push_back(m);
    }
    }
    ··········
    在这段代码中,可以看到他在检测publication所管理的subscriber_links是否是本进程内的,如果是的话,就会直接把message进行入队,然后这里虽然是一个SubscriberLinkptr指针对象调用的enqueueMessage,实际上在SubscriberLink类中这只是一个虚函数,具体实现是由IntraProcessSubscriberLink类和TransportSubscriberLink类实现的。我们先介绍前者的实现,具体代码是这个样子的:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    void IntraProcessSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
    {
    boost::recursive_mutex::scoped_lock lock(drop_mutex_);
    if (dropped_)
    {
    return;
    }

    ROS_ASSERT(subscriber_);
    // subscriber_的类型其实是IntralProcessPublisherLink,这里的声明误导性很强
    subscriber_->handleMessage(m, ser, nocopy);
    }
    在这里最让人难受的就是subscriber_的理解,它的声明出现在intra_processSubscriber_link.h文件中。
    IntraProcessPublisherLinkPtr subscriber_
    可以这样理解,发布者向订阅者发布信息时,会publication会先从自己的订阅列表中找出在进程内的订阅者,然后就会让IntraProcessSubscriberLink对象来进行enqueueMessage操作,这个入队操作到下面实际又交给了IntraProcessPublisherLink对象调用handleMessage,它的实现代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    void IntraProcessPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy)
    {
    boost::recursive_mutex::scoped_lock lock(drop_mutex_);
    if (dropped_)
    {
    return;
    }

    stats_.bytes_received_ += m.num_bytes;
    stats_.messages_received_++;

    SubscriptionPtr parent = parent_.lock();

    if (parent)
    {
    stats_.drops_ += parent->handleMessage(m, ser, nocopy, header_.getValues(), shared_from_this());
    }
    }
    显然到这里,m这个message就已经成功交给了SubscriptionPtr,接下来就是Subscription来调用handleMessage来进行反序列化和进行回调函数的调用了。暂停一下,我们回头看看进程间通信又是如何进行到这一步的。
    在上面我们进行publication类的publish时,进程内通信直接进行了enqueueMessage,之后并没有结束,我们判断message如果buff不为空,就会让他把信息放入publish_queue_中,这是一个在publication中声明的队列。
    紧接着我们判断message是否进行了序列化,如果进行序列化,显然就要进行进程间通信,就调用poll_manager_的单例,使用getPollSet()函数获得PollSet对象,并调用signal()函数,这个函数是用来搞pipe的,具体细节我不太领会,需要在学习一些Unix网络编程的知识。
    我们看一下message 放入了publis_queue_后是如何发送给其他进程间的subscriber的。我们可以通过SourceTrail这个工具清楚看出,另一个使用了了publis_queue_的函数正是publication中的processPublishQueue函数。这个函数是由TopicManager类的processPublishQueues函数调用的,TopicManager类的processPublishQueues函数就是一个循环,会把当前节点的所有publication,调用processPublishQueue函数,进行处理,发布信息。具体内容如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    ··········
    // 处理publish队列
    void TopicManager::processPublishQueues()
    {
    boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
    // 遍历已发布节点,对每个publication调用processPublishQueue函数。
    V_Publication::iterator it = advertised_topics_.begin();
    V_Publication::iterator end = advertised_topics_.end();
    for (; it != end; ++it)
    {
    const PublicationPtr& pub = *it;
    pub->processPublishQueue();
    }
    }
    ··········
    而它的调用,则是在topicManager的start函数中就使用了【注意是使用而不是调用】,topicmanager的实例的初始化就是由start函数进行的,而它的start也正是在整个节点进行初始化时,由init()函数调用的ros::start()调用的。我们到topicManager的start函数中去看,看到了processPublishQueue()是如何进行使用的:
    1
    2
    // 让poll_manager单例监听processPublishQueues函数
    poll_manager_->addPollThreadListener(boost::bind(&TopicManager::processPublishQueues, this));
    可以看到在这里他是通过pollManager的单例对象来进行使用的,这个函数是一个监听函数,它的内部实现是这个样子的:
    1
    2
    3
    4
    5
    boost::signals2::connection PollManager::addPollThreadListener(const VoidFunc& func)
    {
    boost::recursive_mutex::scoped_lock lock(signal_mutex_);
    return poll_signal_.connect(func);
    }
    这里的poll_signal_是一个boosts::signal2::signal对象,这个signal2实现了线程安全的信号槽机制,也就是说,我们通过connect()函数可以将任何一个函数作为参数绑定到signal定义的对象上(这里时poll_signal_),我们可以绑定任意多个函数,作为槽,然后我们可以通过poll_signal_()这个函数,来触发和它绑定的所有函数。这里其实就是将processPublishQueues函数和这个信号进行了绑定,那么这个信号量是在哪里触发的呢?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    void PollManager::threadFunc()
    {
    disableAllSignalsInThisThread();

    while (!shutting_down_)
    {
    {
    boost::recursive_mutex::scoped_lock lock(signal_mutex_);
    poll_signal_();
    }

    if (shutting_down_)
    {
    return;
    }

    poll_set_.update(100);
    }
    }
    在这个threadFunc()函数中,我们调用了poll_signal_信号量,从而触发了和他绑定的一系列函数,从SourceTrail工具我们可以看到有若干函数和他绑定。但重要的时,这个threadFunc函数又是在何时调用的呢?
    1
    2
    3
    4
    5
    void PollManager::start()
    {
    shutting_down_ = false;
    thread_ = boost::thread(&PollManager::threadFunc, this);
    }
    是在PollManager初始化时候和一个线程绑定了。也就是说我们在初始化Nodehandle之后,就建立了一个线程,线程轮转,就是在进行信号触发,然后执行processPublishQueues函数。
    我们跟着processPublishQueues函数向下看,我们可以看到publication的processPublishQueues函数是这个样子的:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    void Publication::processPublishQueue()
    {
    V_SerializedMessage queue;
    {
    boost::mutex::scoped_lock lock(publish_queue_mutex_);

    if (dropped_)
    {
    return;
    }
    // 把publish_queue_中的信息全部放到queue中取,并清空publish_queue
    queue.insert(queue.end(), publish_queue_.begin(), publish_queue_.end());
    publish_queue_.clear();
    }
    // 如果queue为空,则返回
    if (queue.empty())
    {
    return;
    }
    // 把queue中的所有消息,入队
    V_SerializedMessage::iterator it = queue.begin();
    V_SerializedMessage::iterator end = queue.end();
    for (; it != end; ++it)
    {
    enqueueMessage(*it);
    }
    }
    这里同样有一个enqueueMessage,这个是由publication本身实现的
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    bool Publication::enqueueMessage(const SerializedMessage& m)
    {
    boost::mutex::scoped_lock lock(subscriber_links_mutex_);
    // 如果publication已经drop了。返回false
    if (dropped_)
    {
    return false;
    }

    ROS_ASSERT(m.buf);
    //自增序列
    uint32_t seq = incrementSequence();
    if (has_header_)
    {
    // If we have a header, we know it's immediately after the message length
    // Deserialize it, write the sequence, and then serialize it again.
    namespace ser = ros::serialization;
    std_msgs::Header header;
    ser::IStream istream(m.buf.get() + 4, m.num_bytes - 4);
    ser::deserialize(istream, header);
    header.seq = seq;
    ser::OStream ostream(m.buf.get() + 4, m.num_bytes - 4);
    ser::serialize(ostream, header);
    }
    // 遍历subscriberLinks,分别将message放入subscriberlink队列
    for(V_SubscriberLink::iterator i = subscriber_links_.begin();
    i != subscriber_links_.end(); ++i)
    {
    const SubscriberLinkPtr& sub_link = (*i);
    sub_link->enqueueMessage(m, true, false);
    }
    // 如果有latch属性,把m赋值给last_message
    if (latch_)
    {
    last_message_ = m;
    }

    return true;
    }
    这里虽然同样有一个subscriberLink指针调用了enqueueMessage函数,但是实际上确实transportSubcriberLink对象进行调用的。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    void TransportSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
    {
    (void)nocopy;
    // 如过ser为0,说明是进程内通信,说明不符,直接返回
    if (!ser)
    {
    return;
    }

    {
    boost::mutex::scoped_lock lock(outbox_mutex_);
    // 得到publication的最大队列
    int max_queue = 0;
    if (PublicationPtr parent = parent_.lock())
    {
    max_queue = parent->getMaxQueue();
    }

    ROS_DEBUG_NAMED("superdebug", "TransportSubscriberLink on topic [%s] to caller [%s], queueing message (queue size [%d])", topic_.c_str(), destination_caller_id_.c_str(), (int)outbox_.size());
    // 看队列满了没有 outbox_是一个存放序列化信息的队列,如果满了,就会把队列最前面的信息pop掉,并更新queue_full_标志
    if (max_queue > 0 && (int)outbox_.size() >= max_queue)
    { //
    if (!queue_full_)
    {
    ROS_DEBUG("Outgoing queue full for topic [%s]. "
    "Discarding oldest message",
    topic_.c_str());
    }

    outbox_.pop(); // toss out the oldest thing in the queue to make room for us
    queue_full_ = true;
    }
    else // 没有满就继续标志false
    {
    queue_full_ = false;
    }
    // 放入消息
    outbox_.push(m);
    }
    // 先不立即写入,因为需要去函数里设置一些标志
    startMessageWrite(false);
    // 更新一些状态信息
    stats_.messages_sent_++;
    stats_.bytes_sent_ += m.num_bytes;
    stats_.message_data_sent_ += m.num_bytes;
    }
    在这里主要利用outbox_队列来维护要发的信息,queue_full_来标志队列满了没有。然后startMessageWrite函数来执行具体往connection中写数据的过程
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    // 开始向connection中写信息
    void TransportSubscriberLink::startMessageWrite(bool immediate_write)
    {
    boost::shared_array<uint8_t> dummy;
    SerializedMessage m(dummy, (uint32_t)0);

    {
    boost::mutex::scoped_lock lock(outbox_mutex_);
    if (writing_message_ || !header_written_)
    {
    return;
    }
    // 如果outbox不为空,就获得信息,并且把writing_message标志置为true,然后pop掉outbox队列中的信息
    if (!outbox_.empty())
    {
    writing_message_ = true;
    m = outbox_.front();
    outbox_.pop();
    }
    }
    // connection具体过程。
    if (m.num_bytes > 0)
    {
    connection_->write(m.buf, m.num_bytes, boost::bind(&TransportSubscriberLink::onMessageWritten, this, _1), immediate_write);
    }
    }

文章目录