在本章中,我们将编写 Python 3.x 代码,用通过加密连接(TLS 1.2)传递的 MQTT 消息控制车辆。我们将编写能够在不同流行物联网平台上运行的代码,例如 Raspberry Pi 3 板。我们将了解如何利用我们对 MQTT 协议的了解来构建基于需求的解决方案。我们将学习使用最新版本的 EclipsePAHOMQTT Python 客户端库。我们将深入了解以下内容:
- 了解使用 MQTT 控制车辆的要求
- 定义主题和命令
- 学习使用 Python 的好处
- 使用 Python3.x 和 PEP405 创建虚拟环境
- 了解虚拟环境的目录结构
- 激活虚拟环境
- 停用虚拟环境
- 为 Python 安装 paho mqtt
- 使用 paho MQTT 将客户端连接到安全 MQTT 服务器
- 理解回调
- 使用 Python 订阅主题
- 为将用作客户端的 IoT 板配置证书
- 创建一个类来表示车辆
- 用 Python 接收消息
- 处理对循环方法的多个调用
在前三章中,我们详细了解了 MQTT 的工作原理。我们了解了如何在 MQTT 客户机和 MQTT 服务器之间建立连接。我们了解了订阅主题过滤器和发布者向特定主题发送消息时发生的情况。我们安装了一个 Mosquitto 服务器,然后保护了它。
现在,我们将使用 Python 作为主要编程语言来生成 MQTT 客户机,这些客户机将充当发布者和订阅者。我们将把一个 Python MQTT 客户机连接到 MQTT 服务器,并处理命令,用 MQTT 消息控制一辆小型车辆。小型车辆复制了现实道路车辆中的许多功能。
我们将使用 TLS 加密和 TLS 身份验证,因为我们不希望任何 MQTT 客户端能够向我们的车辆发送命令。我们希望我们的 Python 3.x 代码在许多平台上运行,因为我们将使用相同的代码库来控制使用以下物联网板的车辆:
- 树莓皮 3 B 型+
- 高通 DragonBoard 410c
- 小猎犬骨黑
- MinnowBoard 大菱鲆四核
- 拉特潘达 2G
- 上核 4GB
- 平方
根据平台的不同,每辆车都将提供额外的功能,因为某些电路板比其他电路板更强大。但是,我们将重点关注基本特性,以保持示例的简单性,并将重点放在 MQTT 上。然后,我们将能够使用此项目作为其他解决方案的基线,这些解决方案要求我们在运行 Python 3.x 代码的 IoT 板上运行代码,必须连接到 MQTT 服务器并处理命令。
在为车辆供电的电路板上运行的代码必须能够处理在特定主题的消息中接收到的命令。我们将在有效负载中使用 JSON 字符串。
同样用 Python 编写的客户端应用程序必须能够控制一个或多个车辆。我们还将用 Python 编写客户机应用程序,它将使用 JSON 字符串将 MQTT 消息发布到每个车辆的主题中。客户端应用程序必须显示执行每个命令的结果。每当命令成功执行时,每辆车必须向特定主题发布消息。
我们将使用以下主题名称发布车辆的命令:vehicles/vehiclename/commands,其中vehiclename必须替换为分配给车辆的唯一名称。例如,如果我们指定vehiclepi01作为由 Raspberry Pi 3 B+型电路板驱动的车辆的名称,我们将不得不向vehicles/vehiclepi01/commands主题发布命令。在此板上运行的 Python 代码将订阅此主题,以接收带有命令的消息并对其作出反应。
我们将使用以下主题名称使车辆发布成功执行命令的详细信息:vehicles/vehiclename/executedcommands,其中vehiclename必须替换为分配给车辆的唯一名称。例如,如果我们指定vehiclebeagle03作为由 BeagleBone 黑板驱动的车辆的名称,则希望接收有关成功处理命令的信息的客户端必须订阅vehicles/vehiclebeagle03/executedcommands主题。
命令将以带有键值对的 JSON 字符串形式发送。键必须等于 CMD,并且该值必须指定以下任何有效命令。当命令需要其他参数时,参数名称必须包含在下一个键中,此参数的值必须包含在该键的值中:
TURN_ON_ENGINE:打开车辆发动机。TURN_OFF_ENGINE:关闭车辆发动机。LOCK_DOORS:关闭并锁定车门。UNLOCK_DOORS:解锁并打开车门。PARK:停车。PARK_IN_SAFE_PLACE:将车辆停放在为车辆配置的安全位置。TURN_ON_HEADLIGHTS:打开车头灯。TURN_OFF_HEADLIGHTS:关闭车辆前照灯。TURN_ON_PARKING_LIGHTS:打开车辆的停车灯,也称为侧灯。TURN_OFF_PARKING_LIGHTS:关闭车辆的驻车灯,也称为侧灯。ACCELERATE:加速车辆,即踩下油门。BRAKE:制动车辆,即踩下制动踏板。ROTATE_RIGHT:使车辆向右旋转。我们必须在 degrees(度数)关键点的值中指定希望车辆向右旋转的度数。ROTATE_LEFT:使车辆向左旋转。我们必须在 degrees(度数)键的值中指定希望车辆向左旋转的度数。SET_MAX_SPEED:设置车辆允许的最大速度。我们必须在 MPH 键的值中指定所需的最大速度(以英里/小时为单位)。SET_MIN_SPEED:设置我们允许车辆行驶的最低速度。我们必须在 MPH 键的值中指定所需的最小速度(以英里/小时为单位)。
下一行显示了打开车辆发动机的指令的有效负载示例:
{"CMD": "TURN_ON_ENGINE"}下一行显示了将车辆最大速度设置为每小时 5 英里的命令的有效负载示例:
{"CMD": "SET_MAX_SPEED", "MPH": 5}我们有所有必要的细节来开始用 Python 编码。
在接下来的章节中,我们将编写不同的 Python 代码片段,这些代码将订阅主题并向主题发布消息。每当我们想要隔离一个需要额外包的环境时,使用 Python 虚拟环境是很方便的。Python3.3 引入了轻量级虚拟环境,并在 Python3.4 中进行了改进。我们将使用这些虚拟环境,因此,您需要 Python3.4 或更高版本。您可以阅读更多关于 PEP 405 Python 虚拟环境的信息,该虚拟环境引入了venv模块,这里:https://www.python.org/dev/peps/pep-0405 。
本书的所有示例都在 macOS 和 Linux 上的 Python3.6.2 上进行了测试。这些例子也在书中提到的物联网板及其最流行的操作系统上进行了测试。例如,所有示例都在 Raspbian 上进行了测试。Raspbian 基于 DebianLinux,因此,所有 Linux 指令都适用于 Raspbian。
如果您决定使用流行的virtualenv(https://pypi.python.org/pypi/virtualenv )第三方虚拟环境构建器或 Python IDE 提供的虚拟环境选项,只要确保在必要时使用适当的机制激活虚拟环境,而不是按照前面介绍的步骤激活用集成在 Python 中的venv模块生成的虚拟环境。
我们使用venv创建的每个虚拟环境都是一个独立的环境,在其站点目录(文件夹)中都有自己独立的 Python 安装包。当我们在 Python3.4 及更高版本中使用venv创建虚拟环境时,pip包含在新的虚拟环境中。在 Python 3.3 中,创建虚拟环境后需要手动安装pip。请注意,提供的指令与 Python3.4 或更高版本兼容,包括 Python3.6.x。以下命令假定您在 Linux、macOS 或 Windows 上安装了 Python 3.5.x 或更高版本。
首先,我们必须为轻量级虚拟环境选择目标文件夹或目录。下面是我们将在 Linux 和 macOS 示例中使用的路径。虚拟环境的目标文件夹将是主目录中的HillarMQTT/01文件夹。例如,如果我们在 macOS 或 Linux 中的主目录为/Users/gaston,则将在/Users/gaston/HillarMQTT/01中创建虚拟环境。可以在每个命令中用所需路径替换指定路径:
~/HillarMQTT/01下面是我们将在 Windows 示例中使用的路径。虚拟环境的目标文件夹将是我们的用户配置文件文件夹中的HillarMQTT\01文件夹。例如,如果我们的用户配置文件文件夹为C:\Users\gaston,则将在C:\Users\gaston\HillarMQTT\01内创建虚拟环境。可以在每个命令中用所需路径替换指定路径:
%USERPROFILE%\HillarMQTT\01在 Windows PowerShell 中,前面的路径为:
$env:userprofile\HillarMQTT\01现在,我们必须使用-m选项,后跟venv模块名称和所需路径,使 Python 以脚本的形式运行该模块,并在指定路径中创建虚拟环境。根据创建虚拟环境的平台不同,说明也不同。
在 Linux 或 macOS 中打开终端并执行以下命令以创建虚拟环境:
python3 -m venv ~/HillarMQTT/01在 Windows 中,在命令提示符下,执行以下命令以创建虚拟环境:
python -m venv %USERPROFILE%\HillarMQTT\01如果要使用 Windows PowerShell,请执行以下命令以创建虚拟环境:
python -m venv $env:userprofile\HillarMQTT\01前面的命令都不会产生任何输出。脚本创建了指定的目标文件夹,并通过调用ensurepip安装了pip,因为我们没有指定--without-pip选项。
指定的目标文件夹有一个新的目录树,其中包含 Python 可执行文件和指示它是 PEP405 虚拟环境的其他文件。
在虚拟环境的根目录中,pyenv.cfg配置文件为虚拟环境指定了不同的选项,它的存在表明我们在虚拟环境的根文件夹中。在 Linux 和 macOS 中,文件夹将包含以下主要子文件夹:bin、include、lib、lib/python3.6和lib/python3.6/site-packages。请注意,文件夹名称可以根据特定的 Python 版本而有所不同。在 Windows 中,文件夹将包含以下主要子文件夹:Include、Lib、Lib\site-packages和Scripts。每个平台中虚拟环境的目录树与这些平台中 Python 安装的布局相同。
以下屏幕截图显示了为 macOS 和 Linux 平台中的01虚拟环境生成的目录树中的文件夹和文件:
以下屏幕截图显示了为 Windows 中的虚拟环境生成的目录树中的主要文件夹:
激活虚拟环境后,我们将在虚拟环境中安装第三方软件包,根据平台和特定 Python 版本,模块将位于lib/python3.6/site-packages或Lib\site-packages文件夹中。可执行文件将根据平台复制到bin或Scripts文件夹中。我们安装的软件包不会对其他虚拟环境或我们的基本 Python 环境进行更改。
现在我们已经创建了一个虚拟环境,我们将运行一个特定于平台的脚本来激活它。激活虚拟环境后,我们将安装仅在此虚拟环境中可用的软件包。这样,我们将使用一个隔离的环境,在这个环境中,我们安装的所有软件包都不会影响我们的主 Python 环境。
在 Linux 或 macOS 的终端中运行以下命令。请注意,如果在终端会话中不启动与默认 shell 不同的 shell,则此命令的结果将是准确的。如果您有疑问,请检查您的终端配置和首选项:
echo $SHELL该命令将显示您在终端中使用的外壳的名称。在 macOS 中,默认值为/bin/bash,这意味着您正在使用 bash shell。根据 shell 的不同,您必须运行不同的命令来激活 Linux 或 macOS 中的虚拟环境。
如果您的终端配置为在 Linux 或 macOS 中使用 bash shell,请运行以下命令以激活虚拟环境。该命令也适用于zshshell:
source ~/HillarMQTT/01/bin/activate如果您的终端配置为使用csh或tcsh外壳,请运行以下命令以激活虚拟环境:
source ~/HillarMQTT/01/bin/activate.csh如果您的终端配置为使用fish外壳,请运行以下命令激活虚拟环境:
source ~/HillarMQTT/01/bin/activate.fish激活虚拟环境后,命令提示符将显示括在括号中的虚拟环境根文件夹名称,作为默认提示符的前缀,以提醒我们正在虚拟环境中工作。在这种情况下,我们将看到**(01)**作为命令提示符的前缀,因为激活的虚拟环境的根文件夹是01。
以下屏幕截图显示了在执行之前显示的命令后,在带有bash外壳的 macOS High Sierra 终端中激活的虚拟环境:
从前面的截图可以看出,虚拟环境激活后,提示从Gastons-MacBook-Pro:~ gaston$变为(01) Gastons-MacBook-Pro:~ gaston$。
在 Windows 中,可以在命令提示符下运行批处理文件,也可以运行 Windows PowerShell 脚本来激活虚拟环境。
如果希望使用命令提示符,请在 Windows 命令行中运行以下命令以激活虚拟环境:
%USERPROFILE%\HillarMQTT\01\Scripts\activate.bat以下屏幕截图显示了在执行之前显示的命令后,在 Windows 10 命令提示符下激活的虚拟环境:
从前面的截图可以看出,虚拟环境激活后,提示从C:\Users\gaston变为(01) C:\Users\gaston。
如果您更喜欢 Windows PowerShell,请启动它并运行以下命令以激活虚拟环境。请注意,必须在 Windows PowerShell 中启用脚本执行才能运行脚本:
cd $env:USERPROFILE
.\HillarMQTT\01\Scripts\Activate.ps1如果收到与以下类似的错误,则表示未启用脚本执行:
C:\Users\gaston\HillarMQTT\01\Scripts\Activate.ps1 : File C:\Users\gaston\HillarMQTT\01\Scripts\Activate.ps1 cannot be loaded because running scripts is disabled on this system. For more information, see about_Execution_Policies at
http://go.microsoft.com/fwlink/?LinkID=135170.
At line:1 char:1
+ C:\Users\gaston\HillarMQTT\01\Scripts\Activate.ps1
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ CategoryInfo : SecurityError: (:) [], PSSecurityException
+ FullyQualifiedErrorId : UnauthorizedAccessWindows PowerShell 默认执行策略为Restricted。此策略允许执行单个命令,但不运行脚本。因此,如果要使用 Windows PowerShell,必须更改策略以允许执行脚本。确保您了解允许您运行未签名脚本的 Windows PowerShell 执行策略的风险非常重要。有关不同政策的更多信息,请查看以下网页:https://docs.microsoft.com/en-us/powershell/module/microsoft.powershell.core/about/about_execution_policies?view=powershell-6。
以下屏幕截图显示了执行前面显示的命令后在 Windows 10 PowerShell 中激活的虚拟环境:
停用使用前面介绍的过程生成的虚拟环境非常容易。停用将删除在环境变量中所做的所有更改,并将提示更改回其默认消息。停用虚拟环境后,将返回默认的 Python 环境。
在 macOS 或 Linux 中,只需键入deactivate并按输入。
在命令提示符下,您必须运行包含在Scripts文件夹中的deactivate.bat批处理文件。在我们的示例中,此文件的完整路径为%USERPROFILE%\HillarMQTT\01\Scripts\deactivate.bat。
在 Windows PowerShell 中,您必须运行Scripts文件夹中的Deactivate.ps1脚本。在我们的示例中,此文件的完整路径为$env:userprofile\HillarMQTT\01\Scripts\Deactivate.ps1。请记住,必须在 Windows PowerShell 中启用脚本执行才能运行脚本。
The instructions in the next sections assume that the virtual environment we have created is activated.
EclipsePAHO 项目提供了 MQTT 的开源客户端实现。该项目包括一个 Python 客户端,也称为 Paho Python 客户端或 Eclipse Paho MQTT Python 客户端库。这个 Python 客户机是由 Mosquitto 项目提供的,最初被称为 MosquittoPython 客户机。以下是 Eclipse 泛美卫生组织项目的网页:http://www.eclipse.org/paho 。以下是 Eclipse Paho MQTT Python 客户端库版本 1.3.1 的网页,即paho-mqtt模块版本 1.3.1:https://pypi.python.org/pypi/paho-mqtt/1.3.1 。
我们可以在许多支持 Python 3.x 或更高版本的现代物联网板中使用paho-mqtt。我们只需要确保安装了pip就可以更容易地安装paho-mqtt。您可以使用开发计算机运行示例或上述任何板。
在继续执行下一步之前,请确保在前面步骤中创建的虚拟环境已激活。
如果要使用 IoT 板运行示例,请确保在 SSH 终端或板上运行的终端窗口中运行所有命令。如果使用开发计算机,请在 macOS 或 Linux 的终端上运行命令,或在 Windows 的命令提示符下运行命令。
现在,我们将使用pip安装程序安装paho-mqtt1.3.1。我们只需要在 SSH 终端或本地终端窗口中运行以下命令,该窗口用于主板,或在用于安装软件包的计算机上运行:
pip install paho-mqtt==1.3.1一些物联网板具有操作系统,需要您在运行之前的命令之前安装pip。在带有 Raspbian 的 Raspberry Pi 3 板上,pip已经安装。如果您正在使用计算机,Python 安装通常包括pip。
如果您在 Windows 中的默认文件夹中安装了 Python,并且没有使用 Python 虚拟环境,则必须在管理员命令提示符下运行上一个命令。如果您没有在 Raspbian 中使用 Python 虚拟环境,则必须以sudo作为前缀运行上一个命令:sudo pip install paho-mqtt。但是,如前所述,强烈建议使用虚拟环境。
输出的最后几行将指示paho-mqtt包版本 1.3.1 已成功安装。输出将类似于以下几行,但不完全相同,因为它将根据运行命令的平台而有所不同:
Collecting paho-mqtt==1.3.1
Downloading paho-mqtt-1.3.1.tar.gz (80kB)
100% |################################| 81kB 1.2MB/s
Installing collected packages: paho-mqtt
Running setup.py install for paho-mqtt ... done
Successfully installed paho-mqtt-1.3.1首先,我们将使用paho-mqtt创建一个连接到 MOSQUITO MQTT 服务器的 MQTT 客户机。我们将编写几行 Python 代码来建立安全连接并订阅主题。
在第 3 章保护 MQTT 3.1.1 Mosquitto 服务器中,我们保护了我们的 Mosquitto 服务器,因此,我们将使用我们创建的数字证书对客户端进行身份验证。大多数情况下,我们将使用使用 TLS 的 MQTT 服务器,因此,了解如何建立与 TLS 和 TLS 身份验证的连接是一个好主意。与 MQTT 服务器建立不安全连接更容易,但这不是我们在开发使用 MQTT 的应用程序时所面临的最常见的场景。
首先,我们需要将第 3 章中创建的用于保护 MQTT 3.1.1 Mosquitto 服务器的以下文件复制到我们将用于运行 Python 脚本的计算机或设备上的目录中。我们将文件保存在名为mqtt_certificates的目录中。在要用作本例 MQTT 客户机的计算机或板上创建一个board_certificates目录。将以下三个文件复制到此新目录:
ca.crt:证书颁发机构证书文件board001.crt:客户端证书文件board001.key:客户端密钥
现在,我们将在主虚拟环境文件夹中创建一个名为config.py的新 Python 文件。以下几行显示了此文件的代码,该文件定义了许多配置值,这些值将用于建立与 MOSQUITO MQTT 服务器的连接。这样,所有配置值都包含在特定的 Python 脚本中。您必须用创建的board_certificates目录的路径替换certificates_path字符串中的/Users/gaston/board_certificates值。此外,将mqtt_server_host的值替换为 MOSQUITO 服务器或您可能决定使用的任何其他 MQTT 服务器的 IP 地址或主机名。样本的代码文件包含在mqtt_python_gaston_hillar_04_01文件夹中的config.py文件中:
import os.path
# Replace /Users/gaston/python_certificates with the path
# in which you saved the certificate authority file,
# the client certificate file and the client key
certificates_path = "/Users/gaston/python_certificates"
ca_certificate = os.path.join(certificates_path, "ca.crt")
client_certificate = os.path.join(certificates_path, "board001.crt")
client_key = os.path.join(certificates_path, "board001.key")
# Replace 192.168.1.101 with the IP or hostname for the Mosquitto
# or other MQTT server
# Make sure the IP or hostname matches the value
# you used for Common Name
mqtt_server_host = "192.168.1.101"
mqtt_server_port = 8883
mqtt_keepalive = 60代码声明用字符串初始化的certificates_path变量,该字符串指定保存证书颁发机构文件、客户端证书文件和客户端密钥的路径(ca.crt、board001.crt和board001.key。然后,代码声明了以下字符串变量,以及配置 TLS 和 TLS 客户端身份验证所需的证书和密钥文件的完整路径:ca_certificate、client_certificate和client_key。
调用os.path.join可以很容易地将certificates_path变量中指定的路径与文件名连接起来,并生成完整路径。os.path.join函数适用于任何平台,因此,我们不必担心是使用斜杠(/)还是反斜杠(\)将路径与文件名连接起来。有时,我们可以在 Windows 中开发和测试代码,然后在可以使用不同 Unix 或 Linux 风格(如 Raspbian 或 Ubuntu)的 IoT 板上运行代码。os.path.join的使用使我们在不同平台之间切换的场景中的工作更容易。
mqtt_server_host、mqtt_server_port和mqtt_keepalive变量指定 MQTT 服务器(MOSQUITO 服务器)的 IP 地址、我们要使用的端口(8883)以及保持活动选项的秒数。用 MQTT 服务器的 IP 地址替换192.168.1.101非常重要。我们为mqtt_server_port指定8883,因为我们使用 TLS,这是 MQTT over TLS 的默认端口,正如我们在第 3 章中所了解的,保护 MQTT 3.1.1 Mosquitto 服务器。
现在,我们将在主虚拟环境文件夹中创建一个名为subscribe_with_paho.py的新 Python 文件。以下几行显示了该文件的代码,该文件与我们的 Mosquito MQTT 服务器建立连接,订阅vehicles/vehiclepi01/tests主题筛选器,并打印订阅的主题筛选器中接收的所有消息。样本的代码文件包含在mqtt_python_gaston_hillar_04_01文件夹中的subscribe_with_paho.py文件中:
from config import *
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Result from connect: {}".format(
mqtt.connack_string(rc)))
# Subscribe to the vehicles/vehiclepi01/tests topic filter
client.subscribe("vehicles/vehiclepi01/tests", qos=2)
def on_subscribe(client, userdata, mid, granted_qos):
print("I've subscribed with QoS: {}".format(
granted_qos[0]))
def on_message(client, userdata, msg):
print("Message received. Topic: {}. Payload: {}".format(
msg.topic,
str(msg.payload)))
if __name__ == "__main__":
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.on_message = on_message
client.tls_set(ca_certs = ca_certificate,
certfile=client_certificate,
keyfile=client_key)
client.connect(host=mqtt_server_host,
port=mqtt_server_port,
keepalive=mqtt_keepalive)
client.loop_forever()请注意,该代码与paho-mqtt版本 1.3.1 兼容。paho-mqtt的早期版本与代码不兼容。因此,请确保按照前面介绍的步骤安装paho-mqtt版本 1.3.1。
前面的代码使用最近安装的paho-mqtt版本 1.3.1 模块与 MQTT 服务器建立加密连接,订阅vehicles/vehiclepi01/tests主题过滤器,并在我们接收主题中的消息时运行代码。我们将使用此代码来理解paho-mqtt的基础知识。该代码是订阅主题过滤器的 MQTT 客户机的一个非常简单的版本,我们将在下一节中对其进行改进。
第一行导入我们在前面编码的config.py文件中声明的变量。第二行将paho.mqtt.client作为mqtt导入。这样,无论何时使用mqtt别名,我们都将引用paho.mqtt.client。
当我们声明一个函数时,我们将这个函数作为参数传递给另一个函数或方法,或者我们将这个函数赋给一个属性,然后一些代码在某个时候调用这个函数;这种机制称为**回调。**使用名称回调是因为代码在某个时间回调函数。paho-mqtt版本 1.3.1 包要求我们处理许多回调,因此,了解它们是如何工作的非常重要。
代码声明了我们稍后指定为回调的以下三个函数:
on_connect:当 MQTT 客户端从 MQTT 服务器接收到CONNACK响应时,即与 MQTT 服务器成功建立连接时,将调用此函数。on_subscribe:当 MQTT 客户端收到 MQTT 服务器的SUBACK响应时,即订阅成功完成时,调用此函数。on_message:当 MQTT 客户端从 MQTT 服务器接收到PUBLISH消息时,将调用此函数。每当 MQTT 服务器根据客户机的订阅发布消息时,都将调用此函数。
下表总结了将根据从 MQTT 服务器收到的响应调用的函数:
| 来自 MQTT 服务器的响应 | 将被调用的函数 |
| CONNACK | on_connnect |
| SUBACK | on_subscribe |
| PUBLISH | on_message |
主块的代码创建表示 MQTT 客户机的mqtt.Client类(paho.mqtt.client.Client的实例。我们使用这个实例与我们的 MQTT 服务器:MOSQUITO 进行通信。如果我们使用默认参数创建新实例,我们将使用 MQTT 版本 3.1。我们希望使用 MQTT 版本 3.11,因此,我们指定了mqtt.MQTTv311作为协议参数的值。
然后,代码将函数分配给属性。下表总结了这些作业:
| 属性 | 分配功能 |
| client.on_connect | on_connect |
| client.on_message | on_message |
| client.on_subscribe | on_subscribe |
对client.tls_set方法的调用配置加密和身份验证选项。在运行client.connect方法之前调用此方法非常重要。我们在ca_certs、certfile和keyfile参数中指定证书颁发机构证书文件、客户端证书和客户端密钥的完整字符串路径。ca_certs参数名称有点混乱,但我们只需要指定证书颁发机构证书文件的字符串路径,而不是多个证书。
最后,主块调用client.connect方法并指定host、port和keepalive参数的值。这样,代码要求 MQTT 客户机建立到指定 MQTT 服务器的连接。
connect方法以异步执行方式运行,因此,它是一个非阻塞调用。
成功建立与 MQTT 服务器的连接后,将执行client.on_connect属性中指定的回调,即on_connect函数。此函数接收在客户机参数中与 MQTT 服务器建立连接的mqtt.Client实例。
如果要与不使用 TLS 的 MQTT 服务器建立连接,则不需要调用client.tls_set方法。此外,您需要使用适当的端口,而不是使用 TLS 时指定的8883端口。请记住,当您不使用 TLS 时,默认端口为1883。
代码使用"vehicles/vehiclepi01/tests"作为参数调用client.subscribe方法以订阅此特定的单个主题,并将qos参数设置为2以请求 2 的 QoS 级别。
在这种情况下,我们将只订阅一个主题。然而,非常重要的是要知道,我们并不局限于订阅单个主题过滤器;我们可以通过一次调用subscribe方法订阅许多主题过滤器。
MQTT 服务器通过SUBACK响应确认成功订阅指定主题过滤器后,将执行client.on_subscribe属性中指定的回调,即on_subscribe函数。此函数接收granted_qos参数中的整数列表,该参数提供 MQTT 服务器为每个主题筛选器订阅请求授予的 QoS 级别。on_subscribe函数中的代码显示 MQTT 服务器为我们指定的主题过滤器授予的 QoS 级别。在本例中,我们只订阅了一个主题过滤器,因此,代码从接收到的granted_qos数组中获取第一个值。
每当接收到与我们订阅的主题过滤器匹配的新消息时,client.on_messsage属性中指定的回调将被执行,即on_message函数。此函数在客户端参数中接收与 MQTT 服务器建立连接的mqtt.Client实例,在msg参数中接收mqtt.MQTTMessage实例。mqtt.MQTTMessage类描述传入消息。
在这种情况下,无论何时执行on_message函数,msg.topic中的值总是与"vehicles/vehiclepi01/tests"匹配,因为我们只订阅了一个主题,没有其他主题名称与主题过滤器匹配。但是,如果我们订阅了一个或多个主题过滤器,其中可能有多个主题匹配,则始终需要通过检查msg.topic属性的值来检查发送消息的主题。
on_message函数中的代码打印已接收消息的主题msg.topic,以及消息有效负载的字符串表示,即msg.payload属性。
最后,主块调用client.loop_forever方法,该方法在无限块循环中为我们调用loop方法。此时,我们只希望在程序中运行 MQTT 客户机循环。我们将收到主题与我们订阅的主题匹配的消息。
loop方法负责处理网络事件,即确保与 MQTT 服务器进行通信。您可以将loop方法视为同步您的电子邮件客户端以接收传入消息并在发件箱中发送消息的等效方法。
确保此示例中可能要使用的 MOSQUITO 服务器或任何其他 MQTT 服务器正在运行。然后,在要用作 MQTT 客户机并使用 Linux 或 macOS 的任何计算机或设备上执行以下行以启动示例:
python3 subscribe_with_paho.py在 Windows 中,必须执行以下行:
python subscribe_with_paho.py如果您看到类似于以下行的带有SSLError的回溯,则表示 MQTT 服务器主机名或 IP 与生成名为server.crt的服务器证书文件时为Common Name属性指定的值不匹配。确保您检查 MQTT 服务器(Mosquitto 服务器)的 IP 地址,并使用为Common Name指定的适当 IP 地址或主机名再次生成服务器证书文件和密钥,如第 3 章、保护 MQTT 3.1.1 Mosquitto 服务器中所述,如果您正在使用我们生成的自签名证书。如果您正在使用自签名证书、IP 地址和 DHCP 服务器,还要检查 DHCP 服务器是否未更改 Mosquitto 服务器的 IP 地址:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/gaston/HillarMQTT/01/lib/python3.6/site-packages/paho/mqtt/client.py", line 612, in connect
return self.reconnect()
File "/Users/gaston/HillarMQTT/01/lib/python3.6/site-packages/paho/mqtt/client.py", line 751, in reconnect
self._tls_match_hostname()
File "/Users/gaston/HillarMQTT/01/lib/python3.6/site-packages/paho/mqtt/client.py", line 2331, in _tls_match_hostname
raise ssl.SSLError('Certificate subject does not match remote hostname.')现在,按照以下步骤使用 MQTT.fx GUI 实用程序将两条消息发布到vehicles/vehiclepi01/tests主题:
- 启动 MQTT.fx 并按照我们在第 3 章中学习的步骤建立与 MQTT 服务器的连接,以保护 MQTT 3.1.1 Mosquitto 服务器。
- 点击发布,在发布按钮左侧的下拉菜单中输入
vehicles/vehiclepi01/tests。 - 单击发布按钮右侧的 QoS 2。
- 在发布按钮下的文本框中输入以下文本:
{"CMD": " UNLOCK_DOORS"}。然后,单击“发布”按钮。MQTT.fx 将把输入的文本发布到指定的主题。 - 在发布按钮下的文本框中输入以下文本:
{"CMD": "TURN_ON_HEADLIGHTS"}。然后,单击“发布”按钮。MQTT.fx 将把输入的文本发布到指定的主题。
如果不想使用 MQTT.fx 实用程序,可以运行两个mosquitto_pub命令来生成将消息发布到主题的 MQTT 客户端。您只需在 macOS 或 Linux 中打开另一个终端,或在 Windows 中打开另一个命令提示符,转到安装 MOSQUITO 的目录,然后运行以下命令。在这种情况下,不需要指定-d选项。将192.168.1.101替换为 MQTT 服务器的 IP 或主机名。记住用在board_certificates目录中创建的这些文件的完整路径替换ca.crt、board001.crt和board001.key。样本的代码文件包含在mqtt_python_gaston_hillar_04_01文件夹中的script_01.txt文件中:
mosquitto_pub -h 192.168.1.101 -V mqttv311 -p 8883 --cafile ca.crt --cert board001.crt --key board001.key -t vehicles/vehiclepi01/tests -m '{"CMD": "UNLOCK_DOORS"}' -q 2 --tls-version tlsv1.2
mosquitto_pub -h 192.168.1.101 -V mqttv311 -p 8883 --cafile ca.crt --cert board001.crt --key board001.key -t vehicles/vehiclepi01/tests -m '{"CMD": "TURN_ON_HEADLIGHTS"}' -q 2 --tls-version tlsv1.2转到执行 Python 脚本的设备和窗口。您将看到以下输出:
Result from connect: Connection Accepted.
I've subscribed with QoS: 2
Message received. Topic: vehicles/vehiclepi01/tests. Payload: b'{"CMD": "UNLOCK_DOORS"}'
Message received. Topic: vehicles/vehiclepi01/tests. Payload: b'{"CMD": "TURN_ON_HEADLIGHTS"}'Python 程序成功地建立了与 MQTT 服务器的安全加密连接,并成为vehicles/vehiclepi01/tests主题的订户,授予的 QoS 级别为 2。程序显示了它在vehicles/vehiclepi01/tests主题中收到的两条消息。
按Ctrl+C停止程序的执行。生成的 MQTT 客户端将关闭与 MQTT 服务器的连接。您将看到类似于以下输出的错误消息,因为循环执行被中断:
Traceback (most recent call last):
File "subscribe_with_paho.py", line 33, in <module>
client.loop_forever()
File "/Users/gaston/HillarMQTT/01/lib/python3.6/site-packages/paho/mqtt/client.py", line 1481, in loop_forever
rc = self.loop(timeout, max_packets)
File "/Users/gaston/HillarMQTT/01/lib/python3.6/site-packages/paho/mqtt/client.py", line 988, in loop
socklist = select.select(rlist, wlist, [], timeout)
KeyboardInterrupt现在,我们将编写 Python 代码,以便在不同的 IoT 板上工作。当然,您可以使用单个开发计算机或开发板。不需要在不同的设备上运行代码。我们只是想确保我们能够编写能够在不同设备上运行的代码。
请记住将我们在上一章中创建的文件复制到计算机或设备上的一个目录中,该目录将表示控制车辆的电路板,我们将使用该目录运行 Python 脚本。如果您将使用到目前为止一直在使用的同一台计算机或设备,则无需执行下一步。
我们将文件保存在名为mqtt_certificates的目录中。在要用作本例 MQTT 客户机的计算机或板上创建一个board_certificates目录。将以下三个文件复制到此新目录:
ca.crt:证书颁发机构证书文件board001.crt:客户端证书文件board001.key:客户端密钥
我们将创建以下两个类:
Vehicle:此类将表示车辆,并提供在需要处理命令时调用的方法。为了使示例保持简单,我们的方法将只打印在每个方法被调用到控制台输出后车辆执行的操作。当调用每个方法时,代表车辆的真实类将与发动机、灯、执行器、传感器和车辆的其他不同组件交互。VehicleCommandProcessor:此类将表示一个命令处理器,该处理器将与 MQTT 服务器建立连接,订阅一个主题,其中 MQTT 客户端将接收带有命令的消息,分析传入消息,并将命令的执行委托给Vehicle类的关联实例。VehicleCommandProcessor类将声明许多静态方法,我们将这些方法指定为 MQTT 客户机的回调。
在主虚拟环境文件夹中创建一个名为vehicle_commands.py的新 Python 文件。以下几行声明了许多变量,这些变量的值标识了车辆支持的每个命令。此外,代码声明了许多变量,其中的键字符串用于指定命令,键用于指定成功执行的命令。所有这些变量都是用大写字母定义的,因为我们将它们用作常量。样本的代码文件包含在mqtt_python_gaston_hillar_04_01文件夹中的vehicle_commands.py文件中:
# Key strings
COMMAND_KEY = "CMD"
SUCCESFULLY_PROCESSED_COMMAND_KEY = "SUCCESSFULLY_PROCESSED_COMMAND"
# Command strings
# Turn on the vehicle's engine.
CMD_TURN_ON_ENGINE = "TURN_ON_ENGINE"
# Turn off the vehicle's engine
CMD_TURN_OFF_ENGINE = "TURN_OFF_ENGINE"
# Close and lock the vehicle's doors
CMD_LOCK_DOORS = "LOCK_DOORS"
# Unlock and open the vehicle's doors
CMD_UNLOCK_DOORS = "UNLOCK_DOORS"
# Park the vehicle
CMD_PARK = "PARK"
# Park the vehicle in a safe place that is configured for the vehicle
CMD_PARK_IN_SAFE_PLACE = "PARK_IN_SAFE_PLACE"
# Turn on the vehicle's headlights
CMD_TURN_ON_HEADLIGHTS = "TURN_ON_HEADLIGHTS"
# Turn off the vehicle's headlights
CMD_TURN_OFF_HEADLIGHTS = "TURN_OFF_HEADLIGHTS"
# Turn on the vehicle's parking lights, also known as sidelights
CMD_TURN_ON_PARKING_LIGHTS = "TURN_ON_PARKING_LIGHTS"
# Turn off the vehicle's parking lights, also known as sidelights
CMD_TURN_OFF_PARKING_LIGHTS = "TURN_OFF_PARKING_LIGHTS"
# Accelerate the vehicle, that is, press the gas pedal
CMD_ACCELERATE = "ACCELERATE"
# Brake the vehicle, that is, press the brake pedal
CMD_BRAKE = "BRAKE"
# Make the vehicle rotate to the right. We must specify the degrees
# we want the vehicle to rotate right in the value for the DEGREES key
CMD_ROTATE_RIGHT = "ROTATE_RIGHT"
# Make the vehicle rotate to the left. We must specify the degrees
# we want the vehicle to rotate left in the value for the DEGREES key
CMD_ROTATE_LEFT = "ROTATE_LEFT"
# Set the maximum speed that we allow to the vehicle. We must specify
# the desired maximum speed in miles per hour in the value for the MPH key
CMD_SET_MAX_SPEED = "SET_MAX_SPEED"
# Set the minimum speed that we allow to the vehicle. We must specify
# the desired minimum speed in miles per hour in the value for the MPH key
CMD_SET_MIN_SPEED = "SET_MIN_SPEED"
# Degrees key
KEY_DEGREES = "DEGREES"
# Miles per hour key
KEY_MPH = "MPH"COMMAND_KEY变量定义了键字符串,该字符串定义了代码将理解为命令的内容。每当我们收到包含指定键字符串的消息时,我们知道字典中与该键相关联的值将指示该消息希望处理板中运行的代码的命令。MQTT 客户机不会将消息作为字典接收,因此,当消息不仅仅是字符串时,有必要将它们从字符串转换为字典。
SUCCESSFULLY_PROCESSED_COMMAND_KEY变量定义键字符串,该字符串定义代码将在发布到相应主题的响应消息中用作成功处理的命令键的内容。每当我们发布包含指定密钥字符串的消息时,我们知道字典中与该密钥相关联的值将指示电路板已成功处理的命令。
在主虚拟环境文件夹中创建一个名为vehicle_mqtt_client.py的新 Python 文件。以下几行声明了必要的导入和我们在上一个示例中用于建立与 MQTT 服务器的连接的相同变量。然后,这些行声明了Vehicle类。样本的代码文件包含在mqtt_python_gaston_hillar_04_01文件夹中的vehicle_mqtt_client.py文件中:
class Vehicle:
def __init__(self, name):
self.name = name
self.min_speed_mph = 0
self.max_speed_mph = 10
def print_action_with_name_prefix(self, action):
print("{}: {}".format(self.name, action))
def turn_on_engine(self):
self.print_action_with_name_prefix("Turning on the engine")
def turn_off_engine(self):
self.print_action_with_name_prefix("Turning off the engine")
def lock_doors(self):
self.print_action_with_name_prefix("Locking doors")
def unlock_doors(self):
self.print_action_with_name_prefix("Unlocking doors")
def park(self):
self.print_action_with_name_prefix("Parking")
def park_in_safe_place(self):
self.print_action_with_name_prefix("Parking in safe place")
def turn_on_headlights(self):
self.print_action_with_name_prefix("Turning on headlights")
def turn_off_headlights(self):
self.print_action_with_name_prefix("Turning off headlights")
def turn_on_parking_lights(self):
self.print_action_with_name_prefix("Turning on parking lights")
def turn_off_parking_lights(self):
self.print_action_with_name_prefix("Turning off parking
lights")
def accelerate(self):
self.print_action_with_name_prefix("Accelerating")
def brake(self):
self.print_action_with_name_prefix("Braking")
def rotate_right(self, degrees):
self.print_action_with_name_prefix("Rotating right {}
degrees".format(degrees))
def rotate_left(self, degrees):
self.print_action_with_name_prefix("Rotating left {}
degrees".format(degrees))
def set_max_speed(self, mph):
self.max_speed_mph = mph
self.print_action_with_name_prefix("Setting maximum speed to {}
MPH".format(mph))
def set_min_speed(self, mph):
self.min_speed_mph = mph
self.print_action_with_name_prefix("Setting minimum speed to {}
MPH".format(mph))与前一个示例一样,在主虚拟环境文件夹中名为config.py的 Python 文件中定义了与 Mosquito MQTT 服务器建立连接的所有配置值。如果您想在不同的设备上运行此示例,则必须使用适当的值创建一个新的config.py文件,并将从config模块导入值的行更改为使用新的配置文件。不要忘记用您创建的board_certificates目录的路径替换certificates_path字符串中的/Users/gaston/board_certificates值。此外,将mqtt_server_host的值替换为 MOSQUITO 服务器或您可能决定使用的其他 MQTT 服务器的 IP 地址或主机名。
我们必须在 name required 参数中指定车辆名称。构造函数,即__init__方法,将收到的名称保存在具有相同名称的属性中。然后,构造函数设置两个属性的初始值:min_speed_mph和max_speed_mph。这些属性确定车辆的最小和最大速度值,以英里/小时表示。
Vehicle类声明print_action_with_name_prefix方法,该方法接收包含action参数中正在执行的操作的字符串,并以name属性中保存的值作为前缀打印该字符串。该类中定义的其他方法调用print_action_with_name_prefix方法以打印指示车辆正在执行的操作的消息,并以车辆名称作为前缀。
我们将使用最近安装的paho-mqtt版本 1.3.1 模块订阅特定主题,并在收到主题中的消息时运行代码。我们将在主虚拟环境文件夹中的同一个 Python 文件中创建一个名为vehicle_mqtt_client.py的VehicleCommandProcessor类。此类将表示与先前编码的Vehicle类实例关联的命令处理器,配置 MQTT 客户端和对客户端的订阅,并声明在触发某些与 MQTT 相关的事件时将执行的回调的代码。
我们将把VehicleCommandProcessor类的代码分成许多代码片段,以便更容易理解每个代码部分。您必须将下一行添加到现有的vehicle_mqtt_client.pyPython 文件中。下面几行声明了VehicleCommandProcessor类及其构造函数,即__init__方法。样本的代码文件包含在mqtt_python_gaston_hillar_04_01文件夹中的vehicle_mqtt_client.py文件中:
class VehicleCommandProcessor:
commands_topic = ""
processed_commands_topic = ""
active_instance = None
def __init__(self, name, vehicle):
self.name = name
self.vehicle = vehicle
VehicleCommandProcessor.commands_topic = \
"vehicles/{}/commands".format(self.name)
VehicleCommandProcessor.processed_commands_topic = \
"vehicles/{}/executedcommands".format(self.name)
self.client = mqtt.Client(protocol=mqtt.MQTTv311)
VehicleCommandProcessor.active_instance = self
self.client.on_connect = VehicleCommandProcessor.on_connect
self.client.on_subscribe = VehicleCommandProcessor.on_subscribe
self.client.on_message = VehicleCommandProcessor.on_message
self.client.tls_set(ca_certs = ca_certificate,
certfile=client_certificate,
keyfile=client_key)
self.client.connect(host=mqtt_server_host,
port=mqtt_server_port,
keepalive=mqtt_keepalive)我们必须在name和vehicle必需参数中为命令处理器和命令处理器将控制的Vehicle实例指定一个名称。构造函数,即__init__方法,将收到的name和vehicle保存在同名的属性中。然后,构造函数设置commands_topic和processed_commands_topic类属性的值。根据前面讨论的规范,构造函数使用收到的name来确定命令和成功处理的命令的主题名称。MQTT 客户端将接收保存在command_topic类属性中的主题名称中的消息,并将消息发布到保存在processed_commands_topic类属性中的主题名称中。
然后,构造函数创建表示 MQTT 客户机的mqtt.Client类(paho.mqtt.client.Client的实例,我们将使用该实例与 MQTT 服务器通信。代码将此实例分配给client属性(self.client。与前面的示例一样,我们希望使用 MQTT 版本 3.11,因此,我们指定了mqtt.MQTTv311作为协议参数的值。
代码还将对该实例的引用保存在active_instance类属性中,因为我们必须在静态方法中访问该实例,构造函数将该实例指定为 MQTT 客户机触发的不同事件的回调。我们希望所有与车辆命令处理器相关的方法都在VehicleCommandProcessor类中。
然后,代码将静态方法分配给self.client实例的属性。下表总结了这些作业:
| 属性 | 指定静态法 |
| client.on_connect | VehicleCommandProcessor.on_connect |
| client.on_message | VehicleCommandProcessor.on_message |
| client.on_subscribe | VehicleCommandProcessor.on_subscribe |
静态方法既不接收self也不接收cls作为第一个参数,因此,我们可以使用它们作为具有所需数量参数的回调。注意,我们将在接下来的段落中对这些静态方法进行编码和分析。
对self.client.tls_set方法的调用配置加密和身份验证选项。最后,构造函数调用client.connect方法并指定host、port和keepalive参数的值。这样,代码要求 MQTT 客户机建立到指定 MQTT 服务器的连接。请记住,connect方法以异步执行方式运行,因此,它是一个非阻塞调用。
如果要与不使用 TLS 的 MQTT 服务器建立连接,则需要删除对self.client.tls_set方法的调用。此外,您需要使用适当的端口,而不是使用 TLS 时指定的8883端口。请记住,当您不使用 TLS 时,默认端口为1883。
以下几行声明了属于VehicleCommandProcessor类的on_connect静态方法。您必须将这些行添加到现有的vehicle_mqtt_client.pyPython 文件中。样本的代码文件包含在mqtt_python_gaston_hillar_04_01文件夹中的vehicle_mqtt_client.py文件中:
@staticmethod
def on_connect(client, userdata, flags, rc):
print("Result from connect: {}".format(
mqtt.connack_string(rc)))
# Check whether the result form connect is the CONNACK_ACCEPTED
connack code
if rc == mqtt.CONNACK_ACCEPTED:
# Subscribe to the commands topic filter
client.subscribe(
VehicleCommandProcessor.commands_topic,
qos=2)成功建立与 MQTT 服务器的连接后,将执行self.client.on_connect属性中指定的回调,即on_connect静态方法(用@staticmethod装饰符标记)。此静态方法接收在客户机参数中与 MQTT 服务器建立连接的mqtt.Client实例。
代码检查提供 MQTT 服务器返回的CONNACK代码的rc参数的值。如果该值与mqtt.CONNACK_ACCEPTED匹配,则表示 MQTT 服务器接受了连接请求,因此,代码调用VehicleCommandProcessor.commands_topic作为参数的client.subscribe方法订阅commands_topic类属性中指定的主题,并为订阅指定 2 的 QoS 级别。
以下几行声明了属于VehicleCommandProcessor类的on_subscribe静态方法。您必须将这些行添加到现有的vehicle_mqtt_client.pyPython 文件中。样本的代码文件包含在mqtt_python_gaston_hillar_04_01文件夹中的vehicle_mqtt_client.py文件中:
@staticmethod
def on_subscribe(client, userdata, mid, granted_qos):
print("I've subscribed with QoS: {}".format(
granted_qos[0]))on_subscribe静态方法显示 MQTT 服务器为我们指定的主题筛选器授予的 QoS 级别。在本例中,我们只订阅了一个主题过滤器,因此,代码从接收到的granted_qos数组中获取第一个值。
以下几行声明了属于VehicleCommandProcessor类的on_message静态方法。您必须将这些行添加到现有的vehicle_mqtt_client.pyPython 文件中。样本的代码文件包含在mqtt_python_gaston_hillar_04_01文件夹中的vehicle_mqtt_client.py文件中:
@staticmethod
def on_message(client, userdata, msg):
if msg.topic == VehicleCommandProcessor.commands_topic:
print("Received message payload:
{0}".format(str(msg.payload)))
try:
message_dictionary = json.loads(msg.payload)
if COMMAND_KEY in message_dictionary:
command = message_dictionary[COMMAND_KEY]
vehicle =
VehicleCommandProcessor.active_instance.vehicle
is_command_executed = False
if KEY_MPH in message_dictionary:
mph = message_dictionary[KEY_MPH]
else:
mph = 0
if KEY_DEGREES in message_dictionary:
degrees = message_dictionary[KEY_DEGREES]
else:
degrees = 0
command_methods_dictionary = {
CMD_TURN_ON_ENGINE: lambda:
vehicle.turn_on_engine(),
CMD_TURN_OFF_ENGINE: lambda:
vehicle.turn_off_engine(),
CMD_LOCK_DOORS: lambda: vehicle.lock_doors(),
CMD_UNLOCK_DOORS: lambda:
vehicle.unlock_doors(),
CMD_PARK: lambda: vehicle.park(),
CMD_PARK_IN_SAFE_PLACE: lambda:
vehicle.park_in_safe_place(),
CMD_TURN_ON_HEADLIGHTS: lambda:
vehicle.turn_on_headlights(),
CMD_TURN_OFF_HEADLIGHTS: lambda:
vehicle.turn_off_headlights(),
CMD_TURN_ON_PARKING_LIGHTS: lambda:
vehicle.turn_on_parking_lights(),
CMD_TURN_OFF_PARKING_LIGHTS: lambda:
vehicle.turn_off_parking_lights(),
CMD_ACCELERATE: lambda: vehicle.accelerate(),
CMD_BRAKE: lambda: vehicle.brake(),
CMD_ROTATE_RIGHT: lambda:
vehicle.rotate_right(degrees),
CMD_ROTATE_LEFT: lambda:
vehicle.rotate_left(degrees),
CMD_SET_MIN_SPEED: lambda:
vehicle.set_min_speed(mph),
CMD_SET_MAX_SPEED: lambda:
vehicle.set_max_speed(mph),
}
if command in command_methods_dictionary:
method = command_methods_dictionary[command]
# Call the method
method()
is_command_executed = True
if is_command_executed:
VehicleCommandProcessor.active_instance.
publish_executed_command_message(message_dictionary)
else:
print("I've received a message with an
unsupported command.")
except ValueError:
# msg is not a dictionary
# No JSON object could be decoded
print("I've received an invalid message.")每当我们订阅的commands_topic类属性中保存的主题中接收到新消息时,就会执行self.client.on_messsage属性中指定的回调,即之前编码的on_message静态方法(用@staticmethod装饰符标记)。此静态方法在客户机参数中接收与 MQTT 服务器建立连接的mqtt.Client实例,在msg参数中接收mqtt.MQTTMessage实例。
mqtt.MQTTMessage类描述传入消息。
msg.topic属性表示接收消息的主题。因此,静态方法检查msg.topic属性是否与commands_topic类属性中的值匹配。在这种情况下,无论何时执行on_message方法,msg.topic中的值总是与 topic 类属性中的值匹配,因为我们只订阅了一个 topic。但是,如果我们订阅了多个主题,则始终需要检查发送消息的主题和接收消息的主题。因此,我们加入代码是为了清楚地了解如何检查topic中接收到的消息。
该代码打印已接收消息的有效负载,即msg.payload属性。然后,代码将json.loads函数反序列化msg.payload的结果分配给 Python 对象,并将结果分配给message_dictionary局部变量。如果msg.payload的内容不是 JSON,将捕获一个ValueError异常,代码将打印一条消息,表明该消息不包含有效的命令,并且静态方法中不再执行代码。如果msg.payload的内容是 JSON,我们将在message_dictionary局部变量中有一个字典。
然后,代码检查保存在COMMAND_KEY字符串中的值是否包含在message_dictionary字典中。如果表达式的计算结果为True,则表示转换为字典的 JSON 消息包含我们必须处理的命令。但是,在处理该命令之前,我们必须检查哪个是该命令,因此,有必要检索与键相关联的值,该值相当于保存在COMMAND_KEY字符串中的值。当值是我们作为需求分析的任何命令时,代码能够运行特定的代码。
代码使用引用活动VehicleCommandProcessor实例的active_instance类属性,根据必须处理的命令调用关联车辆的必要方法。我们必须将回调声明为静态方法,因此,我们使用这个 class 属性来访问活动实例。成功处理命令后,代码将is_command_executed标志设置为True。最后,代码检查该标志的值,如果它等于True,则代码调用active_instance类属性中保存的VehicleCommandProcessor实例的publish_executed_command_message。
当然,在实际示例中,我们应该添加更多验证。前面的代码经过简化,使我们能够继续关注 MQTT。
以下几行声明了属于VehicleCommandProcessor类的publish_executed_command_message方法。您必须将这些行添加到现有的vehicle_mqtt_client.pyPython 文件中。样本的代码文件包含在mqtt_python_gaston_hillar_04_01文件夹中的vehicle_mqtt_client.py文件中:
def publish_executed_command_message(self, message):
response_message = json.dumps({
SUCCESFULLY_PROCESSED_COMMAND_KEY:
message[COMMAND_KEY]})
result = self.client.publish(
topic=self.__class__.processed_commands_topic,
payload=response_message)
return resultpublish_executed_command_message方法接收已通过消息参数中的命令接收的消息字典。该方法调用json.dumps函数将字典序列化为 JSON 格式的字符串,并显示指示命令已成功处理的响应消息。最后,代码使用processed_commands_topic变量作为主题参数和payload参数中的 JSON 格式字符串(response_message)调用client.publish方法。
在这种情况下,我们不评估从publish方法收到的响应。此外,我们正在为指定所需服务质量的qos参数使用默认值。因此,我们将以等于 0 的 QoS 级别发布此消息。在第 5 章中,我们将使用 Python测试和改进我们的车辆控制解决方案,我们将使用更高级的场景,其中我们将添加代码来检查方法的结果,并将代码添加到成功发布消息时触发的on_publish回调中,正如我们在前面的示例中所做的。在这种情况下,我们仅对通过命令接收的消息使用 QoS 级别 2。
以下几行声明了属于VehicleCommandProcessor类的process_incoming_commands方法。您必须将这些行添加到现有的vehicle_mqtt_client.pyPython 文件中。样本的代码文件包含在mqtt_python_gaston_hillar_04_01文件夹中的vehicle_mqtt_client.py文件中:
def process_incoming_commands(self):
self.client.loop()process_incoming_commands方法调用 MQTT 客户机的loop方法,并确保与 MQTT 服务器进行通信。将对loop方法的调用视为同步邮箱。将发送任何要在传出框中发布的挂起消息,任何传入消息将到达收件箱,并且将触发我们之前分析的事件。这样,车辆命令处理器将接收消息并处理命令。
最后,以下几行声明了主要代码块。您必须将这些行添加到现有的vehicle_mqtt_client.pyPython 文件中。样本的代码文件包含在mqtt_python_gaston_hillar_04_01文件夹中的vehicle_mqtt_client.py文件中:
if __name__ == "__main__":
vehicle = Vehicle("vehiclepi01")
vehicle_command_processor = VehicleCommandProcessor("vehiclepi01",
vehicle)
while True:
# Process messages and the commands every 1 second
vehicle_command_processor.process_incoming_commands()
time.sleep(1)__main__方法创建一个名为 vehicle 的Vehicle类实例,其中"vehiclepi01"作为 name 参数的值。下一行创建一个名为vehicle_command_processor的VehicleCommandProcessor类实例,其中"vehiclepi01"和之前创建的Vehicle实例X作为name和vehicle参数的值。这样,vehicle_command_processor将把命令的执行委托给vehicle中的实例方法。
VehicleCommandProcessor类的构造函数将订阅 MQTT 服务器中的vehicles/vehiclepi01/commands主题,因此,我们必须向该主题发布消息,以便发送代码将处理的命令。每当成功处理命令时,新消息将发布到vehicles/vehiclepi01/executedcommands主题。因此,我们必须订阅此主题以检查车辆执行的命令。
while 循环调用vehicle_command_processor.process_commands方法并休眠一秒钟。process_commands方法调用 MQTT 客户机的 loop 方法,并确保与 MQTT 服务器进行通信。
还有一个线程接口,我们可以通过调用 MQTT 客户机的loop_start方法来运行该接口。这样,我们可以避免多次调用循环方法。然而,我们调用 loop 方法可以更容易地调试代码,并理解在后台如何工作。我们将在第 5 章中使用线程接口,使用 Python测试和改进我们的车辆控制解决方案。
让我们看看您是否能正确回答以下问题:
-
以下哪一个 Python 模块是 Paho Python 客户端?
paho-mqttpaho-client-pippaho-python-client
-
要与使用 TLS 的 MQTT 服务器建立连接,在调用
connect之前必须调用paho.mqtt.client.Client实例的哪种方法?connect_with_tlstls_setconfigure_tls
-
paho.mqtt.client.Client实例与 MQTT 服务器建立连接后,将调用分配给以下哪个属性的回调?on_connectionon_connectconnect_callback
-
paho.mqtt.client.Client实例从其订阅的主题过滤器之一接收到消息后,将调用分配给以下哪个属性的回调?on_message_arrivedon_messagemessage_arrived_callback
-
paho.mqtt.client.Client实例的以下哪种方法在无限阻塞循环中为我们调用循环方法?infinite_looploop_while_trueloop_forever
正确答案包含在附录解决方案中。
在本章中,我们分析了使用 MQTT 消息控制车辆的需求。我们定义了我们将使用的主题以及作为消息有效载荷一部分的命令,以控制车辆。然后,我们与 pahopython 客户机一起编写将 MQTT 客户机连接到 MQTT 服务器的 Python 代码。
我们了解了调用 Paho Python 客户端所需的方法及其参数。我们分析了回调的工作原理,并编写了订阅主题过滤器以及接收和处理消息的代码。
我们用 Python 编写了处理车辆命令的代码。该代码能够在不同的物联网平台上运行,包括 Raspberry Pi 3 系列板、高通 DragonBoard、BeagleBone Black、MinnowBoard Turbot、LattePanda、UP squared,以及任何能够执行 Python 3.6.x 代码的计算机。我们使用 Python 处理 MQTT 客户机的网络循环。
现在我们已经了解了使用 Python 处理 MQTT 的基本知识,我们将使用 MQTT 消息和 Python 代码来使用和改进我们的车辆控制解决方案,我们将利用其他 MQTT 功能,这些是我们将在第 5 章、中讨论的主题用 Python测试并改进我们的车辆控制解决方案。




