mooon-agent接收状态机代码摘要

简介:
  • recv_machine.h
 
  1. #ifndef MOOON_AGENT_RECV_MACHINE_H  
  2.  
  3. #define MOOON_AGENT_RECV_MACHINE_H  
  4.  
  5. #include <agent/message.h>  
  6.  
  7. AGENT_NAMESPACE_BEGIN  
  8.  
  9.  
  10.  
  11. class CAgentThread;  
  12.  
  13. class CRecvMachine  
  14.  
  15. {  
  16.  
  17. private:  
  18.  
  19.     /***  
  20.  
  21.       * 接收状态值  
  22.  
  23.       */  
  24.  
  25.     typedef enum recv_state_t  
  26.  
  27.     {  
  28.  
  29.         rs_header, /** 接收消息头状态 */  
  30.  
  31.         rs_body /** 接收消息体状态 */  
  32.  
  33.     }TRecvState;  
  34.  
  35.  
  36.  
  37.     /***  
  38.  
  39.       * 接收状态上下文  
  40.  
  41.       */  
  42.  
  43.     struct RecvStateContext  
  44.  
  45.     {  
  46.  
  47.         const char* buffer; /** 当前的数据buffer */  
  48.  
  49.         size_t buffer_size; /** 当前的数据字节数 */  
  50.  
  51.           
  52.  
  53.         RecvStateContext(const char* buf=NULL, size_t buf_size=0)  
  54.  
  55.          :buffer(buf)  
  56.  
  57.          ,buffer_size(buf_size)  
  58.  
  59.         {  
  60.  
  61.         }  
  62.  
  63.           
  64.  
  65.         RecvStateContext(const RecvStateContext& other)  
  66.  
  67.          :buffer(other.buffer)  
  68.  
  69.          ,buffer_size(other.buffer_size)  
  70.  
  71.         {  
  72.  
  73.         }  
  74.  
  75.           
  76.  
  77.         RecvStateContext& operator =(const RecvStateContext& other)  
  78.  
  79.         {  
  80.  
  81.             buffer = other.buffer;  
  82.  
  83.             buffer_size = other.buffer_size;  
  84.  
  85.             return *this;  
  86.  
  87.         }  
  88.  
  89.     };  
  90.  
  91.       
  92.  
  93. public:  
  94.  
  95.     CRecvMachine(CAgentThread* thread);  
  96.  
  97.     util::handle_result_t work(const char* buffer, size_t buffer_size);  
  98.  
  99.     void reset();  
  100.  
  101.       
  102.  
  103. private:  
  104.  
  105.     void set_next_state(recv_state_t next_state)  
  106.  
  107.     {  
  108.  
  109.         _recv_state = next_state;  
  110.  
  111.         _finished_size = 0;  
  112.  
  113.     }  
  114.  
  115.       
  116.  
  117.     util::handle_result_t handle_header(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx);  
  118.  
  119.     util::handle_result_t handle_body(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx);  
  120.  
  121.     util::handle_result_t handle_error(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx);  
  122.  
  123.          
  124.  
  125. private:   
  126.  
  127.     CAgentThread* _thread; /** 需要通过CAgentThread取得CProcessorManager */  
  128.  
  129.     agent_message_header_t _header; /** 消息头,这个大小是固定的 */  
  130.  
  131.     recv_state_t _recv_state; /** 当前的接收状态 */  
  132.  
  133.     size_t _finished_size; /** 当前状态已经接收到的字节数,注意不是总的已经接收到的字节数,只针对当前状态 */  
  134.  
  135. };  
  136.  
  137.  
  138.  
  139. AGENT_NAMESPACE_END  
  140.  
  141. #endif // MOOON_AGENT_RECV_MACHINE_H  

recv_machine.cpp

 
  1. #include "recv_machine.h" 
  2.  
  3. #include "agent_thread.h" 
  4.  
  5. AGENT_NAMESPACE_BEGIN  
  6.  
  7.  
  8.  
  9. CRecvMachine::CRecvMachine(CAgentThread* thread)  
  10.  
  11.  :_thread(thread)  
  12.  
  13. {  
  14.  
  15.     set_next_state(rs_header);  
  16.  
  17. }  
  18.  
  19.  
  20.  
  21. // 状态机入口函数  
  22.  
  23. // 状态机工作原理:-> rs_header -> rs_body -> rs_header  
  24.  
  25. // -> rs_header -> rs_error -> rs_header  
  26.  
  27. // -> rs_header -> rs_body -> rs_error -> rs_header  
  28.  
  29. // 参数说明:  
  30.  
  31. // buffer - 本次收到的数据,注意不是总的  
  32.  
  33. // buffer_size - 本次收到的数据字节数  
  34.  
  35. util::handle_result_t CRecvMachine::work(const char* buffer, size_t buffer_size)  
  36.  
  37. {   
  38.  
  39.     RecvStateContext next_ctx(buffer, buffer_size);   
  40.  
  41.     util::handle_result_t hr = util::handle_continue;  
  42.  
  43.       
  44.  
  45.     // 状态机循环条件为:util::handle_continue == hr  
  46.  
  47.     while (util::handle_continue == hr)  
  48.  
  49.     {   
  50.  
  51.         RecvStateContext cur_ctx(next_ctx);  
  52.  
  53.           
  54.  
  55.         switch (_recv_state)  
  56.  
  57.         {  
  58.  
  59.         case rs_header:  
  60.  
  61.             hr = handle_header(cur_ctx, &next_ctx);  
  62.  
  63.             break;  
  64.  
  65.         case rs_body:  
  66.  
  67.             hr = handle_body(cur_ctx, &next_ctx);  
  68.  
  69.             break;  
  70.  
  71.         default:  
  72.  
  73.             hr = handle_error(cur_ctx, &next_ctx);  
  74.  
  75.             break;  
  76.  
  77.         }  
  78.  
  79.     }  
  80.  
  81.           
  82.  
  83.     return hr;  
  84.  
  85. }  
  86.  
  87.  
  88.  
  89. void CRecvMachine::reset()  
  90.  
  91. {  
  92.  
  93.     set_next_state(rs_header);  
  94.  
  95. }  
  96.  
  97.  
  98.  
  99. // 处理消息头部  
  100.  
  101. // 参数说明:  
  102.  
  103. // cur_ctx - 当前上下文,  
  104.  
  105. // cur_ctx.buffer为当前收到的数据buffer,包含了消息头,但也可能包含了消息体。  
  106.  
  107. // cur_ctx.buffer_size为当前收到字节数  
  108.  
  109. // next_ctx - 下一步上下文,  
  110.  
  111. // 由于cur_ctx.buffer可能包含了消息体,所以在一次接收receive动作后,  
  112.  
  113. // 会涉及到消息头和消息体两个状态,这里的next_ctx实际为下一步handle_body的cur_ctx  
  114.  
  115. util::handle_result_t CRecvMachine::handle_header(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx)  
  116.  
  117. {  
  118.  
  119.     if (_finished_size + cur_ctx.buffer_size < sizeof(agent_message_header_t))  
  120.  
  121.     {  
  122.  
  123.         memcpy(reinterpret_cast<char*>(&_header) + _finished_size  
  124.  
  125.               ,cur_ctx.buffer  
  126.  
  127.               ,cur_ctx.buffer_size);  
  128.  
  129.                 
  130.  
  131.         _finished_size += cur_ctx.buffer_size;  
  132.  
  133.         return util::handle_continue;  
  134.  
  135.     }  
  136.  
  137.     else 
  138.  
  139.     {  
  140.  
  141.         size_t need_size = sizeof(agent_message_header_t) - _finished_size;  
  142.  
  143.         memcpy(reinterpret_cast<char*>(&_header) + _finished_size  
  144.  
  145.               ,cur_ctx.buffer  
  146.  
  147.               ,need_size);  
  148.  
  149.                 
  150.  
  151.         // TODO: Check header here  
  152.  
  153.           
  154.  
  155.         size_t remain_size = cur_ctx.buffer_size - need_size;  
  156.  
  157.         if (remain_size > 0)  
  158.  
  159.         {  
  160.  
  161.             next_ctx->buffer = cur_ctx.buffer + need_size;  
  162.  
  163.             next_ctx->buffer_size = cur_ctx.buffer_size - need_size;  
  164.  
  165.         }  
  166.  
  167.           
  168.  
  169.         // 只有当包含消息体时,才需要进行状态切换,  
  170.  
  171.         // 否则维持rs_header状态不变  
  172.  
  173.         if (_header.size > 0)  
  174.  
  175.         {  
  176.  
  177.             // 切换状态  
  178.  
  179.             set_next_state(rs_body);  
  180.  
  181.         }  
  182.  
  183.         else 
  184.  
  185.         {   
  186.  
  187.             CProcessorManager* processor_manager = _thread->get_processor_manager();   
  188.  
  189.             if (!processor_manager->on_message(_header, 0, NULL, 0))  
  190.  
  191.             {  
  192.  
  193.                 return util::handle_error;  
  194.  
  195.             }  
  196.  
  197.         }  
  198.  
  199.                   
  200.  
  201.         return (remain_size > 0)  
  202.  
  203.               ? util::handle_continue // 控制work过程是否继续循环  
  204.  
  205.               : util::handle_finish;  
  206.  
  207.     }  
  208.  
  209. }  
  210.  
  211.  
  212.  
  213. // 处理消息体  
  214.  
  215. // 参数说明:  
  216.  
  217. // cur_ctx - 当前上下文,  
  218.  
  219. // cur_ctx.buffer为当前收到的数据buffer,包含了消息体,但也可能包含了消息头。  
  220.  
  221. // cur_ctx.buffer_size为当前收到字节数  
  222.  
  223. // next_ctx - 下一步上下文,  
  224.  
  225. // 由于cur_ctx.buffer可能包含了消息头,所以在一次接收receive动作后,  
  226.  
  227. // 会涉及到消息头和消息体两个状态,这里的next_ctx实际为下一步handle_header的cur_ctx  
  228.  
  229. util::handle_result_t CRecvMachine::handle_body(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx)  
  230.  
  231. {  
  232.  
  233.     CProcessorManager* processor_manager = _thread->get_processor_manager();  
  234.  
  235.       
  236.  
  237.     if (_finished_size + cur_ctx.buffer_size < _header.size)  
  238.  
  239.     {  
  240.  
  241.         if (!processor_manager->on_message(_header, _finished_size, cur_ctx.buffer, cur_ctx.buffer_size))  
  242.  
  243.         {  
  244.  
  245.             return util::handle_error;  
  246.  
  247.         }  
  248.  
  249.           
  250.  
  251.         _finished_size += cur_ctx.buffer_size;  
  252.  
  253.         return util::handle_continue;  
  254.  
  255.     }  
  256.  
  257.     else 
  258.  
  259.     {  
  260.  
  261.         size_t need_size = _header.size - _finished_size;  
  262.  
  263.         if (!processor_manager->on_message(_header, _finished_size, cur_ctx.buffer, need_size))  
  264.  
  265.         {  
  266.  
  267.             return util::handle_error;  
  268.  
  269.         }  
  270.  
  271.           
  272.  
  273.         // 切换状态  
  274.  
  275.         set_next_state(rs_header);  
  276.  
  277.           
  278.  
  279.         size_t remain_size = cur_ctx.buffer_size - need_size;  
  280.  
  281.         if (remain_size > 0)  
  282.  
  283.         {  
  284.  
  285.             next_ctx->buffer = cur_ctx.buffer + need_size;  
  286.  
  287.             next_ctx->buffer_size = cur_ctx.buffer_size - need_size;  
  288.  
  289.             return util::handle_continue;  
  290.  
  291.         }  
  292.  
  293.  
  294.  
  295.         return util::handle_finish;   
  296.  
  297.     }  
  298.  
  299. }  
  300.  
  301.  
  302.  
  303. util::handle_result_t CRecvMachine::handle_error(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx)  
  304.  
  305. {  
  306.  
  307.     //AGENT_LOG_ERROR("Network error.\n");  
  308.  
  309.     set_next_state(rs_header); // 无条件切换到rs_header,这个时候应当断开连接重连接  
  310.  
  311.     return util::handle_error;  
  312.  
  313. }  
  314.  
  315.  
  316.  
  317. AGENT_NAMESPACE_END  

 



    本文转自eyjian 51CTO博客,原文链接:http://blog.51cto.com/mooon/910302,如需转载请自行联系原作者



相关文章
mooon-agent接收状态机代码摘要
recv_machine.h 点击(此处)折叠或打开 #ifndef MOOON_AGENT_RECV_MACHINE_H #define MOOON_AGENT_RECV_MACHINE_H #include agent/message.
731 0
mooon-agent发送状态机代码摘要
发送状态机比接收状态机相对要简单 send_machine.h 点击(此处)折叠或打开 #ifndef MOOON_AGENT_SEND_MACHINE_H #define MOOON_AGENT_SEND_MACHINE_H #include agent/config.
669 0
|
6月前
|
Cloud Native
云盾·数据库审计中d100适用于自建和云原生的统一日志审计吗? 客户端或APP端安装Agent是否必要?
云盾·数据库审计中d100适用于自建和云原生的统一日志审计吗? 客户端或APP端安装Agent是否必要?
42 1
|
6月前
|
监控 关系型数据库 MySQL
企业实战(8)CentOS 6.8安装Zabbix-agent 5.0监控主机性能与Mysql数据库
企业实战(8)CentOS 6.8安装Zabbix-agent 5.0监控主机性能与Mysql数据库
|
4月前
|
监控 Java
Pinpoint【部署 02】Pinpoint Agent 安装启动及监控 SpringBoot 项目案例分享(添加快速测试math-game.jar包)
Pinpoint【部署 02】Pinpoint Agent 安装启动及监控 SpringBoot 项目案例分享(添加快速测试math-game.jar包)
76 0
|
9月前
|
监控 网络协议 Shell
使用脚本安装zabbix agent 6.0(新手友好)
使用脚本安装zabbix agent 6.0(新手友好)
|
6月前
|
监控 Linux
CentOS 7安装zabbix-agent 5.0报错:依赖检测失败:libpcre.so.0(64bit)获取GPG密钥失败
CentOS 7安装zabbix-agent 5.0报错:依赖检测失败:libpcre.so.0(64bit)获取GPG密钥失败
132 0
|
6月前
|
监控 关系型数据库 MySQL
Zabbix分布式监控实战(二)——CentOS 7.5安装Zabbix-agent 5.0监控Mysql数据库
Zabbix分布式监控实战(二)——CentOS 7.5安装Zabbix-agent 5.0监控Mysql数据库
|
9月前
|
运维 监控 网络协议
【运维】Zabbix Agent安装——监控服务器冒红灯检查步骤
【运维】Zabbix Agent安装——监控服务器冒红灯检查步骤