今个也不前言了,直接切入主题 !


请大家多关注下我的个人博客,http://xiaorui.cc


同事在做一个关于方便的批量上线初始化的平台,其实我真不想说平台两个字,啥都平台,显得有些不时尚了 。 ansible是个好东西,只是上线打包初始化的任务太耗时间,经常是20分钟左右。这时候需要异步的执行,对于ansible来说,客户端把消耗时间的任务通过signal信号来处理,把结果扔到以jid为文件名的文件里。。。

(这两天写个用tornado 实现ansible的web ui)   

原文:http://rfyiamcool.blog.51cto.com/1030776/1422263


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#xiaorui.cc
import  sys
import  time
import  ansible.runner
runner  =  ansible.runner.Runner(
    module_name = 'shell' ,
    module_args = sys.argv[ 1 ],
    pattern = 'web' ,
)
datastructure  =  runner.run_async( 30 )
print  datastructure
print  datastructure[ 1 ].poll()
time.sleep( 3 )
print  datastructure[ 1 ].poll()
time.sleep( 3 )
print  datastructure[ 1 ].poll()
time.sleep( 3 )
print  datastructure[ 1 ].poll()


终端执行的结果:

1
2
3
4
5
6
[root@ 67  ~] # python cmd.py "sleep 6;echo 1111"
({ 'dark' : {},  'contacted' : { '10.10.10.66' : {u 'started' 1 'invocation' : { 'module_name' 'shell' 'module_args' 'sleep 6;echo 1111' }, u 'results_file' : u '/root/.ansible_async/637134991839' , u 'ansible_job_id' : u '637134991839' }}}, <ansible.runner.poller.AsyncPoller  object  at  0x9d75c2c >)
{ 'dark' : {},  'polled' : { '10.10.10.66' : {u 'started' 1 'invocation' : { 'module_name' 'async_status' 'module_args' : u 'jid=637134991839' }, u 'results_file' : u '/root/.ansible_async/637134991839' , u 'ansible_job_id' : u '637134991839' , u 'changed' False }},  'contacted' : {}}
{ 'dark' : {},  'polled' : { '10.10.10.66' : {u 'started' 1 'invocation' : { 'module_name' 'async_status' 'module_args' : u 'jid=637134991839' }, u 'results_file' : u '/root/.ansible_async/637134991839' , u 'ansible_job_id' : u '637134991839' , u 'changed' False }},  'contacted' : {}}
{ 'dark' : {},  'polled' : {},  'contacted' : { '10.10.10.66' : {u 'changed' True , u 'end' : u '2014-06-05 07:06:19.426764' , u 'ansible_job_id' : u '637134991839' , u 'stdout' : u '1111' , u 'cmd' : u 'sleep 6;echo 1111 ' 'invocation' : { 'module_name' 'async_status' 'module_args' : u 'jid=637134991839' }, u 'start' : u '2014-06-05 07:06:13.420793' , u 'finished' 1 , u 'stderr' : u' ', u' rc ': 0, u' delta ': u' 0 : 00 : 06.005971 '}}}
{ 'dark' : {},  'polled' : {},  'contacted' : { '10.10.10.66' : {u 'changed' True , u 'end' : u '2014-06-05 07:06:19.426764' , u 'ansible_job_id' : u '637134991839' , u 'stdout' : u '1111' , u 'cmd' : u 'sleep 6;echo 1111 ' 'invocation' : { 'module_name' 'async_status' 'module_args' : u 'jid=637134991839' }, u 'start' : u '2014-06-05 07:06:13.420793' , u 'finished' 1 , u 'stderr' : u' ', u' rc ': 0, u' delta ': u' 0 : 00 : 06.005971 '}}}

原文:http://rfyiamcool.blog.51cto.com/1030776/1422263


wKiom1OPRLXS8lMbAASdiMRI_fo505.jpg


在运行的过程中,我强制中断主进程。

因为任务是fork到守候进程,所以任务还是被执行的。

1
2
[root@66 ~] # cat /root/.ansible_async/656009932164
{ "changed" true "end" "2014-06-05 07:10:15.741252" "stdout" "1111" "cmd" "sleep 10;echo 1111 " "start" "2014-06-05 07:10:05.736109" "delta" "0:00:10.005143" "stderr" "" "rc" : 0}

原文:http://rfyiamcool.blog.51cto.com/1030776/1422263


想知道他是怎么异步的获取状态的么? 

比起saltstack这种逆天的工具来说,ansible的返回值显得有点不时尚。 

他的任务结果会输出到.ansible_aysnc的一个token文件里面的。然后你每次去poll任务,他都是拿着这个jid,然后ssh到客户端cat数据,然后把数据json.loads。


当执行异步任务的时候,我监控了22端口的行为数据。

配合上面的python调用ansible api 的那个脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
T 10.10.10.66:22 -> 10.10.10.67:33046 [A]
 
T 10.10.10.66:22 -> 10.10.10.67:33046 [A]
 
T 10.10.10.66:22 -> 10.10.10.67:33046 [A]
 
T 10.10.10.66:22 -> 10.10.10.67:33046 [A]
 
T 10.10.10.66:22 -> 10.10.10.67:33046 [A]
 
T 10.10.10.66:22 -> 10.10.10.67:33046 [A]
 
T 10.10.10.66:22 -> 10.10.10.67:33046 [A]
 
T 10.10.10.66:22 -> 10.10.10.67:33046 [A]
 
T 10.10.10.66:22 -> 10.10.10.67:33046 [A]
 
T 10.10.10.66:22 -> 10.10.10.67:33046 [A]
 
T 10.10.10.66:22 -> 10.10.10.67:33046 [A]
 
T 10.10.10.66:22 -> 10.10.10.67:33046 [A]
 
T 10.10.10.67:33046 -> 10.10.10.66:22 [A]
 
T 10.10.10.67:33046 -> 10.10.10.66:22 [A]
 
T 10.10.10.67:33046 -> 10.10.10.66:22 [A]
 
T 10.10.10.67:33046 -> 10.10.10.66:22 [A]
 
T 10.10.10.67:33046 -> 10.10.10.66:22 [AR
xiaorui.cc


进程的监控,会看到我每次poll数据的时候,都是派生了一个新进程。

1
2
3
4
5
[root@66  /var ] # ps aux|grep python
root      1216  0.0  1.9  74456 20392 ?        Sl   06:18   0:04  /usr/bin/python  /usr/bin/salt-minion  -d
root      6251  0.0  0.2   9312  2512 ?        S    07:45   0:00  /usr/bin/python  /root/ .ansible /tmp/ansible-tmp-1401896304 .86-33623655469433 /async_wrapper  701347917770 30  /root/ .ansible /tmp/ansible-tmp-1401896304 .86-33623655469433 /command  /root/ .ansible /tmp/ansible-tmp-1401896304 .86-33623655469433 /arguments
root      6252  0.0  0.2   9312  2784 ?        S    07:45   0:00  /usr/bin/python  /root/ .ansible /tmp/ansible-tmp-1401896304 .86-33623655469433 /async_wrapper  701347917770 30  /root/ .ansible /tmp/ansible-tmp-1401896304 .86-33623655469433 /command  /root/ .ansible /tmp/ansible-tmp-1401896304 .86-33623655469433 /arguments
root      6253  2.0  0.6  15000  6376 ?        S    07:45   0:00  /usr/bin/python  /root/ .ansible /tmp/ansible-tmp-1401896304 .86-33623655469433 /command  /root/ .ansible /tmp/ansible-tmp-1401896304 .86-33623655469433 /ar


看看客户端的那几个临时文件

1
2
3
4
5
6
7
[root@66 ansible-tmp-1401896171.16-200148091205946] # ll
总用量 60
#xiaorui.cc
-rw-r--r--. 1 root root    28 6月   5 07:43 arguments
-rw-r--r--. 1 root root  6159 6月   5 07:43 async_wrapper
-rwxr-xr-x. 1 root root 47795 6月   5 07:43  command
[root@66 ansible-tmp-1401896171.16-200148091205946] #


原文:http://rfyiamcool.blog.51cto.com/1030776/1422263


command是模块,用来执行linux命令的。argument文件里面是给command传递的参数,async_wrapper这个是关键。  是这次任务能异步起来的核心文件。


原文:http://rfyiamcool.blog.51cto.com/1030776/1422263


看这段代码可以看出ansible使用异步的时候,所采用的一些方法。


看了这个脚本,在结合上线咱们ps aux抓到的正在执行的ansible过来的ssh执行的命令。

/usr/bin/python /root/.ansible/tmp/ansible-tmp-1401896304.86-33623655469433/async_wrapper 701347917770 30 /root/.ansible/tmp/ansible-tmp-1401896304.86-33623655469433/command /root/.ansible/tmp/ansible-tmp-1401896304.86-33623655469433/arguments


给async_wrapper传递了四个参数,jid,超时的时间,引用的模块,传递给模块的参数。 jid给aysnc_wrapper,是为了把结果序列化json存入到jid文件里面。给的时间,是用来通过signal信号做超时的限制 !  模块和参数就不讲了,你应该懂!


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/python
# -*- coding: utf-8 -*-
 
 
try :
     import  json
except  ImportError:
     import  simplejson as json
import  shlex
import  os
import  subprocess
import  sys
import  datetime
import  traceback
import  signal
import  time
import  syslog
 
def  daemonize_self():
     # daemonizing code: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
     # logger.info("cobblerd started")
     try :
         pid  =  os.fork()
         if  pid >  0 :
             # exit first parent
             sys.exit( 0 )
     except  OSError, e:
         print  >>sys.stderr,  "fork #1 failed: %d (%s)"  %  (e.errno, e.strerror)
         sys.exit( 1 )
 
     # decouple from parent environment
     os.chdir( "/" )
     os.setsid()
     os.umask( 022 )
 
     # do second fork
     try :
         pid  =  os.fork()
         if  pid >  0 :
             # print "Daemon PID %d" % pid
             sys.exit( 0 )
     except  OSError, e:
         print  >>sys.stderr,  "fork #2 failed: %d (%s)"  %  (e.errno, e.strerror)
         sys.exit( 1 )
 
     dev_null  =  file ( '/dev/null' , 'rw' )
     os.dup2(dev_null.fileno(), sys.stdin.fileno())
     os.dup2(dev_null.fileno(), sys.stdout.fileno())
     os.dup2(dev_null.fileno(), sys.stderr.fileno())
 
if  len (sys.argv) <  3 :
     print  json.dumps({
         "failed"  True ,
         "msg"     "usage: async_wrapper <jid> <time_limit> <modulescript> <argsfile>.  Humans, do not call directly!"
     })
     sys.exit( 1 )
 
jid  =  sys.argv[ 1 ]
time_limit  =  sys.argv[ 2 ]
wrapped_module  =  sys.argv[ 3 ]
argsfile  =  sys.argv[ 4 ]
cmd  =  "%s %s"  %  (wrapped_module, argsfile)
 
syslog.openlog( 'ansible-%s'  %  os.path.basename(__file__))
syslog.syslog(syslog.LOG_NOTICE,  'Invoked with %s'  %  " " .join(sys.argv[ 1 :]))
 
# setup logging directory
logdir  =  os.path.expanduser( "~/.ansible_async" )
log_path  =  os.path.join(logdir, jid)
 
if  not  os.path.exists(logdir):
     try :
         os.makedirs(logdir)
     except :
         print  json.dumps({
             "failed"  1 ,
             "msg"  "could not create: %s"  %  logdir
         })
 
def  _run_command(wrapped_cmd, jid, log_path):
 
     logfile  =  open (log_path,  "w" )
     logfile.write(json.dumps({  "started"  1 "ansible_job_id"  : jid }))
     logfile.close()
     logfile  =  open (log_path,  "w" )
     result  =  {}
 
     outdata  =  ''
     try :
         cmd  =  shlex.split(wrapped_cmd)
         script  =  subprocess.Popen(cmd, shell = False ,
             stdin = None , stdout = logfile, stderr = logfile)
         script.communicate()
         outdata  =  file (log_path).read()
         result  =  json.loads(outdata)
 
     except  (OSError, IOError), e:
         result  =  {
             "failed" 1 ,
             "cmd"  : wrapped_cmd,
             "msg" str (e),
         }
         result[ 'ansible_job_id' =  jid
         logfile.write(json.dumps(result))
     except :
         result  =  {
             "failed"  1 ,
             "cmd"  : wrapped_cmd,
             "data"  : outdata,  # temporary debug only
             "msg"  : traceback.format_exc()
         }
         result[ 'ansible_job_id' =  jid
         logfile.write(json.dumps(result))
     logfile.close()
 
# immediately exit this process, leaving an orphaned process
# running which immediately forks a supervisory timing process
 
#import logging
#import logging.handlers
 
#logger = logging.getLogger("ansible_async")
#logger.setLevel(logging.WARNING)
#logger.addHandler( logging.handlers.SysLogHandler("/dev/log") )
def  debug(msg):
     #logger.warning(msg)
     pass
 
try :
     pid  =  os.fork()
     if  pid:
         # Notify the overlord that the async process started
 
         # we need to not return immmediately such that the launched command has an attempt
         # to initialize PRIOR to ansible trying to clean up the launch directory (and argsfile)
         # this probably could be done with some IPC later.  Modules should always read
         # the argsfile at the very first start of their execution anyway
         time.sleep( 1 )
         debug( "Return async_wrapper task started." )
         print  json.dumps({  "started"  1 "ansible_job_id"  : jid,  "results_file"  : log_path })
         sys.stdout.flush()
         sys.exit( 0 )
     else :
         # The actual wrapper process
 
         # Daemonize, so we keep on running
         daemonize_self()
 
         # we are now daemonized, create a supervisory process
         debug( "Starting module and watcher" )
 
         sub_pid  =  os.fork()
         if  sub_pid:
             # the parent stops the process after the time limit
             remaining  =  int (time_limit)
 
             # set the child process group id to kill all children
             os.setpgid(sub_pid, sub_pid)
 
             debug( "Start watching %s (%s)" % (sub_pid, remaining))
             time.sleep( 5 )
             while  os.waitpid(sub_pid, os.WNOHANG)  = =  ( 0 0 ):
                 debug( "%s still running (%s)" % (sub_pid, remaining))
                 time.sleep( 5 )
                 remaining  =  remaining  -  5
                 if  remaining < =  0 :
                     debug( "Now killing %s" % (sub_pid))
                     os.killpg(sub_pid, signal.SIGKILL)
                     debug( "Sent kill to group %s" % sub_pid)
                     time.sleep( 1 )
                     sys.exit( 0 )
             debug( "Done in kid B." )
             os._exit( 0 )
         else :
             # the child process runs the actual module
             debug( "Start module (%s)" % os.getpid())
             _run_command(cmd, jid, log_path)
             debug( "Module complete (%s)" % os.getpid())
             sys.exit( 0 )
 
except  Exception, err:
     debug( "error: %s" % (err))
     raise  err
     原文:http: / / rfyiamcool.blog. 51cto .com / 1030776 / 1422263




话说,ansible为了兼容别的语言,还真是做了不少的牺牲。一切都是各种标准输入输出的。。。。哎,如果把ansible api 的async处理方式用在ansible的web平台上,有些不合适。当然,你如果耗得起貌似都ssh过去,认证,执行,然后扔到async_wrapper文件里面的过程,那你.......(nx)。 话说ansible真的不快......你真的可以试试。。。。 

建议大家看看ansible api的源码,实现下returners的方式。  同事已经实现了,其实很简单,有兴趣的朋友,可以在看看我上次的那个ansible runner源码的文章,肯定能找到思路。   还有大家也可以通过修改 async_wrapper的代码实现returner。   这两天有时间会写点 ansible做web平台上遇到的问题。






 本文转自 rfyiamcool 51CTO博客,原文链接:http://blog.51cto.com/rfyiamcool/1422263 ,如需转载请自行联系原作者