python redis链接建立实现分析

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介:

  今天在写zabbix storm job监控脚本的时候用到了python的redis模块,之前也有用过,但是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。
在ConnectionPool之前,如果需要连接redis,我都是用StrictRedis这个类,在源码中可以看到这个类的具体解释:

1
2
redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an 
implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server

使用的方法:

1
2
  r = redis.StrictRedis(host = xxxx, port = xxxx, db = xxxx)
  r.xxxx()

有了ConnectionPool这个类之后,可以使用如下方法

1
2
pool  =  redis.ConnectionPool(host = xxx, port = xxx, db = xxxx)
=  redis.Redis(connection_pool = pool)

这里Redis是StrictRedis的子类
简单分析如下:
在StrictRedis类的__init__方法中,可以初始化connection_pool这个参数,其对应的是一个ConnectionPool的对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class  StrictRedis( object ):
........
     def  __init__( self , host = 'localhost' , port = 6379 ,
                  db = 0 , password = None , socket_timeout = None ,
                  socket_connect_timeout = None ,
                  socket_keepalive = None , socket_keepalive_options = None ,
                  connection_pool = None , unix_socket_path = None ,
                  encoding = 'utf-8' , encoding_errors = 'strict' ,
                  charset = None , errors = None ,
                  decode_responses = False , retry_on_timeout = False ,
                  ssl = False , ssl_keyfile = None , ssl_certfile = None ,
                  ssl_cert_reqs = None , ssl_ca_certs = None ):
          if  not  connection_pool:
              ..........
               connection_pool  =  ConnectionPool( * * kwargs)
          self .connection_pool  =  connection_pool

在StrictRedis的实例执行具体的命令时会调用execute_command方法,这里可以看到具体实现是从连接池中获取一个具体的连接,然后执行命令,完成后释放连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    # COMMAND EXECUTION AND PROTOCOL PARSING
     def  execute_command( self * args,  * * options):
         "Execute a command and return a parsed response"
         pool  =  self .connection_pool
         command_name  =  args[ 0 ]
         connection  =  pool.get_connection(command_name,  * * options)   #调用ConnectionPool.get_connection方法获取一个连接
         try :
             connection.send_command( * args)   #命令执行,这里为Connection.send_command
             return  self .parse_response(connection, command_name,  * * options)
         except  (ConnectionError, TimeoutError) as e:
             connection.disconnect()
             if  not  connection.retry_on_timeout  and  isinstance (e, TimeoutError):
                 raise
             connection.send_command( * args)  
             return  self .parse_response(connection, command_name,  * * options)
         finally :
             pool.release(connection)   #调用ConnectionPool.release释放连接

在来看看ConnectionPool类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
      class  ConnectionPool( object ):  
        ...........
     def  __init__( self , connection_class = Connection, max_connections = None ,
                  * * connection_kwargs):    #类初始化时调用构造函数
         max_connections  =  max_connections  or  2  * *  31
         if  not  isinstance (max_connections, ( int long ))  or  max_connections <  0 :   #判断输入的max_connections是否合法
             raise  ValueError( '"max_connections" must be a positive integer' )
         self .connection_class  =  connection_class   #设置对应的参数
         self .connection_kwargs  =  connection_kwargs
         self .max_connections  =  max_connections
         self .reset()   #初始化ConnectionPool 时的reset操作
     def  reset( self ):
         self .pid  =  os.getpid()
         self ._created_connections  =  0   #已经创建的连接的计数器
         self ._available_connections  =  []    #声明一个空的数组,用来存放可用的连接
         self ._in_use_connections  =  set ()   #声明一个空的集合,用来存放已经在用的连接
         self ._check_lock  =  threading.Lock()
.......
     def  get_connection( self , command_name,  * keys,  * * options):   #在连接池中获取连接的方法
         "Get a connection from the pool"
         self ._checkpid()
         try :
             connection  =  self ._available_connections.pop()   #获取并删除代表连接的元素,在第一次获取connectiong时,因为_available_connections是一个空的数组,
             会直接调用make_connection方法
         except  IndexError:
             connection  =  self .make_connection()
         self ._in_use_connections.add(connection)    #向代表正在使用的连接的集合中添加元素
         return  connection   
     def  make_connection( self ):  #在_available_connections数组为空时获取连接调用的方法
         "Create a new connection"
         if  self ._created_connections > =  self .max_connections:    #判断创建的连接是否已经达到最大限制,max_connections可以通过参数初始化
             raise  ConnectionError( "Too many connections" )
         self ._created_connections  + =  1    #把代表已经创建的连接的数值+1
         return  self .connection_class( * * self .connection_kwargs)      #返回有效的连接,默认为Connection(**self.connection_kwargs)
     def  release( self , connection):   #释放连接,链接并没有断开,只是存在链接池中
         "Releases the connection back to the pool"
         self ._checkpid()
         if  connection.pid ! =  self .pid:
             return
         self ._in_use_connections.remove(connection)    #从集合中删除元素
         self ._available_connections.append(connection)  #并添加到_available_connections 的数组中
     def  disconnect( self ):  #断开所有连接池中的链接
         "Disconnects all connections in the pool"
         all_conns  =  chain( self ._available_connections,
                           self ._in_use_connections)
         for  connection  in  all_conns:
             connection.disconnect()

execute_command最终调用的是Connection.send_command方法,关闭链接为 Connection.disconnect方法,而Connection类的实现:

1
2
3
4
5
6
7
class  Connection( object ):
     "Manages TCP communication to and from a Redis server"
     def  __del__( self ):    #对象删除时的操作,调用disconnect释放连接
         try :
             self .disconnect()
         except  Exception:
             pass

核心的链接建立方法是通过socket模块实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
     def  _connect( self ):
         err  =  None
         for  res  in  socket.getaddrinfo( self .host,  self .port,  0 ,
                                       socket.SOCK_STREAM):
             family, socktype, proto, canonname, socket_address  =  res
             sock  =  None
             try :
                 sock  =  socket.socket(family, socktype, proto)
                 # TCP_NODELAY
                 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY,  1 )
                 # TCP_KEEPALIVE
                 if  self .socket_keepalive:    #构造函数中默认 socket_keepalive=False,因此这里默认为短连接
                     sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE,  1 )
                     for  k, v  in  iteritems( self .socket_keepalive_options):
                         sock.setsockopt(socket.SOL_TCP, k, v)
                 # set the socket_connect_timeout before we connect
                 sock.settimeout( self .socket_connect_timeout)   #构造函数中默认socket_connect_timeout=None,即连接为blocking的模式
                 # connect
                 sock.connect(socket_address)
                 # set the socket_timeout now that we're connected
                 sock.settimeout( self .socket_timeout)   #构造函数中默认socket_timeout=None
                 return  sock
             except  socket.error as _:
                 err  =  _
                 if  sock  is  not  None :
                     sock.close()
.....

关闭链接的方法:

1
2
3
4
5
6
7
8
9
10
11
     def  disconnect( self ):
         "Disconnects from the Redis server"
         self ._parser.on_disconnect()
         if  self ._sock  is  None :
             return
         try :
             self ._sock.shutdown(socket.SHUT_RDWR)   #先shutdown再close
             self ._sock.close()
         except  socket.error:
             pass
         self ._sock  =  None

        
可以小结如下
1)默认情况下每创建一个Redis实例都会构造出一个ConnectionPool实例,每一次访问redis都会从这个连接池得到一个连接,操作完成后会把该连接放回连接池(连接并没有释放),可以构造一个统一的ConnectionPool,在创建Redis实例时,可以将该ConnectionPool传入,那么后续的操作会从给定的ConnectionPool获得连接,不会再重复创建ConnectionPool。
2)默认情况下没有设置keepalive和timeout,建立的连接是blocking模式的短连接。
3)不考虑底层tcp的情况下,连接池中的连接会在ConnectionPool.disconnect中统一销毁。



本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1583541,如需转载请自行联系原作者

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
4天前
|
存储 NoSQL Redis
在Python Web开发过程中,为什么Redis运行速度快
【5月更文挑战第15天】Redis在Python Web开发中运行速度快,原因包括:1) 丰富数据类型满足多样化需求;2) 简单数据模型提升查询效率;3) 单线程模型结合非阻塞I/O实现高效处理;4) 持久化机制保证数据安全;5) 二进制协议与管道技术优化网络通信。这些因素共同确保Redis能处理大量请求并保持高性能。
24 1
|
4天前
|
存储 消息中间件 缓存
Redis的高性能使得它非常适合用于实时分析场景
【5月更文挑战第15天】Redis在Python Web开发中扮演关键角色,常用于缓存系统,提高数据读取速度;会话管理,存储用户信息;分布式锁,确保数据一致性;排行榜和计数,利用有序集合和哈希结构;消息队列,基于列表结构实现异步处理;实时分析,高效处理实时数据。其丰富的数据结构和高性能使其在多种场景下应用广泛。
12 3
|
2天前
|
数据采集 人工智能 数据挖掘
「一行分析」利用12000条招聘数据分析Python学习方向和就业方向
「一行分析」利用12000条招聘数据分析Python学习方向和就业方向
|
2天前
|
JSON JavaScript 数据格式
利用 python 分析基金,合理分析数据让赚钱赢在起跑线!(1)
利用 python 分析基金,合理分析数据让赚钱赢在起跑线!(1)
|
4天前
|
存储 数据挖掘 数据处理
使用Python将数据表中的浮点数据转换为整数:详细教程与案例分析
使用Python将数据表中的浮点数据转换为整数:详细教程与案例分析
7 2
|
4天前
|
语音技术 开发者 Python
python之pyAudioAnalysis:音频特征提取分析文档示例详解
python之pyAudioAnalysis:音频特征提取分析文档示例详解
18 0
|
4天前
|
数据可视化 大数据 Python
python大数据分析处理
python大数据分析处理
14 0
|
4天前
|
NoSQL 网络协议 Java
Redis客户端Lettuce深度分析介绍(上)
Spring Boot自2.0版本开始默认使用Lettuce作为Redis的客户端(注1)。Lettuce客户端基于Netty的NIO框架实现,对于大多数的Redis操作,只需要维持单一的连接即可高效支持业务端的并发请求 —— 这点与Jedis的连接池模式有很大不同。同时,Lettuce支持的特性更加全面,且其性能表现并不逊于,甚至优于Jedis。本文通过分析Lettuce的特性和内部实现(基于6.0版本),及其与Jedis的对照比较,对这两种客户端,以及Redis服务端进行深度探讨。
|
4天前
|
机器学习/深度学习 人工智能 大数据
AI时代Python金融大数据分析实战:ChatGPT让金融大数据分析插上翅膀
AI时代Python金融大数据分析实战:ChatGPT让金融大数据分析插上翅膀
|
4天前
|
Linux 数据安全/隐私保护 iOS开发
如何将python命令链接到Python3
如何将python命令链接到Python3
11 0