从Java使用Azure Service Bus

大家好!碰巧我们的应用程序是用Java堆栈编写的,但是托管在Azure中。我们正在努力充分利用云提供商的管理服务。



其中之一就是Azure Service Bus,今天我想谈谈在常规Spring Boot应用程序中使用它的功能。



如果要阅读有关rake功能的信息-滚动到文章末尾



什么是Azure Service Bus



关于Azure Service Bus的一些话是云消息代理(RabbitMQ,ActiveMQ的云替代)。支持队列(消息传递给一个收件人)和主题(发布/订阅机制)-在此处更详细地



声明支持:



  1. 有序消息-文档说这是一个FIFO,但它是使用消息会话的概念实现的-一组消息,而不是整个队列。如果需要保证消息的顺序,则可以将消息合并为一个组,现在该组中的消息将作为FIFO传递。因此,Azure服务总线队列不是FIFO-它会根据需要随机发送消息
  2. 死信队列-此处的一切都很简单,经过N次尝试或一段时间后,它们无法成功传递消息-已移至DLQ
  3. 预定的交货-您可以设置交货前的延迟
  4. 邮件延迟-将邮件隐藏在队列中,该邮件不会自动传递,但可以通过ID进行检索。我们需要将此ID存储在某个地方


如何与Azure Service Bus集成



Azure Service Bus支持AMQP 1.0,这意味着它与RabbitMQ客户端不兼容。兔子使用AMQP 0.9.1



唯一可以与服务总线一起使用的“标准”客户端是Apache Qpid



有3种方法可以将Spring Boot应用程序与Service Bus配对:



  1. JMS + QPID — , — QPID — .

    timeout producer — — factory.setCacheProducers(false);
  2. Spring Cloud — Azure Service Bus — , . Service Bus

    ( 1.2.6) — , azure service bus java sdk.



    Spring Integration — , «Scheduled delivery» «Message deferral» .



    sdk, MessageAndSessionPump

  3. azure service bus java sdk — ,


Spring Cloud — Azure Service Bus



我将更详细地介绍此方法,并向您介绍

在官方存储库中使用示例应用程序的功能,因此复制代码没有意义-此处有示例存储库



因为 它是Spring Integration Messaging,全部归结为Channel,MessageHandler,MessagingGateway,ServiceActivator。



然后是ServiceBusQueueTemplate



传送讯息



我们必须具有一个Channel,用于在其中写入要发送的消息,另一方面,有一个MessageHandler将其发送到服务总线。



MessagHandlercom.microsoft.azure.spring.integration.core.DefaultMessageHandler -这是连接到外部服务。



如何将其绑定到频道? -添加注释- @ServiceActivator(inputChannel = OUTPUT_CHANNEL),现在我们的MessagHandler正在侦听OUTPUT_CHANNEL通道



接下来,我们需要以某种方式将消息写入通道-这又是春天的魔力-我们宣布MessagingGateway并按名称将其绑定到通道。



示例的摘录



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(String text);
}


就是这样:网关->频道-> MessagHandler- > ServiceBusQueueTemplate- > ServiceBusMessageConverter



在代码中,仍然需要注入我们的网关并调用send方法



在调用链中提到ServiceBusMessageConverter是有原因的-如果要向消息中添加自定义标头(例如CORRELATION_ID),则需要在此处将它们从org.springframework.messaging.MessageHeaders移至天蓝色消息。

特殊方法setCustomHeaders



在这种情况下,您的网关将如下所示:



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(@Payload String text, @Header("CORRELATION_ID") String correlationId);
}


接收讯息



好的,我们知道如何发送消息,如何立即获取消息?



这里的一切是相同的-的MessageProducer - >频道- >处理程序



的MessageProducercom.microsoft.azure.spring.integration.servicebus.inbound.ServiceBusQueueInboundChannelAdapter -这是我们连接到外部服务。内部,带有ServiceBusMessageConverter的相同ServiceBusQueueTemplate,您可以在其中读取自定义标头并将其放入spring集成消息中。 该通道已手动安装在其中:







@Bean
public ServiceBusQueueInboundChannelAdapter queueMessageChannelAdapter(
        @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, 
        ServiceBusQueueOperation queueOperation) {
    queueOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
    ServiceBusQueueInboundChannelAdapter adapter = new ServiceBusQueueInboundChannelAdapter(QUEUE_NAME,
            queueOperation);
    adapter.setOutputChannel(inputChannel);
    return adapter;
}


但是Handler本身通过@ServiceActivator附加到通道



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
    String message = new String(payload);
.......


您可以立即获得该行:



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(String payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
.......


您可能已经注意到奇怪的Checkpointer checkpointer参数,参数用于手动确认消息处理。

如果在创建ServiceBusQueueInboundChannelAdapter设置CheckpointMode.MANUAL,则必须自己发送对消息的确认。



如果使用CheckpointMode.RECORD,则将自动发送确认-ServiceBusQueueTemplate代码中的详细信息



使用特征



因此,我们已经了解的“耙子”和“筹码”列表。



ReceiveMode.PEEKLOCK



Azure Service Bus支持PEEKLOCK模式-使用者接收一条消息,将其锁定到服务总线中,在一定时间内(锁定持续时间)任何人都无法访问,但不会从中删除该消息。如果在指定的时间内消费者没有发送确认确认-成功/放弃或没有延长锁定-该消息将被视为再次可用,并且将尝试进行新的发送。



有趣的是,放弃只是简单地重置了锁,该消息立即变为可重新发送。



ServiceBusQueueTemplate默认创建QueueClient模式ReceiveMode.PEEKLOCK



如果未处理的异常在我们的处理程序中发生-没有确认将发送到服务器,并且消息将保持锁定状态,并将在超时后重新发送。

在这种情况下,传递计数器将增加,这是合乎逻辑的。



我不知道这是错误还是功能-但是在必要时重试两次之间的延迟非常方便。



如果即使重试也无法处理该消息,则必须捕获异常并将消息标记为已处理并向应用程序添加其他逻辑,否则它将一次又一次地传递,直到达到重新传递的数量限制(在服务总线中创建队列时配置) )



并发和预取消息数



您可能已经猜到了,并发设置负责并行消息处理程序的数量,而预取消息计数是我们将从服务器进入缓冲区的消息数量



默认情况下,ServiceBusQueueTemplate是自动配置的(AzureServiceBusQueueAutoConfiguration),两个参数的值均为1,即默认情况下,每个队列将具有一个处理线程,尽管对每个单独消息都具有确认的服务总线的概念暗示了许多并发处理程序。如果您的请求处理时间很长,那么这尤其重要。



不幸的是,这些设置无法通过应用程序配置(application.yml / application.properties)进行设置,只能在代码中进行设置。但是,即使通过代码,也无法为不同的队列设置不同的设置。



因此,如果需要进行其他设置,则必须为每个ServiceBusQueueInboundChannelAdapter创建多个ServiceBusQueueTemplate Bean



Azure Service Bus Java SDK内的CompletableFuture



湛蓝的服务总线Java SDK的本身是围绕实现CompletableFutureCachedThreadPool执行人- MessagingFactory.INTERNAL_THREAD_POOL所以要小心各种线程本地豆



有序消息



我们将服务总线用作作业队列-一些作业相互依赖,因此必须按照创建顺序执行。



正如我上面提到的,T恤衫使用消息会话的概念-当消息通过密钥分组到一个会话中(在标头中传输)时,只要存在至少一个带有该会话密钥的消息,该会话就存在-文档中的详细信息

服务总线保证了该组中消息的发送顺序是添加到服务器(即服务总线服务器将其写入存储库的顺序)。



还值得一提的是您是否创建了启用会话的队列-这意味着所有消息都必须具有带有会话密钥的标头。



立刻,我们对服务总线在FIFO队列中排队消息的可能性感到非常满意-尽管是一组消息。



但是过了一会儿,我们开始注意到问题:



  • 一些消息开始无数次到达
  • 队列处理变慢
  • 在服务总线统计信息中,一半的请求被标记为失败,并且失败的请求即使在空闲时也出现在空队列中


通过查看sdk代码,我们发现了使用会话的特殊性:



  1. 使用者捕获会话并开始读取会话中的所有可用消息
  2. 同时处理的会话数等于并发参数
  3. unhandled exception — 1 ( ) — re-delivery ? 0 exception — ttl .
  4. — success abandon. — delay re-delivery

    .. abandon — , delivery counter .

    delivery count


结果,他们放弃了此服务总线功能并编写了一辆自行车,而服务总线充当了触发器。



一旦启用会话的队列被取消,统计信息中的错误即消失;对服务总线的请求。



在JMS + Qpid捆绑软件中-此功能不可用。



队列大小大于1G的潜在问题



我们还没有见面,但是我听说如果队列大小超过1G,它将开始不稳定。



如果您遇到此问题,反之亦然,则一切正常-在评论中写。



跟踪请求的问题



标准的azure应用程序见解代理无法跟踪作为依赖项发送的消息和作为请求跟踪传入的消息。



我必须添加一些代码。



结果



如果您需要一个消息处理时间较长的作业队列,而又不需要队列,则可以使用。



如果消息处理很快-使用Azure事件中心-常规Kafka,标准客户端可以正常工作。



All Articles