Skip to content

Latest commit

 

History

History
1212 lines (970 loc) · 71.4 KB

File metadata and controls

1212 lines (970 loc) · 71.4 KB

十七、处理服务事务

系统组件和它们各自管理的数据对象之间存在大量的交互潜力。虽然我们已经研究出了一些确定传输数据更改或命令消息的机制,但我们还没有开始探索这些交互的细节。简而言之,我们仍然需要解决所有本地 CRUD 操作的数据流(以及消息传输)是什么样子的。

在本章中,我们将介绍以下主题:

  • 工匠创造产品
  • 工匠和中央办公室工作人员激活和停用产品
  • 由工匠和中央办公室员工更改产品数据
  • 工匠删除产品
  • 中央办公室工作人员创造工匠
  • Artisan 和中央办公室工作人员对 Artisan 数据进行更改
  • 中央办公室工作人员删除工匠
  • 通过网络店面创建订单,并将该信息转发给工匠以实现
  • 取消网上店面的订单,并将该信息转发给工匠
  • 工匠完成订单项目

剩余故事

由于我们在第 16 章Artisan Gateway Service中的工作仅(暂定)结束了三个故事,因此仍有几个(十一个)需要解决。RabbitMQSender的实施所采用的 RabbitMQ 消息传输策略也提出了一些问题,即如何传播这些过程所需的一些工件,尤其是签名密钥,还有一个决策有待决定,即 Artisan 网关是将一个消息队列用于 Artisan 的入站流量,还是每个 Artisan 使用一个消息队列,这可能会增加另一个故事:

  • 作为 Artisan,我需要创建并分配给我的消息队列,以便我可以将数据更改发送到 Artisan 网关

仍然挂起的大部分故事都表示一个数据流过程,一个与特定用户在系统上下文中执行的特定操作相关联的数据事务。每个过程又是 CRUD 操作的一些变体,通常按照相关消息的指示创建、更新或删除一个或多个数据对象。在审查针对系统中每个用户角色可用的所有业务对象的所有各种 CRUD 操作的可能性时,出现了五个新的故事:

  • 作为一名技工,我需要能够停用Product对象,以便管理Product可用性(这可以通过一般更新操作处理)

  • 作为一名工匠,我需要在下订单时得到通知,该订单包括我的一项产品供应,以便我能够完成该订单中我的部分(最终,由中央办公室的某些活动触发,创建一个 Artisan residentOrder对象)

  • 作为客户,我需要将我订单的相关部分转发给相应的工匠,以便他们能够完成我订单中的相应部分(前面故事的另一半,但可能会增加一些功能需求)

  • 作为取消订单的客户,我需要将取消订单的相关部分转发给相应的 Artisan,以便他们不会履行其订单部分(本质上是删除 Artisan 常驻者Order-对象,但 Artisan 应用程序端会发出通知)

  • 作为一名技工,我需要在订单被取消时得到通知,其中包括我的一项产品,这样我就可以停止任何与之相关的过程中履行活动,并根据需要更新我的Product状态(同样,前面故事的另一半)

所有这些事务都遵循类似的模式:

  • O****对象的相关消息数据需要一起发送数据,用于创建消息(带DaemonMessage
  • 消息由发送方(RabbitMQSender的实例)发送到Artisan 网关服务
  • 服务读取消息,并调用相应的[process-method],该消息可能会与Artisan 网关数据存储交互。
  • [process-method]本身可能需要发送其他消息,或者返回Artisan 网关服务自身进行进一步的本地处理,或者通过服务返回给 Artisan。发送后续消息的过程将非常相似,新消息的目的地可能会有额外的变化:

因此,主要的变化点在消息数据本身中,这些变化应该由业务规则决定,这些规则围绕着用户角色可以对这些对象做什么。

一点重组

在深入研究单个事务的细节之前,似乎需要对最近的代码进行一些小的重组。RabbitMQSenderDaemonMessage类最初是在hms_core.daemons模块中编写的,因为这似乎是保存它们的一个合乎逻辑的地方,它们仍然与守护进程相关,但也与 Artisan 应用程序的部分相关(可能还有中央办公室应用程序)与各种守护进程类本身没有任何联系。由于我们还发现需要各种对象能够生成消息数据结构,并且感觉应该由不同的抽象来处理,因此将这两个类移动到新的hms_core模块(hms_core.messaging模块)中并以这种方式添加新的抽象是合乎逻辑的,所有与消息传递相关的类都位于一个位置。将自定义异常InvalidMessageError移动到新模块也感觉像是一个明智的步骤,因为它也严格与消息相关。

这些代码移动需要对 Artisan Gateway 服务的主模块进行一些简单的更改,例如从以下位置更改原始导入:

from hms_core.daemons import BaseDaemon, DaemonMessage, \ 
    InvalidMessageError 
from hms_core.daemons import BaseDaemon

对下列事项:

from hms_core.daemons import BaseDaemon
from hms_core.messaging import DaemonMessage, InvalidMessageError

为了使它们仍然有用,在已生成的任何测试脚本中也需要进行类似的更改。

This sort of code reorganization is probably inevitable, at least on a long-term basis: it's just a matter of time before something just doesn't feel right where it lives, and needs to be moved to a better location. In general, the earlier the need for a reorganization like this is caught, the better, as it will tend to be less troublesome or disruptive because there's less chance of broken interactions with code if there's less code to interact with. It also probably goes without saying, but it's always a good idea to rerun any test code that might have been created to assure that nothing is egregiously broken before moving on. In this case, the final test script for the daemon (scratch-space/daemon-artisan-tests.py) revealed some minor issues that had to be resolved—not because of the code-move, but because it wasn't rerun before closing out the code in Chapter 16The Artisan Gateway Service. Still, the issue was caught before it became a real bug.

对象事务的准备

前面的代码重组为我们创建前面提到的新的抽象基类ABC)提供了一个坚实、合理的位置。这个新 ABC 的目标是要求派生类能够提供一个消息数据就绪的数据结构,该结构可以传递给DaemonMessage作为其__init__方法中的数据参数,这两种方法都简化了为任何需要消息的给定对象创建消息的过程,并允许该进程的代码作为单个数据对象类本身的一部分存在。为了与迄今为止在代码中发展的命名约定保持一致,最好将其编写为名为to_message_data的实例方法。考虑的另一个选项是to_message_dict,但该方法名称在其他地方已经存在,并且它与DaemonMessage参数也没有太大关系。

to_message_data方法可以是完全抽象的,抽象方法本身没有提供具体的实现。与hms_sys代码库中迄今定义的许多抽象方法不同,实际上没有任何通用功能可依赖。

就这样,真的。新的 ABC 不需要任何其他让人想到的东西。它只是定义了新方法的需求。它甚至不需要__init__方法,因为不需要作为实例属性值传递任何内容(尽管它仍然会从所有类最终派生的对象类继承__init__方法)。因此,其整个定义如下:

class HasMessageData(metaclass=abc.ABCMeta):
    """
Provides interface requirements, and type-identity for objects that 
are expected to provide a to_message_data method.
"""

    ###################################
    # Abstract methods                #
    ###################################

    @abc.abstractmethod
    def to_message_data(self) -> (dict,):
        """
Creates and returns a dictionary representation of the instance 
that is safe to be passed to a DaemonMessage instance during or 
after creation as the data of that message.

self .............. (HasMessageData instance, required) The 
                    instance to execute against
"""
        raise NotImplementedError(
            '%s.to_message_data has not been implemented as '
            'required by HasMessageData' % 
            (self.__class__.__name__)
        )

没有具体功能的 ABC 定义与 Python 代码所能达到的其他面向对象语言所提供的正式接口声明差不多。它仍然只是一个抽象基类,就像到目前为止为项目构建的其他 ABC 一样,但它所做的只是生成一组功能需求,派生类在实例化之前必须实现这些功能需求。在这种情况下,当我们将HasMessageData应用于hms_core.co_objectshms_artisan.artisan_objects名称空间中已经定义的各种数据对象类(ArtisanProduct两个名称空间中的类,以及hms_artisan名称空间中的Order类)时,这立即建立了这些类实现to_message_data的要求,不管它们是如何实施的。

In hms_sys, since the concrete Artisan, Order, and Product classes all derive from ABCs defined in hms_core, we could actually attach HasMessageData to those ABCs, rather than to the concrete classes. The end result would be the same—the concrete classes would be required to implement to_message_data—and there would be (very slightly) less work. The trade-off would be that any future classes that derived from BaseArtisan, BaseOrder, or BaseProduct would also be required to implement to_message_data, even if there was no need for that functionality. While that doesn't feel horrible, it does impose some functional requirements on future development that may not be warranted. For the time being, since we know that the current concrete classes should derive from HasMessageData, we'll derive them from it directly—if that needs to be changed in the future, it's a safer effort to move the requirement deeper into the inheritance tree.

to_message_data的具体实现在代码中提供了一个逻辑挂钩,用于实现关于每个对象在消息中可以发送什么的业务规则限制。也就是说,Artisan 和 Central Office 用户都不允许更改或设置他们各自控制的具有特定属性的所有对象的所有状态数据。即使在用户类型拥有对象类型(工匠和产品)的情况下,也有其他用户拥有的属性(例如,产品和store_available)。由于to_message_data将用于实际生成消息数据,而这些数据又将用于在每个消息事务的接收端进行更改,因此将其生成的数据结构限制为用户类型可以创建或更改的值的简单权宜之计可防止每个用户对对象数据进行非法更改。在处理每个用户/对象/操作组合的特定事务时,我们将深入研究这一点。

产品对象事务

由于产品数据事务集具有最多的单个事务(七个),我们将从这些事务开始,希望它们能够尽早暴露设计中的任何漏洞。每个事务都与一个原始的迭代故事相关联,并且将调用与事务流程相关的特定故事。用户/对象组合的to_message_data具体实现将在该组合的第一笔事务中定义,并在后续事务细节中根据需要进行细化。还将解决该特定组合的任何其他具体需求。

由于针对任何对象的所有不同操作都需要识别对象,因此所有to_message_data输出中的一个常量是被传输对象的oid属性。它在每项业务中都发挥着重要作用:

  • 创建新对象时,必须在消息中提供oid,这样我们就不会在不同的应用程序安装或环境中使用不同的唯一标识符。oid值的生成已经解决了这一问题,该值从BaseDataObject继承而来,如果oid不存在,则在BaseDataObject中创建。
  • 更新现有对象时,必须提供oid,以便检索和更改原始对象。
  • 删除现有对象时,存在相同的标识需求。必须提供oid以标识要删除的对象。
  • 虽然我们还没有响应消息的用例(或多或少相当于标准 CRUD 操作结构中的 read),但它也需要一个oid值来标识应该获取和返回哪个对象。

Artisan–创建产品

根据前面的列表,工匠需要创建产品的相关故事如下:

  • 作为一名工匠,我需要能够创建Product对象,以便管理我的Product产品

Artisan 用户拥有Product对象的大部分数据点。事实上,他们真正不能创建或更改的唯一属性是store_available标志,该标志控制中央办公室运行的 Web 店面上是否有给定的产品可用。因此,hms_artisan.artisan_objects.Productto_message_data输出与to_data_dict方法非常相似:

def to_message_data(self) -> (dict,):
    """
Creates and returns a dictionary representation of the instance 
that is safe to be passed to a DaemonMessage instance during or 
after creation as the data of that message.
"""
    return {
        'oid':str(self.oid),
        # - Properties from BaseProduct:
        'available':self.available,
        'description':self.description,
        'dimensions':self.dimensions,
        'metadata':self.metadata,
        'name':self.name,
        'shipping_weight':self.shipping_weight,
        'summary':self.summary,
        # - Date/time values, since we may well want/need to 
        #   keep those in sync across environments
        'created':datetime.strftime(
            self.created, self.__class__._data_time_string
        ),
        'modified':datetime.strftime(
            self.modified, self.__class__._data_time_string
        ),
    }

createdmodified值包含在此数据结构中,假设它们也应在 Artisan 和中央办公室数据存储中保持同步,如果没有其他内容,则可能,允许 UI 逻辑更容易地检测 UI 在显示实例数据之前需要注意的更改,尽管它几乎肯定需要对所有应用程序和服务实例的公共时间进行一些标准化。

给定一个新的Product对象(new_product和 Artisan 的签名密钥(signing_key,将new_product传输到 Artisan 网关服务变得非常简单:

new_product_message = DaemonMessage(
    'create', new_product.to_message_data(), signing_key
)
# - Assumes that RabbitMQSender has already been configured...
#   it would be slightly more involved if this were the first time 
#   it was used...
sender = RabbitMQSender()
sender.send_message(new_product_message)

接受这些消息并实际创建新的Product的 Artisan 网关方法是ArtisanGatewayDaemon.create_product。由于它是服务中的一种方法,特别是因为它对数据进行了更改(在本例中,创建新数据),因此其进程的日志记录量几乎与进程本身的日志记录量相同,尽管其中大部分是调试日志记录,并且仅当服务配置为记录该级别的事件时才会记录:

def create_product(self, properties:(dict,)) -> None:
    self.info('%s.create_product called' % self.__class__.__name__)
    if type(properties) != dict:
        raise TypeError(
            '%s.create_product expects a dict of Product '
            'properties, but was passed "%s" (%s)' % 
            (
                self.__class__.__name__, properties, 
                type(properties).__name__
            )
        )
    self.debug('properties ... %s:' % (type(properties)))
    self.debug(str(properties))
    # - Create the new object...
    new_object = Product.from_data_dict(properties)
    self.debug('New object created successfully')
    #   ...and save it.
    new_object.save()
    self.info(
        'New Product %s created successfully' % new_object.oid
    )

At this point, the various Gateway methods aren't making any determination about whether the incoming message is authorized to make the changes that the method is making. We'll examine this later on.

中央办公室–批准/列出产品

对于能够激活产品的中央办公室员工来说,早期故事集中的相关故事如下:

  • 作为产品经理,我需要能够激活Product对象,以便管理Product可用性

中央办公室拥有产品的store_available标志,因此他们居住在hms_code.co_objects.Productto_message_dict版本至少在最初要简单得多:

def to_message_data(self) -> (dict,):
    """
Creates and returns a dictionary representation of the instance 
that is safe to be passed to a DaemonMessage instance during or 
after creation as the data of that message.
"""
    return {
        'oid':str(self.oid),
        # - Properties from BaseProduct:
        'store_available':self.store_available,
        # - Date/time values, since we may well want/need to 
        #   keep those in sync across environments
        'modified':datetime.strftime(
            self.modified, self.__class__._data_time_string
        ),
    }

product_to_activate``Product对象和中央办公室signing_key相关的消息传输与我们之前看到的新产品传输一样简单:

product_message = DaemonMessage(
    'update', product_to_activate.to_message_data(), signing_key
)
sender = RabbitMQSender()
sender.send_message(product_message)

同样的消息结构和传输过程也将解决中央办公室需要停用产品的问题,这是另一个最初的迭代故事:

  • 作为产品经理,我需要能够停用Product对象,以便管理Product可用性

Artisan Gateway 接受这些消息并更新相关Product的方法是ArtisanGatewayDaemon.update_product。与create_product一样,它通过执行大量日志:

def update_product(self, properties:(dict,)) -> None:
    self.info('%s.update_product called' % self.__class__.__name__)
    if type(properties) != dict:
        raise TypeError(
            '%s.update_product expects a dict of Product '
            'properties, but was passed "%s" (%s)' % 
            (
                self.__class__.__name__, properties, 
                type(properties).__name__
            )
        )
    self.debug('properties ... %s:' % (type(properties)))
    self.debug(str(properties))
    # - Retrieve the existing object, and get its data-dict 
    #   representation
    existing_object = Product.get(properties['oid'])
    self.debug(
        'Product %s retrieved successfully' % existing_object.oid
    )
    data_dict = existing_object.to_data_dict()
    # - Update the data-dict with the values from properties
    data_dict.update(properties)
    # - Make sure it's flagged as dirty, so that save will 
    #   *update* instead of *create* the instance-record, 
    #   for data-stores where that applies
    data_dict['is_dirty'] = True
    # - Create a new instance of the class with the revised 
    #   data-dict...
    new_object = Product.from_data_dict(data_dict)
    #   ...and save it.
    new_object.save()
    self.info('Product %s updated successfully' % new_object.oid)

中央办公室-更改产品数据

根据前面的列表,中央办公室需要更改产品数据的相关情况如下:

  • 作为一名产品经理,我需要能够更新Product对象,以便能够管理工匠无法管理的Product信息

假设中央办公室希望能够对特定的产品属性进行更改,而无需通过技工对产品内容进行细微的拼写更正或类似的、简单的更改(这些更改会转移到他们的 Web 店面),这并非不合理。由于没有任何关于涉及哪些属性的可靠定义,让我们假设这些属性包括产品的namedescriptionsummary。在这种情况下,需要修改为hms_code.co_objects.Product创建的to_message_data以包含这些值:

def to_message_data(self) -> (dict,):
    """
Creates and returns a dictionary representation of the instance 
that is safe to be passed to a DaemonMessage instance during or 
after creation as the data of that message.
"""
    return {
        'oid':str(self.oid),
        # - Properties from BaseProduct:
        'description':self.description,
        'name':self.name,
        'store_available':self.store_available,
        'summary':self.summary,
        # - Date/time values, since we may well want/need to 
        #   keep those in sync across environments
        'modified':datetime.strftime(
            self.modified, self.__class__._data_time_string
        ),
    }

此实现引入了一个潜在的不必要的副作用:由中央办公室用户执行的任何更新操作都可以一次更新所有这些属性。如果不希望出现这种行为,那么可以采取以下措施:

  • 可以在ArtisanGatewayDaemon中添加其他方法来处理更具体的操作,例如set_product_availability,它只会更改store_available标志值。这可能需要以下方面:

    • DaemonMessage添加相应的允许operation
    • 检查由 Artisan 发起的邮件,以便他们不会意外或故意执行不允许他们执行的存储可用性更改
  • 过滤出站消息数据,以从中删除不适用于特定操作的任何元素,可以作为消息生成的一部分来实现:

    • 可以将 Helper 方法添加到具体的Product类中以执行该过滤
    • UI 可以负责确定应该发送哪种类型的消息,并且可以执行过滤

目前,允许任何更新操作跨多个逻辑操作进行更新似乎没有任何真正的危害,因此暂时可以不进行任何更新。

目前,这个中央办公室角色的变更可以通过审批/挂牌操作使用的相同消息构造、传输和处理流程来处理。这只是数据更新的另一个变体。

Artisan–更新产品数据

根据前面的列表,工匠需要更新产品数据的相关情况如下:

  • 作为一名工匠,我需要能够更新Product对象,以便管理我的Product产品

Artisan 更新和创建事务之间唯一的真正区别是与传出消息相关的operation,我们已经在 ArtisanProduct对象中的to_message_data结果中包含了modified属性:

product_message = DaemonMessage(
    'update', product_to_update.to_message_data(), signing_key
)
sender = RabbitMQSender()
sender.send_message(product_message)

从流程的角度来看,来自工匠的数据更改与来自中央办公室用户的数据更改相同。他们可以使用相同的ArtisanGatewayDaemon.update_product方法实际执行这些更改,因此不需要新代码。

由于 Artisan 还控制产品可用性标志(available),因此在 Central Office 产品批准列表中注明的相同注意事项将适用于 Artisan 级别。其中包含两个不属于原始迭代故事集的故事,但为了完整起见,应该包括这两个故事:

  • 作为一名工匠,我需要能够激活Product对象,以便管理Product可用性
  • 作为一名工匠,我需要能够停用Product对象,以便管理Product可用性

这些也可以由已经定义的相同的现有数据更新过程来处理,只要不需要将激活/停用更改与数据结构的其他更改隔离开来。即使这样的要求浮出水面,在事务的消息发起端处理它们也是可行的,将消息的内容仅限于识别要激活或停用的产品的active标志和oid

Artisan–删除产品

工匠需要从前面的列表中删除产品的相关故事如下:

  • 作为一名工匠,我需要能够删除Product对象,以便管理我的Product产品

如前所述,删除操作实际上只需要删除项目的oid即可成功执行。任何其他信息都会浪费带宽,但如果不是这样的话,删除的代码实际上只是在消息中再次发送的operation中有所不同:

product_message = DaemonMessage(
    'delete', product_to_delete.to_message_data(), signing_key
)
sender = RabbitMQSender()
sender.send_message(product_message)

执行一个更紧密关注的消息并不困难——最终,它不需要比更直接地控制消息数据更多的东西,只需要相关的对象 ID 即可。一种方法是直接创建消息数据,如下所示:

message_data = {
    'oid':str(product_to_delete.oid)
}
product_message = DaemonMessage('delete',message_data, signing_key)
sender = RabbitMQSender()
sender.send_message(product_message)

Artisan Gateway(delete_product中对应的删除方法比创建或更新过程中对应的删除方法简单得多,原因相同:真正需要的是要删除数据的对象的oid

def delete_product(self, properties:(dict,)) -> None:
    self.info('%s.delete_product called' % self.__class__.__name__)
    self.debug(str(properties))
    # - Delete the instance-record for the specified object
    Product.delete(properties['oid'])
    self.debug(
        'Product %s deleted successfully' % properties['oid']
    )

Artisan 对象事务

发送Artisan对象消息的过程不会与前面所示的Product对象的示例有很大的不同。createupdate消息的创建和传输通常遵循如下结构:

# - Typical create-object message-creation and -transmission
create_message = DaemonMessage(
    'create', object_to_create.to_message_data(), signing_key
)
sender = RabbitMQSender()
sender.send_message(create_message)

# - Typical update-object message-creation and -transmission
update_message = DaemonMessage(
    'update', object_to_update.to_message_data(), signing_key
)
sender = RabbitMQSender()
sender.send_message(update_message)

删除消息通常遵循以下两种结构之一,具体取决于关于发送完整对象数据集的决定,或者仅仅是所需的oid值:

# - Transmit the full object-data-set as the delete-message
delete_message = DaemonMessage(
    'delete', object_to_delete.to_message_data(), signing_key
)
sender = RabbitMQSender()
sender.send_message(delete_message)

# - Transmit *only* the required oid as the delete-message:
message_data = {
    'oid':str(product_to_delete.oid)
}
delete_message = DaemonMessage('delete', message_data, signing_key)
sender = RabbitMQSender()
sender.send_message(delete_message)

Artisan对象,如Product对象,从 Artisan 网关服务中的 CRUD 操作方法来看并不复杂。事实上,除了处理哪些对象的细节,以及与执行这些操作所涉及的各种方法相关联的命令消息的特定预期结构之外,它们与它们的Product对象对应物是相同的。例如,update_artisanArtisan 网关服务的方法如下所示:

def update_artisan(self, properties:(dict,)) -> None:
    self.info('%s.update_artisan called' % self.__class__.__name__)
    if type(properties) != dict:
        raise TypeError(
            '%s.update_artisan expects a dict of Artisan '
            'properties, but was passed "%s" (%s)' % 
            (
                self.__class__.__name__, properties, 
                type(properties).__name__
            )
        )
    self.debug('properties ... %s:' % (type(properties)))
    self.debug(str(properties))
    # - Retrieve the existing object, and get its data-dict 
    #   representation
    existing_object = Artisan.get(properties['oid'])
    self.debug(
        'Artisan %s retrieved successfully' % existing_object.oid
    )
    data_dict = existing_object.to_data_dict()
    # - Update the data-dict with the values from properties
    data_dict.update(properties)
    # - Make sure it's flagged as dirty, so that save will 
    #   *update* instead of *create* the instance-record, 
    #   for data-stores where that applies
    data_dict['is_dirty'] = True
    # - Create a new instance of the class with the revised 
    #   data-dict...
    new_object = Artisan.from_data_dict(data_dict)
    #   ...and save it.
    new_object.save()
    self.info('Artisan %s updated successfully' % new_object.oid)

因此,总体而言,各种Artisan操作遵循与Product操作建立的模式相同的模式。

中央办公室-创建工匠

中心办公室员工能够从早期故事集中创建工匠的相关故事如下:

  • 作为 Artisan Manager,我需要能够创建Artisan对象,以便管理 Artisan

Artisan物件是不寻常的,因为它们在逻辑上属于所代表的工匠所有,但它们是由中央办公室创建的。这意味着中央办公室代码库需要两种完全不同的消息格式:一种用于创建Artisan,另一种用于更新。如果我们从一个完整的消息结构开始创建,我们可以更好地评估它在以后的更新过程中是否存在任何风险或复杂性:

def to_message_data(self) -> (dict,):
    """
Creates and returns a dictionary representation of the instance 
that is safe to be passed to a DaemonMessage instance during or 
after creation as the data of that message.
"""
    return {
        'oid':str(self.oid),
        # - BaseArtisan-derived items
        'address':self.address.to_dict() if self.address else None,
        'company_name':self.company_name,
        'contact_email':self.contact_email,
        'contact_name':self.contact_name,
        'website':self.website, 
        # - BaseDataObject-derived items
        'created':datetime.strftime(
            self.created, self.__class__._data_time_string
        ),
        'modified':datetime.strftime(
            self.modified, self.__class__._data_time_string
        ),
        # Queue- and signing-key values
        'queue_id':self.queue_id,
        'signing_key':self.signing_key.hex(),
    }

由于创建Artisan的过程几乎肯定涉及创建和存储与 Artisan(queue_id相关联的消息队列标识符和初始signing_key,因此这些值包括在中央办公室的Artisan.to_message_data方法中。我们仍然需要定义如何在Artisan对象中实际创建签名密钥和队列标识符,但必须以某种方式将它们发送到 Artisan 网关,以便它们可用于向 Artisan 应用程序实例发送、接收和验证消息。

从安全角度来看,这些过程非常重要:请记住,签名密钥被视为一个秘密数据值,应该谨慎对待,不能不必要地传输或不注意保护数据。在许多方面,它相当于一个用户密码——一个仅与一个用户关联的秘密值。如果签名密钥是密码,那么队列标识符可以被视为大致等同于用户名数据,该用户名数据可能不完全是机密的,但仍应谨慎对待,因为它可能唯一地标识用户,并与真实机密关联,共同构成一组用户凭据。随着queue_idsigning_key创建和管理的实现细节的展开,我们很可能不得不重新审视这个消息结构,所以现在,我们将保持它的当前状态。

中央办公室-更新 artisan 数据

对于能够更新工匠数据的中央办公室工作人员而言,从早期的故事集中获得的相关故事如下:

  • 作为 Artisan Manager,我需要能够更新Artisan对象,以便管理 Artisan

一旦Artisan对象被创建,它的大部分属性可以说是由该对象所代表的工匠所拥有。当然,从常识的角度来看,Artisan 用户最有可能知道他们的数据是否是最新的,保持数据的最新符合他们的最大利益。也就是说,将queue_idsigning_key属性放在一边,直到它们的流程更加详细,允许中央办公室用户修改 Artisan 数据的风险并不显著,前提是所做的更改可以传播到 Artisan 用户,也可以由 Artisan 用户自己更改。这个场景的警告是oid属性不应该被任何中央办公室或 Artisan 用户更改,但这几乎是不言而喻的。毕竟,它是Artisan对象的唯一标识符,唯一标识符绝不能轻易更改。

考虑到所有这些,尽管随着queue_idsigning_key管理流程的定义和实施,很可能会出现变更,但仍不需要修改中央办公室的Artisan.to_message_data方法来实现这一目标。

中央办公室–删除工匠

对于能够从早期故事集中删除工匠数据的中央办公室员工而言,相关故事如下:

  • 作为 Artisan Manager,我需要能够删除Artisan对象,以便管理 Artisan

尽管删除 Artisan 的过程可能会对删除或至少停用其所有产品产生其他影响,例如,从生成删除命令消息的角度来看,没有任何影响。与Product对象的删除过程一样,真正需要的唯一属性值是要删除的 Artisan 的oid,在该上下文中,关于使用完整消息体或创建用于删除过程目的的特定消息体的任何决定可能也适用于该上下文。

Artisan–更新 Artisan 数据

Artisan 能够更新 Artisan 数据的相关故事(来自早期故事集)为:

  • 作为一名工匠,我需要能够更新自己的Artisan对象,以便在 HMS 中央办公室管理我的信息。

无论围绕着Artisanqueue_idsigning_key属性的过程的最终形状是什么,这些值作为机密,在没有某种保护的情况下,不应该通过开放互联网发送,至少在运动时对它们进行加密。如果没有这些值,Artisan 用户对 Artisan 数据的更改可以不加密地传递,因此 Artisan 更新的基本消息结构几乎与中央办公室命名空间中的等效消息结构重复:

    def to_message_data(self) -> (dict,):
        """
Creates and returns a dictionary representation of the instance 
that is safe to be passed to a DaemonMessage instance during or 
after creation as the data of that message.
"""
        return {
            'oid':str(self.oid),
            # - BaseArtisan-derived items
            'address':self.address.to_dict() if self.address else None,
            'company_name':self.company_name,
            'contact_email':self.contact_email,
            'contact_name':self.contact_name,
            'website':self.website, 
            # - BaseDataObject-derived items
            'created':datetime.strftime(
                self.created, self.__class__._data_time_string
            ),
            'modified':datetime.strftime(
                self.modified, self.__class__._data_time_string
            )
        }

在中央办公室和 Artisan 代码库之间,我们允许任一用户类型更改 Artisan 的大部分数据。其中大部分是联系信息的一些变化,没有一个具有任何功能性影响,其余的政策已经制定,如果尚未实施(oid),或者仍在等待进一步定义(queue_idsigning_key)。两种用户类型完全控制这些属性时,最严重的风险似乎是同时发生的冲突更改(可能最好在 UI 级别处理)或正在进行的冲突更改(一个用户更改值,另一个用户更改回值,第一个用户再次更改值,依此类推)。

订单对象事务

自从在artisan_objects模块中定义了具体的Order类以来,订单及其在系统中的对应对象就没有得到过太多的讨论。部分原因是,其他类(特别是ArtisanProduct是源自hms_sys代码库的数据表示。尽管如此,artisan_objects.Order类仍然像预期的那样处于一种完整的状态,具有完整的数据持久性和一个具体的实现,该实现有望处理到目前为止针对它的所有需求。

然而,结果是,订单的几个方面从雷达上消失了。本次迭代的原始故事集仅包括一个订单相关故事,即工匠需要能够更新订单作为履行流程的一部分,而没有为该订单提供任何路径,以便让工匠开始,更不用说在此之前的任何事情了。对于订单在完成之前被取消的可能性也没有任何考虑。考虑到这些项目的客户到网关和网关到 Artisan 的路径,他们添加了四个新的故事,将首先解决。

处理订单也有点复杂,因为 Web 店面系统的细节被故意模糊。有几十种(如果不是几百种的话)可用选项,它们都是用大多数流行/主流语言编写的,并且具有不同程度的可扩展性。没有选择任何一个,而是做出了一个基本假设,即hms_sys集成可以以某种方式完成,这至少可以包括以下可能性:

  • 按计划执行的蛮力流程可以从商店的数据中获取新的原始订单信息,并启动 Artisan Gateway 的订单创建流程
  • 存储系统通过某种小型定制扩展,可以直接或通过消息队列向 Artisan 网关发送创建订单消息,执行其订单创建过程
  • 如果存储系统是用 Python 编写的(此字段中至少有 11 个选项),它实际上可能能够导入所需的任何hms_sys代码,或者添加一些配置,并直接执行相关的hms_sys代码

在真实场景中,跨系统集成可能是一组非常具体的需求,但出于说明的目的,为了保持对项目的关注,这些需求被有意地放在一边。

客户–将订单项目转发给工匠

客户能够将订单项目转发给工匠的相关故事,来自早期故事集:

  • 作为客户,我需要将我订单的相关部分转发给相应的工匠,以便他们能够完成我订单中的相应部分。

订单的生命周期比hms_sys中的任何其他对象都要复杂得多。与Artisan物体或Product物体不同,它们的活动寿命较短;创建、处理一次,然后存档,甚至删除。Artisan相反,一旦创建了对象,则预期只要中央办公室/工匠关系持续,对象就会持续存在。Product对象可能长时间处于活动状态,也可能不处于活动状态,但只要其所属工匠的中央办公室/工匠关系持续,对象也可以持续。在这两种情况下,尽管它们的生命周期长度可能会有很大的变化,但它们基本上是无限期地创建和持久化的(有修改或没有修改)。

相比之下,一个相对简单的Order,通过hms_sys可以支持的一个简单子集,可能看起来像这样:

哪里:

  • 最初的订单(针对Product对象P1P2P3)由网站店面创建,并交给Artisan Gateway分发给相关 Artisan 用户处理
  • Artisan GatewayArtisan 应用程序发送订单消息,该Artisan 应用程序与订单**(Artisan 2在本例中存在,但订单不包含其任何产品)关联:**
    • 产品P1P3订单发送给工匠【1】
    • 产品P2订单一份发送给Artisan#3
  • Artisan#1完成产品P1P1 完成的订单部分),将订单的更新消息发送回Artisan Gateway,在Artisan Gateway中记录并存储该部分的完成情况
  • 对于原始订单中的产品P2,对于Artisan#3,也会发生类似的循环(P2 已完成
  • 最终完成周期(P3 完成由**技工【1】**执行
  • 订单完成所有履行后,可以存档、删除或以任何其他方式处理

由于没有创建 Artisan 网关服务能够访问的具体Order类,因此这是需要做的第一件事。在不确切知道订单数据将如何中继到服务的情况下,但仍然需要能够在以后执行流程的往返测试,除了将其定义为派生自HMSMongoDataObject(与co_objects模块中的其他数据对象类一样)和BaseOrder的基本类外,没有什么可以做的了(来自business_objects模块)。以后可能会对其进行添加或更改,但从这两个类派生Order将提供足够的功能,使其可测试。

在使用 Artisan 应用程序的 Order 类定义完成了所有分析工作之后,对于中央办公室代码(co_objects中的相应类来说,这似乎是一个更好的起点,尽管在这个过程中需要一些修改/转换。首先也是最重要的是,它需要从HMSMongoDataObject而不是JSONFileDataObject派生—但由于这两个类都是从BaseDataObject派生的,因此新Order类的相当一部分已经实现了继承更改。

There's enough common code between the two Order classes that it would almost certainly be worth spending time moving those common items back down into BaseOrder. Designing, or even implementing concrete classes, then gathering their common functionality into common parent classes is just as valid a design or implementation approach as starting from the foundations and building out, though it happened accidentally in this case.

除此之外,我们还需要一种机制,允许 Web 店面系统创建一个Order。到目前为止,我们还没有关于这个过程的任何规范,但这并没有阻止我们创建一个类方法,这个类方法(希望)最终能够在这个能力中使用。出于近期的测试目的,它将被设置为接受衍生为customerBaseCustomer对象和产品标识符列表,并着眼于customer在将来的某个时候被修改。首先,我们所关心的是一个方法,可以调用该方法来创建一个完整的Order,并将相关的Product对象附加到该方法上:

def create_order_from_store(
    cls, customer:(BaseCustomer,str,dict), **order_items
):
    """
Creates and returns a new order-instance, whose state is populated 
with data from the     

customer .......... (Type TBD, required) The customer that placed 
                    the order
order_items ....... (dict [oid:quantity], required) The items and 
                    their quantities in the order
"""

假设店面能够以某种dict值的形式传递订单中的产品标识符及其数量,并且不会跟踪整个Product对象,至少不会与hms_sys代码使用的结构相同,这是相当安全的。考虑到order_itemskeys()中可用的产品oid值列表,在创建时检索要添加到order实例的产品只需将所有可用产品向下过滤到订单中特定项目的集合中,同时保留其关联数量:

    # - Get all the products and quantities identified by the 
    #   incoming oid-values in order_items
    products = {
        product:order_items[str(product.oid)] 
        for product in Product.get()
        if str(product.oid) in order_items.keys()
    ]

这里生成的产品是 dicts,通过字典理解生成,其键是Product对象,值是这些产品在订单中的数量。然后,我们需要获取customer

# TODO: Determine how customer-data is going to be #provided 
# (probably a key/value string, could be a JSON packet 
# that could be converted to a dict), and find or create 
# a customer object if/as needed. In the interim, for 
# testing purposes, accept a BaseCustomer-derived object.
  if not isinstance(customer, BaseCustomer):
      raise NotImplementedError(
          "%s.create_order_from_store doesn't yet accept "
          "customer arguments that aren't BaseCustomer-"
          "derived objects, sorry" % (cls.__name__)
      )

最后,新的Order实例被创建、保存(确保其数据被持久化)并返回(如果调用代码需要在创建后立即引用它):

# - Create the order-instance, making sure it's tagged 
#   as new and not dirty so that the save process will 
#   call _create
new_order = cls(
    customer, is_dirty=False, is_new=True, *products
)
# - Save it and return it
new_order.save()
return new_order

Order类还需要一个to_message_data方法,就像它们的产品和 Artisan 对应物一样,在定义了一个方法后,可以使用与前面建立的基本相同的消息传输过程:

def to_message_data(self) -> (dict,):
    """
Creates and returns a dictionary representation of the instance 
that is safe to be passed to a DaemonMessage instance during or 
after creation as the data of that message.
"""
    return {
        # - Local properties
        'name':self.name,
        'street_address':self.street_address,
        'building_address':self.building_address,
        'city':self.city,
        'region':self.region,
        'postal_code':self.postal_code,
        'country':self.country,
        # - Generate a string:int dict from the UUID:int dict
        'items':{
            str(key):int(self.items[key]) 
            for key in self.items.keys()
        },
        # - Properties from BaseDataObject (through 
        #   HMSMongoDataObject)
        'modified':datetime.strftime(
            self.modified, self.__class__._data_time_string
        ),
        'oid':str(self.oid),
    }

这个过程意味着一个新的故事,它可能主要是 UI 开发所需要的,但它可能会对 Artisan 应用程序的其他设计和实现产生一些影响:

  • 作为一名技工,我需要在下订单时得到通知,其中包括我的一项产品,以便我能够完成订单中我的一部分

由于 Web Storefront 创建新的Order还需要将新的Order对象转发给每个 Artisan(查看订单流程图),并且似乎可以合理地预期只有该流程的 store-to-Gateway 服务部分会调用create_order_from_store,乍一看,这似乎是实现该消息传递的合理场所,但这样做将无法访问服务的日志记录设施,因此两个系统之间的任何通信故障都可能丢失。相反,如果 Web 店面向 Artisan Gateway 发出创建订单消息,则 Gateway 服务可以使用适用的数据调用create_order_from_store,并在执行时根据需要/需要记录事件。为了便于说明,这是将要假设的方法。在本例中,create_order_from_store是完整的,Artisan/Order 消息传递作为网关服务create_order方法的一部分。其代码的第一个主要部分与其他创建过程非常相似:

def create_order(self, properties:(dict,)) -> None:
    self.info('%s.create_order called' % self.__class__.__name__)
    if type(properties) != dict:
        raise TypeError(
            '%s.create_order expects a dict of Order '
            'properties, but was passed "%s" (%s)' % 
            (
                self.__class__.__name__, properties, 
                type(properties).__name__
            )
        )
    self.debug('properties ... %s:' % (type(properties)))
    self.debug(str(properties))
# - Create the new object...
    new_order = Order.create_order_from_store(properties)
    self.info(
        'New Order %s created successfully' % new_order.oid
    )

由于create_order_from_store方法已经保存了新的订单,我们不需要在这里保存它,它将已经存在于数据存储中,并且可以在代码中的这一点被其他进程检索到。为了继续,并将必要的Order信息发送给需要了解这些信息的个体技工,我们需要整理哪些产品(以及数量)与系统中的每个技工相关。

Since the Artisan can have a Product, but a Product doesn't keep track of which Artisan they belong to (which might be a good thing to add, in retrospect), the best option we have right now is to load up the Artisan, and search for it for each product. This is not optimal, and definitely worth looking at changing, but it will work for now.

new_order变量保存着一个Order对象,如果用 dict 表示,该对象将如下所示:

{
    'oid':<UUID>,
    'name':<str>,
    # - Shipping-address properties
    'street_address':<str>,
    'building_address':<str> or None,
    'city':<str>,
    'region':<str>,
    'postal_code':<str>,
    'country':<str> or None,
    # - order-items
    'items':{
        <Product object #1>:<int>,
        <Product object #2>:<int>,
        <Product object #3>:<int>,
    },
}

如果以蛮力的方式将其转化为 Artisan/item:quantity 值,则非常简单:

    artisan_orders = {}
    # - Get all the artisans
    all_artisans = Artisan.get()
    # - Sort out which artisan is associated with each item 
    #   in the order, and create or add to a list of 
    #   products:quantities for each
    for product in new_order.products:
        try:
            artisan = [
                candidate for candidate in all_artisans
                if product.oid in [
                    p.oid for p in candidate.products
                ]
            ][0]

如果找到与产品关联的工匠,则需要执行两种情况中的一种:要么artisan已经作为artisan_orders dict中的键存在,在这种情况下,我们只是将项目数据附加到与artisan关联的当前项目列表中,要么他们还没有产品匹配,在这种情况下,我们为artisan创建一个条目,其值是一个包含相关项目数据的列表:

item_data = {
  str(oid):new_order.products[product]
}
if artisan_orders.get(artisan):
   artisan_orders[artisan].append(item_data)
else:
   artisan_orders[artisan] = [item_data]
if artisan_orders.get(artisan):
   artisan_orders[artisan].append(product)
else:
   artisan_orders[artisan] = [product]

虽然这不应该发生,但有可能订单中的产品没有可识别的artisan与之关联。如何处理错误案例的细节可能取决于网络商店系统。即使把这种考虑放在一边,它也应该以某种尚未定义的方式来处理。但是,至少应记录故障:

except IndexError:
   self.error(
       '%s.create_order could not find an '
       'artisan-match for the product %s' % 
       (product.oid)
   )
self.debug('All artisan/product associations handled')

完成此排序后,artisan_ordersdict 将看起来像这样,其中artisan_orders中的每个键都是一个实际的Artisan对象,具有任何此类实例的所有属性和方法,以及产品oid和相关数量:

{
    <Artisan #1>:{
        <str<UUID>>:<int>,
        <str<UUID>>:<int>,
    },
    <Artisan ...>:{
        <str<UUID>>:<int>,
    },
    <Artisan #{whatever}>:{
        <str<UUID>>:<int>,
        <str<UUID>>:<int>,
    },
}

Python dict instances can use almost anything as a key: any immutable built-in type (like str and int values, and even tuple values, but not list or other dict values) can be used as a key in a dict. In addition, instances of user-defined classes, or even those classes themselves, are viable. Instances of built-in classes, or the built-in classes themselves, may not be valid dict keys, though.

使用完整且格式良好的artisan_orders,向每个 Artisan 发送订单消息的过程相对简单,迭代每个 Artisan 密钥,以 Artisan 应用程序的Order类所期望的结构构建消息数据,创建一个DaemonMessage来签名消息,然后发送:

sender = RabbitMQSender()
self.info('Sending order-messages to artisans:')
for artisan in artisan_orders:
# Get the products that this artisan needs to be concerned #with
items = artisan_orders[artisan]
# - Create a message-structure that 
#   artisan_objects.Order.from_message_dict can handle
new_order_data = {
    'target':'order',
    'properties':{
        'name':new_order.name,
        'street_address':new_order.street_address,
                'building_address':new_order.building_address,
                'city':new_order.city,
                'region':new_order.region,
                'postal_code':new_order.postal_code,
                'country':new_order.country,
                'items':items,
                'oid':str(new_order.oid),
            },
        }
        # - Create the signed message
        order_message = DaemonMessage(
            'create', new_order_data, artisan.signing_key
        )

将消息发送到特定的 Artisan 需要另一个更改:RabbitMQSendersend_message方法最初不是用来将消息发送到队列的,而不是它配置的默认队列。出于多种原因,每个技术人员都有自己的消息队列是有意义的,为了使用该特定队列,必须将其作为send_message参数接受。发送消息的网关端调用反映(将artisan.queue_id作为参数传递):

# - Send the message to the artisan
sender.send_message(order_message, artisan.queue_id)
self.info(
    '+- Sent order-message with %d products to '
    'Artisan %s' % (len(items), artisan.oid)
)

RabbitMQSender.send_message中的相关更改并不复杂:只需添加一个可选的queue_name参数,并检查是否提供了该参数,返回到配置的默认队列名称即可:

def send_message(self, message:(DaemonMessage), 
        # Added queue_name
        queue_name:(str,None)=None
    ):
    if type(message) != DaemonMessage:
        raise TypeError(
            '%s.send_message expects a DaemonMessage instance '
            'as its message argument, but was passed "%s" (%s)' % 
            (
                self.__class__.__name__, message, 
                type(message).__name__
            )
        )
 # Using the optional queue_name to override the default
    if not queue_name:
        queue_name = self.queue_name
 # - Note that exchange is blank -- we're just using the 
 #   default exchange at this point…
 # - Also note that we're using queue_name instead of the 
 #   original self.queue_name default...
    self.channel.basic_publish(
        exchange='', routing_key=queue_name, 
        body=message.to_message_json()
  )

客户–取消订单

客户能够取消订单的相关故事(来自早期故事集)是:

  • 作为一名取消订单的客户,我需要将取消订单的相关部分转发给相应的技工,以便他们不会履行其订单部分。

订单取消与订单创建有一个共同点:取消的起点应该是客户,几乎可以肯定,因为有些功能可以通过 Web 店面获得。在形成订单创建的相同假设下操作,以便 Web 店面能够向 Artisan Gateway 服务发送消息以指示取消已启动,类似地,在这种情况下,允许网关使用单个消息处理程序方法delete_order进行处理。

delete_order消息处理程序方法最终是它必须执行的两个任务:

  • 给定一个由oid标识的订单,它必须追踪哪些工匠参与了最初的订单。该部分流程可以与create_order中的工匠和产品标识相同。该代码的产品标识方面可能不需要,但包含它不会造成任何伤害,甚至可能在以后利用它来防止取消部分完成的订单。

  • 它必须生成一条消息并发送给与订单关联的 Artisan 相关的每个 Artisan 应用程序:一条删除消息,其中订单的oid作为数据有效负载。

The Artisan/Product association, yielding artisan_orders in the create_order and delete_order code, would probably be worth moving into a common helper method in the ArtisanGatewayDaemon class: it's identical to being written in those methods as things stand right now. With only two instances of that code present right now, and those being close together in the code, it's not an imperative, perhaps, but as long as there are two instances of the same code, any changes to one have to be made to the other as well.

与订单创建过程一样,订单取消也意味着一个新的故事,同样可能主要用于 UI 开发,但这可能会对 Artisan 应用程序产生一些额外的设计和实现影响:

  • 作为一名技工,我需要在包括我的一个产品产品的订单被取消时得到通知,以便我可以停止任何与之相关的过程中履行活动,并根据需要更新我的产品状态

解决这个问题的基础,当它真的变得活跃时,应该是大部分的——如果不是完全的话——作为删除命令的结果。

Artisan–按订单完成项目

Artisan 能够按照订单完成一个项目的相关故事,来自早期故事集:

  • 作为一名工匠,我需要能够更新订单对象,以便在完成订单中我的部分时向中心办公室指示。

最终,工匠完成全部或部分订单的行为只是另一个更新过程,至少从消息传递的角度来看是这样。不过,到目前为止,还没有一种机制来跟踪Order类中的已完成项,这在得到解决之前是有问题的。幸运的是,已完成项目的模型基本上可以与原始订单项目的模型相同——产品oid键和int数量的集合(具体地说是dict)。使用items属性作为模型或指南,将该属性添加到artisan_objects.Order,需要以下内容:

  • 包括fulfilled_items、adict作为__init__中的一个参数,并以与items参数/属性集成相同的方式对其进行集成
  • 为其创建gettersetterdeleter方法
  • 创建与_get_fulfilled_items关联的fulfilled_items属性
  • 确保to_data_dict方法在其输出结果中包含fulfilled_items的表示
  • 确保from_data_dict类方法不需要对传入的fulfilled_items值进行任何特殊处理

由于fulfilled_items将遵循与Orderitems属性相同的约束,因此禁止直接修改fulfilled_items的成员。这项禁令的基本原理是类似的:我们希望严格控制这些成员的修改,以尽可能防止坏数据更改。同时,我们需要允许工匠完成订单项目(同时执行所有相关检查以确保数据更改有效)。

为了方便起见,artisan_objects.Order类需要调用一个方法,允许 Artisan 用户将项目标记为已完成:

def fulfill_items(self, oid:(UUID,str), quantity:(int,)):
    """
Assigns a number of items fulfilled to a given item-oid, and sets the 
is_dirty state of the instance to True
"""

对于 Artisan 来说,订单履行数据是更重要的数据集之一,因此在允许保存更改之前,我们将以几种不同的方式检查每个参数。检查过程从标准类型和值检查开始(去掉错误消息以保持列表简短):

if type(oid) not in (UUID,str):
   raise TypeError() # Expecting a UUID or str-UUID value
if type(oid) != UUID:
   try:
      oid = UUID(oid)
   except:
      raise ValueError() # Could not convert a str value to a UUID
if type(quantity) != int:
   raise TypeError()
if quantity < 0:
   raise ValueError() # Should not fulfill a negative quantity

我们还将检查以确保所完成的项目实际上是订单的一部分:

if oid not in self._items:
   raise RuntimeError(
       '%s.fulfill_item was asked to fulfill an item '
       '(%s) that doesn\'t exist in the order-items' % 
       (self.__class__.__name__, oid)
)

我们将检查以确保履行数量不大于订单中的数量:

if quantity > self._items[oid]:
   raise RuntimeError(
         '%s.fulfill_item was asked to fulfill an item '
         '(%s) in a higher quantity (%d) than was '
         'ordered (%d)' % 
         (
            self.__class__.__name__, oid, quantity, 
            self._items[oid]
         )
   )
# If everything checks out, then update the quantity, etc.
self._fulfilled_items[oid] = quantity
self._set_is_dirty(True)

除了fulfill_items方法之外,还需要对中央办公室Order类(co_objects.Order类)进行类似的更改,以处理实现消息。目前,在我们能够在下一章集中讨论往返消息测试之前,可以通过简单地从artisan_objects.Order复制代码来满足这些要求。

Copying that much code around is another argument for refactoring the Order classes, re-defining BaseOrder, and deriving the concrete classes from it instead. Time and space constraints in this book may not allow for much discussion of this process, but we'll take at least a brief look at it, either during or after testing.

什么时候发送消息?

到目前为止,我们已经花了相当长的时间研究如何生成和发送相关消息,但除了检查订单创建和取消之外,很少涉及何时发生。由于消息直接对应于各种本地 CRUD 操作,因此很容易简单地将消息调用添加到它们已有的_create_update方法中,确保考虑我们在BaseDataObject中定义的is_dirtyis_new标志。然而,在走这条路之前,最好先看看所有的消息传递过程,从发起到完成,以确保它们有一个清晰的过程终止。以Product更新流程为例,我们需要确保避免的场景如下:

哪里:

  1. 一位工匠对他们的一种产品进行了更改:
    • 执行本地数据更改
    • 他们的Artisan 应用程序Artisan 网关发送消息:更新产品“X”
  2. Artisan 网关接收消息:
    • 执行本地数据更改
    • 向相应的Artisan 应用程序发送消息:更新产品“X”
  3. Artisan 应用程序接收消息:
    • 执行本地数据更改,该更改可能没有任何更新的数据
    • Artisan 网关发送消息:更新产品“X”

在最后一步结束时,流程将在没有任何检查流程或退出条件的情况下跳回第二步,并进入一个无限循环的更新消息中,这些消息实际上什么都不做。同样的情况也可能发生在任何一个更新过程中,其中可能有多个数据更改原点在起作用:Artisan对象可以由其代表的工匠和中央办公室工作人员进行更新。Order物品目前是免税的,但不难想象,在将订单发送给工匠后,客户将来需要更改订单,而工匠将完成订单中的物品。

最终,由于各种数据对象类的save方法不知道它们正在执行的数据更改来自何处,因此它们无法决定在执行数据更改后是否应该发送消息。因此,一个可能的解决方案是在每个save中允许(甚至要求)一个提供该信息的附加参数,该参数可用于确定是否需要发送消息。此修改的结构可能如下所示(对于 Artisan 应用程序代码库中的数据对象):

def save(self, origin:(str,)):
    """
Saves the instance's state-data to the back-end data-store by 
creating it if the instance is new, or updating it if the 
instance is dirty
"""
    # - Perform the data-save process as it currently exists
    if self.is_new and origin != 'artisan':
        # - Send "create" message
    elif self.is_dirty and origin != 'artisan':
        # - Send "update" message
    self._set_is_new(False)
    self._set_is_dirty(False)

BaseDataObject(当前定义了save的地方)和覆盖BaseDataObject.save方法的每个具体数据对象之间添加额外的抽象层是可行的。这种抽象——一种额外的 ABC——至少需要在 Artisan 应用程序和 Artisan 网关代码库中创建,而在中央办公室应用程序中也可能需要另一种变体,这取决于尚未充分探索的实现细节。

权衡的结果是,所有数据对象都必须注意其数据更改的来源。这感觉…凌乱、复杂,而且可能很难维持,至少乍一看是这样。

另一种可能是更改DaemonMessage:如果消息本身包含某些内容,例如指示其来源的数据,那么这些消息的处理程序将能够判断在处理数据更改后是否需要发送消息。在该设计场景中,由 Artisan 发出的Product更新消息(包括origin规范)可能如下所示(在转换为 JSON 之前):

{
    'operation':'update',
    'origin':'artisan',
    'data': {
        'target':'product',
        'properties':{
            'oid':str(new_order.oid),
            'name':'Revised Product Name',
            # - Other product-data skipped for brevity
        },
    },
    'signature':'signature hex-string'
}

ArtisanGatewayDaemon服务类中对应的update_product处理程序方法与其他处理程序方法一起,当前期望对dictproperties)进行操作,并由ArtisanGatewayDaemon._handle_message调用,因为服务的main循环读取要操作的消息。我们可以改变单个处理程序方法所期望的,而是传递原始的message(一个DaemonMessage实例),使处理程序方法负责将传入的message分解为properties并像它们已经做的那样对它们进行操作,并让他们负责确定是否需要发送一条消息并发送它。

给定一个带有originDaemonMessage,以及一个全局可访问的值来比较该来源,发送或不发送消息以及在需要时发送消息的决定并不复杂。如果它在网关服务中的任何位置(即,self是服务实例),它看起来或多或少会像这样:

# self.message_origin is an attribute containing 'gateway'
# - message is the incoming DaemonMessage instance
# - message.origin is 'artisan'
# - artisan is the relevant Artisan object
if message.origin == self.message_origin:
    sender = RabbitMQSender()
    outbound_message = DaemonMessage(
        operation=message.operation,
        origin=self.message_origin,
        data=message.data,
        signing_key=self.signing_key
    )
    sender.send_message(order_message, artisan.queue_id)

用于创建outbound_message的数据可能有所不同,这取决于是否使用了新创建或最近更新对象的数据字典或消息字典。

因此,当对传入的message进行操作时:

  • origin已检查
  • 如果该origin是本地的,则使用传入message的原始operation、本地originsigning_key以及任何合适的data创建并发送相应的outbound_message
  • 否则,将跳过整个分支

假设发送者不是在其他地方创建的,那么要添加的代码并不多——只有九行。对DaemonMessage的更改非常简单:添加origin属性,并确保它在任何地方(基本上,在operation属性已经被使用的任何地方)都被解释。在这一点上,这并不代表对现有代码的重大更改,到目前为止,我们只创建了用于订单创建和更新的出站消息。

如果存在症结,则需要获取与操作相关的Artisan实例,以便出站消息可以使用适当的消息队列(artisan.queue_id。然而,无论我们决定采用何种方法,这都是必要的,因此在这种情况下,这可能是一种清洗(这将使我们之前看到的修改save的想法更加复杂)。

即便如此,这也让人觉得是一种可靠的方法。此时对_handle_message的更改主要是参数和变量名的更改:

def _handle_message(self, message:(DaemonMessage,)) -> None:
    self.info(
        '%s._handle_message called:' % self.__class__.__name__
    )
    target = message.data.get('target')
    self.debug('+- target ....... (%s) %s' % (
        type(target).__name__, target)
    )
    self.debug('+- operation .... (%s) %s' % (
        type(message.operation).__name__, message.operation)
    )
    if message.operation == 'create':
        if target == 'artisan':
            self.create_artisan(message)

# ... removed for brevity

    elif message.operation == 'update':
        if target == 'artisan':
            self.update_artisan(message)
        elif target == 'customer':
            self.update_customer(message)
        elif target == 'order':
            self.update_order(message)
        elif target == 'product':
            self.update_product(message)
        else:
            raise RuntimeError(
                '%s error: "%s" (%s) is not a recognized '
                'object-type/target' % 
                (
                    self.__class__.__name__, target, 
                    type(target).__name__
                )
            )

    # ... removed for brevity

    else:
        raise RuntimeError(
            '%s error: "%s" (%s) is not a recognized '
            'operation' % 
            (
                self.__class__.__name__, message.operation, 
                type(message.operation).__name__
            )
        )

处理方法(以update_product为例)基本保持不变:

def update_product(self, message:(DaemonMessage,)) -> None:
    self.info('%s.update_product called' % self.__class__.__name__)
      if type(message) != DaemonMessage:
         raise TypeError(
             '%s.update_product expects a DaemonMessage '
             'instance, but was passed "%s" (%s)' % 
             (
                self.__class__.__name__, message, 
                type(message).__name__
             )
         )

我们仍然需要properties;我们只是在单个处理程序方法中获取它们,而不是在_handle_message中:

properties = message.data.get('properties')
self.debug('properties ... %s:' % (type(properties)))
self.debug(str(properties))

从该点到保存修改的对象,代码保持不变:

#   ... and save it.
new_object.save()
self.info('Product %s updated successfully' % new_object.oid)

然后我们可以检查是否需要发送出站消息,获取相关的Artisan,创建message,然后发送:

if message.origin == self.message_origin:
  # - Acquire the Artisan whose Product this is
  artisan = self.get_artisan_from_product(new_object)
  sender = RabbitMQSender()
  outbound_message = DaemonMessage(
       operation=message.operation,
       origin=message.origin,
       data=message.data,
       signing_key=self.signing_key
   )
   sender.send_message(order_message, artisan.queue_id)

由于从Product获取Artisan将是一个反复出现的主题,因此创建了一个助手方法(get_artisan_from_product来简化该过程。它还强调了最终需要在产品和工匠之间建立更直接的关联,但基于数据对象查询的流程目前就足够了:

def get_artisan_from_product(
       self, product:(UUID,str,BaseProduct)
    ) -> (Artisan):
    # TODO: Add artisan (owner) to Product classes, and use 
    #       that instead. For now, use this approach
    all_artisans = Artisan.get()
    if isinstance(product, BaseProduct):
       product = product.oid
    elif type(product) == str:
       product = UUID(product)
    for artisan in all_artisans:
        if product in [p.oid for p in artisan.products]:
           return artisan

在结束本章之前的最后一个考虑:当我们开始这一块开发时,关于消息队列是要实现为“所有工匠一个”还是“每个工匠一个”,仍然有一个待定的决定。没有做出正式的决定,但在考虑消息传递过程时,可能会出现其他考虑因素:

  • 每个 Artisan 至少需要两个独立的消息队列:一个用于 Artisan 的流量,另一个用于 Artisan 的流量。如果对所有流量实施了单个队列,则:
    • 必须修改代码,以包括一个origin(已完成)和一个destination,以确保网关在队列中丢弃的消息不会被网关读取
    • 即使这样,一条没有被适当的目的地读取和处理的消息几乎肯定会阻止队列中的其他消息被读取和处理,而不会有更多的代码更改和随之而来的复杂性
  • 如果每个 Artisan 都有一个用于入站和出站消息的不同消息队列,那么这一整套复杂问题就会简单地消失。有一些额外的工作需要提供一些方法来识别单个入站和出站队列,但是如果每个队列只处理一个方向上的流量,到一个 Artisan 应用程序和网关服务,这将大大简化事情,并且开发成本应该非常低。
  • 作为一个附带的好处,因为队列中的每条消息都可以立即与队列所属的工匠关联,这仅仅是因为它来自该队列。

拥有多个消息队列的唯一剩余成本是将存在多个队列,而这主要是由消息队列服务器承担的成本。

总结

本章中的开发工作分散在整个系统的代码库中,这主要是因为随着特定功能的展开,出现了一些需求缺口或实现需求和细节。理想情况下,在现实世界中,其中的大部分会在相当早的时候出现,并表示为迭代过程中附加到故事的特定任务,尽管有些任务可能仍然发生。我们在本章和前一章中做出了一些决定,决定了事情需要如何工作,在最初的故事分析练习中可能没有捕捉到这一点。

因此,此时的代码很可能已被破坏。也许彻底崩溃了。尽管如此,在迭代的故事方面还是取得了很多进展,即使其中没有一个可以正式结束:

  • 已经定义了处理系统组件之间所有数据流的基本功能,其中有一些具体实现将作为其他具体实现的起点
  • 完成消息传输和接收所需的更改已确定范围(如果未实现)
  • 对于如何以及何时需要发送这些消息已经有了坚实的(如果是基本的)理解

在大多数迭代故事被认为是完整的之前,还有很多工作要做,即使撇开 UI 的考虑,我们仍然没有可证明的消息流。下一章的重点将是最终确定这些内容,这将采取一种决定性的测试驱动方法,即使这不是一个正式的 TDD 过程。