XRCE-DDS源码阅读(epromisa实现)
XRCE-DDS源代码的阅读和感悟
MICRO-XRCE-DDS-CLIENT
源码结构
忽略编译后生成的build文件和其他cmake配置文件,可以得到从GitHub上下载的源码目录如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
   | . ├── CMakeLists.txt ├── colcon.pkg ├── CONTRIBUTING.md ├── CTestConfig.cmake ├── Dockerfile ├── docs │   ├── Client.png │   └── General.png ├── examples │   ├── ContinuousFragment │   ├── CustomTransports │   ├── Deployment │   ├── Discovery │   ├── Discovery_try │   ├── MultiSessionHelloWorld │   ├── PingAgent │   ├── PublishHelloWorld │   ├── PublishHelloWorldBestEffort │   ├── PublishHelloWorldP2P │   ├── ReplyAdder │   ├── RequestAdder │   ├── ShapesDemo │   ├── SubscribeHelloWorld │   ├── SubscribeHelloWorldBestEffort │   ├── SubscribeHelloWorldP2P │   ├── TimeSync │   └── TimeSyncWithCb ├── include │   └── uxr ├── LICENSE ├── PLATFORM_SUPPORT.md ├── QUALITY.md ├── README.md ├── src │   └── c ├── test │   ├── case │   ├── memory │   ├── transport │   └── unitary ├── toolchains │   ├── CMakeLists.txt │   ├── nuttx_toolchain.cmake │   ├── nuttx_toolchain.cmake.in │   └── raspberrypi_toolchain.cmake ├── tree.txt └── VERSIONING.md
   | 
 
其中,显然include和src以及example文件夹下的文件对我们比较重要。example中有大量的示例程序,包括发现、可靠发送/接收消息等等。include中包含了XRCE-DDS定义的所需要的头文件。src下面则包括了最主要的实际实现代码
其中src的框架如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
   | . ├── c │   ├── core │   │   ├── log │   │   │   ├── log.c │   │   │   └── log_internal.h │   │   ├── serialization │   │   │   ├── xrce_header.c │   │   │   ├── xrce_header_internal.h │   │   │   ├── xrce_subheader.c │   │   │   ├── xrce_subheader_internal.h │   │   │   └── xrce_types.c │   │   └── session │   │       ├── common_create_entities.c │   │       ├── common_create_entities_internal.h │   │       ├── create_entities_ref.c │   │       ├── create_entities_xml.c │   │       ├── object_id.c │   │       ├── read_access.c │   │       ├── session.c │   │       ├── session_info.c │   │       ├── session_info_internal.h │   │       ├── session_internal.h │   │       ├── stream │   │       │   ├── common_reliable_stream_internal.h │   │       │   ├── input_best_effort_stream.c │   │       │   ├── input_best_effort_stream_internal.h │   │       │   ├── input_reliable_stream.c │   │       │   ├── input_reliable_stream_internal.h │   │       │   ├── output_best_effort_stream.c │   │       │   ├── output_best_effort_stream_internal.h │   │       │   ├── output_reliable_stream.c │   │       │   ├── output_reliable_stream_internal.h │   │       │   ├── seq_num.c │   │       │   ├── seq_num_internal.h │   │       │   ├── stream_id.c │   │       │   ├── stream_storage.c │   │       │   └── stream_storage_internal.h │   │       ├── submessage.c │   │       ├── submessage_internal.h │   │       └── write_access.c │   ├── profile │   │   ├── discovery │   │   │   ├── discovery.c │   │   │   └── transport │   │   │       ├── udp_transport_datagram_freertos_plus_tcp.c │   │   │       ├── udp_transport_datagram_internal.h │   │   │       ├── udp_transport_datagram_posix.c │   │   │       ├── udp_transport_datagram_posix_nopoll.c │   │   │       └── udp_transport_datagram_windows.c │   │   └── transport │   │       ├── custom │   │       │   └── custom_transport.c │   │       ├── ip │   │       │   ├── ip_freertos_plus_tcp.c │   │       │   ├── ip_posix.c │   │       │   ├── ip_windows.c │   │       │   ├── tcp │   │       │   │   ├── tcp_transport.c │   │       │   │   ├── tcp_transport_freertos_plus_tcp.c │   │       │   │   ├── tcp_transport_internal.h │   │       │   │   ├── tcp_transport_posix.c │   │       │   │   └── tcp_transport_windows.c │   │       │   └── udp │   │       │       ├── udp_transport.c │   │       │       ├── udp_transport_freertos_plus_tcp.c │   │       │       ├── udp_transport_internal.h │   │       │       ├── udp_transport_posix.c │   │       │       ├── udp_transport_posix_nopoll.c │   │       │       └── udp_transport_windows.c │   │       ├── serial │   │       │   ├── serial_transport.c │   │       │   └── serial_transport_posix.c │   │       └── stream_framing │   │           ├── stream_framing_protocol.c │   │           └── stream_framing_protocol.h │   └── util │       ├── ping.c │       ├── time.c │       └── time_internal.h
   | 
 
MICRO-XRCE-DDS-AGENT
相关线程
agentthread
由Agent_instance单例调用函数创建并启动。
负责解析命令后启动agentserver。
agentserver是一个继承自Server类的子类,可以是多个类型,跟传输方式有关,UDP、TCP等。
Server类是一个继承Agent和SessoinManager类的子类。
agentserver启动主要负责三件事,一件事初始化server(主要是网络编程,使用poll进行IO复用),一个是初始化input_scheduler设置队列大小和优先级大小,以及初始化output_scheduler。最后一件事是负责初始化五个线程,这五个线程的作用会在下面讲解。
消息包packet的处理借助于类PacketScheduler,这个类包含了两个map类型的成员变量,一个key是优先级,value是一个队列,存放待处理的packet;另一个key是优先级,value表示队列的大小。这里其实是一个高优先级先调度的模型。
error_handler_thread
处理错误的线程 一个条件循环函数
receiver_loop
处理接受消息的线程 
线程启动后从传输中接受input_packet包,然后将接受的消息放入input_scheduler调度器,将不同的消息放入不同优先级的队列。
sender_loop
处理发送消息的线程
从output_scheduler中取出优先级最高的队列的队首元素,通过send_message发送出去。
processing_loop
处理调度器中的消息的线程
从inputscheduler中取出优先级最高的队列的队首元素,交给processor来进行处理。
processor_对象是Processor类的对象。该类统一处理所有的packet消息。会根据inputpacket消息的sessionid来确定消息要交由root\处理还是根据clientKey来确定消息交给client来处理。
交由client处理的消息实际是又交给了client自己的session中指定sessionID的队列,根据session的sessionID队列messages_来依次进行处理message。
处理inputmessage的过程就是不断看是否有下一条submessage,然后读取submessage,根据submessage的类型与内容,和DDS进行交互,如有数据回馈则将数据填充组成output_packet,返回给server\的output_scheduler,最后将交互结果返回给上一层。
处理过inputmessage后,考虑对于可靠流的消息,要组织acknack子消息。将acknack消息构成一个output_packet,交付server\的output_scheduler
如果client不存在 还要处理delete子消息的返回。
heartbeat_loop
让processor_检查心跳信号,就是构造心跳信号进行发送,完毕后陷入休眠,休眠结束继续检查,不断循环
        
            
        
        
          
              原文链接: https://zijian.wang/2022/03/17/XRCE-DDS源码阅读(epromisa实现)/
              版权声明: 转载请注明出处.