ZeroMQ(java)中监控Socket

简介: 基本上ZeroMQ(java)中基本的代码都算是过了一遍了吧,不过觉得它在日志这一块貌似基本没有做什么工作,也就是我们通过日志来知道ZeroMQ都发生了什么事情。。而且由于ZeroMQ中将连接的建立和重连接都进行了隔离,用户不需要做什么事情来维护连接,当然这样做的好处是使程序员的编码工作变少了,但是当然也有不好的地方,那就是用户失去了对ZeroMQ整个运行阶段的控制。

基本上ZeroMQ(java)中基本的代码都算是过了一遍了吧,不过觉得它在日志这一块貌似基本没有做什么工作,也就是我们通过日志来知道ZeroMQ都发生了什么事情。。

而且由于ZeroMQ中将连接的建立和重连接都进行了隔离,用户不需要做什么事情来维护连接,当然这样做的好处是使程序员的编码工作变少了,但是当然也有不好的地方,那就是用户失去了对ZeroMQ整个运行阶段的控制。。

例如,当我们主动去连接一个远程地址,或者连接中断之后,没有日志告诉我们都这些事情发生了。。。当时对于一个消息通信系统来说,这些日志,或者说事件的监控都是至关重要的。

当然没有日志来记录这些内容,并不代表我们就无法知道当前运行情况都发生了什么事情,ZeroMQ中采用了另外一种方法来监控Socket的情况,不过这种方法比较恶心,需要建立额外的Socket来监控自己感兴趣的Socket。。。

这里先不说这些闲话了,来看看ZeroMQ中都定义了哪些监听事件:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. public static final int EVENT_CONNECTED = zmq.ZMQ.ZMQ_EVENT_CONNECTED;   //当主动建立连接建立成功之后的事件  
  2. public static final int EVENT_DELAYED = zmq.ZMQ.ZMQ_EVENT_CONNECT_DELAYED;   //连接延迟  
  3. public static final int EVENT_RETRIED = zmq.ZMQ.ZMQ_EVENT_CONNECT_RETRIED;    //尝试重新连接  
  4. public static final int EVENT_CONNECT_FAILED = zmq.ZMQ.ZMQ_EVENT_CONNECT_FAILED;   //连接失败  
  5.   
  6. public static final int EVENT_LISTENING = zmq.ZMQ.ZMQ_EVENT_LISTENING;    //建立了监听  
  7. public static final int EVENT_BIND_FAILED = zmq.ZMQ.ZMQ_EVENT_BIND_FAILED;  //bind失败  
  8.   
  9. public static final int EVENT_ACCEPTED = zmq.ZMQ.ZMQ_EVENT_ACCEPTED;   //接收到accept事件  
  10. public static final int EVENT_ACCEPT_FAILED = zmq.ZMQ.ZMQ_EVENT_ACCEPT_FAILED;   //accept出错的事件  
  11.   
  12. public static final int EVENT_CLOSED = zmq.ZMQ.ZMQ_EVENT_CLOSED;   //关闭事件  
  13. public static final int EVENT_CLOSE_FAILED = zmq.ZMQ.ZMQ_EVENT_CLOSE_FAILED;     //关闭失败  
  14. public static final int EVENT_DISCONNECTED = zmq.ZMQ.ZMQ_EVENT_DISCONNECTED;   //连接断开  
  15.   
  16. public static final int EVENT_ALL = zmq.ZMQ.ZMQ_EVENT_ALL;   //所有的事件  

上面是定义的所有可能发生的事件,具体每一种事件代表什么意思后面的注释都已经说明了,当然这里面我觉得最重要的事件有连接的断开,连接的建立,以及重连接等事件。。。

接下来我们来看看如何在ZeroMQ(java)中来监控这些事件吧,直接上代码了:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. import org.zeromq.ZMQ;  
  2.   
  3. import zmq.ZMQ.Event;  
  4.   
  5. public class Request {  
  6.     public static void main (String args[]) {  
  7.         ZMQ.Context context = ZMQ.context(1);  
  8.         ZMQ.Socket req = context.socket(ZMQ.REQ);  
  9.           
  10.         req.monitor("inproc://reqmoniter", ZMQ.EVENT_CONNECTED | ZMQ.EVENT_DISCONNECTED);  //这段代码会创建一个pair类型的socket,专门来接收当前socket发生的事件  
  11.           
  12.         final ZMQ.Socket moniter = context.socket(ZMQ.PAIR);   //这里创建一个pair类型的socket,用于与上面建立的moniter建立连接  
  13.         moniter.connect("inproc://reqmoniter");  //连接当前socket的监听  
  14.           
  15.         new Thread(new Runnable(){  
  16.   
  17.             public void run() {  
  18.                 // TODO Auto-generated method stub  
  19.                 while (true) {  
  20.                     Event event = Event.read(moniter.base());  //从当前moniter里面读取event  
  21.                     System.out.println(event.event +  "  " + event.addr);  
  22.                 }  
  23.             }  
  24.               
  25.         }).start();  
  26.           
  27.         req.connect("tcp://127.0.0.1:5000");  
  28.           
  29.         while (true) {  
  30.             req.send("aaa");  
  31.             req.recv();  
  32.         }  
  33.           
  34.           
  35.     }  
  36. }  
  37.   
  38. package monit;  
  39.   
  40. import org.zeromq.ZMQ;  
  41.   
  42. public class Response {  
  43.     public static void main(String args[]) {  
  44.         final ZMQ.Context context = ZMQ.context(1);  
  45.           
  46.   
  47.       
  48.         ZMQ.Socket response = context.socket(ZMQ.REP);  
  49.         response.bind("tcp://*:5000");  
  50.         while (!Thread.currentThread().isInterrupted()) {  
  51.             response.recv();  
  52.             response.send("hello".getBytes());  
  53.             try {  
  54.                 Thread.currentThread().sleep(100);  
  55.             } catch (InterruptedException e) {  
  56.                 // TODO Auto-generated catch block  
  57.                 e.printStackTrace();  
  58.             }  
  59.         }  
  60.         response.close();  
  61.         context.term();  
  62.     }  
  63. }  

这里代码用req/rep来举例子的,其实用起来还算是比较的简单,当连接建立之后,将会输出:

1  tcp://127.0.0.1:5000

1就是代表连接建立的事件,后面是建立连接的地址,

这个时候如果关闭response,那么将会输出如下:

512  tcp://127.0.0.1:5000

512代表的就是连接断开的事件。。。


嗯,至于说监控的源码是怎么实现的,这里就不细说了,其实还是非常的简单的,如果有兴趣可以自己去看看。。


到此ZeroMQ(java)的代码就算是读的差不多了,算是告一段落吧,接下来好好看看书。。。另外有想 再搞搞C/C++方面的东西。。。初步计划看看Redis的源码吧。。。。

若转载请注明出处!若有疑问,请回复交流!
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
7天前
|
运维 监控 Java
Java 性能监控和故障排查技巧
【4月更文挑战第19天】Java应用的开发与运行需重视性能监控和故障排查,以确保稳定性和可靠性。利用日志记录、系统性能指标和VisualVM等工具进行监控。通过错误日志分析、分阶段测试、内存和线程分析来排查问题。关注性能瓶颈定位,并提前规划、定期监控,提升数据分析能力和团队协作,以优化应用性能。
|
18天前
|
监控 Java Linux
linux下监控java进程 实现自动重启服务
linux下监控java进程 实现自动重启服务
|
4月前
|
Java Android开发
如何在Java中创建Socket连接?
如何在Java中创建Socket连接?
69 0
|
2月前
|
监控 Unix Linux
socket监控进程,并对程序执行有关操作。
socket监控进程,并对程序执行有关操作。
|
2月前
|
Java
[Java]Socket套接字(网络编程入门)
[Java]Socket套接字(网络编程入门)
38 0
|
3月前
|
监控 应用服务中间件
idea debug模式启动Tomcat报错:Error running ‘tomcat8‘: java.net.SocketException “socket closed“
idea debug模式启动Tomcat报错:Error running ‘tomcat8‘: java.net.SocketException “socket closed“
|
3月前
|
Java
基于socket实现java Swing简易聊天室[附完整源码]
基于socket实现java Swing简易聊天室[附完整源码]
|
3月前
|
数据可视化 Java 关系型数据库
基于java Swing 框架使用socket技术开发的即时通讯系统【源码+数据库】
基于java Swing 框架使用socket技术开发的即时通讯系统【源码+数据库】
|
4月前
|
分布式计算 Linux Spark
【已解决】Caused by: java.net.SocketException: Connection reset by peer: socket write error
【已解决】Caused by: java.net.SocketException: Connection reset by peer: socket write error
72 0
|
4月前
|
监控 Java
Zabbix【部署 02】Zabbix-Java-Gateway安装配置使用(使用Zabbix-Java-Gateway通过JMX监控Java应用程序实例分享)
Zabbix【部署 02】Zabbix-Java-Gateway安装配置使用(使用Zabbix-Java-Gateway通过JMX监控Java应用程序实例分享)
90 0