Python 脚本学习笔记(五)集中式病毒扫描,端口扫描以及分段数据库操作

简介:

      Clam AntiVirus是一个免费而且开放源码的防毒软件,软件与病毒库的更新由开源社区免费发布,目前ClamdAV主要为Linux、Uinux系统提供病毒扫描查杀pyClamad是一个python的第三方模块,可让python直接使用ClamAV病毒扫描守护进程clamd来实现一个高效的病毒检测功能。


一、实现集中式的病毒扫描

1、安装clamavp clamd 服务的相关程序包

yum install clamav clamd clamav-update -y

chkconfig clamd on

更新病毒库

/usr/bin/freshclam

更改配置文件修改监听地址到所有网络,启动服务

sed -i -e '/^TCPAddr/{ s/127.0.0.1/0.0.0.0/;}' /etc/clamd.conf

/etc/init.d/clamd start


2、安装pyClamd模块

pip2.7  install pyClamd


工作原理:管理服务器通过python发出多线程指令连接业务服务器的3310端口,执行病毒扫描,然后返回结果给管理服务器。 业务服务器必须安装clamd相关程序包,并启动服务监听在3310端口才能正常收到指令;


实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import  time
import  pyclamd
from  threading  import  Thread
 
class  Scan(Thread):  #继承多线程Thread类
 
     def  __init__ ( self ,IP,scan_type, file ):
         """构造方法"""
         Thread.__init__( self )
         self .IP  =  IP
         self .scan_type = scan_type
         self . file  =  file
         self .connstr = ""
         self .scanresult = ""
 
 
     def  run( self ):
         """多进程run方法"""
 
         try :
             cd  =  pyclamd.ClamdNetworkSocket( self .IP, 3310 )
             """探测连通性"""
             if  cd.ping():
                 self .connstr = self .IP + " connection [OK]"
                 """重载clamd病毒特征库"""
                 cd. reload ()
                 """判断扫描模式"""
                 if  self .scan_type = = "contscan_file" :
                     self .scanresult = "{0}\n" . format (cd.contscan_file( self . file ))
                 elif  self .scan_type = = "multiscan_file" :
                     self .scanresult = "{0}\n" . format (cd.multiscan_file( self . file ))
                 elif  self .scan_type = = "scan_file" :
                     self .scanresult = "{0}\n" . format (cd.scan_file( self . file ))
                 time.sleep( 1 )
             else :
                 self .connstr = self .IP + " ping error,exit"
                 return
         except  Exception,e:
             self .connstr = self .IP + " " + str (e)
 
 
IPs = [ '192.168.1.21' , '192.168.1.22' #扫描主机的列表
scantype = "multiscan_file"  #指定扫描模式
scanfile = "/data/www"  #指定扫描路径
i = 1
threadnum = 2  #指定启动的线程数
scanlist  =  []  #存储Scan类线程对象列表
 
for  ip  in  IPs:
 
     """将数据值带入类中,实例化对象"""
     currp  =  Scan(ip,scantype,scanfile)
     scanlist.append(currp)  #追加对象到列表
 
"""当达到指定的线程数或IP列表数后启动线程"""
     if  i % threadnum = = 0  or  i = = len (IPs):
         for  task  in  scanlist:
             task.start()  #启动线程
 
         for  task  in  scanlist:
             task.join()  #等待所有子线程退出,并输出扫描结果
             print  task.connstr  #打印服务器连接信息
             print  task.scanresult  #打印结果信息
         scanlist  =  []   
     i + = 1


二、使用python-nmap模块实现一个高效的端口扫描器

需要依赖nmap和python-nmap;

yum install nmap

pip2.7 install python-nmap


实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import  sys
import  nmap
 
scan_row = []
input_data  =  raw_input ( 'Please input hosts and port: ' )
scan_row  =  input_data.split( " " )
if  len (scan_row)! = 2 :
     print  "Input errors,example \"192.168.1.0/24 80,443,22\""
     sys.exit( 0 )
hosts = scan_row[ 0 ]     #接收用户输入的主机
port = scan_row[ 1 ]     #接收用户输入的端口
 
try :
     nm  =  nmap.PortScanner()     #创建端口扫描对象
except  nmap.PortScannerError:
     print ( 'Nmap not found' , sys.exc_info()[ 0 ])
     sys.exit( 0 )
except :
     print ( "Unexpected error:" , sys.exc_info()[ 0 ])
     sys.exit( 0 )
 
try :
     nm.scan(hosts = hosts, arguments = ' -v -sS -p ' + port)     #调用扫描方法,参数指定扫描主机hosts,nmap扫描命令行参数arguments
except  Exception,e:
     print  "Scan erro:" + str (e)
     
for  host  in  nm.all_hosts():     #遍历扫描主机
     print ( '----------------------------------------------------' )
     print ( 'Host : %s (%s)'  %  (host, nm[host].hostname()))     #输出主机及主机名
     print ( 'State : %s'  %  nm[host].state())     #输出主机状态,如up、down
 
     for  proto  in  nm[host].all_protocols():     #遍历扫描协议,如tcp、udp
         print ( '----------' )
         print ( 'Protocol : %s'  %  proto)     #输入协议名
 
         lport  =  nm[host][proto].keys()     #获取协议的所有扫描端口
         lport.sort()     #端口列表排序
         for  port  in  lport:     #遍历端口及输出端口与状态
             print ( 'port : %s\tstate : %s'  %  (port, nm[host][proto][port][ 'state' ]))


三、实现一个程序完成取MySQL数据导出txt,完成压缩,传FTP服务器,自动删除过期数据。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#!/usr/local/python27/bin/python2.7
#coding:utf-8
import  os
import  sys
import  pymysql
import  ftplib
import  commands
import  time
import  datetime
 
"""从数据库获取数据"""
def  sql(user,passwd,host,db):
     conn  =  pymysql.connect(host = host,user = user,password = passwd,db = db)
     cur  =  conn.cursor()
     cur.execute( "select count(*) from ucenter_member;" )
     result_num  =  cur.fetchall()
     """由于返回的数据是一个元组,下面的格式转换用于去除括号"""
     total_num  =  int ( str (result_num).lstrip( '((' ).rstrip( ',),)' ))
     """总行数 / 每次取数据的行数 = 需要取的次数 + 1 是因为怕不能整除可以把剩下的数据都取出"""
     linesum  =  (total_num / 5000 + 1 )
     =  0
     while  ( j < linesum ):
         result_num  =  cur.execute( "SELECT id,login,reg_time,last_login_time,type from ucenter_member limit" + ' ' + str ( int (j * 5000 )) + ',' + str ( 5000 ) + ';' )
         data  =  cur.fetchall()
     """定义输出的文件对象"""    
         outfile  =  open ( '/alidata/data_analyse/ucenter-%s' %  time.strftime( '%Y-%m-%d' ,time.localtime(time.time())) + '.txt' , 'a+' )
         for  in  range (result_num):            
             out  =  str (data[i]).strip( '()' ) + '\n'
             outfile.write(out) 
         j + = 1
     outfile.close()      
     outfilename  =  ( 'ucenter-%s' %  time.strftime( '%Y-%m-%d' ,time.localtime(time.time())) + '.txt' )
     return  outfilename
 
"""FTP文件上传函数"""        
def  upload( file ):
     os.chdir( '/alidata/data_analyse/'
     file_path  =  os.path.abspath( file )
     =  open (file_path, 'rb' )
     ftp  =  ftplib.FTP( '115.236.179.166' )
     ftp.login( 'liuyang' , 'liuyang666999' )
     """上传文件,STOR 后面的 %s 定义的是上传后保存的文件名,f为需要上传的文件对象"""
     ftp.storbinary( 'STOR %s' % file ,f)
 
"""文件压缩函数"""
def  gzip(filename):
     os.chdir( '/alidata/data_analyse/' )
     =  commands.getoutput( "zip -9 %s %s"  % (filename + '.zip' ,filename))
     return (filename + '.zip' )
 
"""过期文件删除函数"""
def  Del_file():
     """切换程序的工作目录"""
     os.chdir( '/alidata/data_analyse/' )
     ThreeDaysAgo  =  (datetime.datetime.now()  -  datetime.timedelta(days = 3 ))
     rmtime  =  ThreeDaysAgo.strftime( "%Y-%m-%d" )
     rmfile  =  ( 'ucenter-%s' %  rmtime + '.txt' )
     rmfile2  =  ( 'ucenter-%s' %  rmtime + '.txt.zip' )
     if  os.path.exists(rmfile):
         os.remove(rmfile)
     if  os.path.exists(rmfile2):
         os.remove(rmfile2)
     return
 
if  __name__  = =  '__main__' :
 
     outfilename  =  sql( 'root' , '123456' , '10.1.1.1' , 'hellodb' )
     gzipfile  =  gzip(outfilename)
     starttime  =  datetime.datetime.now()
     upload(gzipfile)
     endtime  =  datetime.datetime.now()
     uptime  =  (endtime  -  starttime).seconds
     with  open ( './history.log' , 'a+' ) as f:
         f.write( 'time:%s,upload cost time:%s'  %  (time.strftime( '%Y-%m-%d %H:%M:%S' ,time.localtime(time.time())),uptime) + '\n' )
     Del_file()



本文转自qw87112 51CTO博客,原文链接:http://blog.51cto.com/tchuairen/1698897


相关文章
|
19天前
|
存储 C语言 Python
【Python】学习笔记day3
【Python】学习笔记day3
26 1
|
20天前
|
Linux Shell Python
Linux执行Python脚本
Linux执行Python脚本
26 1
|
2月前
|
Web App开发 数据采集 自然语言处理
python脚本抢各大平台大额优惠卷
python脚本抢各大平台大额优惠卷
48 0
|
9天前
|
JSON 测试技术 持续交付
自动化测试与脚本编写:Python实践指南
【4月更文挑战第9天】本文探讨了Python在自动化测试中的应用,强调其作为热门选择的原因。Python拥有丰富的测试框架(如unittest、pytest、nose)以支持自动化测试,简化测试用例的编写与维护。示例展示了使用unittest进行单元测试的基本步骤。此外,Python还适用于集成测试、系统测试等,提供模拟外部系统行为的工具。在脚本编写实践中,Python的灵活语法和强大库(如os、shutil、sqlite3、json)助力执行复杂测试任务。同时,Python支持并发、分布式执行及与Jenkins、Travis CI等持续集成工具的集成,提升测试效率和质量。
|
16天前
|
存储 监控 异构计算
【Python】GPU内存监控脚本
【Python】GPU内存监控脚本
|
16天前
|
Ubuntu Unix Linux
【Linux/Ubuntu】Linux/Ubuntu运行python脚本
【Linux/Ubuntu】Linux/Ubuntu运行python脚本
|
24天前
|
XML Shell Linux
性能工具之 JMeter 使用 Python 脚本快速执行
性能工具之 JMeter 使用 Python 脚本快速执行
40 1
性能工具之 JMeter 使用 Python 脚本快速执行
|
26天前
|
算法 搜索推荐 测试技术
python排序算法及优化学习笔记1
python实现的简单的排序算法,以及算法优化,学习笔记1
33 1
|
1月前
|
数据采集 测试技术 Python
Python自动化脚本的魅力与实践
Python自动化脚本的魅力与实践
48 0
|
1月前
|
数据安全/隐私保护 Python
使用Python脚本实现图片合成PDF功能
使用Python脚本实现图片合成PDF功能
26 0