通过Axon进行通信的微服务

在这个简单的教程中,我们将在Spring Boot上创建几个微服务,并通过Axon框架组织它们之间的通信。










假设我们有这样的任务。



股市中有交易来源。该来源通过Rest接口向我们发送交易。



我们需要获取这些事务,将它们保存到数据库中并进行方便的内存存储。



该存储库应执行以下功能:



  • 返回交易清单;
  • 返回完整头寸,即 表“工具”-“当前证券数量”;
  • 给定工具的返回位置。


我们如何完成这项任务?



根据微服务方式的规范,我们需要将任务分为微服务组件:



  • 休息收到的交易;
  • 将交易保存到数据库;
  • 内存中用于显示位置数据的存储区。


让我们在本教程的框架内制作第一个和第三个服务,将第二个服务留给第二部分(如果有兴趣,请在注释中写)。



因此,我们有两个微服务。



第一个从外部接收数据。



第二个处理此数据并响应传入的请求。



当然,我们希望获得水平扩展,不间断更新以及微服务的其他好处。



我们面前有什么艰巨的任务?



实际上有很多,但是现在让我们谈谈这些微服务之间数据将如何流动。您还可以在他们之间进行休息,可以安排一些队列,可以提出很多利弊的建议。



让我们看看一种可能的方法-通过Axon框架进行异步通信



该解决方案的优点是什么?



首先,异步通信增加了灵活性(是的,这里有一个减号,但是到目前为止我们只在谈论优点)。



其次,我们可以直接使用Event SourcingCQRS

第三,Axon提供了现成的基础架构,我们只需要专注于开发业务逻辑。



让我们开始吧。



我们将把这个项目搁浅。它将具有三个模块:



  • 共同。具有通用数据结构的模块(我们不喜欢复制粘贴);
  • tradeCreator。具有微服务的模块,用于接受Rest上的交易;
  • tradeQueries。带有微服务的模块,用于显示位置。


让我们以Spring Boot为基础并连接Axon启动器。



没有Spring,Axon可以正常工作,但是我们将一起使用它们。



在这里,我们需要停下来,向您介绍有关Axon的一些话。



它是一个客户端-服务器系统。有一个服务器-这是一个单独的应用程序,我们将在docker中运行它。



还有一些将自己嵌入微服务的客户。

这是图片。首先,启动Axon服务器(在docker中),然后启动我们的微服务。



在启动时,微服务会寻找服务器并开始与其进行交互。交互可以有条件地分为两种类型:技术和业务。



技术上的一种是交换消息“我还活着”(可以在调试日志记录模式下看到此类消息)。



诸如“新交易”之类的消息遮盖了业务。



一个重要的功能是,在启动微服务后,它可以询问Axon服务器“发生了什么”,然后服务器将累积的事件发送到微服务。因此,微服务可以相对安全地重新启动而不会丢失数据。

通过这种交换方案,我们可以非常轻松地

在不同主机上运行许多微服务实例



是的,到目前为止,Axon Server的一个实例并不可靠。



我们从事事件采购和CQRS范例。这意味着我们必须有“团队”,“事件”和“样本”。



我们将有一个命令:“创建交易”,一个事件“创建交易”和三个选择:“显示所有交易”,“显示头寸”,“显示工具的头寸”。



工作方案如下:



  1. TradeCreator微服务接受Rest交易。
  2. tradeCreator微服务创建一个“创建交易”命令,并将其发送到Axon服务器。
  3. Axon服务器接收命令并将命令发送给感兴趣的收件人,在本例中为tradeCreator微服务。
  4. tradeCreator微服务接收命令,生成一个“交易完成”事件,并将其发送到Axon服务器。
  5. Axon服务器接收事件并将其转发给感兴趣的订阅者。
  6. 现在,我们只有一个感兴趣的收件人-tradeQueries微服务。
  7. tradeQueries微服务接收事件并更新内部数据。


(重要的是,在事件形成的那一刻,tradeQueries微服务可能不可用,但是一旦开始,它将立即接收事件)。



是的,轴突服务器在通信中心中,所有消息都通过它。



让我们继续进行编码。



为了不使代码混乱,下面仅给出片段,下面是整个示例的链接。



让我们从通用模块开始。



其中,公共部分是事件(类CreatedTradeEvent)。注意名称,实际上,这是生成此事件的团队的名称,但使用过去时。过去,因为首先,出现命令,这将导致事件的创建。



其他常见的结构包括用于描述头寸的类别(Position类别),交易(Trade类)和交易面(枚举面)。买卖。



让我们继续到tradeCreator模块。



此模块具有用于接受交易的Rest接口(TradeController类)。

从收到的交易中形成命令“创建交易”,并将其发送到轴突服务器。



    @PostMapping("/trade")
    public ResponseEntity<String> create(@RequestBody Trade trade) {
        var createTradeCommand = CreateTradeCommand.builder()
                .tradeId(trade.getTradeId())
	...
                .build();
        var result = commandGateway.sendAndWait(createTradeCommand, 3, TimeUnit.SECONDS);
        return ResponseEntity.ok(result.get().toString());
    }


为了处理该命令,使用了TradeAggregate类。

为了使Axon能够找到它,我们添加了@Aggregate批注。

处理命令的方法如下所示(缩写):



    @CommandHandler
    public TradeAggregate(CreateTradeCommand command) {
        log.info("command: {}", command);
        var event = CreatedTradeEvent.builder()
                .tradeId(command.tradeId())
		....
                .build();
        AggregateLifecycle.apply(event);
    }


从命令生成一个事件,并将其发送到服务器。

该命令位于CreateTradeCommand类中。



现在,让我们看一下最后一个tradeQueries模块。



选择在查询包中描述。

该模块还具有

公共类TradeController Rest接口



例如,让我们看一下请求的处理:“显示所有交易”。



    @GetMapping("/trade/all")
    public List<Trade> findAllTrades() {
        return queryGateway.query(new FindAllTradesQuery(),
                ResponseTypes.multipleInstancesOf(Trade.class)).join();
    }


创建获取请求并将其发送到服务器。



TradesEventHandler类用于处理获取请求。

它有一个注释的方法



   @QueryHandler
    public List<Position> handleFindCurrentPositionQuery(FindCurrentPositionQuery query)


正是他负责从内存中获取数据。



出现有关该商店中信息如何更新的问题。



首先,这只是针对特定选择量身定制的ConcurrentHashMap的集合。

要更新它们,将应用该方法:



    @EventHandler
    public void on(CreatedTradeEvent event) {
        log.info("event:{}", event);

        var trade = Trade.builder()
	...
                .build();
        trades.put(event.tradeId(), trade);
        position.merge(event.shortName(), event.size(),
                (oldValue, value) -> event.side() == Side.BUY ? oldValue + value : oldValue - value);
    }


它接收“创建交易”事件并更新地图。



这些是微服务开发的亮点。



轴突的缺点呢?



首先,这是基础架构的复杂性,出现了一个故障点-Axon服务器,所有通信都通过它。



其次,这种分布式系统的缺点非常明显,即临时数据不一致。在我们的情况下,在接收新交易和更新样本数据之间可能会花费不可接受的长时间。



幕后还剩下什么?



关于事件源和CQRS,它是什么以及它的用途,一无所知。

在不公开这些概念的情况下,有些要点可能不清楚。



也许某些代码片段也需要澄清。



我们在一个公开的网络研讨会上谈到了这一点



完整的例子



All Articles