socketserver源码阅读

本篇socketserver源码阅读主要目的是熟悉Python中的继承模式

示例代码

1
2
3
4
5
6
7
8
9
10
import socketserver


class MyClass(socketserver.BaseRequestHandler):

def handle(self):
pass

obj = socketserver.ThreadingTCPServer(('127.0.0.1', 1559), MyClass)
obj.serve_forever()

阅读入口

1
obj = socketserver.ThreadingTCPServer(('127.0.0.1', 1559), MyClass)

从以上这一行代码中可以看出,执行了ThreadingTCPServer类中的__init__方法(实例化了这个类的对象,就从这里为入口点来阅读)

ThreadingTCPServer类图

step by step

step 1

首先,从实例化ThreadingTCPServer这个类的对象可以得知,是执行了这个对象的__init__构造方法,但是我们查看这个类的源码发现,只有一个pass

1
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

但是这个类继承了两个父类,那么必然是执行了父类的__init__方法。在查找父类__init__方法的时候,首先从左边的父类开始查找

step 2

ThreadingTCPServer继承的第一个父类中ThreadingMixIn,没有找到__init__构造方法,接着,Python会去ThreadingTCPServer继承的第二个父类总去查找

step 3

ThreadingTCPServer继承的第二个父类TCPServer找到了__init__构造方法并执行

1
2
3
4
5
6
7
8
9
10
11
12
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
"""Constructor. May be extended, do not override."""
BaseServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family,
self.socket_type)
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise

在这段构造方法中,前三个参数对应了示例代码中,创建ThreadingTCPServer对象时的三个参数

1
obj = socketserver.ThreadingTCPServer(('127.0.0.1', 1559), MyClass)
1
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
  • self —> obj
  • server_address —> (‘127.0.0.1’, 1559)
  • RequestHandlerClass —> MyClass

TCPServer的构造方法拿到这几个参数之后,紧接着又调用了父类的__init__构造方法

step 3.1

TCPServer的父类BaseServer中,__init__构造方法执行了以下内容

1
2
3
4
5
6
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False

创建了两个公有变量和两个私有变量,并且

  • RequestHandlerClass —> MyClass

step 4

TCPServer在执行完父类的构造方法封装了四个字段后,继续向下执行

1
2
self.socket = socket.socket(self.address_family,
self.socket_type)

创建了一个socket对象

1
2
3
4
5
6
7
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise

step 4.1

1
2
3
4
5
6
7
8
9
10
def server_bind(self):
"""Called by constructor to bind the socket.

May be overridden.

"""
if self.allow_reuse_address:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
self.server_address = self.socket.getsockname()

绑定了IP和端口号,self.server_address的值是在执行父类BaseServer的构造方法时创建的

step 4.2

1
2
3
4
5
6
7
def server_activate(self):
"""Called by constructor to activate the server.

May be overridden.

"""
self.socket.listen(self.request_queue_size)

监听socket

至此,TCPServer中的__init__构造方法执行完毕,构造方法执行完毕,就意味着示例代码中的

1
obj = socketserver.ThreadingTCPServer(('127.0.0.1', 1559), MyClass)

这一句就执行完毕了

step 5

接下来测试代码走到第10行

1
obj.serve_forever()

ThreadingTCPServer首先在自己的代码块中查找,没有

根据继承的规则,再去左边的父类ThreadingMixIn中查找,没有

之后再去右边的父类TCPServer中查找,没有

最后在TCPServer的父类BaseServer中找到了serve_forever方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown.

Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
# XXX: Consider using another file descriptor or connecting to the
# socket to wake this up instead of polling. Polling reduces our
# responsiveness to a shutdown request and wastes cpu at all other
# times.
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ)

while not self.__shutdown_request:
ready = selector.select(poll_interval) # IO多路复用
if ready: # 如果有新的客户端连接,ready就会有值
self._handle_request_noblock()

self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()

step 5.1

1
ready = selector.select(poll_interval)

IO多路复用,这里没有使用self来调,而是使用selector对象来调select方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def select(self, timeout=None):
timeout = None if timeout is None else max(timeout, 0)
ready = []
try:
r, w, _ = self._select(self._readers, self._writers, [], timeout)
except InterruptedError:
return ready
r = set(r)
w = set(w)
for fd in r | w:
events = 0
if fd in r:
events |= EVENT_READ
if fd in w:
events |= EVENT_WRITE

key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready

step 5.2

在有客户端连接进来的时候,ready就有有值,为真,会执行self._handle_request_noblock方法

因为是self.来调用的,根据继承规则,就要回到最开始的类中,再逐级向上查找,所以回到ThreadingTCPServer中查找,没有

根据继承的规则,再去左边的父类ThreadingMixIn中查找,没有

之后再去右边的父类TCPServer中查找,没有

最后在TCPServer的父类BaseServer中找到了_handle_request_noblock方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def _handle_request_noblock(self):
"""Handle one request, without blocking.

I assume that selector.select() has returned that the socket is
readable before this function was called, so there should be no risk of
blocking in get_request().
"""
try:
request, client_address = self.get_request() # 获取到客户端连接对象socket,和客户端地址
except OSError:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)

step 5.3

1
request, client_address = self.get_request()

根据类图查找,在TCPServer类中

1
2
3
4
5
6
7
def get_request(self):
"""Get the request and client address from the socket.

May be overridden.

"""
return self.socket.accept()

接受客户端连接请求

step 6

_handle_request_noblock方法中又执行了self.process_request方法

回到ThreadingTCPServer中查找,没有

根据继承的规则,在左边的父类ThreadingMixIn中找到该方法

1
2
3
4
5
6
def process_request(self, request, client_address):
"""Start a new thread to process the request."""
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = self.daemon_threads
t.start()

在这个方法中,使用了多线程去执行self.process_request_thread方法

1
2
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))

使用了self调用,还是要从头查找

step 7

ThreadingMixIn类中找到

1
2
3
4
5
6
7
8
9
10
11
12
def process_request_thread(self, request, client_address):
"""Same as in BaseServer but as a thread.

In addition, exception handling is done here.

"""
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)

这个方法应该是用来接收客户端发来的请求以及给客户端发送消息

step 8

1
self.finish_request(request, client_address)

根据类图,在BaseServer中找到

1
2
3
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self)

step3中已经封装了RequestHandlerClass = MyClass

相当于执行了MyClass的构造方法

1
MyClass(request, client_address, self)

step 9

MyClass中没有定义构造方法,在MyClass继承的父类socketserver.BaseRequestHandler中查找,找到了构造方法

1
2
3
4
5
6
7
8
9
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()

执行了self.handle()

step 10

执行self.handle()方法时,就会执行MyClass中定义的handle方法

在示例代码中,我们已经自定义了一个方法去实现与客户端的交互,