XRCE-DDS源码阅读(epromisa实现)

XRCE-DDS源码阅读(epromisa实现)

XRCE-DDS源代码的阅读和感悟

MICRO-XRCE-DDS-CLIENT

源码结构

忽略编译后生成的build文件和其他cmake配置文件,可以得到从GitHub上下载的源码目录如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
.
├── CMakeLists.txt
├── colcon.pkg
├── CONTRIBUTING.md
├── CTestConfig.cmake
├── Dockerfile
├── docs
│   ├── Client.png
│   └── General.png
├── examples
│   ├── ContinuousFragment
│   ├── CustomTransports
│   ├── Deployment
│   ├── Discovery
│   ├── Discovery_try
│   ├── MultiSessionHelloWorld
│   ├── PingAgent
│   ├── PublishHelloWorld
│   ├── PublishHelloWorldBestEffort
│   ├── PublishHelloWorldP2P
│   ├── ReplyAdder
│   ├── RequestAdder
│   ├── ShapesDemo
│   ├── SubscribeHelloWorld
│   ├── SubscribeHelloWorldBestEffort
│   ├── SubscribeHelloWorldP2P
│   ├── TimeSync
│   └── TimeSyncWithCb
├── include
│   └── uxr
├── LICENSE
├── PLATFORM_SUPPORT.md
├── QUALITY.md
├── README.md
├── src
│   └── c
├── test
│   ├── case
│   ├── memory
│   ├── transport
│   └── unitary
├── toolchains
│   ├── CMakeLists.txt
│   ├── nuttx_toolchain.cmake
│   ├── nuttx_toolchain.cmake.in
│   └── raspberrypi_toolchain.cmake
├── tree.txt
└── VERSIONING.md

其中,显然include和src以及example文件夹下的文件对我们比较重要。example中有大量的示例程序,包括发现、可靠发送/接收消息等等。include中包含了XRCE-DDS定义的所需要的头文件。src下面则包括了最主要的实际实现代码

其中src的框架如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
.
├── c
│   ├── core
│   │   ├── log
│   │   │   ├── log.c
│   │   │   └── log_internal.h
│   │   ├── serialization
│   │   │   ├── xrce_header.c
│   │   │   ├── xrce_header_internal.h
│   │   │   ├── xrce_subheader.c
│   │   │   ├── xrce_subheader_internal.h
│   │   │   └── xrce_types.c
│   │   └── session
│   │   ├── common_create_entities.c
│   │   ├── common_create_entities_internal.h
│   │   ├── create_entities_ref.c
│   │   ├── create_entities_xml.c
│   │   ├── object_id.c
│   │   ├── read_access.c
│   │   ├── session.c
│   │   ├── session_info.c
│   │   ├── session_info_internal.h
│   │   ├── session_internal.h
│   │   ├── stream
│   │   │   ├── common_reliable_stream_internal.h
│   │   │   ├── input_best_effort_stream.c
│   │   │   ├── input_best_effort_stream_internal.h
│   │   │   ├── input_reliable_stream.c
│   │   │   ├── input_reliable_stream_internal.h
│   │   │   ├── output_best_effort_stream.c
│   │   │   ├── output_best_effort_stream_internal.h
│   │   │   ├── output_reliable_stream.c
│   │   │   ├── output_reliable_stream_internal.h
│   │   │   ├── seq_num.c
│   │   │   ├── seq_num_internal.h
│   │   │   ├── stream_id.c
│   │   │   ├── stream_storage.c
│   │   │   └── stream_storage_internal.h
│   │   ├── submessage.c
│   │   ├── submessage_internal.h
│   │   └── write_access.c
│   ├── profile
│   │   ├── discovery
│   │   │   ├── discovery.c
│   │   │   └── transport
│   │   │   ├── udp_transport_datagram_freertos_plus_tcp.c
│   │   │   ├── udp_transport_datagram_internal.h
│   │   │   ├── udp_transport_datagram_posix.c
│   │   │   ├── udp_transport_datagram_posix_nopoll.c
│   │   │   └── udp_transport_datagram_windows.c
│   │   └── transport
│   │   ├── custom
│   │   │   └── custom_transport.c
│   │   ├── ip
│   │   │   ├── ip_freertos_plus_tcp.c
│   │   │   ├── ip_posix.c
│   │   │   ├── ip_windows.c
│   │   │   ├── tcp
│   │   │   │   ├── tcp_transport.c
│   │   │   │   ├── tcp_transport_freertos_plus_tcp.c
│   │   │   │   ├── tcp_transport_internal.h
│   │   │   │   ├── tcp_transport_posix.c
│   │   │   │   └── tcp_transport_windows.c
│   │   │   └── udp
│   │   │   ├── udp_transport.c
│   │   │   ├── udp_transport_freertos_plus_tcp.c
│   │   │   ├── udp_transport_internal.h
│   │   │   ├── udp_transport_posix.c
│   │   │   ├── udp_transport_posix_nopoll.c
│   │   │   └── udp_transport_windows.c
│   │   ├── serial
│   │   │   ├── serial_transport.c
│   │   │   └── serial_transport_posix.c
│   │   └── stream_framing
│   │   ├── stream_framing_protocol.c
│   │   └── stream_framing_protocol.h
│   └── util
│   ├── ping.c
│   ├── time.c
│   └── time_internal.h

MICRO-XRCE-DDS-AGENT

相关线程

agentthread

由Agent_instance单例调用函数创建并启动。

负责解析命令后启动agentserver

agentserver是一个继承自Server类的子类,可以是多个类型,跟传输方式有关,UDP、TCP等。

Server类是一个继承Agent和SessoinManager类的子类。

agentserver启动主要负责三件事,一件事初始化server(主要是网络编程,使用poll进行IO复用),一个是初始化input_scheduler设置队列大小和优先级大小,以及初始化output_scheduler。最后一件事是负责初始化五个线程,这五个线程的作用会在下面讲解。

input_scheduler和output_scheduler

消息包packet的处理借助于类PacketScheduler,这个类包含了两个map类型的成员变量,一个key是优先级,value是一个队列,存放待处理的packet;另一个key是优先级,value表示队列的大小。这里其实是一个高优先级先调度的模型。

error_handler_thread

处理错误的线程 一个条件循环函数

receiver_loop

处理接受消息的线程

线程启动后从传输中接受input_packet包,然后将接受的消息放入input_scheduler调度器,将不同的消息放入不同优先级的队列。

sender_loop

处理发送消息的线程

从output_scheduler中取出优先级最高的队列的队首元素,通过send_message发送出去。

processing_loop

处理调度器中的消息的线程

从inputscheduler中取出优先级最高的队列的队首元素,交给processor来进行处理。

processor_对象是Processor类的对象。该类统一处理所有的packet消息。会根据inputpacket消息的sessionid来确定消息要交由root\处理还是根据clientKey来确定消息交给client来处理。

交由client处理的消息实际是又交给了client自己的session中指定sessionID的队列,根据session的sessionID队列messages_来依次进行处理message。

处理inputmessage的过程就是不断看是否有下一条submessage,然后读取submessage,根据submessage的类型与内容,和DDS进行交互,如有数据回馈则将数据填充组成output_packet,返回给server\的output_scheduler,最后将交互结果返回给上一层。

处理过inputmessage后,考虑对于可靠流的消息,要组织acknack子消息。将acknack消息构成一个output_packet,交付server\的output_scheduler

如果client不存在 还要处理delete子消息的返回。

heartbeat_loop

让processor_检查心跳信号,就是构造心跳信号进行发送,完毕后陷入休眠,休眠结束继续检查,不断循环