渡渡鸟的审核不是基于纸质的:审核员拥有一台平板电脑,审核员可以在其中记录所有产品并创建报告。但是直到2020年,比萨店的修改才完全在纸上进行-只是因为这样变得越来越容易。当然,这会导致数据不准确,错误和丢失-人们会犯错误,纸张会丢失,甚至还有更多。我们决定解决此问题并改进平板电脑方式。该实现决定使用DDD。我们是如何做到的,我们将进一步告诉您。
首先,简要介绍业务流程以了解上下文。考虑产品流程图以及其中的修订内容,然后继续介绍许多技术细节。
产品移动方案以及为何需要修订
我们的网络中有600多家比萨店(而且这个数字还将继续增长)。每天,每种原料中都存在原材料的移动:从产品的准备和销售,按有效期开始的配料冲销到原材料向链中其他比萨店的移动。比萨店的天平经常包含约120种产品生产所必需的物品,此外还包含许多消耗品,家用材料和化学药品,以保持比萨店的清洁。所有这些都需要“核算”,以便知道哪些原材料丰富而哪些缺少。
“会计”描述了比萨店中原材料的任何流动。交货是资产负债表上的一个加号,而冲销则是一个减号。例如,当我们订购披萨时,出纳员接受订单并将其发送进行处理。然后将面团rolled出并塞入奶酪,番茄酱和意大利辣香肠等配料。所有这些产品已投入生产-已注销。同样,当到期日期结束时,可能会发生注销。
由于交货和注销,形成了“仓库余额”。该报告根据信息系统中的操作反映了资产负债表上有多少原材料。这就是“结算余额”。但是有一个“实际价值”-现在实际上库存了多少原材料。
修订版
要计算实际值,请使用“修订”(它们也称为“库存”)。
审核有助于准确计算采购的原材料数量。太多的购买将冻结营运资金,冲销多余产品的风险将会增加,这也导致损失。不仅原料过剩是危险的,而且原料过剩也很危险-这可能导致某些产品的生产停顿,从而导致收入减少。审计有助于了解由于原材料损失和未确认损失而导致企业获得的利润减少了多少,并且可以降低成本。
修订版会共享其数据,并适当考虑进一步处理,例如建筑报告。
修订过程中的问题或旧修订的工作方式
修订是一个费力的过程。它花费大量时间,并包括以下几个阶段:计数和固定原材料的剩余量,按存储区域汇总原材料的结果,然后将结果输入Dodo IS信息系统。
以前,审计是使用笔和纸的形式进行的,上面有原材料清单。手动汇总,核对结果并将结果传输到Dodo IS时,可能会出错。在全面审核中,计算了超过100种原材料,并且计算本身通常是在傍晚或清晨进行,这可能会导致浓度下降。
如何解决问题
我们的线程游戏团队正在开发比萨店的会计。我们决定启动一个名为“审计员的平板电脑”的项目,该项目将简化对比萨饼店的审计。我们决定在自己的信息系统Dodo IS中进行所有操作,该系统中实现了会计的主要组件,因此我们不需要与第三方系统集成。此外,我们在场的所有国家/地区都将能够使用该工具,而无需诉诸其他集成。
甚至在开始该项目之前,我们团队中的成员都讨论了在实践中应用DDD的愿望。幸运的是,其中一个项目已经成功应用了此方法,因此我们有一个示例可供您查看-这是“收银台”项目。
在本文中,我将讨论我们在开发中使用的战术DDD模式:聚合,命令,域事件,应用程序服务和有界上下文集成。我们不会描述DDD的战略模式和基本原理,否则本文将很长。我们已经在文章“您可以在10分钟内了解有关域驱动设计的内容吗?”
新版本的修订
在开始审核之前,您需要知道要精确计算的内容。为此,我们需要修订模板。它们由“办公室管理员”角色配置。修订模板是InventoryTemplate实体。它包含以下字段:
- 模板标识符;
- 比萨店ID;
- 模板名称;
- 修订类别:每月,每周,每天;
- 单位;
- 储存区和该储存区中的原材料
对于这个实体,已经实现了CRUD功能,我们将不对其进行详细介绍。
审核员获得模板列表后,即可开始审核。通常在关闭比萨店时会发生这种情况。目前,没有订单,原料也没有移动-您可以可靠地获取余额数据。
开始审核时,审核员选择一个区域,例如冰箱,然后开始计算那里的原材料。在冰箱里,他看到5包奶酪,每包10公斤,将10公斤* 5输入计算器,按“输入更多”。然后,他在顶部架子上看到另外2个包装,然后单击“添加”。结果,他进行了2次测量-每次测量50公斤和20公斤。
测光我们称检查员在某个区域输入的原材料数量,但不一定是总数。审核员可以输入两个1公斤的测量值,也可以输入2公斤的测量值-可以任意组合。最主要的是,审核员本人应保持清楚。
计算器界面。
因此,审核员将在1-2个小时内逐步考虑所有原材料,然后完成审核。
动作算法非常简单:
- 审核员可以开始审核;
- 审核员可以在开始的修订中添加度量;
- 审核员可以完成审核。
根据该算法,形成了系统的业务需求。
域的聚合,命令和事件的第一个版本的实现
首先,让我们定义DDD战术模板集中包含的术语。我们将在本文中引用它们。
战术DDD模板
聚合是实体和价值对象的集群。就数据修改而言,集群中的对象是单个实体。每个聚合都有一个根元素,通过它可以访问实体和值。单元不应设计得太大。它们将消耗大量内存,并且成功完成事务的可能性会降低。
聚集边界是一组对象,这些对象必须在单个事务中保持一致:必须遵守该群集中的所有不变量。
不变是不能不一致的业务规则。
命令是对单元的某种动作。作为此操作的结果,可以更改聚合的状态,并可以生成一个或多个域事件。
域事件是维持一致性所需的聚合状态更改的通知。汇总可确保事务的一致性:必须立即更改所有数据。由此产生的一致性保证了长期的一致性-数据将更改,但不会在此时和现在更改,而是会无限期地更改。此间隔取决于许多因素:消息队列的拥塞,外部服务准备处理这些消息,网络。
根元素是具有唯一全局标识符的实体。子元素只能在整个集合中具有本地身份。它们可以互相引用,并且只能引用其根元素。
团队与活动
让我们以团队形式描述业务需求。命令只是具有描述性字段的DTO。
命令“添加测量”具有以下字段:
- 测量值-特定度量单位中的原材料数量,如果已删除度量,则可以为空;
- 版本-可以编辑度量,因此需要版本;
- 原材料标识符;
- 计量单位:公斤/克,升/毫升,件;
- 存储区标识符。
测量添加命令代码
public sealed class AddMeasurementCommand
{
// ctor
public double? Value { get; }
public int Version { get; }
public UUId MaterialTypeId { get; }
public UUId MeasurementId { get; }
public UnitOfMeasure UnitOfMeasure { get; }
public UUId InventoryZoneId { get; }
}
我们还需要一个由这些命令的执行产生的事件。我们使用界面标记事件
IPublicInventoryEvent
-将来将需要它与外部消费者集成。
在事件“度量”中,该字段与命令“添加度量”中的相同,除了事件还存储发生它的单位的标识符及其版本。
事件代码“冻结”
public class MeasurementEvent : IPublicInventoryEvent
{
public UUId MaterialTypeId { get; set; }
public double? Value { get; set; }
public UUId MeasurementId { get; set; }
public int MeasurementVersion { get; set; }
public UUId AggregateId { get; set; }
public int Version { get; set; }
public UnitOfMeasure UnitOfMeasure { get; set; }
public UUId InventoryZoneId { get; set; }
}
当我们描述了命令和事件后,就可以实现聚合了
Inventory
。
实施库存汇总
UML汇总图清单。
方法是这样的:修订的开始会启动聚合的创建
Inventory
,为此,我们使用factory方法,Create
并使用command开始修订StartInventoryCommand
。
每个命令都会改变聚合状态并将事件保存在list中
changes
,该事件将被发送到存储进行记录。而且,基于这些更改,将生成外部事件。创建
聚合后
Inventory
,我们可以为每个后续请求更改其状态而将其还原。
changes
自上次还原本机以来,已存储更改()。
- 通过
Restore
在聚合的当前实例上播放所有先前事件(按版本排序)的方法来恢复状态Inventory
。
这是该想法
Event Sourcing
在单位内的实施。稍后我们Event Sourcing
将讨论如何在存储库的框架中实现该想法。沃恩·弗农(Vaughn Vernon)的书中有一个很好的例证:
通过按事件发生的顺序应用事件,可以恢复单元的状态。
然后,团队进行了几次测量
AddMeasurementCommand
。审核以命令结束FinishInventoryCommand
。集合通过变异方法验证其状态以符合其不变性。
重要的是要注意,该设备
Inventory
以及所有度量均已完全版本化。测量更加困难-您必须解决事件处理方法中的冲突When(MeasurementEvent e)
。在代码中,我只会显示命令的处理AddMeasurementCommand
。
总库存代码
public sealed class Inventory : IEquatable<Inventory>
{
private readonly List<IInventoryEvent> _changes = new List<IInventoryEvent>();
private readonly List<InventoryMeasurement> _inventoryMeasurements = new List<InventoryMeasurement>();
internal Inventory(UUId id, int version, UUId unitId, UUId inventoryTemplateId,
UUId startedBy, InventoryState state, DateTime startedAtUtc, DateTime? finishedAtUtc)
: this(id)
{
Version = version;
UnitId = unitId;
InventoryTemplateId = inventoryTemplateId;
StartedBy = startedBy;
State = state;
StartedAtUtc = startedAtUtc;
FinishedAtUtc = finishedAtUtc;
}
private Inventory(UUId id)
{
Id = id;
Version = 0;
State = InventoryState.Unknown;
}
public UUId Id { get; private set; }
public int Version { get; private set; }
public UUId UnitId { get; private set; }
public UUId InventoryTemplateId { get; private set; }
public UUId StartedBy { get; private set; }
public InventoryState State { get; private set; }
public DateTime StartedAtUtc { get; private set; }
public DateTime? FinishedAtUtc { get; private set; }
public ReadOnlyCollection<IInventoryEvent> Changes => _changes.AsReadOnly();
public ReadOnlyCollection<InventoryMeasurement> Measurements => _inventoryMeasurements.AsReadOnly();
public static Inventory Restore(UUId inventoryId, IInventoryEvent[] events)
{
var inventory = new Inventory(inventoryId);
inventory.ReplayEvents(events);
return inventory;
}
public static Inventory Restore(UUId id, int version, UUId unitId, UUId inventoryTemplateId,
UUId startedBy, InventoryState state, DateTime startedAtUtc, DateTime? finishedAtUtc,
InventoryMeasurement[] measurements)
{
var inventory = new Inventory(id, version, unitId, inventoryTemplateId,
startedBy, state, startedAtUtc, finishedAtUtc);
inventory._inventoryMeasurements.AddRange(measurements);
return inventory;
}
public static Inventory Create(UUId inventoryId)
{
if (inventoryId == null)
{
throw new ArgumentNullException(nameof(inventoryId));
}
return new Inventory(inventoryId);
}
public void ReplayEvents(params IInventoryEvent[] events)
{
if (events == null)
{
throw new ArgumentNullException(nameof(events));
}
foreach (var @event in events.OrderBy(e => e.Version))
{
Mutate(@event);
}
}
public void AddMeasurement(AddMeasurementCommand command)
{
if (command == null)
{
throw new ArgumentNullException(nameof(command));
}
Apply(new MeasurementEvent
{
AggregateId = Id,
Version = Version + 1,
UnitId = UnitId,
Value = command.Value,
MeasurementVersion = command.Version,
MaterialTypeId = command.MaterialTypeId,
MeasurementId = command.MeasurementId,
UnitOfMeasure = command.UnitOfMeasure,
InventoryZoneId = command.InventoryZoneId
});
}
private void Apply(IInventoryEvent @event)
{
Mutate(@event);
_changes.Add(@event);
}
private void Mutate(IInventoryEvent @event)
{
When((dynamic) @event);
Version = @event.Version;
}
private void When(MeasurementEvent e)
{
var existMeasurement = _inventoryMeasurements.SingleOrDefault(x => x.MeasurementId == e.MeasurementId);
if (existMeasurement is null)
{
_inventoryMeasurements.Add(new InventoryMeasurement
{
Value = e.Value,
MeasurementId = e.MeasurementId,
MeasurementVersion = e.MeasurementVersion,
PreviousValue = e.PreviousValue,
MaterialTypeId = e.MaterialTypeId,
UserId = e.By,
UnitOfMeasure = e.UnitOfMeasure,
InventoryZoneId = e.InventoryZoneId
});
}
else
{
if (!existMeasurement.Value.HasValue)
{
throw new InventoryInvalidStateException("Change removed measurement");
}
if (existMeasurement.MeasurementVersion == e.MeasurementVersion - 1)
{
existMeasurement.Value = e.Value;
existMeasurement.MeasurementVersion = e.MeasurementVersion;
existMeasurement.UnitOfMeasure = e.UnitOfMeasure;
existMeasurement.InventoryZoneId = e.InventoryZoneId;
}
else if (existMeasurement.MeasurementVersion < e.MeasurementVersion)
{
throw new MeasurementConcurrencyException(Id, e.MeasurementId, e.Value);
}
else if (existMeasurement.MeasurementVersion == e.MeasurementVersion &&
existMeasurement.Value != e.Value)
{
throw new MeasurementConcurrencyException(Id, e.MeasurementId, e.Value);
}
else
{
throw new NotChangeException();
}
}
}
// Equals
// GetHashCode
}
当“已测量”事件发生时,将检查具有此标识符的现有测量的存在。如果不是这种情况,则添加新的测量。
如果是这样,则需要进行其他检查:
- 您无法编辑远程测量;
- 传入的版本必须大于上一个版本。
如果满足条件,我们可以为现有测量设置新值和新版本。如果版本较小,则存在冲突。为此,我们抛出一个异常
MeasurementConcurrencyException
。如果版本匹配并且值不同,那么这也是一种冲突情况。好吧,如果版本和值都匹配,则不会发生任何更改。通常不会出现这种情况。
“测量”实体包含与“添加测量”命令完全相同的字段。
实体代码“冻结”
public class InventoryMeasurement
{
public UUId MeasurementId { get; set; }
public UUId MaterialTypeId { get; set; }
public UUId UserId { get; set; }
public double? Value { get; set; }
public int MeasurementVersion { get; set; }
public UnitOfMeasure UnitOfMeasure { get; set; }
public UUId InventoryZoneId { get; set; }
}
单元测试很好地证明了公共汇总方法的使用。
单元测试代码“在修订开始后添加测量”
[Fact]
public void WhenAddMeasurementAfterStartInventory_ThenInventoryHaveOneMeasurement()
{
var inventoryId = UUId.NewUUId();
var inventory = Domain.Inventories.Entities.Inventory.Create(inventoryId);
var unitId = UUId.NewUUId();
inventory.StartInventory(Create.StartInventoryCommand()
.WithUnitId(unitId)
.Please());
var materialTypeId = UUId.NewUUId();
var measurementId = UUId.NewUUId();
var measurementVersion = 1;
var value = 500;
var cmd = Create.AddMeasurementCommand()
.WithMaterialTypeId(materialTypeId)
.WithMeasurement(measurementId, measurementVersion)
.WithValue(value)
.Please();
inventory.AddMeasurement(cmd);
inventory.Measurements.Should().BeEquivalentTo(new InventoryMeasurement
{
MaterialTypeId = materialTypeId,
MeasurementId = measurementId,
MeasurementVersion = measurementVersion,
Value = value,
UnitOfMeasure = UnitOfMeasure.Quantity
});
}
放在一起:命令,事件,库存汇总
运行“完成库存”时库存的总生命周期。
该图显示了命令处理的过程
FinishInventoryCommand
。在处理之前,有必要Inventory
在命令执行时恢复单元的状态。为此,我们将在本机上执行的所有事件加载到内存中并进行播放(第1页)。
修订完成时,我们已经发生了以下事件-修订的开始和三个度量的增加。这些事件是作为命令处理
StartInventoryCommand
和的结果出现的AddMeasurementCommand
。在数据库中,表中的每一行都包含修订ID,版本和事件本身。
在这个阶段,我们执行命令
FinishInventoryCommand
(第2页)。此命令将首先检查单元当前状态的有效性-修订版处于state状态InProgress
,然后将生成新的状态更改,向FinishInventoryEvent
列表中添加一个事件changes
(项目3)。
命令完成后,所有更改将保存到数据库。结果,带有事件的新行
FinishInventoryEvent
和单元的最新版本将出现在数据库中(第4页)。
类型
Inventory
(修订)-与其嵌套实体相关的集合和根元素。因此,类型Inventory
定义了单元的边界。聚合边界包括类型Measurement
(度量)的实体列表,以及对聚合执行的所有事件的列表(changes
)。
整个功能的实现
通过功能,我们意味着实现特定的业务需求。在我们的示例中,我们将考虑“添加度量”功能。要实现此功能,我们需要了解“应用程序服务”(
ApplicationService
)的概念。
应用程序服务是域模型的直接客户端。应用程序服务在使用ACID数据库时保证交易,从而确保状态转换是原子保留的。此外,应用程序服务还解决了安全问题。
我们已经有一个单位
Inventory
... 为了实现整个功能,我们将完全使用应用程序服务。在其中,您需要检查所有连接的实体的存在以及用户的访问权限。只有在满足所有条件之后,才可以保存设备的当前状态并将事件发送到外界。为了实现应用程序服务,我们使用MediatR
。
功能代码“添加测量”
public class AddMeasurementChangeHandler
: IRequestHandler<AddMeasurementChangeRequest, AddMeasurementChangeResponse>
{
// dependencies
// ctor
public async Task<AddMeasurementChangeResponse> Handle(
AddMeasurementChangeRequest request,
CancellationToken ct)
{
var inventory =
await _inventoryRepository.GetAsync(request.AddMeasurementChange.InventoryId, ct);
if (inventory == null)
{
throw new NotFoundException($"Inventory {request.AddMeasurementChange.InventoryId} is not found");
}
var user = await _usersRepository.GetAsync(request.UserId, ct);
if (user == null)
{
throw new SecurityException();
}
var hasPermissions =
await _authPermissionService.HasPermissionsAsync(request.CountryId, request.Token, inventory.UnitId, ct);
if (!hasPermissions)
{
throw new SecurityException();
}
var unit = await _unitRepository.GetAsync(inventory.UnitId, ct);
if (unit == null)
{
throw new InvalidRequestDataException($"Unit {inventory.UnitId} is not found");
}
var unitOfMeasure =
Enum.Parse<UnitOfMeasure>(request.AddMeasurementChange.MaterialTypeUnitOfMeasure);
var addMeasurementCommand = new AddMeasurementCommand(
request.AddMeasurementChange.Value,
request.AddMeasurementChange.Version,
request.AddMeasurementChange.MaterialTypeId,
request.AddMeasurementChange.Id,
unitOfMeasure,
request.AddMeasurementChange.InventoryZoneId);
inventory.AddMeasurement(addMeasurementCommand);
await HandleAsync(inventory, ct);
return new AddMeasurementChangeResponse(request.AddMeasurementChange.Id, user.Id, user.GetName());
}
private async Task HandleAsync(Domain.Inventories.Entities.Inventory inventory, CancellationToken ct)
{
await _inventoryRepository.AppendEventsAsync(inventory.Changes, ct);
try
{
await _localQueueDataService.Publish(inventory.Changes, ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "error occured while handling action");
}
}
}
活动采购
在实施过程中,出于以下几个原因,我们决定选择ES方法:
- 渡渡鸟有成功使用此方法的示例。
- ES使事件期间的问题更容易理解-所有用户操作都已存储。
- 如果采用传统方法,则将无法使用ES。
实现的想法很简单-我们将由于命令而出现的所有新事件添加到数据库中。为了还原聚合,我们接收所有事件并在实例上播放它们。为了避免每次都获取大量事件,我们每N个事件删除一次状态,并播放此快照的其余部分。
库存汇总商店ID
internal sealed class InventoryRepository : IInventoryRepository
{
// dependencies
// ctor
static InventoryRepository()
{
EventTypes = typeof(IEvent)
.Assembly.GetTypes().Where(x => typeof(IEvent).IsAssignableFrom(x))
.ToDictionary(t => t.FullName, x => x);
}
public async Task AppendAsync(IReadOnlyCollection<IEvent> events, CancellationToken ct)
{
using (var session = await _dbSessionFactory.OpenAsync())
{
if (events.Count == 0) return;
try
{
foreach (var @event in events)
{
await session.ExecuteAsync(Sql.AppendEvent,
new
{
@event.AggregateId,
@event.Version,
@event.UnitId,
Type = @event.GetType().FullName,
Data = JsonConvert.SerializeObject(@event),
CreatedDateTimeUtc = DateTime.UtcNow
}, cancellationToken: ct);
}
}
catch (MySqlException e)
when (e.Number == (int) MySqlErrorCode.DuplicateKeyEntry)
{
throw new OptimisticConcurrencyException(events.First().AggregateId, "");
}
}
}
public async Task<Domain.Models.Inventory> GetInventoryAsync(
UUId inventoryId,
CancellationToken ct)
{
var events = await GetEventsAsync(inventoryId, 0, ct);
if (events.Any()) return Domain.Models.Inventory.Restore(inventoryId, events);
return null;
}
private async Task<IEvent[]> GetEventsAsync(
UUId id,
int snapshotVersion,
CancellationToken ct)
{
using (var session = await _dbSessionFactory.OpenAsync())
{
var snapshot = await GetInventorySnapshotAsync(session, inventoryId, ct);
var version = snapshot?.Version ?? 0;
var events = await GetEventsAsync(session, inventoryId, version, ct);
if (snapshot != null)
{
snapshot.ReplayEvents(events);
return snapshot;
}
if (events.Any())
{
return Domain.Inventories.Entities.Inventory.Restore(inventoryId, events);
}
return null;
}
}
private async Task<Inventory> GetInventorySnapshotAsync(
IDbSession session,
UUId id,
CancellationToken ct)
{
var record =
await session.QueryFirstOrDefaultAsync<InventoryRecord>(Sql.GetSnapshot, new {AggregateId = id},
cancellationToken: ct);
return record == null ? null : Map(record);
}
private async Task<IInventoryEvent[]> GetEventsAsync(
IDbSession session,
UUId id,
int snapshotVersion,
CancellationToken ct)
{
var rows = await session.QueryAsync<EventRecord>(Sql.GetEvents,
new
{
AggregateId = id,
Version = snapshotVersion
}, cancellationToken: ct);
return rows.Select(Map).ToArray();
}
private static IEvent Map(EventRecord e)
{
var type = EventTypes[e.Type];
return (IEvent) JsonConvert.DeserializeObject(e.Data, type);
}
}
internal class EventRecord
{
public string Type { get; set; }
public string Data { get; set; }
}
经过几个月的运作,我们意识到我们并不需要在单元实例上存储所有用户操作。企业不会以任何方式使用此信息。话虽这么说,维护这种方法还是有开销的。在评估了所有利弊之后,我们计划从ES转向传统方法-
Events
用Inventories
和替换标牌Measurements
。
与外部有界上下文集成
这就是有限的上下文
Inventory
与外界交互的方式。
修订上下文与其他上下文的交互。该图显示了上下文,服务及其彼此之间的归属。
在的情况下
Auth
,Inventory
并且Datacatalog
,对于每一个服务一个界上下文。整体执行一些功能,但是现在我们只对比萨店的会计功能感兴趣。除修订外,会计还包括比萨店中原材料的流动:收据,转移,注销。
HTTP
该服务通过HTTP
Inventory
与之交互Auth
。首先,用户面对Auth
,提示用户选择对他可用的角色之一。
- 系统具有角色“审计员”,用户可以在审计过程中选择该角色。
- .
- .
在最后阶段,用户拥有来自的令牌
Auth
。修订服务必须验证此令牌,因此它要求Auth
进行验证。Auth
将检查令牌的生存期是否已过期,它是否属于所有者或是否具有必要的访问权限。如果一切正常,则将Inventory
邮票保存在cookie中-用户ID,登录名,比萨店ID并设置cookie的生存期。
注意。
Auth
我们在“授权的精妙之处:OAuth 2.0技术概述”一文中详细描述了该服务的工作方式。
它
Inventory
通过消息队列与其他服务交互。该公司使用RabbitMQ作为消息代理,以及上面的绑定-MassTransit。
RMQ:消费事件
目录服务-
Datacatalog
将提供Inventory
所有必要的实体:会计,国家,部门和比萨店的原材料。
在不详细介绍基础架构的情况下,我将描述使用事件的基本思想。在目录服务方面,一切已经准备就绪,可以发布事件,让我们看一下原始示例。
Datacatalog事件合同代码
namespace Dodo.DataCatalog.Contracts.Products.v1
{
public class MaterialType
{
public UUId Id { get; set; }
public int Version { get; set; }
public int CountryId { get; set; }
public UUId DepartmentId { get; set; }
public string Name { get; set; }
public MaterialCategory Category { get; set; }
public UnitOfMeasure BasicUnitOfMeasure { get; set; }
public bool IsRemoved { get; set; }
}
public enum UnitOfMeasure
{
Quantity = 1,
Gram = 5,
Milliliter = 7,
Meter = 8,
}
public enum MaterialCategory
{
Ingredient = 1,
SemiFinishedProduct = 2,
FinishedProduct = 3,
Inventory = 4,
Packaging = 5,
Consumables = 6
}
}
这篇文章发表在
exchange
。每个服务都可以创建自己的捆绑包exchange-queue
来使用事件。
通过RMQ原语发布事件及其消耗的方案。
最终,服务可以订阅的每个实体都有一个队列。剩下的就是将新版本保存到数据库。
来自Datacatalog的事件使用者代码
public class MaterialTypeConsumer : IConsumer<Dodo.DataCatalog.Contracts.Products.v1.MaterialType>
{
private readonly IMaterialTypeRepository _materialTypeRepository;
public MaterialTypeConsumer(IMaterialTypeRepository materialTypeRepository)
{
_materialTypeRepository = materialTypeRepository;
}
public async Task Consume(ConsumeContext<Dodo.DataCatalog.Contracts.Products.v1.MaterialType> context)
{
var materialType = new AddMaterialType(context.Message.Id,
context.Message.Name,
(int)context.Message.Category,
(int)context.Message.BasicUnitOfMeasure,
context.Message.CountryId,
context.Message.DepartmentId,
context.Message.IsRemoved,
context.Message.Version);
await _materialTypeRepository.SaveAsync(materialType, context.CancellationToken);
}
}
RMQ:发布事件
整体的计费部分使用数据
Inventory
来支持其余功能,而这些功能则需要修订数据。我们要通知其他服务的所有事件,都用interface标记IPublicInventoryEvent
。当此类事件发生时,我们将它们与changelog(changes
)隔离开,并将其发送到调度队列。为此,使用了两个表publicqueue
和publicqueue_archive
。
为了保证消息的传递,我们使用一种通常称为“本地队列”的模式,即
Transactional outbox pattern
。保存聚合状态Inventory
并将事件发送到本地队列是在一个事务中进行的。交易一旦提交,我们将立即尝试将消息发送给经纪人。
如果消息已发送,则将其从队列中删除
publicqueue
。否则,将尝试稍后发送消息。然后,单片和数据管道的订户使用消息。该表publicqueue_archive
永久存储数据,以便在某些时候需要时方便地重新分配事件。
用于将事件发布到消息代理的代码
internal sealed class BusDataService : IBusDataService
{
private readonly IPublisherControl _publisherControl;
private readonly IPublicQueueRepository _repository;
private readonly EventMapper _eventMapper;
public BusDataService(
IPublicQueueRepository repository,
IPublisherControl publisherControl,
EventMapper eventMapper)
{
_repository = repository;
_publisherControl = publisherControl;
_eventMapper = eventMapper;
}
public async Task ConsumePublicQueueAsync(int batchEventSize, CancellationToken cancellationToken)
{
var events = await _repository.GetAsync(batchEventSize, cancellationToken);
await Publish(events, cancellationToken);
}
public async Task Publish(IEnumerable<IPublicInventoryEvent> events, CancellationToken ct)
{
foreach (var @event in events)
{
var publicQueueEvent = _eventMapper.Map((dynamic) @event);
await _publisherControl.Publish(publicQueueEvent, ct);
await _repository.DeleteAsync(@event, ct);
}
}
}
我们将事件发送到整体报告。盈亏报告允许您将任何两个修订版本相互比较。此外,还有一个重要的报告“仓库余额”,前面已经提到过。
为什么将事件发送到数据管道?都是一样的-对于报告,但仅在新的轨道上。以前,所有报告都存储在一个整体中,但现在已被删除。这分担两个责任-生产和分析数据的存储和处理:OLTP和OLAP。这对于基础设施和发展都非常重要。
结论
通过遵循域驱动设计的原则和实践,我们设法构建了一个可靠且灵活的系统,可以满足用户的业务需求。我们不仅提供了不错的产品,而且还提供了易于修改的优质代码。我们希望在您的项目中会有使用域驱动设计的地方。
您可以在我们的DDDevotion社区和DDDevotion Youtube频道上找到有关DDD的更多信息。您可以在Dodo Engineering聊天室的Telegram中讨论该文章。