使用Zabbix监控RabbitMQ

简介:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
写这个的思路
 
监控脚本主要包括三个部分,监控overview,监控当前主机的节点信息,还有监控各个队列
根据网上的脚本进行了修改,新增加了很多监控项目,把原来脚本中的 filter 去掉了
这里顺便提一下,对于网上的各种代码,不能拿来就用,要结合自身的需求对代码进行分析,
也可以提升自己的编码能力,如果只是一味地拿来就用,那永远也得不到提高。
 
 
rabbitmq.queues[{ #VHOSTNAME},queue_messages_ready,{#QUEUENAME}]
 
 
key  =  '"rabbitmq.queues[{0},queue_{1},{2}]"' . format (queue[ 'vhost' ], item, queue[ 'name' ])
value  =  queue.get(item,  0 )
rabbitmq.queues[{ #VHOSTNAME},queue_messages_ready,{#QUEUENAME}]
UserParameter = rabbitmq.discovery_queue, / usr / bin / python  / opt / app / zabbix / sbin / rabbitmq_status.py  - - check = list_queues
UserParameter = rabbitmq.queues, / usr / bin / python  / opt / app / zabbix / sbin / rabbitmq_status.py  - - check = queues
UserParameter = rabbitmq[ * ], / usr / bin / python  / opt / app / zabbix / sbin / rabbitmq_status.py  - - check = $ 1  - - metric = $ 2
value  =  queue[ 0 ].get( 'message_stats' , {}).get( 'deliver_get' 0 )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#!/usr/bin/env /usr/bin/python
'''Python module to query the RabbitMQ Management Plugin REST API and get
results that can then be used by Zabbix.
https://github.com/jasonmcintosh/rabbitmq-zabbix
'''
'''
     This script is tested on RabbitMQ 3.5.3 
'''
import  json
import  optparse
import  socket
import  urllib2
import  subprocess
import  tempfile
import  os
import  logging
  
logging.basicConfig(filename = '/var/log/rabbitmq_zabbix.log' , level = logging.WARNING,  format = '%(asctime)s %(levelname)s: %(message)s' )
  
class  RabbitMQAPI( object ):
     '''Class for RabbitMQ Management API'''
  
     def  __init__( self , user_name = 'guest' , password = 'guest' , host_name = '',
                  protocol = 'http' , port = 15672 , conf = '/usr/local/zabbix/etc/zabbix_agentd.conf' , senderhostname = None ):
         self .user_name  =  user_name
         self .password  =  password
         self .host_name  =  host_name  or  socket.gethostname()
         self .protocol  =  protocol
         self .port  =  port
         self .conf  =  conf  or  '/usr/local/zabbix/etc/zabbix_agentd.conf'
         self .senderhostname  =  senderhostname  if  senderhostname  else  host_name
  
     def  call_api( self , path):
         '''
            All URIs will server only resource of type application/json,and will require HTTP basic authentication. The default username and password is guest/guest.  /%sf is encoded for the default virtual host '/' 
         '''
         url  =  '{0}://{1}:{2}/api/{3}' . format ( self .protocol,  self .host_name,  self .port, path)
         password_mgr  =  urllib2.HTTPPasswordMgrWithDefaultRealm()
         password_mgr.add_password( None , url,  self .user_name,  self .password)
         handler  =  urllib2.HTTPBasicAuthHandler(password_mgr)
         logging.debug( 'Issue a rabbit API call to get data on '  +  path)
######## json.loads()  transfer json data to python data
######## json.dump()   transfer python data to json data
         return  json.loads(urllib2.build_opener(handler). open (url).read())
  
     def  list_queues( self ):
         ''' curl -i -u guest:guest http://localhost:15672/api/queues  
         return a list 
         '''
         queues  =  []
         for  queue  in  self .call_api( 'queues' ):
             logging.debug( "Discovered queue "  +  queue[ 'name' ])
             element  =  { '{#VHOSTNAME}' : queue[ 'vhost' ],
                        '{#QUEUENAME}' : queue[ 'name' ]
                       }
             queues.append(element)
             logging.debug( 'Discovered queue ' + queue[ 'vhost' ] + '/' + queue[ 'name' ])
         return  queues
  
     def  list_nodes( self ):
         '''Lists all rabbitMQ nodes in the cluster'''
         nodes  =  []
         for  node  in  self .call_api( 'nodes' ):
             # We need to return the node name, because Zabbix
             # does not support @ as an item parameter
             name  =  node[ 'name' ].split( '@' )[ 1 ]
             element  =  { '{#NODENAME}' : name,
                        '{#NODETYPE}' : node[ 'type' ]}
             nodes.append(element)
             logging.debug( 'Discovered nodes ' + name + '/' + node[ 'type' ])
         return  nodes
  
     def  check_queue( self ):
         '''Return the value for a specific item in a queue's details.'''
         return_code  =  0
         #### use tempfile module to create a file on memory, will not be deleted when it is closed , because 'delete' argument is set to False
         rdatafile  =  tempfile.NamedTemporaryFile(delete = False )
         for  queue  in  self .call_api( 'queues' ):
             self ._get_queue_data(queue, rdatafile)
         rdatafile.close()
         return_code  =  self ._send_queue_data(rdatafile)
         #### os.unlink is used to remove a file
         os.unlink(rdatafile.name)
         return  return_code
  
     def  _get_queue_data( self , queue, tmpfile):
         '''Prepare the queue data for sending'''
         '''
           ### one single  queue's information like this #####
           ### curl -i -u guest:guest http://localhost:15672/api/queues   dumps a list   ###
{"memory":32064,"message_stats":{"ack":3870,"ack_details":{"rate":0.0},"deliver":3871,"deliver_details":{"rate":0.0},"deliver_get":3871,"deliver_get_details":{"rate":0.0},"disk_writes":3870,"disk_writes_details":{"rate":0.0},"publish":3870,"publish_details":{"rate":0.0},"redeliver":1,"redeliver_details":{"rate":0.0}},"messages":0,"messages_details":{"rate":0.0},"messages_ready":0,"messages_ready_details":{"rate":0.0},"messages_unacknowledged":0,"messages_unacknowledged_details":{"rate":0.0},"idle_since":"2016-03-01 22:04:22","consumer_utilisation":"","policy":"","exclusive_consumer_tag":"","consumers":4,"recoverable_slaves":"","state":"running","messages_ram":0,"messages_ready_ram":0,"messages_unacknowledged_ram":0,"messages_persistent":0,"message_bytes":0,"message_bytes_ready":0,"message_bytes_unacknowledged":0,"message_bytes_ram":0,"message_bytes_persistent":0,"disk_reads":0,"disk_writes":3870,"backing_queue_status":{"q1":0,"q2":0,"delta":["delta",0,0,0],"q3":0,"q4":0,"len":0,"target_ram_count":"infinity","next_seq_id":3870,"avg_ingress_rate":0.060962064328682466,"avg_egress_rate":0.060962064328682466,"avg_ack_ingress_rate":0.060962064328682466,"avg_ack_egress_rate":0.060962064328682466},"name":"app000","vhost":"/","durable":true,"auto_delete":false,"arguments":{},"node":"rabbit@test2"}
         '''
         for  item  in  'memory' , 'messages' , 'messages_ready' , 'messages_unacknowledged' , 'consumers'  ]:
             #key = rabbitmq.queues[/,queue_memory,queue.helloWorld]
             key  =  '"rabbitmq.queues[{0},queue_{1},{2}]"' . format (queue[ 'vhost' ], item, queue[ 'name' ])
             ### if item is in queue,value=queue[item],else value=0
             value  =  queue.get(item,  0 )
             logging.debug( "SENDER_DATA: - %s %s"  %  (key,value))
             tmpfile.write( "- %s %s\n"  %  (key, value))
         ##  This is a non standard bit of information added after the standard items
         for  item  in  [ 'deliver_get' 'publish' ]:
             key  =  '"rabbitmq.queues[{0},queue_message_stats_{1},{2}]"' . format (queue[ 'vhost' ], item, queue[ 'name' ])
             value  =  queue.get( 'message_stats' , {}).get(item,  0 )
             logging.debug( "SENDER_DATA: - %s %s"  %  (key,value))
             tmpfile.write( "- %s %s\n"  %  (key, value))
  
     def  _send_queue_data( self , tmpfile):
         '''Send the queue data to Zabbix.'''
         '''Get key value from temp file. '''
         args  =  '/usr/local/zabbix/sbin/zabbix_sender -c {0} -i {1}'
         if  self .senderhostname:
             args  =  args  +  " -s "  +  self .senderhostname
         return_code  =  0
         process  =  subprocess.Popen(args. format ( self .conf, tmpfile.name),
                                            shell = True , stdout = subprocess.PIPE,
                                            stderr = subprocess.PIPE)
         out, err  =  process.communicate()
         logging.debug( "Finished sending data" )
         return_code  =  process.wait()
         logging.info( "Found return code of "  +  str (return_code))
         if  return_code ! =  0 :
             logging.warning(out)
             logging.warning(err)
         else :
             logging.debug(err)
             logging.debug(out)
         return  return_code
  
     def  check_aliveness( self ):
         '''Check the aliveness status of a given vhost. '''
         '''virtual host '/' should be encoded as '/%2f' ''' 
         return  self .call_api( 'aliveness-test/%2f' )[ 'status' ]
  
     def  check_overview( self , item):
         '''First, check the overview specific items'''
         ''' curl -i -u guest:guest http://localhost:15672/api/overview   '''
         ## rabbitmq[overview,connections]
         if    item  in  'channels' , 'connections' , 'consumers' , 'exchanges' , 'queues'  ]: 
           return  self .call_api( 'overview' ).get( 'object_totals' ).get(item, 0 )
         ## rabbitmq[overview,messages]
         elif  item  in  'messages' , 'messages_ready' , 'messages_unacknowledged'  ]:
           return  self .call_api( 'overview' ).get( 'queue_totals' ).get(item, 0 )
         elif  item  = =  'message_stats_deliver_get' :
           return  self .call_api( 'overview' ).get( 'message_stats' , {}).get( 'deliver_get' , 0 )
         elif  item  = =  'message_stats_publish' :
           return  self .call_api( 'overview' ).get( 'message_stats' , {}).get( 'publish' , 0 )
         elif  item  = =  'message_stats_ack' :
           return  self .call_api( 'overview' ).get( 'message_stats' , {}).get( 'ack' , 0 )
         elif  item  = =  'message_stats_redeliver' :
           return  self .call_api( 'overview' ).get( 'message_stats' , {}).get( 'redeliver' , 0 )
         elif  item  = =  'rabbitmq_version' :
           return  self .call_api( 'overview' ).get( 'rabbitmq_version' 'None' )
  
     def  check_server( self ,item,node_name):
         '''Return the value for a specific item in a node's details. '''
         '''curl -i -u guest:guest http://localhost:15672/api/nodes'''
         '''return a list'''
         # hostname     hk-prod-mq1.example.com
         # self.call_api('nodes')[0]['name']   rabbit@hk-prod-mq1
         node_name  =  node_name.split( '.' )[ 0 ]
         for  nodeData  in  self .call_api( 'nodes' ):
             if  node_name  in  nodeData[ 'name' ]:
                 return  nodeData.get(item, 0 )
         return  'Not Found'
  
  
def  main():
     '''Command-line parameters and decoding for Zabbix use/consumption.'''
     choices  =  [ 'list_queues' 'list_nodes' 'queues' 'check_aliveness' ,
                'overview' , 'server' ]
     parser  =  optparse.OptionParser()
     parser.add_option( '--username' help = 'RabbitMQ API username' ,
                       default = 'guest' )
     parser.add_option( '--password' help = 'RabbitMQ API password' ,
                       default = 'guest' )
     parser.add_option( '--hostname' help = 'RabbitMQ API host' ,
                       default = socket.gethostname())
     parser.add_option( '--protocol' help = 'RabbitMQ API protocol (http or https)' ,
                       default = 'http' )
     parser.add_option( '--port' help = 'RabbitMQ API port' type = 'int' ,
                       default = 15672 )
     parser.add_option( '--check' type = 'choice' , choices = choices,
                       help = 'Type of check' )
     parser.add_option( '--metric' help = 'Which metric to evaluate' , default = '')
     parser.add_option( '--node' help = 'Which node to check (valid for --check=server)' )
     parser.add_option( '--conf' , default = '/usr/local/zabbix/etc/zabbix_agentd.conf' )
     parser.add_option( '--senderhostname' , default = ' ', help=' Allows including a sender parameter on calls to zabbix_sender')
     (options, args)  =  parser.parse_args()
     if  not  options.check:
         parser.error( 'At least one check should be specified' )
     logging.debug( "Started trying to process data" )
     api  =  RabbitMQAPI(user_name = options.username, password = options.password,
                       host_name = options.hostname, protocol = options.protocol, port = options.port,
                       conf = options.conf, senderhostname = options.senderhostname)
  
     if  options.check  = =  'list_queues' :
         print  json.dumps({ 'data' : api.list_queues()},indent = 4 ,separators = ( ',' , ':' ))
     elif  options.check  = =  'list_nodes' :
         print  json.dumps({ 'data' : api.list_nodes()},indent = 4 ,separators = ( ',' , ':' ))
     elif  options.check  = =  'queues' :
         print  api.check_queue()
     elif  options.check  = =  'check_aliveness' :
         print  api.check_aliveness()
     elif  options.check  = =  'overview' :
     #rabbitmq[overview,connections]
     #--check=overview   --metric=connections
         if  not  options.metric:
             parser.error( 'Missing required parameter: "metric"' )       
         else :
             if  options.node:
                 print  api.check_overview(options.metric)
             else :
                 print  api.check_overview(options.metric)
     elif  options.check  = =  'server' :
     #rabbitmq[server,sockets_used]
     #--check=server   --metric=sockets_used
          if  not  options.metric:
             parser.error( 'Missing required parameter: "metric"' )
          else :
             if  options.node:
                 print  api.check_server(options.metric,options.node)
             else :
                 print  api.check_server(options.metric,api.host_name)
   
  
if  __name__  = =  '__main__' :
     main()
1
2
3
4
5
6
7
8
9
修改的地方
1.default = socket.gethostbyname(socket.gethostname())
2.logging .warning(args) debug不能显示,不知道什么原因
3.args  =  '/usr/local/zabbix/bin/zabbix_sender -c {0} -i {1}' . format ( self .conf, tmpfile.name)
4.args  =  args  +  " -s "  +  self .senderhostname  +  " -z "  +  serverip
5.subprocess .Popen()
process  =  subprocess.Popen(args.shell = True , stdout = subprocess.PIPE,stderr = subprocess.PIPE)
out, err  =  process.communicate()
return_code  =  process.wait()



原文http://john88wang.blog.51cto.com/2165294/1745824

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8天前
|
存储 SQL 监控
修改Zabbix源码实现监控数据双写,满足业务需求!
虽然对接Elasticsearch后有诸多好处,但是它不往数据库写历史数据了,同时还不再计算趋势数据了。有这么一个场景...
修改Zabbix源码实现监控数据双写,满足业务需求!
|
8天前
|
数据采集 监控 数据库
OceanBase社区版可以通过Zabbix监控
OceanBase社区版可以通过Zabbix监控
94 4
|
5月前
|
监控 关系型数据库 机器人
小白带你学习linux的监控平台zabbix
小白带你学习linux的监控平台zabbix
150 0
|
7月前
|
监控 关系型数据库 MySQL
企业实战(8)CentOS 6.8安装Zabbix-agent 5.0监控主机性能与Mysql数据库
企业实战(8)CentOS 6.8安装Zabbix-agent 5.0监控主机性能与Mysql数据库
|
8天前
|
监控 网络协议 Unix
centos7 zabbix安装客户端agent -配置监控远程主机 在需要监控的电脑上安装
centos7 zabbix安装客户端agent -配置监控远程主机 在需要监控的电脑上安装
34 0
|
8天前
|
数据采集 监控 数据库
请问OceanBase社区版能否通过zabbix监控,然后将报错信息展现到grafana?
【2月更文挑战第25天】请问OceanBase社区版能否通过zabbix监控,然后将报错信息展现到grafana?
27 2
|
7月前
|
监控
zabbix如何添加自定义监控项
zabbix如何添加自定义监控项
295 0
|
8天前
|
消息中间件 Web App开发 监控
mqtt数据问题之如何实现webRTC 协议的监控视频压测
MQTT协议是一个轻量级的消息传输协议,设计用于物联网(IoT)环境中设备间的通信;本合集将详细阐述MQTT协议的基本原理、特性以及各种实际应用场景,供用户学习和参考。
68 0
|
8天前
|
监控 Cloud Native 关系型数据库
使用 Grafana 统一监控展示 - 对接 Zabbix
使用 Grafana 统一监控展示 - 对接 Zabbix

推荐镜像

更多