本文主要分析 redis-py 源码,阐述 redis-py 连接过程和如何获取响应
先来看一下 redis-py 的项目结构,结构比较简单,也很清晰。
. ├── __init__.py ├── _compat.py python2.x向后兼容的内部模块 ├── client.py 实现redis客户端 ├── connection.py 实现连接池类,连接类和解析类 ├── exceptions.py 异常模块 ├── lock.py 实现共享、分布式锁 ├── sentinel.py redis高可用客户端 └── utils.py 一些其他模块我们从一个完整的请求开始阅读源码
In [1]: import redis In [2]: r = redis.Redis(host='localhost', port=6379, db=1) In [3]: r.ping() Out[3]: True根据示例代码,将分三个阶段解读源码:实例化、发送命令、获取响应
实例化的过程就是创建一个客户端的过程,使用起来比较方便,只需要实例化 Redis 类即可
In [1]: import redis In [2]: r = redis.Redis(host='localhost', port=6379, db=0)那么 Redis 类的初始化都做了什么呢?看下面的源码:
class Redis(object): """ Implementation of the Redis protocol. This abstract class provides a Python interface to all Redis commands and an implementation of the Redis protocol. Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server """ def __init__(self, host='localhost', port=6379, db=0, connection_pool=None, max_connections=None): if not connection_pool: kwargs = { 'db': db, 'host': host, 'port': port, 'max_connections': max_connextions, ... } connection_pool = ConnectionPool(**kwargs) self.connection_pool = connection_pool上面的代码我省略了一些本次分析不需要的代码,目的是让我们的分析过程更明确。 我们在实例化 Redis 类时只传入了 redis-server 的地址参数,未指定连接池类,所以初始化的过程中实际上会先创建一个连接池。显然,我们不能忽略连接池的创建细节。
连接池在初始化时会接收一个类,默认是 Connection类,然后设置最大连接数,最后需要调用 reset 函数,reset 函数则设置了进程id,线程锁和一些其他属性。 到现在,还没有真正建立连接。
实例化结束之后,我们获得了 Redis 的一个实例,但是还没有真正的建立连接,现在我直接调用客户端实例的 ping 方法,返回 True,我们先看一下 ping() 发生了什么?
In [3]: r.ping() Out[3]: True调用 ping() 之后,立即返回了 True。这个过程需要分开讨论,先解决发送命令。
ping() 方法直接调用了 execute_command(),Redis 实现的所有的命令都是通过execute_command 发出去的。总结一下上面的代码:
获取连接对象发送命令解析响应释放连接下面的源码虽然简化了很多,但还是比较长,简单的说一下:
获取连接对象,先检查一下是否在当前进程然后从当前可用连接队列中弹出一个连接,如果没有,则创建一个新的连接实例创建新的连接实例,实际上是创建一个socket连接对象。用连接实例的 connect() 方法,目的是确保连接可用通过 can_read() 方法检查是否有未读数据来确定该连接是否准备好发送命令然后返回该连接实例。 # 这里省略了很多细节,感兴趣的可以看一下源码 def get_connection(self, command_name, *keys, **options): "Get a connection from the pool" self._checkpid() try: connection = self._avaliable_connections.pop() except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) try: connection.connect() except: self.release(connection) raise def make_connection(self): "Create a new connection" if self._created_connections >= self.max_connections: raise ConnectionError("Too many connections") self._created_connections += 1 return self.connection_class(**self.connection_kwargs) class Connection(object): "Manages TCP communication to and from a redis server" descritption_format = "Connection<host=%(host)s,port=%(port)s,db=%(db)s>" def __init__(self, host='localhost', port=6379, db=0, parser_class=DefaultParser, socket_read_size=65536, **kwargs): self.host = host self.port = int(port) self.db = db self._parser = parser_class(socket_read_size=socket_read_size) self._sock = None self._description_args = { 'host': self.host, 'port': self.port, 'db': self.db } def connect(self): "Connects to the Redis server if not already connected" if self._sock: return sock = self._connect() self._sock = sock self.on_connect() def _connect(self): "Create a TCP socket" # 简化一下socket连接过程 for res in socket.getaddrinfo(self.host, self.port, self.socket_type, socket.SOCK_STREAM): family, socktype, proto, cannoname, socket_address = res sock = socket.socket(family, socktype, proto) sock.setsockopt(socket.IPPROTO_TCP, sockte.TCP_NODELAY, 1) sock.connect(socket_address) return sock def on_connect(self): "Initialize the connection, authenticate and select a database" self._parser.on_connect(self) # 对服务器返回的数据进行解析 if self.db: self.send_command('SELECT', self.db)上面说了一大堆,可能已经忘了我们走到哪一步了,我们在调用客户端实例的 ping() 方法过程中,先去建立一个真正的连接,现在我们已经有了真实的连接对象,也就是socket对象,那么,接下来我们调用 send_command() 方法去发送我们的 ping 命令,直接看源码:
def send_command(self, *args): "Pack and send a command to the redis server" self.send_packed_command(self.pack_command(*args)) def pack_command(self, *args): "Pack a series of arguments into the Redis protocol" return [b'*1\r\n*\r\nping\r\n'] def send_packed_command(self, command): "Send an already packed command to the Redis server" for item in command: self._sock.sendall(item)命令打包函数的具体细节可以以后研究,现在我们知道 send_command() 函数会把命令参数打包,返回一个列表,列表元素是一个字节类型的字符串,字符串包含了我们要发送的命令,然后调用 send_packed_command() 函数,发送命令的函数我省略了细节,最后,通过套接字的 sendall() 方法发送出去,到此为止,我们的 ping 命令已经发给了 redis 服务器。
上面,我们的ping命令已经发送给服务器了,那么如何获取响应呢?看看上面的 execute_command() 方法,return self.parse_response(connection, command_name, **options),所以,响应是在 Redis 的实例方法 parse_response() 返回。继续看源码
def parse_response(self, connection, command_name, **options): "Parses a response from the redis server" return connection.read_response() def read_response(self): "Read the response from previously send command" return self._parser.read_response()响应需要解析对象去获取,_parser 可能是 HiredisParser 对象,也可能是 PthoneParser 对象,取决于是否安装了 hiredis。hiredis 对于大批量的数据解析可以提升10倍的速度,这里我们先看一下未使用 hiredis 是如何获取响应的。
# parser对象已经在建立连接的时候动态添加了 _sock 和 _buffer 属性 def read_response(self): response = self._buffer.readline() # io.BytesIO() byte, response = byte_to_chr(response[0]), response[1:] if isinstance(response, bytes): response = self.encoder.decode(response) return response解析响应的过程实际上就是从内存读取响应的过程,当内存中没有数据时,会直接调用 socket.recv() 方法获取数据,然后写入内存。
数据获取完成之后,就需要把当前的连接释放掉,具体的操作就是把这个连接对象放回到连接池中。
def release(self, connection): "Releases the connection back to the pool" self._checkpid() if connection.pid != self.pid: return self._in_use_connections.remove(connection) self._available_connections.append(connection)这样,一次完整的请求结束了。。。
