Skip to content

Latest commit

 

History

History
854 lines (593 loc) · 61.6 KB

File metadata and controls

854 lines (593 loc) · 61.6 KB

十三、并发

并发是一种让计算机同时做(或看起来做)多项事情的艺术。历史上,这意味着邀请处理器每秒多次在不同任务之间切换。在现代系统中,它也可以字面上意味着在不同的处理器内核上同时做两件或更多的事情。

并发本质上不是一个面向对象的主题,但是 Python 的并发系统是建立在我们在本书中介绍的面向对象结构之上的。本章将向您介绍以下主题:

  • 线程
  • 多处理
  • 期货
  • 异步

并发是复杂的。基本概念相当简单,但可能出现的 bug 却很难追踪。然而,对于许多项目来说,并发性是获得所需性能的唯一途径。想象一下,如果 web 服务器在前一个请求完成之前无法响应用户的请求!我们不会详细讨论它有多难(需要另一本完整的书),但我们将了解如何在 Python 中实现基本并发,以及一些最常见的陷阱。

螺纹

大多数情况下,创建并发是为了在程序等待 I/O 发生时继续工作。例如,服务器可以在等待前一个请求的数据到达时开始处理新的网络请求。交互式程序可能会在等待用户按键时渲染动画或执行计算。请记住,虽然一个人每分钟可以键入 500 多个字符,但计算机每秒可以执行数十亿条指令。因此,即使在快速打字的情况下,单个按键之间也会发生大量的处理。

从理论上讲,在程序中管理所有这些活动之间的切换是可能的,但实际上不可能做到正确。相反,我们可以依靠 Python 和操作系统来处理棘手的切换部分,同时创建看起来独立但同时运行的对象。这些对象称为线程;在 Python 中,它们有一个非常简单的 API。让我们来看一个基本的例子:

from threading import Thread

class InputReader(Thread):
    def run(self):
        self.line_of_text = input()

print("Enter some text and press enter: ")
thread = InputReader()
thread.start()

count = result = 1
while thread.is_alive():
    result = count * count
    count += 1

print("calculated squares up to {0} * {0} = {1}".format(
    count, result))
print("while you typed '{}'".format(thread.line_of_text))

此示例运行两个线程。你能看见他们吗?每个程序都有一个线程,称为主线程。从一开始执行的代码就发生在这个线程中。更明显的是,第二个线程作为InputReader类存在。

要构造线程,我们必须扩展Thread类并实现run方法。run方法中的任何代码(或从该方法中调用的代码)都在单独的线程中执行。

在我们调用对象上的start()方法之前,新线程不会开始运行。在这种情况下,线程立即暂停以等待键盘输入。同时,原始线程在调用start点继续执行。它开始计算while循环中的平方。while循环中的条件检查InputReader线程是否已经退出其run方法;一旦这样做,它就会向屏幕输出一些摘要信息。

如果我们运行示例并键入字符串“hello world”,则输出如下所示:

Enter some text and press enter:
hello world
calculated squares up to 1044477 * 1044477 = 1090930114576
while you typed 'hello world'

当然,在键入字符串时,您将计算或多或少的平方,因为数字与我们的相对键入速度以及我们正在运行的计算机的处理器速度有关。

当我们调用start方法时,线程才开始以并发模式运行。如果我们想取出并发调用以查看其比较结果,我们可以在最初调用thread.start()的地方调用thread.run()。结果表明:

Enter some text and press enter:
hello world
calculated squares up to 1 * 1 = 1
while you typed 'hello world'

在这种情况下,线程永远不会激活,while循环永远不会执行。我们在打字的时候闲置着,浪费了大量的 CPU 资源。

有很多不同的模式可以有效地使用线程。我们不会涵盖所有的方法,但我们会看一个常见的方法,这样我们就可以了解join方法。让我们检查一下加拿大各省首都的当前温度:

from threading import Thread
import json
from urllib.request import urlopen
import time

CITIES = [
    'Edmonton', 'Victoria', 'Winnipeg', 'Fredericton',
    "St. John's", 'Halifax', 'Toronto', 'Charlottetown',
    'Quebec City', 'Regina'
]

class TempGetter(Thread):
 def __init__(self, city):
 super().__init__()
 self.city = city

    def run(self):
        url_template = (
            'http://api.openweathermap.org/data/2.5/'
            'weather?q={},CA&units=metric')
        response = urlopen(url_template.format(self.city))
        data = json.loads(response.read().decode())
        self.temperature = data['main']['temp']

threads = [TempGetter(c) for c in CITIES]
start = time.time()
for thread in threads:
    thread.start()

for thread in threads:
 thread.join()

for thread in threads:
    print(
        "it is {0.temperature:.0f}°C in {0.city}".format(thread))
print(
    "Got {} temps in {} seconds".format(
    len(threads), time.time() - start))

此代码在启动前构造 10 个线程。注意我们如何重写构造函数将它们传递到Thread对象,记住调用super以确保Thread正确初始化。请注意:新线程尚未运行,因此__init__方法仍在主线程内部执行。我们在一个线程中构造的数据可以从其他正在运行的线程访问。

在 10 个线程启动之后,我们再次循环它们,对每个线程调用join()方法。该方法本质上说是“在执行任何操作之前等待线程完成”。我们依次称之为十次;在所有十个线程完成之前,for 循环不会退出。

此时,我们可以打印存储在每个线程对象上的温度。再次注意,我们可以从主线程访问在线程中构造的数据。在线程中,默认情况下共享所有状态。

在我的 100 mbit 连接上执行此代码大约需要十分之二秒:

it is 5°C in Edmonton
it is 11°C in Victoria
it is 0°C in Winnipeg
it is -10°C in Fredericton
it is -12°C in St. John's
it is -8°C in Halifax
it is -6°C in Toronto
it is -13°C in Charlottetown
it is -12°C in Quebec City
it is 2°C in Regina
 Got 10 temps in 0.18970298767089844 seconds

如果我们在一个线程中运行这段代码(通过将start()调用更改为run()并注释掉join()调用),则需要将近 2 秒的时间,因为每个 0.2 秒的请求必须在下一个开始之前完成。这 10 倍的加速显示了并发编程是多么有用。

线程的许多问题

线程可能很有用,特别是在其他编程语言中,但现代 Python 程序员出于几个原因倾向于避免使用它们。正如我们将看到的,还有其他并发编程的方法正在受到 Python 开发人员的更多关注。在继续讨论更突出的主题之前,让我们先讨论一下这些陷阱。

共享内存

线程的主要问题也是它们的主要优势。线程可以访问所有内存,从而访问程序中的所有变量。这很容易导致程序状态不一致。你有没有遇到过一个房间,一盏灯有两个开关,两个不同的人同时打开它们?每个人(线程)都希望他们的操作能够打开指示灯(变量),但结果值(指示灯关闭)与这些预期不一致。现在想象一下,如果这两个线程在银行账户之间转移资金,或者在一辆车上管理巡航控制系统。

在线程编程中,这个问题的解决方案是“同步”对读取或写入共享变量的任何代码的访问。有几种不同的方法可以做到这一点,但我们在这里不讨论它们,所以我们可以关注更多的 python 构造。同步解决方案是可行的,但很容易忘记应用它。更糟糕的是,由于不恰当地使用同步而导致的错误确实很难追踪,因为线程执行操作的顺序不一致。我们不能轻易地重现这个错误。通常,使用已经适当使用锁的轻量级数据结构强制线程之间的通信是最安全的。Python 提供了queue.Queue类来实现这一点;它的功能与我们将在下一节讨论的multiprocessing.Queue基本相同。

在某些情况下,这些缺点可能会被允许共享内存的一个优点所抵消:它的速度很快。如果多个线程需要访问一个巨大的数据结构,共享内存可以快速提供这种访问。然而,在 Python 中,运行在不同 CPU 核上的两个线程不可能同时执行计算,这一事实通常会抵消这一优势。这就引出了线程的第二个问题。

全局解释器锁

为了高效地管理内存、垃圾收集和对库中机器代码的调用,Python 有一个名为全局解释器锁GIL的实用程序。这是不可能关闭的,这意味着线程在 Python 中没有用处,因为它们在其他语言中擅长一件事:并行处理。GIL 的主要作用是防止任何两个线程同时工作,即使它们有工作要做。在这种情况下,“工作”意味着使用 CPU,因此多线程访问磁盘或网络是完全可以的;一旦线程开始等待某些内容,就会释放 GIL。

GIL 被高度贬低,大多数人不了解它是什么,也不了解它给 Python 带来的所有好处。如果我们的语言没有这样的限制,那当然很好,但是 Python 参考开发人员已经确定,至少现在,它带来的价值大于成本。它使参考实现更易于维护和开发,并且在最初开发 Python 的单核处理器时代,它实际上使解释器更快。然而,GIL 的最终结果是它限制了线程给我们带来的好处,而没有降低成本。

虽然 GIL 是大多数人使用的 Python 参考实现中的一个问题,但它在一些非标准实现(如 IronPython 和 Jython)中得到了解决。不幸的是,在发布时,这些都不支持 Python3。

线程开销

与我们稍后将讨论的异步系统相比,线程的最后一个限制是维护线程的成本。每个线程占用一定数量的内存(在 Python 进程和操作系统内核中)来记录该线程的状态。在线程之间切换也会占用(少量)CPU 时间。这项工作在没有任何额外编码的情况下无缝地进行(我们只需调用start(),其余的都会处理),但这项工作仍然必须在某个地方进行。

通过对工作负载进行结构化,使线程可以重用以执行多个作业,可以在一定程度上缓解这一问题。Python 提供了一个ThreadPool特性来处理这个问题。它是作为多处理库的一部分提供的,其行为与我们稍后将讨论的ProcessPool相同,因此我们将把讨论推迟到下一节。

多处理

多处理 API 最初设计为模仿线程 API。然而,它已经发展,在 Python3 的最新版本中,它更有力地支持更多特性。多处理库是在 CPU 密集型作业需要并行进行且多核可用时设计的(考虑到目前可以以 35 美元的价格购买四核 Raspberry Pi,通常有多核可用)。当进程的大部分时间都在等待 I/O(例如,网络、磁盘、数据库或键盘)时,多处理是没有用的,但它们是并行计算的方式。

多处理模块启动新的操作系统进程来完成这项工作。在 Windows 机器上,这是一个相对昂贵的操作;在 Linux 上,进程在内核中的实现方式与线程相同,因此开销仅限于在每个进程中运行单独的 Python 解释器的成本。

让我们尝试使用与threadingAPI 提供的结构类似的结构来并行计算繁重的操作:

from multiprocessing import Process, cpu_count
import time
import os

class MuchCPU(Process):
    def run(self):
 print(os.getpid())
        for i in range(200000000):
            pass

if __name__ == '__main__':
 procs =  [MuchCPU() for f in range(cpu_count())]
    t = time.time()
    for p in procs:
 p.start()
    for p in procs:
 p.join()
    print('work took {} seconds'.format(time.time() - t))

这个示例只占用 CPU 2 亿次迭代。你可能不认为这是有用的工作,但这是一个寒冷的日子,我欣赏我的笔记本电脑在这种负载下产生的热量。

API 应该是熟悉的;我们实现了一个子类Process(而不是Thread,并实现了一个run方法。此方法在执行某些密集(如果被误导)工作之前打印出进程 ID(操作系统分配给机器上每个进程的唯一编号)。

请特别注意模块级代码周围的if __name__ == '__main__':防护,该防护可防止模块在导入时运行,而不是作为程序运行。一般来说,这是一种很好的做法,但在某些操作系统上使用多处理时,这是必不可少的。在幕后,为了执行run()方法,多处理可能必须在新进程内导入模块。如果我们允许整个模块在该点执行,它将开始递归地创建新进程,直到操作系统耗尽资源。

我们为机器上的每个处理器核心构造一个进程,然后启动并加入这些进程。在我 2014 年的四核笔记本电脑上,输出如下所示:

6987
6988
6989
6990
work took 12.96659541130066 seconds

前四行是在每个MuchCPU实例中打印的进程 ID。最后一行显示,在我的机器上,2 亿次迭代可以在大约 13 秒内运行。在这 13 秒钟内,我的进程监视器显示我的四个内核都在 100%运行。

如果我们将MuchCPU中的multiprocessing.Process改为子类threading.Thread,则输出如下:

7235
7235
7235
7235
work took 28.577413082122803 seconds

这一次,四个线程在同一个进程内运行,运行时间几乎是原来的三倍。这是全局解释器锁的成本;在其他语言或 Python 实现中,线程版本的运行速度至少与多处理版本一样快,我们可能期望它的运行时间是多处理版本的四倍,但请记住,我的笔记本电脑上运行着许多其他程序。在多处理版本中,这些程序还需要共享四个 CPU。在线程版本中,这些程序可以使用其他三个 CPU。

多处理池

一般来说,没有理由拥有比计算机上处理器更多的进程。这有几个原因:

  • 只有cpu_count()进程可以同时运行
  • 每个进程都使用 Python 解释器的完整副本来消耗资源
  • 进程之间的通信代价高昂
  • 创建进程需要非零的时间

考虑到这些限制,在程序启动时最多创建cpu_count()进程,然后让它们根据需要执行任务是有意义的。实现一系列基本的通信过程并不困难,但调试、测试和正确操作可能会很棘手。当然,Python 就是 Python,我们不必做所有这些工作,因为 Python 开发人员已经以多处理池的形式为我们完成了这些工作。

池的主要优点是,它们可以减少计算主进程中执行的代码以及子进程中运行的代码的开销。与多处理模拟的线程 API 一样,通常很难记住谁在执行什么。池抽象限制了不同进程中的代码相互交互的位置数量,使跟踪变得更加容易。

  • 池还无缝地隐藏在进程之间传递数据的进程。使用池看起来很像函数调用;将数据传递到函数中,它在另一个或多个进程中执行,当工作完成时,返回一个值。重要的是要理解,在引擎盖下,为了支持这一点,正在做大量的工作:一个进程中的对象正在被酸洗并传递到管道中。
  • 另一个进程从管道中检索数据并取消勾选。在子流程中完成工作并产生结果。结果被腌制并传递到管道中。最终,原始进程将其取消勾选并返回。

所有这些酸洗和将数据传递到管道中都需要时间和内存。因此,将传入池和从池返回的数据的数量和大小保持在最小是理想的,并且只有在需要对相关数据进行大量处理时才有利于使用池。

有了这些知识,使所有这些机器工作的代码出人意料地简单。让我们看一下计算随机数列表中所有素因子的问题。这是各种加密算法中常见且昂贵的一部分(更不用说对这些算法的攻击了!)。它需要多年的处理能力来破解用于保护您银行账户的超大数字。下面的实现虽然可读,但效率不高,但这没关系,因为我们希望看到它使用大量的 CPU 时间:

import random
from multiprocessing.pool import Pool

def prime_factor(value):
    factors = []
    for divisor in range(2, value-1):
        quotient, remainder = divmod(value, divisor)
        if not remainder:
            factors.extend(prime_factor(divisor))
            factors.extend(prime_factor(quotient))
            break
    else:
        factors = [value]
    return factors

if __name__ == '__main__':
 pool = Pool()

    to_factor = [
        random.randint(100000, 50000000) for i in range(20)
    ]
 results = pool.map(prime_factor, to_factor)
 for value, factors in zip(to_factor, results):
        print("The factors of {} are {}".format(value, factors))

让我们关注并行处理方面,因为计算因子的蛮力递归算法非常清晰。我们首先构造一个多处理池实例。默认情况下,此池为运行它的计算机中的每个 CPU 核心创建一个单独的进程。

map方法接受一个函数和一个 iterable。池对 iterable 中的每个值进行 pickle,并将其传递到一个可用进程中,该进程对其执行函数。当该流程完成其工作时,它会对结果的因子列表进行 pickle 处理,并将其传递回池中。一旦所有池都完成了处理工作(这可能需要一些时间),结果列表就会传递回原始流程,该流程一直在耐心等待所有这些工作的完成。

使用类似的map_async方法通常更有用,即使进程仍在工作,该方法也会立即返回。在这种情况下,results 变量将不是一个值列表,而是一个稍后通过调用results.get()返回值列表的承诺。这个 promise 对象还有ready()wait()等方法,可以让我们检查是否所有结果都已经存在。

或者,如果我们事先不知道想要得到结果的所有值,我们可以使用apply_async方法对单个作业进行排队。如果池中有一个进程尚未运行,它将立即启动;否则,它将保留任务,直到有可用的空闲进程。

池也可以是closed,它拒绝执行任何进一步的任务,但处理队列中当前的所有内容;或者terminated,它更进一步,拒绝启动队列中仍然存在的任何作业,尽管仍然允许完成当前正在运行的任何作业。

排队

如果我们需要更多地控制流程之间的通信,我们可以使用QueueQueue数据结构用于将消息从一个进程发送到一个或多个其他进程。任何可酸洗的对象都可以发送到Queue中,但请记住,酸洗可能是一项昂贵的操作,因此请将此类对象保持在较小的范围内。为了说明队列,让我们为文本内容构建一个小型搜索引擎,将所有相关条目存储在内存中。

这不是构建基于文本的搜索引擎的最明智的方法,但我已经使用这种模式来查询数字数据,这些数据需要使用 CPU 密集型进程来构建图表,然后呈现给用户。

这个特定的搜索引擎并行扫描当前目录中的所有文件。为 CPU 上的每个核心构造一个进程。其中每一个都被指示将一些文件加载到内存中。让我们看一下进行加载和搜索的函数:

def search(paths, query_q, results_q):
    lines = []
    for path in paths:
        lines.extend(l.strip() for l in path.open())

 query = query_q.get()
    while query:
 results_q.put([l for l in lines if query in l])
 query = query_q.get()

请记住,此函数是在与主线程不同的进程中运行的(实际上,它是在cpucount()不同的进程中运行的)。它传递一个path.path对象列表和两个multiprocessing.Queue对象列表;一个用于传入查询,另一个用于发送传出结果。这些队列与我们在第 6 章Python 数据结构中讨论的Queue类具有类似的接口。但是,他们正在做额外的工作来 pickle 队列中的数据,并通过管道将其传递到子流程中。这两个队列在主进程中设置,并通过管道传递到子进程内的搜索函数中。

搜索代码是相当愚蠢的,无论是在效率和能力方面;它循环存储在内存中的每一行,并将匹配的行放入一个列表中。该列表被放置在队列上并传递回主进程。

让我们看一下主流程,它设置了以下队列:

if __name__ == '__main__':
    from multiprocessing import Process, Queue, cpu_count
    from path import path
    cpus = cpu_count()
    pathnames = [f for f in path('.').listdir() if f.isfile()]
    paths = [pathnames[i::cpus] for i in range(cpus)]
 query_queues = [Queue() for p in range(cpus)]
 results_queue = Queue()

 search_procs = [
 Process(target=search, args=(p, q, results_queue))
 for p, q in zip(paths, query_queues)
 ]
    for proc in search_procs: proc.start()

为了便于描述,我们假设cpu_count是四。注意导入语句是如何放置在if保护中的吗?这是一个小型优化,可防止在某些操作系统上的每个子流程(不需要它们的地方)中导入它们。我们列出当前目录中的所有路径,然后将列表分成四个大致相等的部分。我们还构建了一个包含四个Queue对象的列表,将数据发送到每个子流程。最后,我们构造了一个single结果队列;这将传递到所有四个子流程中。它们中的每一个都可以将数据放入队列,并在主进程中进行聚合。

现在让我们看一下使搜索实际发生的代码:

    for q in query_queues:
 q.put("def")
 q.put(None)  # Signal process termination

    for i in range(cpus):
 for match in results_queue.get():
            print(match)
    for proc in search_procs: proc.join()

这段代码只对"def"执行一次搜索(因为它是一个充满 Python 文件的目录中的常见短语!)。在一个更适合生产的系统中,我们可能会将一个套接字连接到此搜索代码。在这种情况下,我们必须更改进程间协议,以便返回队列上的消息包含足够的信息来标识结果附加到的查询。

队列的这种使用实际上是分布式系统的本地版本。想象一下,如果搜索被发送到多台计算机,然后重新组合。我们在这里不讨论它,但多处理模块包含一个管理器类,它可以从前面的代码中提取大量样板文件。甚至还有一个版本的multiprocessing.Manager可以管理远程系统上的子流程,以构建基本的分布式应用程序。如果您有兴趣进一步了解这一点,请查看 Python 多处理文档。

多重处理的问题

与线程一样,多处理也有问题,其中一些问题我们已经讨论过了。没有实现并发的最佳方法;在 Python 中尤其如此。我们总是需要检查并行问题,找出许多可用的解决方案中哪一个是该问题的最佳解决方案。有时候,没有最好的解决办法。

在多处理的情况下,主要的缺点是在进程之间共享数据非常昂贵。正如我们所讨论的,进程之间的所有通信,无论是通过队列、管道还是更隐式的机制,都需要对对象进行酸洗。过度酸洗很快就占据了加工时间。当在进程之间传递相对较小的对象并且需要在每个进程上完成大量工作时,多处理效果最佳。另一方面,如果流程之间不需要通信,那么使用该模块可能没有任何意义;我们可以启动四个单独的 Python 进程并独立使用它们。

多处理的另一个主要问题是,与线程一样,很难判断变量或方法在哪个进程中被访问。在多进程处理中,如果从另一个进程访问变量,通常会覆盖当前运行进程中的变量,而另一个进程保留旧值。这真的很难维护,所以不要这样做。

期货

让我们开始研究一种更异步的并发方式。根据我们需要什么样的并发性(倾向于 I/O 而不是倾向于 CPU),我们可以选择多处理或线程。它们并不能完全解决意外改变共享状态的问题,但它们允许我们构造代码,以便在这样做时更容易跟踪。未来在不同的线程或进程之间提供不同的边界。与多处理池类似,它们对于“呼叫和应答”类型的交互非常有用,在这种交互中,处理可以发生在另一个线程中,然后在将来的某个时候(毕竟它们的名称很恰当),您可以向它询问结果。它实际上只是多处理池和线程池的包装器,但它提供了更干净的 API 并鼓励编写更好的代码。

future 是一个基本上包装函数调用的对象。该函数调用在线程或进程的后台运行。future 对象具有检查 future 是否已完成以及在完成后获取结果的方法。

让我们做另一个文件搜索示例。在最后一节中,我们实现了一个版本的unix grep命令。这一次,让我们做一个简单版本的find命令。示例将在整个文件系统中搜索包含给定字符串的路径:

from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from os.path import sep as pathsep
from collections import deque

def find_files(path, query_string):
    subdirs = []
    for p in path.iterdir():
        full_path = str(p.absolute())
        if p.is_dir() and not p.is_symlink():
            subdirs.append(p)
        if query_string in full_path:
                print(full_path)

    return subdirs

query = '.py'
futures = deque()
basedir = Path(pathsep).absolute()

with ThreadPoolExecutor(max_workers=10) as executor:
    futures.append(
        executor.submit(find_files, basedir, query))
    while futures:
        future = futures.popleft()
        if future.exception():
            continue
        elif future.done():
            subdirs = future.result()
            for subdir in subdirs:
                futures.append(executor.submit(
                    find_files, subdir, query))
        else:
            futures.append(future)

此代码由一个名为find_files的函数组成,该函数在单独的线程(或进程,如果我们使用ProcessPoolExecutor的话)中运行。这个函数没有什么特别之处,但请注意它如何不访问任何全局变量。与外部环境的所有交互都传递到函数中或从函数返回。这不是一个技术要求,但它是最好的方法,让你的大脑在你的脑袋里编程时与未来。

在没有适当同步的情况下访问外部变量会导致所谓的竞争条件。例如,假设两个并发写入尝试递增一个整数计数器。它们同时启动,并且都将值读取为 5。然后它们都增加值,并将结果写回 6。但是,如果两个进程试图增加一个变量,那么预期的结果是它将增加 2,因此结果应该是 7。现代智慧认为,避免这样做的最简单方法是尽可能多地保持状态私有,并通过已知的安全结构(如队列)共享它们。

我们在开始之前设置了几个变量;对于本例,我们将搜索包含字符'.py'的所有文件。我们有一系列的未来,我们将很快讨论。basedir变量指向文件系统的根目录;在 Unix 机器上有'/',在 Windows 上可能有C:\

首先,让我们学习一门关于搜索理论的短期课程。该算法并行实现了广度优先搜索。它不是使用深度优先搜索递归地搜索每个目录,而是将当前文件夹中的所有子目录添加到队列中,然后添加每个文件夹的所有子目录,依此类推。

程序的核心部分称为事件循环。我们可以构造一个ThreadPoolExecutor作为一个上下文管理器,以便在完成时自动清理它并关闭它的线程。它需要一个max_workers参数来指示一次运行的线程数;如果提交的作业超过这个数量,它会将其余作业排队,直到有一个工作线程可用。当使用ProcessPoolExecutor时,这通常受限于机器上的 CPU 数量,但对于线程,这可能会更高,具体取决于一次有多少 CPU 在等待 I/O。每个线程都会占用一定的内存,所以不应该太高;在磁盘速度(而不是并行请求的数量)成为瓶颈之前,不需要那么多线程。

构建执行器之后,我们将使用根目录向其提交作业。submit()方法立即返回一个Future对象,它承诺最终会给我们一个结果。未来被放在队列上。然后循环重复地从队列中删除第一个未来并检查它。如果它仍在运行,它将被添加回队列的末尾。否则,我们检查函数是否通过调用future.exception()引发异常。如果是这样的话,我们就忽略它(这通常是一个权限错误,尽管一个真正的应用程序需要更加小心异常是什么)。如果我们没有在这里检查此异常,它将在我们调用result()时引发,并且可以通过正常的try进行处理。。。except机构。

假设没有异常发生,我们可以调用result()获取函数调用的返回值。由于该函数返回非符号链接的子目录列表(我防止无限循环的惰性方式),result()返回相同的内容。这些新的子目录被提交给执行者,产生的未来被抛到队列中,以便在以后的迭代中搜索它们的内容。

因此,这就是开发基于未来的 I/O 绑定应用程序所需的全部内容。实际上,它使用的是我们已经讨论过的同一个线程或进程 API,但它提供了一个更容易理解的接口,并且更容易看到并发运行的函数之间的边界(只是不要试图从将来的内部访问全局变量!)。

异步

AsyncIO 是 Python 并发编程的最新技术。它将未来和事件循环的概念与我们在第 9 章迭代器模式中讨论的协同过程相结合。其结果与编写并发代码时可能得到的结果一样优雅、易于理解,尽管这并不意味着什么!

AsyncIO 可以用于几个不同的并发任务,但它是专门为网络 I/O 设计的。大多数网络应用程序,尤其是服务器端的应用程序,需要花费大量时间等待数据从网络传入。这可以通过在单独的线程中处理每个客户机来解决,但是线程会占用内存和其他资源。AsyncIO 使用协同路由而不是线程。

该库还提供了自己的事件循环,避免了前面示例中需要几行长的 while 循环。然而,事件循环是有代价的。当我们在事件循环的异步任务中运行代码时,该代码必须立即返回,既不阻塞 I/O,也不阻塞长时间运行的计算。在编写我们自己的代码时,这是一件小事,但这意味着任何阻塞 I/O 的标准库或第三方函数都必须创建非阻塞版本。

AsyncIO 通过创建一组协程来解决这个问题,这些协程使用yield from语法立即将控制返回到事件循环。事件循环负责检查阻塞调用是否已完成并执行任何后续任务,就像我们在上一节中手动执行的那样。

异步操作

阻塞函数的一个典型示例是time.sleep调用。让我们使用此调用的异步版本来说明异步 IO 事件循环的基本原理:

import asyncio
import random

@asyncio.coroutine
def random_sleep(counter):
    delay = random.random() * 5
    print("{} sleeps for {:.2f} seconds".format(counter, delay))
 yield from asyncio.sleep(delay)
    print("{} awakens".format(counter))

@asyncio.coroutine
def five_sleepers():
    print("Creating five tasks")
 tasks = [
 asyncio.async(random_sleep(i)) for i in range(5)]
    print("Sleeping after starting five tasks")
 yield from asyncio.sleep(2)
    print("Waking and waiting for five tasks")
 yield from asyncio.wait(tasks)

asyncio.get_event_loop().run_until_complete(five_sleepers())
print("Done five tasks")

这是一个相当基本的示例,但它涵盖了异步 IO 编程的几个特性。最容易理解的是它的执行顺序,即从下到上的顺序。

最后第二行获取事件循环,并指示它运行 future,直到完成。所讨论的未来被命名为five_sleepers。一旦 future 完成了它的工作,循环将退出,我们的代码将终止。作为异步程序员,我们不需要知道太多关于run_until_complete调用内部发生的事情,但要知道很多事情正在发生。这是我们在上一章中编写的 futures 循环的增强型协程版本,它知道如何处理迭代、异常、函数返回、并行调用等等。

现在更仔细地看一看未来。忽略几个段落的装饰;我们会回去的。协同程序首先构建了random_sleep未来的五个实例。结果的未来被包装在一个asyncio.async任务中,该任务将它们添加到循环的任务队列中,以便它们可以在控制返回到事件循环时并发执行。

每当我们调用yield from时,就会返回该控件。在本例中,我们调用yield from asyncio.sleep将此协同路由的执行暂停两秒钟。在此中断期间,事件循环执行它已排队的任务;即五种random_sleep期货。这些协程每次打印一条开始消息,然后在特定的时间内将控制发送回事件循环。如果random_sleep内的任何睡眠呼叫短于两秒,事件循环将控制权传递回相关的未来,该未来将在返回之前打印其唤醒消息。当five_sleepers内的睡眠调用被唤醒时,它执行到调用的下一个 yield,等待剩余的random_sleep任务完成。当所有睡眠调用完成执行后,random_sleep任务返回,这会将它们从事件队列中移除。一旦这五个都完成,asyncio.wait调用和five_sleepers方法也会返回。最后,由于事件队列现在为空,run_until_complete调用可以终止,程序结束。

asyncio.coroutine装饰器主要只是记录了这个协同程序是用来作为事件循环中的未来的。在这种情况下,程序在没有 decorator 的情况下可以正常运行。然而,asyncio.coroutine装饰符也可以用来包装一个正常的函数(一个不能产生的函数),这样它就可以被视为一个未来的函数。在这种情况下,整个函数在将控制返回到事件循环之前执行;装饰器只是强制函数实现协程 API,以便事件循环知道如何处理它。

解读异步未来

AsyncIO 协程按顺序执行每一行,直到遇到yield from语句,此时它将控制权返回到事件循环。然后,事件循环执行准备运行的任何其他任务,包括原始协同程序正在等待的任务。无论该子任务何时完成,事件循环都会将结果发送回协程,以便它能够继续执行,直到遇到另一个yield from语句或返回。

这允许我们编写同步执行的代码,直到我们明确地需要等待某些东西。这消除了线程的不确定性行为,因此我们几乎不需要担心共享状态。

提示

避免从协同程序内部访问共享状态仍然是一个好主意。它使您的代码更容易推理。更重要的是,即使理想情况下所有异步执行都发生在协程内部,但现实情况是某些未来都是在线程或进程的后台执行的。坚持“不共享”的理念,以避免大量困难的 bug。

此外,AsyncIO 允许我们在一个协程中收集代码的逻辑部分,即使我们正在其他地方等待其他工作。作为一个具体的例子,尽管random_sleep协程中的yield from asyncio.sleep调用允许大量事件在事件循环中发生,但协程本身看起来似乎一切都井然有序。AsyncIO 模块的主要优点是能够读取相关的异步代码片段,而不必担心等待任务完成的机器。

用于联网的异步 IO

AsyncIO 是专门设计用于网络套接字的,所以让我们实现一个 DNS 服务器。更准确地说,让我们实现 DNS 服务器的一个极其基本的功能。

域名系统的基本目的是将域名(如 www.amazon.com)转换为 IP 地址(如 72.21.206.6)。它必须能够执行多种类型的查询,并知道如何联系其他 DNS 服务器,如果它没有所需的答案。我们不会实现这些,但以下示例能够直接响应标准 DNS 查询,以查找我最近三位雇主的 IP:

import asyncio
from contextlib import suppress

ip_map = {
    b'facebook.com.': '173.252.120.6',
    b'yougov.com.': '213.52.133.246',
    b'wipo.int.': '193.5.93.80'
}

def lookup_dns(data):
    domain = b''
    pointer, part_length = 13, data[12]
    while part_length:
        domain += data[pointer:pointer+part_length] + b'.'
        pointer += part_length + 1
        part_length = data[pointer - 1]

    ip = ip_map.get(domain, '127.0.0.1')

    return domain, ip

def create_response(data, ip):
    ba = bytearray
    packet = ba(data[:2]) + ba([129, 128]) + data[4:6] * 2
    packet += ba(4) + data[12:]
    packet += ba([192, 12, 0, 1, 0, 1, 0, 0, 0, 60, 0, 4])
    for x in ip.split('.'): packet.append(int(x))
    return packet

class DNSProtocol(asyncio.DatagramProtocol):
    def connection_made(self, transport):
        self.transport = transport

 def datagram_received(self, data, addr):
        print("Received request from {}".format(addr[0]))
        domain, ip = lookup_dns(data)
        print("Sending IP {} for {} to {}".format(
            domain.decode(), ip, addr[0]))
 self.transport.sendto(
 create_response(data, ip), addr)

loop = asyncio.get_event_loop()
transport, protocol = loop.run_until_complete(
 loop.create_datagram_endpoint(
 DNSProtocol, local_addr=('127.0.0.1', 4343)))
print("DNS Server running")

with suppress(KeyboardInterrupt):
 loop.run_forever()
transport.close()
loop.close()

本例设置了一个字典,它将一些域无声地映射到 IPv4 地址。然后是两个函数,它们从二进制 DNS 查询数据包中提取信息并构造响应。我们不会讨论这些;如果您想了解更多关于 DNS 的请阅读 RFC(“征求意见”,定义大多数互联网协议的格式)1034 和 1035。

您可以通过在另一个终端中运行以下命令来测试此服务:

nslookup -port=4343 facebook.com localhost

让我们开始进餐吧。异步 IO 网络围绕着紧密相连的传输和协议概念展开。协议是一个类,它具有在相关事件发生时调用的特定方法。由于 DNS 运行在UDP用户数据报协议的顶部;我们将协议类构建为DatagramProtocol的子类。这个类有各种各样的事件可以响应;我们特别感兴趣的是发生的初始连接(仅为了存储传输以备将来使用)和datagram_received事件。对于 DNS,必须解析并响应每个接收到的数据报,此时交互结束。

因此,当接收到数据报时,我们处理数据包,查找 IP,并使用我们不讨论的函数构造响应(它们是家族中的害群之马)。然后,我们指示底层传输使用其sendto方法将生成的数据包发送回请求客户端。

传输基本上表示通信流。在本例中,它将在事件循环中的 UDP 套接字上发送和接收数据的所有麻烦都抽象出来。例如,与 TCP 套接字和子进程交互有类似的传输。

UDP 传输是通过调用循环的create_datagram_endpoint协同路由来构建的。这将构造适当的 UDP 套接字并开始侦听它。我们向它传递套接字需要监听的地址,重要的是,传递我们创建的协议类,以便传输在接收数据时知道调用什么。

由于初始化套接字的过程需要花费大量的时间,并且会阻塞事件循环,create_datagram_endpoint函数是一个协程。在我们的示例中,我们在等待初始化时实际上不需要做任何事情,因此我们将调用包装在loop.run_until_complete中。事件循环负责管理未来,当它完成时,它返回两个值的元组:新初始化的传输和从我们传入的类构造的协议对象。

在幕后,传输在事件循环上设置了一个任务,用于侦听传入的 UDP 连接。然后,我们所要做的就是启动事件循环并调用loop.run_forever(),以便任务能够处理这些数据包。当数据包到达时,它们按照协议进行处理,一切正常。

唯一需要注意的另一件重要事情是,当我们完成传输(事实上,事件循环)时,它们应该是关闭的。在本例中,代码在没有对close()的两次调用的情况下运行得很好,但是如果我们正在动态构造传输(或者只是进行适当的错误处理!),我们需要更加注意它。

您可能已经失望地看到,在设置协议类和底层传输时需要多少样板文件。AsyncIO 在这两个称为流的关键概念之上提供了一个抽象。在下一个示例中,我们将看到 TCP 服务器中的流示例。

使用执行器包装阻塞代码

AsyncIO 提供了自己版本的 futures 库,允许我们在没有适当的非阻塞调用时在单独的线程或进程中运行代码。这本质上允许我们将线程和进程与异步模型结合起来。此功能的一个更有用的应用程序是,当应用程序具有 I/O 绑定和 CPU 绑定的突发活动时,可以充分利用这两个方面。I/O 绑定部分可以发生在事件循环中,而 CPU 密集型工作可以剥离到不同的进程中。为了说明这一点,让我们使用 AsyncIO 实现“作为服务排序”:

import asyncio
import json
from concurrent.futures import ProcessPoolExecutor

def sort_in_process(data):
    nums = json.loads(data.decode())
    curr = 1
    while curr < len(nums):
        if nums[curr] >= nums[curr-1]:
            curr += 1
        else:
            nums[curr], nums[curr-1] = \
                nums[curr-1], nums[curr]
            if curr > 1:
                curr -= 1

    return json.dumps(nums).encode()

@asyncio.coroutine
def sort_request(reader, writer):
    print("Received connection")
 length = yield from reader.read(8)
 data = yield from reader.readexactly(
 int.from_bytes(length, 'big'))
 result = yield from asyncio.get_event_loop().run_in_executor(
 None, sort_in_process, data)
    print("Sorted list")
    writer.write(result)
    writer.close()   
    print("Connection closed")     

loop = asyncio.get_event_loop()
loop.set_default_executor(ProcessPoolExecutor())
server = loop.run_until_complete(
 asyncio.start_server(sort_request, '127.0.0.1', 2015))
print("Sort Service running")

loop.run_forever()
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

这是一个很好的代码实现一些非常愚蠢想法的例子。将排序作为一种服务的整个想法是相当荒谬的。使用我们自己的排序算法而不是调用 Python 的sorted更糟糕。我们使用的算法称为 gnome 排序,或者在某些情况下称为“愚蠢排序”。这是一个用纯 Python 实现的慢速排序算法。我们定义了自己的协议,而不是使用野外存在的许多完全合适的应用程序协议之一。甚至使用多处理来实现并行性的想法在这里也可能受到怀疑;我们仍然会将所有数据传入和传出子流程。有时候,从你正在编写的程序中退一步,问问自己是否在努力实现正确的目标是很重要的。

但让我们看看这种设计的一些智能功能。首先,我们将字节传入和传出子流程。这比在主进程中解码 JSON 要聪明得多。这意味着(相对昂贵的)解码可以在不同的 CPU 上进行。此外,pickle JSON 字符串通常比 pickle 列表小,因此进程之间传递的数据更少。

第二,这两种方法是非常线性的;看起来代码正在一行接一行地执行。当然,在 AsyncIO 中,这是一种错觉,但我们不必担心共享内存或并发原语。

溪流

前面的示例现在看起来应该很熟悉了,因为它有一个类似于其他 AsyncIO 程序的样板文件。然而,也有一些不同之处。你会注意到我们叫的是start_server而不是create_server。此方法钩住 AsyncIO 的流,而不是使用底层传输/协议代码。我们不需要传入协议类,而是可以传入一个普通的协同路由,它接收读写器参数。它们都表示可以像文件或套接字一样从中读取和写入的字节流。其次,因为这是一个 TCP 服务器而不是 UDP 服务器,所以在程序完成时需要进行一些套接字清理。此清理是一个阻塞调用,因此我们必须在事件循环上运行wait_closed协程。

流很容易理解。Reading 是一个潜在的阻塞调用,因此我们必须使用yield from来调用它。写作不会阻碍;它只是将数据放在一个队列中,异步 IO 在后台发送该队列。

sort_request方法中的代码发出两个读取请求。首先,它从线路中读取 8 个字节,并使用大端符号将它们转换为整数。此整数表示客户端打算发送的数据字节数。所以在下一次调用readexactly时,它读取了那么多字节。readreadexactly之间的区别在于前者将读取到请求的字节数,而后者将缓冲读取,直到接收到所有字节数,或者直到连接关闭。

遗嘱执行人

现在让我们看一下执行者代码。我们进口与上一节中使用的完全相同的ProcessPoolExecutor。请注意,我们不需要它的特殊 AsyncIO 版本。事件循环有一个方便的run_in_executor协程,我们可以使用它来运行未来。默认情况下,循环在ThreadPoolExecutor中运行代码,但如果愿意,我们可以传入不同的执行器。或者,正如我们在本例中所做的,当我们通过调用loop.set_default_executor()来设置事件循环时,我们可以设置不同的默认值。

正如您可能从上一节中回忆到的,没有太多关于与执行人一起使用期货的样板文件。但是,当我们将它们与 AsyncIO 一起使用时,根本就没有!协同程序在将来自动包装函数调用并将其提交给执行者。我们的代码阻塞,直到将来完成,而事件循环继续处理其他连接、任务或未来。当将来完成时,协程将唤醒并继续将数据写回客户机。

您可能想知道,与其在事件循环中运行多个进程,不如在不同的进程中运行多个事件循环。答案是:“也许”。然而,根据具体的问题空间,我们最好使用单个事件循环运行程序的独立副本,而不是尝试使用主多处理进程协调所有内容。

在本节中,我们讨论了 AsyncIO 的大部分要点,本章还介绍了许多其他并发原语。并发是一个很难解决的问题,没有一个解决方案适合所有的用例。设计并发系统最重要的部分是决定可用的工具中哪一个是解决问题的正确工具。我们已经看到了几种并发系统的优点和缺点,现在对不同类型需求的更好选择有了一些见解。

案例研究

为了结束本章和本书,让我们构建一个基本的图像压缩工具。它将拍摄黑白图像(每像素 1 位,开或关),并尝试使用称为游程编码的非常基本的压缩形式对其进行压缩。你可能会发现黑白图像有点牵强。如果是这样,您在的时间不够 http://xkcd.com

我在本章的示例代码中包含了一些黑白 BMP 图像示例(它们很容易读入数据,并留下了很多改进文件大小的机会)。

我们将使用一种称为游程编码的简单技术来压缩图像。这项技术基本上采用一个比特序列,并用重复的比特数替换任何重复比特串。例如,字符串 000011100 可以替换为 04 12 03,以指示 4 个零后面跟着 2 个 1,然后再跟着 3 个零。为了让事情变得更有趣,我们将把每一行分成 127 位的块。

我没有随意挑选 127 位。127 个不同的值可以被编码成 7 位,这意味着如果一行包含全部 1 或全部 0,我们可以将其存储在一个字节中;第一位表示它是一行 0 还是一行 1,其余 7 位表示该位中存在多少位。

将图像分块还有另一个好处;我们可以并行处理单个块,而不需要它们相互依赖。然而,也有一个主要的缺点;如果一次运行中只有几个 1 或 0,那么它将占用压缩文件中的more空间。当我们将长运行分解为块时,我们可能会创建更多这样的小运行,并使文件的大小膨胀。

在处理文件时,我们必须考虑压缩文件中字节的确切布局。我们的文件将在文件开头存储两个字节的小端整数,表示完成文件的宽度和高度。然后它将写入表示每行 127 位块的字节。

现在,在我们开始设计一个并发系统来构建这样的压缩映像之前,我们应该问一个基本问题:这个应用程序是 I/O 绑定的还是 CPU 绑定的?

老实说,我的回答是“我不知道”。我不确定该应用程序是否会花费更多时间从磁盘加载数据并将其写回,还是在内存中进行压缩。我怀疑它原则上是一个 CPU 受限的应用程序,但一旦我们开始将图像字符串传递到子进程中,我们可能会失去并行性的任何好处。这个问题的最佳解决方案可能是编写一个 C 或 Cython 扩展,但让我们看看在纯 Python 中我们能走多远。

我们将使用自下而上的设计来构建这个应用程序。这样,我们就有了一些构建块,可以将它们组合成不同的并发模式,以查看它们之间的比较情况。让我们从使用运行长度编码压缩 127 位块的代码开始:

from bitarray import bitarray
def compress_chunk(chunk):
    compressed = bytearray()
    count = 1
    last = chunk[0]
    for bit in chunk[1:]:
        if bit != last:
            compressed.append(count | (128 * last))
            count = 0
            last = bit
        count += 1
    compressed.append(count | (128 * last))
    return compressed

此代码使用bitarray类来处理单个零和一。它作为第三方模块分发,您可以使用命令pip install bitarray安装。传递到compress_chunks中的块就是这个类的一个实例(尽管这个例子也适用于布尔值列表)。在这种情况下,位数组的主要好处是,当在进程之间对它们进行酸洗时,它们会占用布尔值列表或 1 和 0 的 bytestring 的八分之一空间。因此,它们腌制得更快。它们也比大量的按位操作更容易使用(双关语)。

该方法使用运行长度编码压缩数据,并返回包含压缩数据的 bytearray。其中,位数组类似于 1 和 0 的列表,bytearray 类似于字节对象的列表(当然,每个字节包含 8 个 1 或 0)。

执行压缩的算法非常简单(尽管我想指出,我花了两天的时间来实现和调试它。简单易懂并不一定意味着易于编写!)。它首先将last变量设置为当前运行中的位类型(无论是True还是False。然后,它在位上循环,对每个位进行计数,直到找到一个不同的位。当它这样做时,它会根据last变量包含的内容,将字节最左边的位(128 位)设为零或一,从而构造一个新字节。然后重置计数器并重复该操作。循环完成后,它为最后一次运行创建最后一个字节,并返回结果。

在创建构建块时,让我们创建一个函数来压缩一行图像数据:

def compress_row(row):
    compressed = bytearray()
    chunks = split_bits(row, 127)
    for chunk in chunks:
        compressed.extend(compress_chunk(chunk))
    return compressed

此函数接受名为 row 的位数组。它使用一个我们将很快定义的函数将其拆分为块,每个块的宽度为 127 位。然后,它使用先前定义的compress_chunk压缩这些块中的每一个,将结果连接到一个bytearray,并返回。

我们将split_bits定义为一个简单的生成器:

def split_bits(bits, width):
    for i in range(0, len(bits), width):
        yield bits[i:i+width]

现在,由于我们还不确定这是否会在线程或进程中更有效地运行,让我们将这些函数包装在一个方法中,该方法在提供的执行器中运行所有内容:

def compress_in_executor(executor, bits, width):
    row_compressors = []
    for row in split_bits(bits, width):
 compressor = executor.submit(compress_row, row)
        row_compressors.append(compressor)

    compressed = bytearray()
    for compressor in row_compressors:
 compressed.extend(compressor.result())
    return compressed

这个例子几乎不需要解释;它使用我们已经定义的相同的split_bits函数(为自下而上设计欢呼!),根据图像的宽度将输入的位分成行。

请注意,此代码将压缩任何位序列,尽管它会膨胀,而不是压缩位值频繁变化的二进制数据。黑白图像绝对是有问题的压缩算法的最佳候选。现在,让我们创建一个函数,该函数使用第三方枕头模块加载图像文件,将其转换为位,并对其进行压缩。我们可以使用古老的评论声明轻松地在遗嘱执行人之间切换:

from PIL import Image
def compress_image(in_filename, out_filename, executor=None):
    executor = executor if executor else ProcessPoolExecutor()
    with Image.open(in_filename) as image:
        bits = bitarray(image.convert('1').getdata())
        width, height = image.size

    compressed = compress_in_executor(executor, bits, width)

    with open(out_filename, 'wb') as file:
        file.write(width.to_bytes(2, 'little'))
        file.write(height.to_bytes(2, 'little'))
        file.write(compressed)

def single_image_main():
    in_filename, out_filename = sys.argv[1:3]
    #executor = ThreadPoolExecutor(4)
 executor = ProcessPoolExecutor()
    compress_image(in_filename, out_filename, executor)

image.convert()调用将图像更改为黑白(一位)模式,而getdata()将返回这些值的迭代器。我们将结果打包到一个位数组中,以便它们能够更快地通过线路传输。当我们输出压缩文件时,我们首先写入图像的宽度和高度,然后是压缩数据,它以字节数组的形式到达,可以直接写入二进制文件。

编写完所有这些代码后,我们终于能够测试线程池或进程池是否为我们提供了更好的性能。我创建了一个大的(7200 x 5600 像素)黑白图像,并在两个池中运行它。在我的系统上,ProcessPool处理图像大约需要 7.5 秒,而ThreadPool持续处理大约需要 9 秒。因此,正如我们所怀疑的,在进程之间来回酸洗位和字节的成本消耗了在多个处理器上运行所获得的几乎所有效率收益(尽管查看我的 CPU 监视器,它确实充分利用了我机器上的所有四个核)。

因此,似乎压缩单个图像在单独的进程中最有效,但这仅仅是因为我们在父进程和子进程之间来回传递了大量数据。当进程之间传递的数据量非常低时,多处理更有效。

因此,让我们扩展应用程序,并行压缩目录中的所有位图。我们必须传递到子进程的唯一内容是文件名,因此与使用线程相比,我们应该获得一个速度增益。另外,有点疯狂,我们将使用现有的代码来压缩单个图像。这意味着我们将在每个子流程中运行ProcessPoolExecutor,以创建更多的子流程。我不建议在现实生活中这样做!

from pathlib import Path
def compress_dir(in_dir, out_dir):
    if not out_dir.exists():
        out_dir.mkdir()

 executor = ProcessPoolExecutor()
    for file in (
            f for f in in_dir.iterdir() if f.suffix == '.bmp'):
        out_file = (out_dir / file.name).with_suffix('.rle')
 executor.submit(
 compress_image, str(file), str(out_file))

def dir_images_main():
    in_dir, out_dir = (Path(p) for p in sys.argv[1:3])
    compress_dir(in_dir, out_dir)

这段代码使用了我们之前定义的compress_image函数,但在每个图像的单独进程中运行它。它不会将执行器传递给函数,因此新进程开始运行后,compress_image会创建一个ProcessPoolExecutor

既然我们正在 executors 内部运行 executors,那么我们可以使用线程和进程池的四种组合来压缩图像。它们各自具有完全不同的计时模式:

|   |

每个映像的进程池

|

每个映像的线程池

| | --- | --- | --- | | 每行工艺池 | 42 秒 | 53 秒 | | 每行线程池 | 34 秒 | 64 秒 |

正如我们所期望的,对每个图像使用线程,并且再次对每行使用线程是最慢的,因为 GIL 阻止我们并行执行任何工作。考虑到我们在使用单个图像时对每一行使用单独的处理时速度稍快,您可能会惊讶地发现,如果我们在单独的处理中处理每个图像,则对行使用ThreadPool功能会更快。花点时间去理解为什么会这样。

我的机器只有四个处理器核。每个图像中的每一行都在一个单独的池中处理,这意味着所有这些行都在争夺处理能力。当只有一个图像时,我们通过并行运行每一行来获得(非常适度的)加速。但是,当我们增加一次处理的图像数量时,将所有行数据传入和传出子流程的成本会占用其他每个图像的处理时间。因此,如果我们可以在一个单独的处理器上处理每个图像,其中唯一需要放入子进程管道的就是两个文件名,那么我们将获得一个稳定的加速。

因此,我们看到不同的工作负载需要不同的并发范例。即使我们只是在使用期货,我们也必须就使用哪种执行人做出明智的决定。

还请注意,对于通常大小的映像,程序运行速度足够快,以至于我们使用哪种并发结构都无关紧要。事实上,即使我们根本不使用任何并发性,我们最终也可能获得大致相同的用户体验。

这个问题也可以通过直接使用线程和/或多处理模块来解决,尽管可能需要编写更多的样板代码。您可能想知道异步 IO 在这里是否有用。答案是:“可能不会”。大多数操作系统都没有从文件系统进行非阻塞读取的好方法,因此库最终还是将所有调用包装在了未来。

为了完整起见,这里是我用来解压 RLE 图像以确认算法正常工作的代码(事实上,直到我修复了压缩和解压中的错误,我仍然不确定它是否完美。我应该使用测试驱动开发!):

from PIL import Image
import sys

def decompress(width, height, bytes):
    image = Image.new('1', (width, height))

    col = 0
    row = 0
    for byte in bytes:
        color = (byte & 128) >> 7
        count = byte & ~128
        for i in range(count):
            image.putpixel((row, col), color)
            row += 1
        if not row % width:
            col += 1
            row = 0
    return image

with open(sys.argv[1], 'rb') as file:
    width = int.from_bytes(file.read(2), 'little')
    height = int.from_bytes(file.read(2), 'little')

    image = decompress(width, height, file.read())
    image.save(sys.argv[2], 'bmp')

这段代码相当简单。每次运行都用一个字节编码。它使用一些位数学来提取像素的颜色和运行的长度。然后,它设置图像中运行的每个像素,以适当的间隔递增下一个像素的行和列以进行检查。

练习

在本章中,我们讨论了几种不同的并发范例,但仍然不清楚每种范例何时有用。正如我们在案例研究中所看到的,在实施一种策略之前,最好先尝试几种不同的策略。

Python3 中的并发性是一个巨大的主题,这样大的一本书不可能涵盖所有需要了解的内容。作为您的第一个练习,我鼓励您查看几个第三方库,它们可能提供额外的上下文:

  • execnet,一个允许本地和远程无共享并发的库
  • 并行 python,一种可以并行执行线程的替代解释器
  • Cython 是一种与 python 兼容的语言,可编译为 C 语言,并具有原语来释放 gil 并利用完全并行多线程。
  • pypystm,在 Python 解释器的超快 PyPy 实现之上的软件事务内存的实验实现
  • 格温特

如果您在最近的应用程序中使用过线程,请查看代码,看看是否可以通过使用 futures 使其更可读、更不容易出现错误。比较线程和多处理未来,看看是否可以通过使用多个 CPU 获得任何好处。

尝试为一些基本 HTTP 请求实现异步 IO 服务。您可能需要在 web 上查找 HTTP 请求的结构;它们是相当简单的 ASCII 数据包来解密。如果您能够让 web 浏览器呈现一个简单的 get 请求,那么您将对异步 IO 网络传输和协议有很好的了解。

确保在访问共享数据时了解线程中发生的争用条件。试着想出一个程序,使用多个线程设置共享值,使数据故意损坏或无效。

还记得我们在第 6 章Python 数据结构中为案例研究介绍的链接收集器吗?您可以通过并行请求使其运行得更快吗?使用原始线程、期货或异步 IO 更好吗?

尝试直接使用线程或多处理编写运行长度编码示例。你有速度提升吗?代码是更容易推理还是更难推理?有没有办法通过使用并发或并行来加速解压脚本?

总结

本章以一个不太面向对象的主题结束我们对面向对象编程的探索。并发是一个困难的问题,我们只触及了表面。虽然进程和线程的底层操作系统抽象不提供远程面向对象的 API,但 Python 提供了一些非常好的面向对象抽象。线程和多处理包都为底层机制提供了面向对象的接口。Futures 能够将许多混乱的细节封装到单个对象中。AsyncIO 使用协同路由对象使我们的代码看起来像是同步运行的,同时将丑陋而复杂的实现细节隐藏在一个非常简单的循环抽象后面。

感谢您阅读Python 3 面向对象编程第二版。我希望您已经享受了这段旅程,并渴望在未来的所有项目中开始实现面向对象的软件!