当我们消除复杂的共享状态并围绕非严格处理进行设计时,我们可以利用并行性来提高性能。在本章中,我们将介绍可用的多进程和多线程技术。Python 库包在应用于允许延迟计算的算法时特别有用。
这里的中心思想是在一个进程内的多个线程或多个进程之间分发一个功能程序。如果我们创建了合理的功能设计,我们可以避免应用程序组件之间的复杂交互;我们有接受参数值并产生结果的函数。这是进程或线程的理想结构。
在本章中,我们将重点讨论几个主题:
- 函数式编程和并发的一般思想。
- 当我们考虑内核、CPU 和 OS 级并行时,并发意味着什么。需要注意的是,并发不会神奇地使一个糟糕的算法更快。
- 使用内置的
multiprocessing和concurrent.futures模块。这些模块允许多种并行执行技术。dask软件包也可以实现这一点。
我们将关注进程级并行性,而不是多线程。使用进程并行性允许我们完全忽略 Python 的全局解释器锁(GIL)。有关 Python 的 GIL 的更多信息,请参见https://docs.python.org/3.3/c-api/init.html#thread-状态和全局解释器锁。
最有效的并发处理发生在正在执行的任务之间没有依赖关系的情况下。开发并发(或并行编程的最大困难是协调对共享资源的更新。
当遵循功能设计模式时,我们倾向于避免有状态的程序。功能设计应尽量减少或消除对共享对象的并发更新。如果我们可以设计以惰性、非严格评估为中心的软件,那么我们也可以设计有助于并行评估的软件。这可能导致令人尴尬的并行设计,其中大部分工作可以同时完成,计算之间很少或没有交互。
操作之间的依赖关系是编程的核心。在2*(3+a)表达式中,必须首先计算(3+a)子表达式。表达式的总值取决于两个操作的正确顺序。
在处理集合时,我们经常会遇到集合中项目之间的处理顺序无关紧要的情况。考虑下面两个例子:
x = list(func(item) for item in y)
x = list(reversed([func(item) for item in y[::-1]])) 尽管func(item)表达式的求值顺序相反,但这两个命令的结果相同。当func(item)的每次评估都是独立的且没有副作用时,这一点就起作用了。
即使是以下命令片段也有相同的结果:
import random
indices = list(range(len(y)))
random.shuffle(indices)
x = [None]*len(y)
for k in indices:
x[k] = func(y[k]) 上例中的评估顺序是随机的。因为每个评估func(y[k])都是独立于所有其他评估的,所以评估的顺序并不重要。许多允许非严格评估的算法就是这种情况。
在一台具有单处理器和单核的小型计算机中,所有计算都通过处理器的一个且唯一的核进行序列化。整个操作系统的使用将通过巧妙的时间切片安排交错多个进程和多个线程。
在一台具有多个 CPU 或单个 CPU 中有多个内核的计算机上,可能存在一些实际的 CPU 指令并发处理。所有其他并发都是通过操作系统级别的时间切片来模拟的。macOS X 笔记本电脑可以有 200 个共享 CPU 的并发进程;这比可用内核的数量多得多。从这一点上,我们可以看出,操作系统时间切片是整个系统的大部分明显并发行为的原因。
让我们考虑一个假设的算法,其复杂性由 EndoT0.席描述。假设有一个包含 1000 字节 Python 代码的内部循环。在处理 10000 个对象时,我们正在执行 1000 亿个 Python 操作。我们可以称之为基本处理预算。我们可以尝试分配我们认为可能有用的尽可能多的进程和线程,但处理预算不能改变。
单个 CPython 字节码没有简单的执行计时。然而,macOS X 笔记本电脑上的长期平均值表明,我们可以预期每秒执行大约 60MB 的代码。这意味着我们的 1000 亿字节码操作将需要大约 1666 秒,即 28 分钟。
如果我们有一台双处理器、四核计算机,那么我们可能会将运行时间减少到原来总时间的 25%:大约 7 分钟。这假定我们可以将工作划分为四个(或更多)独立的操作系统进程。
这里的重要考虑是 1000 亿字节码的总预算不能改变。并行不会神奇地减少工作量。它只能更改时间表,以减少所用的时间。
切换到更好的算法
可以将工作量从 1000 亿次操作减少到 1.33 亿次操作,潜在运行时间约为 2 秒。在四个内核上,我们可能会在 516 毫秒内看到响应。并行性不可能有算法改变所带来的那种戏剧性的改进。
操作系统确保进程之间很少或没有交互。在创建多进程应用程序时,如果多个进程必须交互,则必须显式共享公共 OS 资源。这可以是公共文件、特定的共享内存对象,也可以是进程之间具有共享状态的信号量。过程本身是独立的;它们之间的互动是例外的。
相反,多线程是单个进程的一部分;进程的所有线程共享操作系统资源。我们可以破例获得一些线程本地内存,这些内存可以在不受其他线程干扰的情况下自由写入。在线程本地内存之外,写入内存的操作可以以潜在的不可预测顺序设置进程的内部状态。必须使用显式锁定来避免问题。如前所述,严格来说,指令执行的整个序列很少是并发的。来自并发线程和进程的指令通常以不可预测的顺序在内核之间交错。线程化带来了对共享变量进行破坏性更新的可能性,并且需要通过锁定进行互斥访问。
在裸机硬件级别,存在一些复杂的内存写入情况。有关内存写入问题的更多信息,请访问http://en.wikipedia.org/wiki/Memory_disambiguation 。
在尝试设计多线程应用程序时,并发对象更新的存在可能会造成严重破坏。锁定是避免并发写入共享对象的一种方法。通常避免共享对象是另一种可行的设计技术。第二种避免写入共享对象的技术更适用于函数式编程。
在 CPython 中,GIL 用于确保操作系统线程调度不会干扰 Python 数据结构的内部维护。实际上,GIL 将调度的粒度从机器指令更改为 Python 虚拟机操作组。
GIL 在确保数据结构完整性方面的影响通常可以忽略不计。对性能的最大影响来自算法的选择。
执行大量计算和相对较少 I/O 的程序不会从并发处理中获得太多好处。如果一个计算的预算是 28 分钟,那么以不同的方式交错操作不会产生显著的影响。使用八个磁芯可将时间缩短约八分之一。实际节省的时间取决于操作系统和语言开销,这很难预测。引入并发不会像更好的算法那样对性能产生影响。
当计算涉及大量 I/O 时,将 CPU 处理与 I/O 请求交错将显著提高性能。其思想是在等待操作系统完成其他数据段的 I/O 时,对某些数据段进行计算。因为 I/O 通常需要大量的等待,所以一个八核处理器可以交错处理几十个并发 I/O 请求的工作。
我们有两种交叉计算和 I/O 方法。它们如下:
- 我们可以创建一个处理阶段的管道。单个项目必须经过读取、筛选、计算、聚合和写入的所有阶段。多个并发阶段的思想是在每个阶段都有不同的数据对象。阶段之间的时间切片将允许计算和 I/O 交错。
- 我们可以创建一个并发工作者池,每个人都对一个数据项执行所有处理。将数据项分配给池中的工作人员,并从工作人员收集结果。
这些方法之间的差异并不明显;有一个模糊的中间区域,显然不是一个或另一个。在管道的一个阶段需要一批工人,以使该阶段与其他阶段一样快的情况下,通常会创建混合混合。有一些形式使设计并发程序变得更容易。通信顺序流程(CSP范式可以帮助设计消息传递应用程序。可以使用pycsp之类的包将 CSP 形式主义添加到 Python 中。
I/O 密集型程序通常从并发处理中获得最显著的好处。其思想是交错 I/O 和处理。CPU 密集型程序将从并发处理中获得较小的好处。
并发是非严格评估的一种形式:操作的确切顺序是不可预测的。multiprocessing包引入了Pool对象的概念。Pool对象包含多个工作进程,并希望这些进程同时执行。该软件包允许操作系统调度和时间切片来交叉执行多个进程。目的是使整个系统尽可能繁忙。
为了充分利用这一功能,我们需要将应用程序分解为非严格并发执行有益的组件。整个应用程序必须从可以不确定顺序处理的离散任务构建。
例如,通过 web 抓取从 internet 收集数据的应用程序通常通过并行处理进行优化。我们可以创建一个由几个相同的工作人员组成的Pool对象,实现网站抓取。每个工人都以要分析的 URL 的形式分配任务。
分析多个日志文件的应用程序也是并行化的好选择。我们可以为分析工作者创建一个Pool对象。我们可以将每个日志文件分配给一名分析人员;这允许在Pool对象中的不同工作人员之间并行进行读取和分析。每个工人将同时执行 I/O 和计算。但是,一个工人可以在分析,而其他工人则在等待 I/O 完成。
因为好处取决于难以预测的输入和输出操作的时间,所以多进程总是涉及实验。更改池大小和测量运行时间是设计并发应用程序的重要部分。
下面是一个多进程应用程序的示例。我们将在 web 日志文件中刮取通用日志格式(CLF行。这是 web 服务器访问日志的常用格式。行会很长,但当包装到书的页边时,看起来如下所示:
99.49.32.197 - - [01/Jun/2012:22:17:54 -0400] "GET /favicon.ico
HTTP/1.1" 200 894 "-" "Mozilla/5.0 (Windows NT 6.0)
AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.52
Safari/536.5"我们经常需要分析大量的文件。许多独立文件的存在意味着并发性将对我们的抓取过程有一些好处。
我们将把分析分解为两大功能领域。处理的第一阶段是对日志文件进行必要的解析,以收集相关信息。我们将进一步将解析分解为四个阶段。详情如下:
- 读取多个源日志文件中的所有行。
- 然后,我们从文件集合中的日志条目行创建简单的 namedtuple 对象。
- 更复杂字段(如日期和 URL)的详细信息将被解析。
- 拒绝来自日志的无趣路径;我们还可以将其视为只解析有趣的路径。
一旦经过解析阶段,我们就可以执行大量的分析。为了演示multiprocessing模块,我们将看一个简单的分析,以计算特定路径的出现次数。
第一部分,从源文件读取,涉及最多的输入处理。Python 对文件迭代器的使用将转化为用于数据缓冲的较低级别操作系统请求。每个操作系统请求都意味着进程必须等待数据可用。
显然,我们希望交错其他操作,以便它们不会等待 I/O 完成。我们可以在一个范围内交错操作,从单个行到整个文件。我们将首先考虑交错整个文件,因为这是相对简单的实现。
解析 Apache CLF 文件的功能设计如下所示:
data = path_filter(
access_detail_iter(
access_iter(
local_gzip(filename))))我们已经将较大的解析问题分解为许多函数,这些函数将处理解析问题的每个部分。local_gzip()函数从本地缓存的 GZIP 文件中读取行。access_iter()函数为访问日志中的每一行创建一个NamedTuple对象。access_detail_iter()函数将扩展到一些更难解析的字段。最后,path_filter()函数将丢弃一些没有太多分析价值的路径和文件扩展名。
它有助于将此类设计可视化为处理管道,如下所示:
(local_gzip(filename) | access_iter | access_detail_iter | path_filter) > data这使用管道的 shell 表示法(|)将数据从一个进程传递到另一个进程。内置 Pythonpipes模块有助于构建实际的 shell 管道,以利用操作系统的多进程功能。其他软件包,如pipetools或pipe提供了一种类似的方式来可视化复合功能。
这里是解析大量文件的第一个阶段:读取每个文件并生成一个简单的行序列。由于日志文件是以.gzip格式保存的,我们需要用gzip.open()功能打开每个文件,而不是用io.open()功能或__builtins__.open()功能打开。
local_gzip()函数从本地缓存的文件中读取行,如以下命令片段所示:
from typing import Iterator
def local_gzip(pattern: str) -> Iterator[Iterator[str]]:
zip_logs= glob.glob(pattern)
for zip_file in zip_logs:
with gzip.open(zip_file, "rb") as log:
yield (
line.decode('us-ascii').rstrip()
for line in log)前面的函数遍历与给定模式匹配的所有文件。对于每个文件,生成的值是一个生成器函数,它将遍历该文件中的所有行。我们已经封装了一些内容,包括通配符文件匹配、打开用.gzip格式压缩的日志文件的细节,以及将文件拆分为一系列行,而不包含任何尾随的换行符(\n)。
这里的基本设计模式是生成每个文件的生成器表达式。前面的函数可以重新表述为一个函数和将该特定函数应用于每个文件的映射。当需要识别单个文件时,这在极少数情况下非常有用。在某些情况下,可以对其进行优化,使用yield from使所有不同的日志文件看起来都是一个行流。
还有其他几种方法可以产生类似的输出。例如,这里是前面示例中的内部for循环的替代版本。line_iter()函数还将发出给定文件的行:
def line_iter(zip_file: str) -> Iterator[str]:
log = gzip.open(zip_file, "rb")
return (line.decode('us-ascii').rstrip() for line in log)line_iter()功能应用gzip.open()功能和一些线路清理。我们可以使用映射将line_iter()函数应用于与模式匹配的所有文件,如下所示:
map(line_iter, glob.glob(pattern)) 虽然这种替代映射非常简洁,但它的缺点是,当没有更多引用时,会留下打开的文件对象等待正确的垃圾收集。在处理大量文件时,这似乎是不必要的开销。因此,我们将重点关注前面显示的local_gzip()函数。
先前的替代映射具有与multiprocessing模块工作方式很好地匹配的明显优势。我们可以创建一个工作池,并将任务(如文件读取)映射到进程池。如果我们这样做,我们可以并行读取这些文件;打开的文件对象将是单独进程的一部分。
此设计的扩展将包括第二个功能,使用 FTP 从 web 主机传输文件。由于文件是从 web 服务器收集的,因此可以使用local_gzip()功能对其进行分析。
local_gzip()函数的结果由access_iter()函数用于为源文件中描述文件访问的每一行创建命名偶。
一旦我们有权访问每个日志文件的所有行,我们就可以提取所描述的访问细节。我们将使用正则表达式来分解该行。从那里,我们可以构建一个namedtuple对象。
以下是用于解析 CLF 文件中的行的正则表达式:
import re
format_pat = re.compile(
r"(?P<host>[\d\.]+)\s+"
r"(?P<identity>\S+)\s+"
r"(?P<user>\S+)\s+"
r"\[(?P<time>.+?)\]\s+"
r'"(?P<request>.+?)"\s+'
r"(?P<status>\d+)\s+"
r"(?P<bytes>\S+)\s+"
r'"(?P<referer>.*?)"\s+' # [SIC]
r'"(?P<user_agent>.+?)"\s*'
)我们可以使用这个正则表达式将每一行分解为包含九个单独数据元素的字典。通过将文本转换为NamedTuple对象,可以轻松地处理使用[]和"来划分复杂字段,如time、request、referrer和user_agent参数。
每个单独访问可概括为NamedTuple的子类,如下所示:
from typing import NamedTuple
class Access(NamedTuple):
host: str
identity: str
user: str
time: str
request: str
status: str
bytes: str
referer: str
user_agent: strWe've taken pains to ensure that the NamedTuple field names match the regular expression group names in the (?P<name>) constructs for each portion of the record. By making sure the names match, we can very easily transform the parsed dictionary into a tuple for further processing.
以下是access_iter()函数,它要求每个文件在文件的行上表示为迭代器:
from typing import Iterator
def access_iter(
source_iter: Iterator[Iterator[str]]
) -> Iterator[Access]:
for log in source_iter:
for line in log:
match = format_pat.match(line)
if match:
yield Access(**match.groupdict())local_gzip()功能的输出是一系列序列。外部序列基于单个日志文件。对于每个文件,都有一个嵌套的 iterable 行序列。如果该行与给定的模式匹配,则它是某种类型的文件访问。我们可以从match字典创建一个名为 tuple 的Access。不匹配的行被悄悄地丢弃。
这里的基本设计模式是从解析函数的结果构建一个不可变的对象。在本例中,解析函数是正则表达式匹配器。其他类型的解析将适合此设计模式。
有一些替代方法可以做到这一点。例如,我们可以按如下方式使用map()功能:
def access_builder(line: str) -> Optional[Access]:
match = format_pat.match(line)
if match:
return Access(**match.groupdict())
return None前面的可选函数仅体现了Access对象的基本解析和构造。它将返回一个Access或None对象。下面是我们如何使用此功能将日志文件展平为单个Access对象流:
filter(
None,
map(
access_builder,
(line for log in source_iter for line in log)
)
)这说明了如何将local_gzip()函数的输出转换为Access实例序列。在本例中,我们将access_builder()函数应用于 iterable 结构的嵌套迭代器,该迭代器由读取文件集合而产生。filter()函数从map()函数的结果中删除None对象。
我们在这里的目的是展示我们有许多用于解析文件的函数样式。在第 4 章中处理集合时,我们看到了非常简单的解析。在这里,我们使用各种技术执行更复杂的解析。
先前创建的初始Access对象不会分解构成访问日志行的九个字段中的一些内部元素。我们将从整体分解到高级字段中分别解析这些项。单独执行这些解析操作可以简化处理的每个阶段。它还允许我们替换整个过程的一小部分,而不破坏分析日志的一般方法。
解析的下一阶段的结果对象将是一个NamedTuple子类AccessDetails,它封装了原始的Access元组。它将包含一些单独解析的详细信息的附加字段:
from typing import NamedTuple, Optional
import datetime
import urllib.parse
class AccessDetails(NamedTuple):
access: Access
time: datetime.datetime
method: str
url: urllib.parse.ParseResult
protocol: str
referrer: urllib.parse.ParseResult
agent: Optional[AgentDetails]access属性是原始Access对象,是简单字符串的集合。time属性是解析后的access.time字符串。method、url和protocol属性来自分解access.request字段。referrer属性是经过解析的 URL。
agent属性也可以分解为细粒度字段。一个非传统的浏览器或网站刮板可以产生一个无法解析的agent字符串,所以这个属性被标记为Optional类型提示。
下面是组成NamedTuple类的AgentDetails子类的属性:
class AgentDetails(NamedTuple):
product: str
system: str
platform_details_extensions: str这些字段反映了代理描述的最常见语法。在这方面有相当大的差异,但这一特定的值子集似乎相当普遍。
下面是用于要分解的字段的三个详细程度分析器:
from typing import Tuple, Optional
import datetime
import re
def parse_request(request: str) -> Tuple[str, str, str]:
words = request.split()
return words[0], ' '.join(words[1:-1]), words[-1]
def parse_time(ts: str) -> datetime.datetime:
return datetime.datetime.strptime(
ts, "%d/%b/%Y:%H:%M:%S %z"
)
agent_pat = re.compile(
r"(?P<product>\S*?)\s+"
r"\((?P<system>.*?)\)\s*"
r"(?P<platform_details_extensions>.*)"
)
def parse_agent(user_agent: str) -> Optional[AgentDetails]:
agent_match = agent_pat.match(user_agent)
if agent_match:
return AgentDetails(**agent_match.groupdict())
return None我们已经为 HTTP 请求、时间戳和用户代理信息编写了三个解析器。日志中的请求值通常是三个字的字符串,如GET /some/path HTTP/1.1。parse_request()函数提取这三个空格分隔的值。在不太可能的情况下,路径中有空格,我们将提取第一个单词和最后一个单词作为方法和协议;所有剩余的单词都是路径的一部分。
时间解析委托给datetime模块。我们在parse_time()函数中提供了正确的格式。
解析用户代理是一项挑战。有很多变化;我们为parse_agent()函数选择了一个通用的。如果用户代理文本与给定的正则表达式匹配,我们将使用AgentDetails类的属性。如果用户代理信息与正则表达式不匹配,我们将使用None值。原始文本将在Access对象中提供。
我们将使用这三个解析器从给定的Access对象构建AccessDetails实例。access_detail_iter()函数的主体如下所示:
from typing import Iterable, Iterator
def access_detail_iter(
access_iter: Iterable[Access]
) -> Iterator[AccessDetails]:
for access in access_iter:
try:
meth, url, protocol = parse_request(access.request)
yield AccessDetails(
access=access,
time=parse_time(access.time),
method=meth,
url=urllib.parse.urlparse(url),
protocol=protocol,
referrer=urllib.parse.urlparse(access.referer),
agent=parse_agent(access.user_agent)
)
except ValueError as e:
print(e, repr(access)) 我们使用了与前面的access_iter()功能类似的设计模式。根据解析某个输入对象的结果构建一个新对象。新的AccessDetails对象将包裹上一个Access对象。这种技术允许我们使用不可变对象,但仍然包含更精确的信息。
该函数本质上是从Access对象到AccessDetails对象的映射。以下是使用map()高级功能的替代设计:
from typing import Iterable, Iterator
def access_detail_iter2(
access_iter: Iterable[Access]
) -> Iterator[AccessDetails]:
def access_detail_builder(access: Access) -> Optional[AccessDetails]:
try:
meth, uri, protocol = parse_request(access.request)
return AccessDetails(
access=access,
time=parse_time(access.time),
method=meth,
url=urllib.parse.urlparse(uri),
protocol=protocol,
referrer=urllib.parse.urlparse(access.referer),
agent=parse_agent(access.user_agent)
)
except ValueError as e:
print(e, repr(access))
return None
return filter(
None,
map(access_detail_builder, access_iter)
)我们已经将AccessDetails对象的结构更改为返回单个值的函数。我们可以将该函数映射到原始Access对象的可编辑输入流。我们将看到这与multiprocessing模块的工作方式非常吻合。
在面向对象编程环境中,这些附加解析器可能是类定义的方法函数或属性。使用惰性解析方法的面向对象设计的优点是,除非需要,否则不会解析项。这个特殊的功能设计解析所有东西,假设它将被使用。
可以创建惰性功能设计。它可以依赖三个解析器函数根据需要从给定的Access对象中提取和解析各种元素。我们不使用details.time属性,而是使用parse_time(access.time)参数。语法较长,但它确保只在需要时解析属性。
我们将研究几个针对AccessDetails对象的过滤器。第一个是过滤器的集合,它拒绝许多很少感兴趣的开销文件。第二个过滤器将是分析函数的一部分,我们将在后面介绍。
path_filter()功能是三个功能的组合:
- 排除空路径
- 排除某些特定的文件名
- 排除具有给定扩展名的文件
path_filter()函数的优化版本如下所示:
def path_filter(
access_details_iter: Iterable[AccessDetails]
) -> Iterable[AccessDetails]:
name_exclude = {
'favicon.ico', 'robots.txt', 'index.php', 'humans.txt',
'dompdf.php', 'crossdomain.xml',
'_images', 'search.html', 'genindex.html',
'searchindex.js', 'modindex.html', 'py-modindex.html',
}
ext_exclude = {
'.png', '.js', '.css',
}
for detail in access_details_iter:
path = detail.url.path.split('/')
if not any(path):
continue
if any(p in name_exclude for p in path):
continue
final = path[-1]
if any(final.endswith(ext) for ext in ext_exclude):
continue
yield detail对于每个AccessDetails对象,我们将应用三个过滤测试。如果路径基本上为空,如果路径包含一个排除的名称,或者如果路径的最终名称具有排除的扩展名,则会悄悄忽略该项。如果路径与这些条件中的任何一个都不匹配,那么它可能很有趣,并且是path_filter()函数产生的结果的一部分。
这是一个优化,因为所有测试都是使用命令式的for循环体应用的。
另一种设计可以将每个测试定义为一个单独的一级过滤式函数。例如,我们可能有如下函数来处理空路径:
def non_empty_path(detail: AccessDetails) -> bool:
path = detail.url.path.split('/')
return any(path)此函数仅确保路径包含名称。我们可以使用filter()功能如下:
filter(non_empty_path, access_details_iter) 我们可以为non_excluded_names()和non_excluded_ext()函数编写类似的测试。filter()函数的整个序列如下所示:
filter(non_excluded_ext,
filter(non_excluded_names,
filter(non_empty_path, access_details_iter)))这会将每个filter()函数应用于前一个filter()函数的结果。拒绝空路径;从该子集中,排除的名称和排除的扩展名将被拒绝。我们还可以将前面的示例表述为一系列赋值语句,如下所示:
non_empty = filter(non_empty_path, access_details_iter)
nx_name = filter(non_excluded_names, non_empty)
nx_ext = filter(non_excluded_ext, nx_name)当我们添加新的筛选条件时,这个版本的优点是更容易扩展。
The use of generator functions (such as the filter() function) means that we aren't creating large intermediate objects. Each of the intermediate variables, ne, nx_name, and nx_ext, are proper lazy generator functions; no processing is done until the data is consumed by a client process.
虽然很优雅,但效率低下,因为每个函数都需要解析AccessDetails对象中的路径。为了提高效率,我们需要用lru_cache属性包装path.split('/')函数。
我们将看两个分析函数,它们可以用来过滤和分析单个AccessDetails对象。第一个函数,filter()函数将只通过特定路径。第二个函数将汇总每个不同路径的出现情况。
我们将定义一个小的book_in_path()函数,并将其与内置的filter()函数相结合,以将该函数应用于细节。以下是复合book_filter()功能:
from typing import Iterable, Iterator
def book_filter(
access_details_iter: Iterable[AccessDetails]
) -> Iterator[AccessDetails]:
def book_in_path(detail: AccessDetails) -> bool:
path = tuple(
item
for item in detail.url.path.split('/')
if item
)
return path[0] == 'book' and len(path) > 1
return filter(book_in_path, access_details_iter)我们通过book_in_path()函数定义了一个规则,我们将应用于每个AccessDetails对象。如果路径不是空的,并且路径的第一级属性是book,那么我们对这些对象感兴趣。所有其他AccessDetails对象都可以被悄悄拒绝。
reduce_book_total()函数是我们感兴趣的最终简化:
from collections import Counter
def reduce_book_total(
access_details_iter: Iterable[AccessDetails]
) -> Dict[str, int]:
counts: Dict[str, int] = Counter()
for detail in access_details_iter:
counts[detail.url.path] += 1
return counts此函数将生成一个Counter()对象,显示AccessDetails对象中每条路径的频率。为了关注一组特定的路径,我们将使用reduce_total(book_filter(details))方法。这仅提供给定筛选器传递的项的摘要。
因为Counter对象可以应用于多种类型,所以需要类型提示来提供狭义的规范。在本例中,提示是Dict[str, int]向mypy工具显示将对路径的字符串表示进行计数。
下面是分解日志文件集合的复合analysis()函数:
def analysis(filename: str) -> Dict[str, int]:
"""Count book chapters in a given log"""
details = path_filter(
access_detail_iter(
access_iter(
local_gzip(filename))))
books = book_filter(details)
totals = reduce_book_total(books)
return totalsWe've defined a rule, through the book_in_path() function, which we'll apply to each analysis()函数使用local_gzip()函数处理单个文件名或文件模式。它应用了一组标准的解析函数path_filter()、access_detail_iter()和access_iter(),以创建AccessDetails对象的 iterable 序列。然后,它将我们的分析过滤和还原应用于AccessDetails对象序列。结果是一个显示特定路径访问频率的Counter对象。
保存的.gzip格式日志文件的特定集合总计约 51MB。使用此函数串行处理文件需要 140 秒以上。我们可以使用并发处理做得更好吗?
使用multiprocessing模块的一种优雅方式是创建一个处理Pool对象,并将工作分配给该池中的各个进程。我们将使用操作系统在各个进程之间交错执行。如果每个进程都混合了 I/O 和计算,那么我们应该能够确保处理器非常繁忙。当进程等待 I/O 完成时,其他进程可以进行计算。当 I/O 完成时,一个进程将准备好运行,并且可以与其他进程竞争处理时间。
将工作映射到单独流程的方法如下所示:
import multiprocessing
with multiprocessing.Pool(4) as workers:
workers.map(analysis, glob.glob(pattern)) 我们已经创建了一个具有四个独立进程的Pool对象,并将该Pool对象分配给workers变量。然后,我们使用进程池将一个函数analysis映射到一个可执行的工作队列。workers池中的每个进程都将从 iterable 队列中分配项目。在本例中,队列是glob.glob(pattern)属性的结果,该属性是一系列文件名。
当analysis()函数返回结果时,创建Pool对象的父进程可以收集这些结果。这允许我们创建几个并发构建的Counter对象,并将它们合并为一个单一的复合结果。
如果我们在池中启动p进程,我们的整体应用程序将包括p+1进程。将有一个父进程和p子进程。这通常效果很好,因为在子进程池启动后,父进程将没有什么事情可做。通常,工人将被分配到单独的 CPU(或内核),父级将与Pool对象中的一个子级共享一个 CPU。
The ordinary Linux parent/child process rules apply to the subprocesses created by this module. If the parent crashes without properly collecting the final status from the child processes, then zombie processes can be left running. For this reason, a process Pool object is a context manager. When we use a pool through the with statement, at the end of the context, the children are properly terminated.
默认情况下,Pool对象将根据multiprocessing.cpu_count()函数的值具有多个 worker。这个数字通常是最优的,简单地使用with multiprocessing.Pool() as workers:属性就足够了。
在某些情况下,拥有比 CPU 更多的工人会有所帮助。当每个工人都有 I/O 密集型处理时,这可能是真的。让许多工作进程等待 I/O 完成可以缩短应用程序的运行时间。
如果一个给定的Pool对象有pworker,此映射可以将处理时间缩短到串行处理所有日志所需时间的几乎
。实际上,Pool对象中的父进程和子进程之间的通信涉及一些开销。这些管理费用将限制将工作细分为非常小的并发部分的有效性。
多进程Pool对象有四种类似于映射的方法将工作分配给池:map()、imap()、imap_unordered()和starmap()。其中每一项都是将函数分配给进程池并将数据项映射到该函数的公共主题的变体。他们在分配工作和收集结果的细节上有所不同。
map(function, iterable)方法将 iterable 中的项分配给池中的每个工作者。按照分配给Pool对象的顺序收集完成的结果,以便保留顺序。
imap(function, iterable)方法描述为lazier而非map()。默认情况下,它将 iterable 中的每个项目发送到下一个可用的辅助项。这可能涉及更多的通信开销。因此,建议使用大于 1 的块大小。
imap_unordered(function, iterable)方法与imap()方法类似,但结果的顺序没有保留。允许无序处理映射意味着,随着每个过程的完成,结果将被收集。否则,必须按顺序收集结果。
starmap(function, iterable)方法类似于itertools.starmap()函数。iterable 中的每个项都必须是一个元组;使用*修饰符将元组传递给函数,以便元组的每个值都成为位置参数值。实际上,它正在执行function(*iterable[0])、function(*iterable[1])等等。
以下是上述映射主题的一个变体:
import multiprocessing
pattern = "*.gz"
combined = Counter()
with multiprocessing.Pool() as workers:
result_iter = workers.imap_unordered(
analysis, glob.glob(pattern))
for result in result_iter:
combined.update(result)我们已经创建了一个Counter()函数,用于合并池中每个工作者的结果。我们根据可用 CPU 的数量创建了一个子进程池,并将Pool对象用作上下文管理器。然后,我们将analysis()函数映射到文件匹配模式中的每个文件。来自analysis()函数的结果Counter对象组合成一个结果计数器。
这个版本花了大约 68 秒来分析一批日志文件。使用多个并发进程显著缩短了分析日志的时间。单进程基线时间为 150 秒。其他实验需要使用更大的池大小来运行,以确定需要多少工作人员才能使系统尽可能繁忙。
我们已经使用multiprocessing模块的Pool.map()功能创建了一个两层的 map reduce 流程。第一层是analysis()函数,它在单个日志文件上执行 map reduce。然后,我们将这些减少合并到更高级别的减少操作中。
除了map()函数的变体之外,池还有一个apply(function, *args, **kw)方法,我们可以使用它将一个值传递给工作池。我们可以看到,map()方法实际上只是一个缠绕在apply()方法周围的for循环。例如,我们可以使用以下命令:
list(
workers.apply(analysis, f)
for f in glob.glob(pattern)
)就我们的目的而言,还不清楚这是否是一项重大改进。我们需要做的几乎所有事情都可以表示为一个map()函数。
map()、starmap()和apply()函数的作用是将工作分配给Pool对象中的子流程,然后在该响应准备就绪时从该子流程收集响应。这可能会导致子级等待父级收集结果。_async()函数的变化不会等待子函数完成。这些函数返回一个对象,可以查询该对象以从子进程获取单个结果。
以下是使用map_async()方法的变化:
import multiprocessing
pattern = "*.gz"
combined = Counter()
with multiprocessing.Pool() as workers:
results = workers.map_async(
analysis, glob.glob(pattern))
data = results.get()
for c in data:
combined.update(c)我们已经创建了一个Counter()函数,用于合并池中每个工作者的结果。我们根据可用 CPU 的数量创建了一个子进程池,并将这个Pool对象用作上下文管理器。然后,我们将analysis()函数映射到文件匹配模式中的每个文件。来自map_async()函数的响应是MapResult对象;我们可以查询此结果和工人池的总体状态。在本例中,我们使用get()方法获得Counter对象的序列。
来自analysis()函数的结果Counter对象组合成一个结果Counter对象。此聚合为我们提供了许多日志文件的总体摘要。此处理并不比上一个示例快。map_async()函数的使用允许父进程在等待子进程完成时执行额外的工作。
multiprocessing包支持多种体系结构。我们可以轻松地创建跨多台服务器的多进程结构,并提供正式的身份验证技术来创建必要的安全级别。我们可以使用队列和管道将对象从一个进程传递到另一个进程。我们可以在进程之间共享内存。我们还可以在进程之间共享较低级别的锁,以同步对共享资源(如文件)的访问。
这些体系结构中的大多数都涉及到显式地管理多个工作进程之间的状态。特别是,使用锁和共享内存在本质上是必不可少的,不适合函数式编程方法。
我们可以谨慎地以函数式的方式处理队列和管道。我们的目标是将设计分解为生产者和消费者功能。生产者可以创建对象并将其插入队列。使用者将从队列中取出对象并对其进行处理,可能会将中间结果放入另一个队列中。这将创建一个并发处理器网络,工作负载分布在这些不同的进程之间。使用pycsp包可以简化进程之间基于队列的消息交换。欲了解更多信息,请访问https://pypi.python.org/pypi/pycsp 。
这种设计技术在设计复杂的应用服务器时具有一些优势。各种子进程可以在服务器的整个生命周期中存在,同时处理各个请求。
除了multiprocessing包,我们还可以使用concurrent.futures模块。这还提供了一种将数据映射到线程或进程的并发池的方法。模块 API 相对简单,在许多方面与multiprocessing.Pool()函数的接口相似。
下面是一个示例,说明它们的相似性:
from concurrent.futures import ProcessPoolExecutor
pool_size = 4
pattern = "*.gz"
combined = Counter()
with ProcessPoolExecutor(max_workers=pool_size) as workers:
for result in workers.map(analysis, glob.glob(pattern)):
combined.update(result) 前面的示例和前面的示例之间最重要的变化是我们使用了concurrent.futures.ProcessPoolExecutor对象的实例,而不是multiprocessing.Pool方法。基本的设计模式是使用可用工人池将analysis()函数映射到文件名列表。合并生成的Counter对象以创建最终结果。
concurrent.futures模块的性能与multiprocessing模块几乎相同。
concurrent.futures模块提供了第二种执行器,我们可以在应用程序中使用。我们可以使用ThreadPoolExecutor对象,而不是创建concurrent.futures.ProcessPoolExecutor对象。这将在单个进程中创建一个线程池。
线程池的语法与使用ProcessPoolExecutor对象几乎相同。然而,性能却截然不同。在此日志文件分析示例中,工作主要由 I/O 控制。由于进程中的所有线程共享相同的操作系统调度约束,因此多线程日志文件分析的总体性能与串行处理日志文件的性能大致相同。
使用示例日志文件和运行 macOS X 的小型四核笔记本电脑,这些结果表明了共享 I/O 资源和进程的线程之间的差异:
- 使用
concurrent.futures线程池,运行时间为 168 秒 - 使用进程池,运行时间为 68 秒
在这两种情况下,Pool对象的大小都是 4。单进程单线程基线时间为 150 秒;添加线程使处理运行得更慢。这个结果是典型的做大量输入和输出的程序。多线程可能更适合于线程长时间空闲的用户界面,或者等待用户移动鼠标或触摸屏幕的用户界面。
Pythonthreading包包含许多有助于构建命令式应用程序的构造。本模块的重点不是编写函数式应用程序。我们可以利用queue模块中的线程安全队列将对象从一个线程传递到另一个线程。
threading模块没有一种将工作分配给各个线程的简单方法。API 并不适合函数式编程。
与multiprocessing模块更原始的特性一样,我们可以尝试隐藏锁和队列的状态和命令性质。然而,在concurrent.futures模块中使用ThreadPoolExecutor方法似乎更容易。ProcessPoolExecutor.map()方法为我们提供了一个非常愉快的界面,可以同时处理集合的元素。
使用map()函数原语分配工作似乎很符合我们对函数编程的期望。因此,最好将注意力集中在concurrent.futures模块上,因为它是编写并发功能程序的最易访问的方式。
从函数编程的角度来看,我们已经看到了三种方法可以同时将map()函数概念应用于数据项。我们可以使用以下任何一种:
multiprocessing.Poolconcurrent.futures.ProcessPoolExecutorconcurrent.futures.ThreadPoolExecutor
我们与他们互动的方式几乎相同;这三种方法都有一个map()方法,将函数应用于 iterable 集合的项。这与其他函数式编程技术非常吻合。性能是不同的,因为并发线程和并发进程的性质不同。
在设计过程中,我们的日志分析应用程序分解为两个总体区域:
- 低级解析:这是几乎所有日志分析应用程序都将使用的通用解析
- 更高层次的分析应用程序:这是更具体的过滤和缩减,重点是我们的应用程序需求
下层解析可分解为四个阶段:
- 正在读取多个源日志文件中的所有行。这是从文件名到一系列行的
local_gzip()映射。 - 从文件集合中的日志项行创建简单的命名耦合。这是从文本行到
Access对象的access_iter()映射。 - 解析更复杂字段(如日期和 URL)的详细信息。这是从
Access对象到AccessDetails对象的access_detail_iter()映射。 - 拒绝日志中的无趣路径。我们也可以认为这只是通过有趣的路径。这与其说是映射操作,不如说是一个过滤器。这是捆绑在
path_filter()函数中的过滤器集合。
我们定义了一个整体的analysis()函数,用于解析和分析给定的日志文件。它对低级解析的结果应用高级过滤和归约。它还可以用于文件的通配符集合。
考虑到所涉及的映射数量,我们可以看到几种方法将此问题分解为可以映射到线程或进程池的工作。以下是我们可以考虑的一些设计方案:
- 将
analysis()功能映射到各个文件。在本章中,我们将此作为一致的示例。 - 从整体
analysis()函数中重构local_gzip()函数。我们现在可以将修改后的analysis()函数映射到local_gzip()函数的结果。 - 从整体
analysis()函数中重构access_iter(local_gzip(pattern))函数。我们可以将这个修改后的analysis()函数映射到Access对象的 iterable 序列。 - 将
access_detail_iter(access-iter(local_gzip(pattern)))函数重构为单独的 iterable。然后,我们将映射path_filter()函数和更高级别的过滤器,并根据AccessDetail对象的 iterable 序列进行约简。 - 我们还可以将较低级别的解析重构为独立于较高级别分析的函数。我们可以将分析过滤器和缩减映射到低级解析的输出。
所有这些都是重构示例应用程序的相对简单的方法。使用函数式编程技术的好处是,整个过程的每个部分都可以定义为映射。这使得考虑不同的架构来定位最优设计是切实可行的。
然而,在这种情况下,我们需要将 I/O 处理分发到尽可能多的 CPU 或内核。大多数潜在的重构都将执行父进程中的所有 I/O;这些只会将计算分配到多个并发进程,几乎不会带来什么好处。然后,我们要关注映射,因为这些映射将 I/O 分发到尽可能多的内核。
最小化进程间传递的数据量通常很重要。在本例中,我们只为每个工作进程提供了简短的文件名字符串。得到的Counter对象比每个日志文件中 10 MB 的压缩细节数据要小得多。我们可以通过消除只出现一次的项来进一步减小每个Counter对象的大小,或者我们可以将应用程序限制为仅 20 个最流行的项。
我们可以自由地重新组织这个应用程序的设计,这并不意味着我们应该重新组织设计。我们可以运行一些基准测试实验来证实我们的怀疑,即日志文件解析主要取决于读取文件所需的时间。
在本章中,我们研究了支持多个数据段并发处理的两种方法:
multiprocessing模块:具体地说,Pool类和可供工作人员池使用的各种映射。concurrent.futures模块:具体来说是ProcessPoolExecutor和ThreadPoolExecutor类。这些类还支持一个映射,该映射将在作为线程或进程的工作者之间分配工作。
我们还注意到一些替代方案似乎不适合函数式编程。multiprocessing模块还有许多其他功能,但它们与功能设计不太匹配。类似地,threading和queue模块可用于构建多线程应用程序,但这些功能并不适合函数式程序。
在下一章中,我们将介绍operator模块。这可以用来简化某些类型的算法。我们可以使用内置运算符函数,而不是定义 Lambda 形式。我们还将介绍一些设计灵活决策的技术,以及允许以非严格顺序计算表达式的技术。