在企业环境中,随着用户数量的增长,尝试同时访问 web 应用的用户数量也会增长,这是正常的。这向我们提出了一个有趣的问题,即如何扩展 web 应用以处理用户的大量并发请求。
扩展 web 应用以处理大量用户是一项可以通过多种方式实现的任务,其中最简单的方式之一是添加更多的基础设施并运行更多的应用实例。然而,这种技术虽然简单,但对应用可伸缩性的经济性来说负担很大,因为与大规模运行应用相关的基础设施成本可能是巨大的。我们当然需要精心设计应用,使其能够轻松处理大量并发请求,而不需要频繁地扩展基础设施。
在前一章的基础上,我们将看到如何使用这些技术来构建可处理大量并发请求的可扩展应用,同时还学习一些其他技术,这些技术将帮助我们以毫不费力的方式扩展应用。
在本章中,我们将研究以下技术,以扩展 web 应用以实现大规模请求处理:
- 在 web 应用部署中使用反向代理
- 使用线程池扩展请求处理
- 使用 Python AsyncIO 理解单线程并发代码的概念
可以通过运行以下命令克隆代码示例:
git clone https://github.com/PacktPublishing/Hands-On-Enterprise-Application-Development-with-Python为了成功执行代码示例,需要提供 python-virtualenv包。
多年来,在互联网出现的时代,web 应用架构师通常面临的最常见的问题之一是如何处理日益增长的并发性。随着越来越多的用户上网并使用 web 应用,迫切需要扩展基础设施来管理所有这些请求。
这甚至适用于我们的企业 web 应用。即使我们可以估计一个企业中有多少用户可以同时访问这些 web 应用,但在今后的时间里,还没有一条硬性规定是正确的。随着企业的发展,访问应用的客户机数量也将增加,这将给基础架构带来更大的压力,并增加扩展它的需要。但是,在尝试扩展应用以适应不断增加的客户机数量时,我们有哪些选择呢?让我们看一看。
技术世界提供了许多选项来扩展应用,以适应不断增长的用户群;其中一些选项只是要求增加硬件资源,而其他选项则要求应用在内部处理多个请求。大多数情况下,缩放选项分为两大类:垂直缩放和水平缩放:
让我们看看他们俩,找出他们的利弊:
- 垂直扩展:垂直扩展的整个概念是基于向现有资源添加更多资源的事实。。。
当大多数企业项目求助于使用一个或另一个框架时,这通常决定了在生产阶段如何为应用提供服务,最好还是从表面上看一看,了解如何开发应用,同时保持应用的可伸缩性。
在本节中,我们将了解可以帮助我们构建可伸缩应用的不同技术,即使我们不使用可以为我们实现这一点的每构建一个框架。在本节中,我们将了解如何使用线程/进程池来同时处理多个客户端,以及为什么需要资源池,以及是什么阻止我们启动单独的线程或进程来处理每个其他传入请求。
但在深入研究如何在应用开发中利用线程池或进程池之前,让我们先看看一种简单的方法,通过这种方法,我们可以将传入请求的处理交给后台线程。
以下代码实现了一个简单的套接字服务器,该服务器首先接受传入连接,然后将其交给后台线程进行读写,从而释放主线程以接受其他传入连接:
# simple_socket_thread.py
#!/usr/bin/python3
import socket
import threading
# Let's first create a TCP type Server for handling clients
class Server(object):
"""A simple TCP Server."""
def __init__(self, hostname, port):
"""Server initializer
Keyword arguments:
hostname -- The hostname to use for the server
port -- The port on which the server should bind
"""
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.hostname = hostname
self.port = port
self.bind_connection()
self.listen()
def bind_connection(self):
"""Bind the server to the host."""
self.server.bind((self.hostname, self.port))
def listen(self):
"""Start listening for the incoming connections."""
self.server.listen(10) # Queue a maximum of 10 clients
# Enter the listening loop
while True:
client, client_addr = self.server.accept()
print("Received a connection from %s" % str(client_addr))
client_thread = threading.Thread(target=self.handle_client, args=(client,))
client_thread.daemon = True
client_thread.start()
def handle_client(self, client):
"""Handle incoming client connection.
Keyword arguments:
client -- The client connection socket
"""
print("Accepted a client connection")
while True:
buff = client.recv(1024).decode()
if not buff:
break
print(buff)
print("Client closed the connection")
client.close() # We are done now, let's close the connection
if __name__ == '__main__':
server = Server('localhost', 7000)在这段代码中,我们实现了一个简单的Server类,它初始化机器上基于 TCP 的服务器,准备接受传入的连接。在不转移太多注意力的情况下,让我们试着将注意力集中在这段代码的重要方面,即我们在 listen() 方法下启动服务器的侦听循环。
在listen()方法下,我们首先调用套接字的listen()方法,告诉它最多可以排队 10 个未被接受的连接。一旦达到此限制,服务器将拒绝任何进一步的客户端连接。现在,从这里开始,我们开始一个无限循环,第一次调用套接字的 accept() 方法。对 accept() 方法的调用会一直阻塞,直到客户端尝试建立连接。成功尝试后, accept() 调用返回客户端连接套接字和客户端地址。客户机连接套接字可用于对客户机执行 I/O 操作。
接下来有趣的是:一旦客户机连接被接受,我们就启动一个守护进程线程,负责处理与客户机的通信,并将客户机连接套接字移交给线程。这从本质上释放了我们的主线程来处理客户机套接字的 I/O,因此,我们的主线程现在可以接受更多的客户机。对于连接到我们服务器的所有其他客户端,此过程都将继续。
到目前为止还不错;我们有一个很好的方法来处理传入的客户,随着客户数量的增加,我们的服务可以逐渐扩大。这是一个简单的解决办法,不是吗?很明显,在提出这个解决方案的过程中,我们忽略了这个过程中的一个主要缺陷。该漏洞存在于这样一个事实中:我们没有实现与应用可以启动多少线程来处理传入客户端相关的任何类型的控制。想象一下,如果一百万客户端试图连接到我们的服务器,会发生什么?我们真的会同时运行一百万个线程吗?答案是否定的。
但为什么不可能呢?让我们看一看。
在前面的例子中,我们遇到了一个问题,为什么我们不能有一百万个线程,每个线程处理一个单独的客户端?这将为我们提供大量的并发性和可伸缩性。但是,有许多原因确实阻止我们同时运行一百万个线程。让我们看看阻止我们无限扩展应用的可能原因:
- 资源限制:服务器处理的每个客户端连接都不是免费的。对于每一个新连接的客户端,我们都在消耗机器的一些资源。这些可能包括映射到套接字的文件描述符、用于保存信息的内存量。。。
正如我们在上一节中看到的,我们不需要无限多的线程来处理传入的客户机。我们可以使用有限数量的线程来处理大量客户机。但是,我们如何在应用中实现线程池呢。事实证明,用 Python3 和concurrent.futures模块实现线程池功能非常容易。
以下代码示例修改了现有 TCP 服务器示例,以使用线程池,而不是任意启动无限多个线程来处理传入的客户端连接:
# simple_socket_threadpool.py
#!/usr/bin/python3
from concurrent.futures import ThreadPoolExecutor
import socket
# Let's first create a TCP type Server for handling clients
class Server(object):
"""A simple TCP Server."""
def __init__(self, hostname, port, pool_size):
"""Server initializer
Keyword arguments:
hostname -- The hostname to use for the server
port -- The port on which the server should bind
pool_size -- The pool size to use for the threading executor
"""
# Setup thread pool size
self.executor_pool = ThreadPoolExecutor(max_workers=pool_size)
# Setup the TCP socket server
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.hostname = hostname
self.port = port
self.bind_connection()
self.listen()
def bind_connection(self):
"""Bind the server to the host."""
self.server.bind((self.hostname, self.port))
def listen(self):
"""Start listening for the incoming connections."""
self.server.listen(10) # Queue a maximum of 10 clients
# Enter the listening loop
while True:
client, client_addr = self.server.accept()
print("Received a connection from %s" % str(client_addr))
self.executor_pool.submit(self.handle_client, client)
def handle_client(self, client):
"""Handle incoming client connection.
Keyword arguments:
client -- The client connection socket
"""
print("Accepted a client connection")
while True:
buff = client.recv(1024).decode()
if not buff:
break
print(buff)
print("Client closed the connection")
client.close() # We are done now, let's close the connection
if __name__ == '__main__':
server = Server('localhost', 7000, 20)在本例中,我们修改了 TCP 服务器代码以利用线程池,而不是启动任意数量的线程。让我们来看看我们是如何做到这一点的。
首先,为了利用线程池,我们需要初始化线程池执行器的一个实例。在Server类的__init__方法下,我们首先通过调用线程池执行器的构造函数来初始化线程池执行器:
self.executor_pool = ThreadPoolExecutor(max_workers=pool_size)ThreadPoolExecutor构造函数接受一个max_workers参数,该参数定义ThreadPool中可能有多少并发线程。但是,max_workers参数的最佳值是什么?
一般的经验法则是max_workers=(5 x CPU 核心总数)。这个公式背后的原因是,在 web 应用中,大多数线程通常都在等待 I/O 完成,而少数线程则忙于执行 CPU 限制的操作。
我们创建了一个ThreadPoolExecutor之后的下一件事是将作业提交给它,以便执行器池中的线程可以处理它们。这可以通过使用ThreadPoolExecutor类的 submit 方法来实现。这可以在Server类的listen()方法下看到:
self.executor_pool.submit(self.handle_client, client)ThreadPoolExecutor的submit()方法将线程内要执行的方法的名称以及需要传递给执行方法的参数作为第一个参数。
这非常容易实施,并为我们提供了许多好处,例如:
- 优化利用基础架构提供的资源
- 处理多个请求的能力
- 提高了可扩展性,减少了客户端的等待时间
One important thing to take a note of here is, since the ThreadPoolExecutor utilizes the threads, the CPython implementation might not provide the maximum performance due to the presence of GIL, which doesn't allow the execution of more than one thread at a time. Hence, the performance of the application may vary depending upon the underlying Python implementation being used.
现在,出现的问题是,如果我们想避开全局解释器锁怎么办?在仍然使用 Python 的 CPython 实现的情况下,是否有某种机制?我们在前一章中讨论了这个场景,并解决了使用 Python 的多处理模块代替线程库的问题。
而且,事实证明,使用ProcessPoolExecutor是一项非常简单的壮举。concurrent.futures 包中的底层实现解决了大多数必要问题,并为程序员提供了一个简单易用的抽象。为了了解这一点,让我们修改前面的示例,将ProcessPoolExecutor替换为ThreadPoolExecutor。为此,我们需要做的只是首先从 concurrent.futures 包导入正确的实现,如下所述:
from concurrent.futures import ProcessPoolExecutor我们需要做的下一件事是修改我们的__init__方法来创建进程池,而不是线程池。以下__init__方法的实现说明了我们如何实现这一点:
def __init__(self, hostname, port, pool_size):
"""Server initializer
Keyword arguments:
hostname -- The hostname to use for the server
port -- The port on which the server should bind
pool_size -- The size of the pool to use for the process based executor
"""
# Setup process pool size
self.executor_pool = ProcessPoolExecutor(max_workers=pool_size)
# Setup the TCP socket server
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.hostname = hostname
self.port = port
self.bind_connection()
self.listen()我认为,这是一个简单的过程,现在我们的应用可以使用多进程模型而不是多线程模型。
但是,我们可以保持池大小不变,还是需要更改池大小?
每个进程都有自己的内存空间和需要维护的内部指针,这使得该进程比使用线程实现并发性更重。这就提供了一个减少池大小的理由,以便更大程度地使用底层系统资源。一般来说,对于一个ProcessPoolExecutor,可以通过公式max_workers=*(2 x CPU 核数+1)*来计算max_workers。
这个公式背后的原因可以归因于这样一个事实:在任何给定的时间,我们都可以假设一半的进程将忙于执行网络 I/O,而其他进程可能忙于执行 CPU 密集型任务。
因此,现在我们对如何使用资源池以及为什么与启动任意数量的线程相比,它是一种更好的方法有了充分的了解。但是,这种方法仍然需要大量的上下文切换,并且高度依赖于所使用的底层 Python 实现。但肯定有比这更好的东西。
考虑到这一点,让我们尝试进入 Python 王国的另一个领域,异步编程领域。
在深入研究异步编程这一未知领域之前,让我们先回顾一下为什么使用线程或多进程。
使用线程或多进程的主要原因之一是为了提高并发性,从而提高应用处理更多并发请求的能力。但这样做的代价是资源利用率的提高,运行多线程的能力有限,或者启动更重的进程来适应更高的并发性,以及在共享数据结构之间实现锁的复杂机制。
现在,在构建可伸缩的 web 应用的上下文中,我们与一般用途的应用也有一些主要区别。。。
正如我们最近讨论的,Python 中对异步编程的支持是通过使用事件循环和协同例程实现的。但它们到底是什么?让我们来看一看:
顾名思义,事件循环是一个循环。这个循环的作用是,当一个新任务被执行时,事件循环将这个任务排队。现在,控制从这里转移到事件循环。当事件循环运行时,它检查队列中是否有任务。如果存在任务,控件将切换到该任务。
现在,这里是任务异步执行上下文中有趣的部分。假设事件循环的队列中有两个任务,即任务 A 和任务 B。当事件循环开始执行时,它会检查它所拥有的任务队列的状态。事件队列发现其队列中有任务。因此,事件队列拾取任务 A。现在发生上下文切换。。。
Python AsyncIO 中的协同例程提供了一种轻量级的机制,可以同时执行多个操作。co 例程是作为 Python 中生成器的特殊用例实现的。所以,在我们深入理解什么是协同例程之前,让我们花一点时间来理解生成器。
一般来说,生成器是那些生成某些值的函数。然而,其他函数都是这样做的,那么生成器与常规函数有何不同呢。区别在于一般函数的生命周期与生成器的不同。当我们调用一个函数时,它会产生一些值并返回它,一旦调用移出函数体,函数的作用域就会被破坏。当我们再次调用该函数时,将生成并执行一个新的作用域。
与此相反,当我们调用一个生成器时,生成器可以返回一个值,然后进入暂停状态,控件将返回给调用者。此时,生成器的作用域不会被破坏,它可以从先前留下的位置获取值的生成。这基本上为我们提供了一个函数,通过它我们可以获取或产生一些值。
下面的代码示例演示如何编写简单的生成器函数:
def get_number():
i = 0
while True:
yield i
i = i + 1
num = get_number()
print(next(num))
>>> 0
print(next(num))
>>> 1有趣的是,生成器不会通过反复调用生成器来继续提供下一个结果。为了得到新的结果,我们需要在生成器上使用next()方法。这使我们能够从生成器中产生新的结果。
现在,协同例程实现了生成器的一个特殊用例,在这个用例中,它们不仅可以产生新的结果,还可以接收一些数据。这是由产量和发电机的send()方法组合而成的。
以下代码示例显示了一个简单 co 例程的实现:
def say_hello():
msg = yield "Hello"
yield msg
greeting = say_hello()
next(greeting)
>>> Hello
greeting.send("Joe")
>>> Joe由于协同例程允许暂停和恢复函数,因此延迟生成结果,这使得它成为异步编程用例的一个很好的选择,在异步编程用例中,任务经常被发送到阻塞状态,然后在其操作完成后从那里恢复。
Python AsyncIO 中的任务是一种包装 co 例程的机制。每个任务都有一个与之相关联的结果,该结果可以立即生成,也可以根据任务的类型延迟生成。这一结果被称为未来。
在 AsyncIO 中,任务是未来的一个子类,它围绕着一个共同例程。当协同例程完成生成值时,任务返回并被事件循环标记为已完成,因此从事件队列的任务队列中删除。
现在,我们对与 Python AsyncIO 使用相关的术语有了足够的了解。现在让我们深入了解一些操作,并编写一个简单的程序来了解 Python AsyncIO 的实际工作原理。
现在是时候振作起来,开始潜入 Python 异步编程的世界,并了解 AsyncIO 是如何真正工作的了。
以下代码使用 Python 请求库和 AsyncIO 实现了一个简单的 URL 获取程序:
# async_url_fetch.py
#!/usr/bin/python3
import asyncio
import requests
async def fetch_url(url):
response = requests.get(url)
return response.text
async def get_url(url):
return await fetch_url(url)
def process_results(future):
print("Got results")
print(future.result())
loop = asyncio.get_event_loop()
task1 = loop.create_task(get_url('http://www.google.com'))
task2 = loop.create_task(get_url('http://www.microsoft.com'))
task1.add_done_callback(process_results)
task2.add_done_callback(process_results)
loop.run_forever()这是一个实现 Python 异步 IO 库的小而好的异步程序。现在,让我们花一些时间来理解我们在这里做了什么。
从顶部开始,我们导入了 Python 请求库,从 Python 代码生成 web 请求,还导入了 Python 的 AsyncIO 库。
接下来,我们定义一个名为fetch_url的协同例程。为 AsyncIO 定义共同例程的一般语法要求使用async关键字:
async def fetch_url(url)下一行是另一个名为get_url的 co 例程的定义。我们在get_url例程中所做的是调用另一个共同例程fetch_url,它实际获取 URL。
由于fetch_url是一个阻塞共同例程,因此我们使用await关键字继续调用fetch_url。这意味着该方法可以暂停,直到获得结果:
return await fetch_url(url)程序的下一步是process_results方法的定义。当get_url方法的结果到达时,我们使用此方法作为回调来处理这些结果。此方法采用单个参数,future对象,其中包含对get_url的函数调用结果。
在方法内部,可以通过future对象的results()方法访问未来的结果:
print(future.results())有了这些,我们就有了执行 AsyncIO 事件循环的所有基本机制。现在,是时候实现一个真正的事件循环并向其提交一些任务了。
我们首先通过调用get_event_loop()方法获取 AsyncIO 事件循环。get_event_loop()方法为运行代码的平台返回 AsyncIO 的最佳事件循环实现。
AsyncIO implements multiple event loops which a programmer can use. Usually a simple call to get_event_loop() will return the best event loop implementation for the system the interpreter is running on.
创建循环后,我们现在通过使用create_task()方法向事件循环提交一些任务。这会将任务添加到要执行的事件循环队列中。现在,由于这些任务是异步的,我们不知道哪个任务将首先生成结果,因此我们需要提供回调来处理任务的结果。为此,我们通过 tasksadd_done_callback()方法向 tasks 添加回调:
task1.add_done_callback(process_results)一旦这里的一切都设置好了,我们将事件循环启动到run_forever模式,以便事件循环继续运行并处理新任务。
通过这个,我们完成了一个简单的异步 IO 程序的实现。但是,我们正在尝试构建一个企业级应用。如果我想用 AsyncIO 构建一个企业 web 应用呢?
现在,让我们看看如何使用 AssiCIO 来实现一个简单的异步套接字服务器。
Python 实现提供的 AsyncIO 库提供了许多强大的功能。这些功能之一是接口和管理套接字通信的能力。这使程序员能够实现异步套接字处理,从而允许更多的客户端连接到服务器。
以下代码示例使用基于回调的机制构建了一个简单的套接字处理程序,以处理与客户端的通信:
# async_socket_server.py#!/usr/bin/python3import asyncioclass MessageProtocol(asyncio.Protocol): """An asyncio protocol implementation to handle the incoming messages.""" def connection_made(self, transport): ...大多数时候,当我们通过框架构建一些 web 应用时,框架通常提供一个小型且易于运行的 web 服务器。尽管这些服务器适合在开发环境中使用,以便在开发阶段快速实现更改并通过应用内部的问题进行调试,但这些服务器无法处理生产工作负载。
即使在整个应用都是从头开始开发的情况下,通过使用反向代理将通信代理给 web 应用通常也是一个好主意。但问题是,我们为什么要这样做?我们为什么不直接运行 web 应用,让它处理传入的请求呢。让我们快速了解 web 应用服务的所有职责:
- 传入请求的处理:当新请求到达 web 应用时,web 应用可能需要决定如何处理该请求。如果 web 应用具有可以处理请求的工作人员,则应用将接受请求,将其移交给工作人员,并在工作人员完成处理后返回请求的响应。如果没有工作进程,则 web 应用必须将此请求排入队列以供以后处理。在最坏的情况下,当队列积压超过最大排队客户端数的阈值时,web 应用必须拒绝该请求。
- 服务静态资源:如果 web 应用需要生成动态 HTML 页面,它还可以作为服务器,跨 CSS、Javascript、图像等静态资源发送,从而增加负载。
- 处理加密:大多数 web 应用现在都启用了加密。在这种情况下,我们的 web 应用还需要我们管理加密数据的解析并提供安全连接。
这些都是需要由一个简单的 web 应用服务器来处理的一些职责。我们更需要的是一种机制,通过这种机制,我们可以从 web 应用服务器上卸下大量的这些责任,让它只处理它应该做的基本工作以及它真正发挥作用的地方。
因此,我们要提高 web 应用处理大量客户机的能力的第一步行动是首先减轻它的一些责任。为了实现这一点,我想到了一个简单的选择,即首先在反向代理后面开始运行 web 应用:
那么,反向代理本质上是做什么的呢?反向代理的工作方式是,当客户端请求到达Web 应用服务器时,反向代理截获该请求。根据为将请求与相应后端应用匹配而定义的规则,反向代理然后将该请求转发给。。。
在考虑使用反向代理时,首先想到的一个优点是安全性的提高。这是因为现在我们可以在防火墙后面运行 web 应用,这样就不能直接访问它。反向代理截取请求并将其转发给应用,而不让用户知道他们发出的请求背后发生了什么。
这种对 web 应用的受限访问有助于减少恶意用户可能利用的攻击面,从而侵入 web 应用并访问或修改关键记录。
反向代理服务器还可用于提高 web 应用的连接处理能力。现在,为了加快远程内容的获取,web 浏览器打开到 web 服务器的多个连接以增加资源的并行下载。反向代理可以在 web 应用处理挂起的请求时排队并为连接请求提供服务,从而提高连接接受度并减少应用管理连接状态的负载。
当 web 应用生成对特定客户机请求的响应时,可能会再次出现相同类型的请求,或者再次请求相同的资源。对于每个类似的请求,使用 web 应用一次又一次地生成响应可能不是一个很好的解决方案。
反向代理有时可以帮助理解请求和响应模式,并为它们实现缓存。启用缓存时,当相似的请求再次到达或相同的资源再次被请求时,反向代理可以直接发回缓存的响应,而不是将请求转发给 web 应用,从而减轻 web 应用的大量开销。这将提高 web 应用的性能,缩短客户端的响应时间。
大多数 web 应用都提供两种资源。一种是根据外部输入和保持不变的静态内容(如 CSS 文件、Javascript 文件、图像等)生成的动态响应。
如果我们可以从 web 应用中卸下这些责任中的任何一项,它将提供大量性能增益和改进的可伸缩性。
我们在这里的最佳可能性是将静态资源的服务转移到客户端。反向代理还可以兼作服务器,它可以为客户端提供静态资源,而无需将这些请求转发到 web 应用服务器,从而显著减少等待。。。
通过本章的课程,我们了解了构建 web 应用以处理大量并发请求的不同方法。我们从理解和学习不同的缩放技术开始,例如垂直缩放和水平缩放,并了解每种技术的不同优缺点。然后,我们进一步深入讨论这些主题,以帮助我们提高 web 应用本身处理更多请求的能力。这让我们开始了使用资源池的旅程,以及为什么使用资源池而不是为到达 web 应用的每个新请求任意分配资源是一个好主意。在接下来的旅程中,我们了解了处理传入请求的异步方式,以及为什么异步机制更适合于 I/O 受限的 web 应用的更高可伸缩性。我们通过研究反向代理的使用以及反向代理提供了哪些优势来帮助我们扩展 web 应用,从而结束了关于为大量客户机扩展应用的讨论。
现在,我们已经了解了如何让应用处理大量并发请求,下一章将介绍如何利用我们在本书中学到的不同概念构建演示应用。
- 我们如何使用同一应用的多个实例来服务传入的请求?
- 我们如何实现流程池并通过它们分发客户端请求?
- 我们能否实现一个同时利用进程池和线程池的应用?在实施同样的计划时,我们可能会面临哪些问题?
- 如何使用 AsyncIO 实现基本 web 服务器?


