假设我们有这样的任务。
股市中有交易来源。该来源通过Rest接口向我们发送交易。
我们需要获取这些事务,将它们保存到数据库中并进行方便的内存存储。
该存储库应执行以下功能:
- 返回交易清单;
- 返回完整头寸,即 表“工具”-“当前证券数量”;
- 给定工具的返回位置。
我们如何完成这项任务?
根据微服务方式的规范,我们需要将任务分为微服务组件:
- 休息收到的交易;
- 将交易保存到数据库;
- 内存中用于显示位置数据的存储区。
让我们在本教程的框架内制作第一个和第三个服务,将第二个服务留给第二部分(如果有兴趣,请在注释中写)。
因此,我们有两个微服务。
第一个从外部接收数据。
第二个处理此数据并响应传入的请求。
当然,我们希望获得水平扩展,不间断更新以及微服务的其他好处。
我们面前有什么艰巨的任务?
实际上有很多,但是现在让我们谈谈这些微服务之间数据将如何流动。您还可以在他们之间进行休息,可以安排一些队列,可以提出很多利弊的建议。
让我们看看一种可能的方法-通过Axon框架进行异步通信。
该解决方案的优点是什么?
首先,异步通信增加了灵活性(是的,这里有一个减号,但是到目前为止我们只在谈论优点)。
其次,我们可以直接使用Event Sourcing和CQRS。
第三,Axon提供了现成的基础架构,我们只需要专注于开发业务逻辑。
让我们开始吧。
我们将把这个项目搁浅。它将具有三个模块:
- 共同。具有通用数据结构的模块(我们不喜欢复制粘贴);
- tradeCreator。具有微服务的模块,用于接受Rest上的交易;
- tradeQueries。带有微服务的模块,用于显示位置。
让我们以Spring Boot为基础并连接Axon启动器。
没有Spring,Axon可以正常工作,但是我们将一起使用它们。
在这里,我们需要停下来,向您介绍有关Axon的一些话。
它是一个客户端-服务器系统。有一个服务器-这是一个单独的应用程序,我们将在docker中运行它。
还有一些将自己嵌入微服务的客户。
这是图片。首先,启动Axon服务器(在docker中),然后启动我们的微服务。
在启动时,微服务会寻找服务器并开始与其进行交互。交互可以有条件地分为两种类型:技术和业务。
技术上的一种是交换消息“我还活着”(可以在调试日志记录模式下看到此类消息)。
诸如“新交易”之类的消息遮盖了业务。
一个重要的功能是,在启动微服务后,它可以询问Axon服务器“发生了什么”,然后服务器将累积的事件发送到微服务。因此,微服务可以相对安全地重新启动而不会丢失数据。
通过这种交换方案,我们可以非常轻松地
在不同主机上运行许多微服务实例。
是的,到目前为止,Axon Server的一个实例并不可靠。
我们从事事件采购和CQRS范例。这意味着我们必须有“团队”,“事件”和“样本”。
我们将有一个命令:“创建交易”,一个事件“创建交易”和三个选择:“显示所有交易”,“显示头寸”,“显示工具的头寸”。
工作方案如下:
- TradeCreator微服务接受Rest交易。
- tradeCreator微服务创建一个“创建交易”命令,并将其发送到Axon服务器。
- Axon服务器接收命令并将命令发送给感兴趣的收件人,在本例中为tradeCreator微服务。
- tradeCreator微服务接收命令,生成一个“交易完成”事件,并将其发送到Axon服务器。
- Axon服务器接收事件并将其转发给感兴趣的订阅者。
- 现在,我们只有一个感兴趣的收件人-tradeQueries微服务。
- tradeQueries微服务接收事件并更新内部数据。
(重要的是,在事件形成的那一刻,tradeQueries微服务可能不可用,但是一旦开始,它将立即接收事件)。
是的,轴突服务器在通信中心中,所有消息都通过它。
让我们继续进行编码。
为了不使代码混乱,下面仅给出片段,下面是整个示例的链接。
让我们从通用模块开始。
其中,公共部分是事件(类CreatedTradeEvent)。注意名称,实际上,这是生成此事件的团队的名称,但使用过去时。过去,因为首先,出现命令,这将导致事件的创建。
其他常见的结构包括用于描述头寸的类别(Position类别),交易(Trade类)和交易面(枚举面)。买卖。
让我们继续到tradeCreator模块。
此模块具有用于接受交易的Rest接口(TradeController类)。
从收到的交易中形成命令“创建交易”,并将其发送到轴突服务器。
@PostMapping("/trade")
public ResponseEntity<String> create(@RequestBody Trade trade) {
var createTradeCommand = CreateTradeCommand.builder()
.tradeId(trade.getTradeId())
...
.build();
var result = commandGateway.sendAndWait(createTradeCommand, 3, TimeUnit.SECONDS);
return ResponseEntity.ok(result.get().toString());
}
为了处理该命令,使用了TradeAggregate类。
为了使Axon能够找到它,我们添加了@Aggregate批注。
处理命令的方法如下所示(缩写):
@CommandHandler
public TradeAggregate(CreateTradeCommand command) {
log.info("command: {}", command);
var event = CreatedTradeEvent.builder()
.tradeId(command.tradeId())
....
.build();
AggregateLifecycle.apply(event);
}
从命令生成一个事件,并将其发送到服务器。
该命令位于CreateTradeCommand类中。
现在,让我们看一下最后一个tradeQueries模块。
选择在查询包中描述。
该模块还具有
公共类TradeController Rest接口。
例如,让我们看一下请求的处理:“显示所有交易”。
@GetMapping("/trade/all")
public List<Trade> findAllTrades() {
return queryGateway.query(new FindAllTradesQuery(),
ResponseTypes.multipleInstancesOf(Trade.class)).join();
}
创建获取请求并将其发送到服务器。
TradesEventHandler类用于处理获取请求。
它有一个注释的方法
@QueryHandler
public List<Position> handleFindCurrentPositionQuery(FindCurrentPositionQuery query)
正是他负责从内存中获取数据。
出现有关该商店中信息如何更新的问题。
首先,这只是针对特定选择量身定制的ConcurrentHashMap的集合。
要更新它们,将应用该方法:
@EventHandler
public void on(CreatedTradeEvent event) {
log.info("event:{}", event);
var trade = Trade.builder()
...
.build();
trades.put(event.tradeId(), trade);
position.merge(event.shortName(), event.size(),
(oldValue, value) -> event.side() == Side.BUY ? oldValue + value : oldValue - value);
}
它接收“创建交易”事件并更新地图。
这些是微服务开发的亮点。
轴突的缺点呢?
首先,这是基础架构的复杂性,出现了一个故障点-Axon服务器,所有通信都通过它。
其次,这种分布式系统的缺点非常明显,即临时数据不一致。在我们的情况下,在接收新交易和更新样本数据之间可能会花费不可接受的长时间。
幕后还剩下什么?
关于事件源和CQRS,它是什么以及它的用途,一无所知。
在不公开这些概念的情况下,有些要点可能不清楚。
也许某些代码片段也需要澄清。
我们在一个公开的网络研讨会上谈到了这一点。
完整的例子。