XRCE-DDS源码阅读(epromisa实现)
XRCE-DDS源代码的阅读和感悟
MICRO-XRCE-DDS-CLIENT
源码结构
忽略编译后生成的build文件和其他cmake配置文件,可以得到从GitHub上下载的源码目录如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| . ├── CMakeLists.txt ├── colcon.pkg ├── CONTRIBUTING.md ├── CTestConfig.cmake ├── Dockerfile ├── docs │ ├── Client.png │ └── General.png ├── examples │ ├── ContinuousFragment │ ├── CustomTransports │ ├── Deployment │ ├── Discovery │ ├── Discovery_try │ ├── MultiSessionHelloWorld │ ├── PingAgent │ ├── PublishHelloWorld │ ├── PublishHelloWorldBestEffort │ ├── PublishHelloWorldP2P │ ├── ReplyAdder │ ├── RequestAdder │ ├── ShapesDemo │ ├── SubscribeHelloWorld │ ├── SubscribeHelloWorldBestEffort │ ├── SubscribeHelloWorldP2P │ ├── TimeSync │ └── TimeSyncWithCb ├── include │ └── uxr ├── LICENSE ├── PLATFORM_SUPPORT.md ├── QUALITY.md ├── README.md ├── src │ └── c ├── test │ ├── case │ ├── memory │ ├── transport │ └── unitary ├── toolchains │ ├── CMakeLists.txt │ ├── nuttx_toolchain.cmake │ ├── nuttx_toolchain.cmake.in │ └── raspberrypi_toolchain.cmake ├── tree.txt └── VERSIONING.md
|
其中,显然include和src以及example文件夹下的文件对我们比较重要。example中有大量的示例程序,包括发现、可靠发送/接收消息等等。include中包含了XRCE-DDS定义的所需要的头文件。src下面则包括了最主要的实际实现代码
其中src的框架如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| . ├── c │ ├── core │ │ ├── log │ │ │ ├── log.c │ │ │ └── log_internal.h │ │ ├── serialization │ │ │ ├── xrce_header.c │ │ │ ├── xrce_header_internal.h │ │ │ ├── xrce_subheader.c │ │ │ ├── xrce_subheader_internal.h │ │ │ └── xrce_types.c │ │ └── session │ │ ├── common_create_entities.c │ │ ├── common_create_entities_internal.h │ │ ├── create_entities_ref.c │ │ ├── create_entities_xml.c │ │ ├── object_id.c │ │ ├── read_access.c │ │ ├── session.c │ │ ├── session_info.c │ │ ├── session_info_internal.h │ │ ├── session_internal.h │ │ ├── stream │ │ │ ├── common_reliable_stream_internal.h │ │ │ ├── input_best_effort_stream.c │ │ │ ├── input_best_effort_stream_internal.h │ │ │ ├── input_reliable_stream.c │ │ │ ├── input_reliable_stream_internal.h │ │ │ ├── output_best_effort_stream.c │ │ │ ├── output_best_effort_stream_internal.h │ │ │ ├── output_reliable_stream.c │ │ │ ├── output_reliable_stream_internal.h │ │ │ ├── seq_num.c │ │ │ ├── seq_num_internal.h │ │ │ ├── stream_id.c │ │ │ ├── stream_storage.c │ │ │ └── stream_storage_internal.h │ │ ├── submessage.c │ │ ├── submessage_internal.h │ │ └── write_access.c │ ├── profile │ │ ├── discovery │ │ │ ├── discovery.c │ │ │ └── transport │ │ │ ├── udp_transport_datagram_freertos_plus_tcp.c │ │ │ ├── udp_transport_datagram_internal.h │ │ │ ├── udp_transport_datagram_posix.c │ │ │ ├── udp_transport_datagram_posix_nopoll.c │ │ │ └── udp_transport_datagram_windows.c │ │ └── transport │ │ ├── custom │ │ │ └── custom_transport.c │ │ ├── ip │ │ │ ├── ip_freertos_plus_tcp.c │ │ │ ├── ip_posix.c │ │ │ ├── ip_windows.c │ │ │ ├── tcp │ │ │ │ ├── tcp_transport.c │ │ │ │ ├── tcp_transport_freertos_plus_tcp.c │ │ │ │ ├── tcp_transport_internal.h │ │ │ │ ├── tcp_transport_posix.c │ │ │ │ └── tcp_transport_windows.c │ │ │ └── udp │ │ │ ├── udp_transport.c │ │ │ ├── udp_transport_freertos_plus_tcp.c │ │ │ ├── udp_transport_internal.h │ │ │ ├── udp_transport_posix.c │ │ │ ├── udp_transport_posix_nopoll.c │ │ │ └── udp_transport_windows.c │ │ ├── serial │ │ │ ├── serial_transport.c │ │ │ └── serial_transport_posix.c │ │ └── stream_framing │ │ ├── stream_framing_protocol.c │ │ └── stream_framing_protocol.h │ └── util │ ├── ping.c │ ├── time.c │ └── time_internal.h
|
MICRO-XRCE-DDS-AGENT
相关线程
agentthread
由Agent_instance单例调用函数创建并启动。
负责解析命令后启动agentserver。
agentserver是一个继承自Server类的子类,可以是多个类型,跟传输方式有关,UDP、TCP等。
Server类是一个继承Agent和SessoinManager类的子类。
agentserver启动主要负责三件事,一件事初始化server(主要是网络编程,使用poll进行IO复用),一个是初始化input_scheduler设置队列大小和优先级大小,以及初始化output_scheduler。最后一件事是负责初始化五个线程,这五个线程的作用会在下面讲解。
消息包packet的处理借助于类PacketScheduler,这个类包含了两个map类型的成员变量,一个key是优先级,value是一个队列,存放待处理的packet;另一个key是优先级,value表示队列的大小。这里其实是一个高优先级先调度的模型。
error_handler_thread
处理错误的线程 一个条件循环函数
receiver_loop
处理接受消息的线程
线程启动后从传输中接受input_packet包,然后将接受的消息放入input_scheduler调度器,将不同的消息放入不同优先级的队列。
sender_loop
处理发送消息的线程
从output_scheduler中取出优先级最高的队列的队首元素,通过send_message发送出去。
processing_loop
处理调度器中的消息的线程
从inputscheduler中取出优先级最高的队列的队首元素,交给processor来进行处理。
processor_对象是Processor类的对象。该类统一处理所有的packet消息。会根据inputpacket消息的sessionid来确定消息要交由root\处理还是根据clientKey来确定消息交给client来处理。
交由client处理的消息实际是又交给了client自己的session中指定sessionID的队列,根据session的sessionID队列messages_来依次进行处理message。
处理inputmessage的过程就是不断看是否有下一条submessage,然后读取submessage,根据submessage的类型与内容,和DDS进行交互,如有数据回馈则将数据填充组成output_packet,返回给server\的output_scheduler,最后将交互结果返回给上一层。
处理过inputmessage后,考虑对于可靠流的消息,要组织acknack子消息。将acknack消息构成一个output_packet,交付server\的output_scheduler
如果client不存在 还要处理delete子消息的返回。
heartbeat_loop
让processor_检查心跳信号,就是构造心跳信号进行发送,完毕后陷入休眠,休眠结束继续检查,不断循环
原文链接: https://zijian.wang/2022/03/17/XRCE-DDS源码阅读(epromisa实现)/
版权声明: 转载请注明出处.