几年前,在我们的一个项目中,我们面临将动作的执行推迟一定时间的需求。例如,您可以在三个小时内找出付款状态,或者在45分钟后重新发送通知。但是,那时我们没有找到合适的库来“推迟”并且不需要额外的时间进行配置和操作。我们分析了可能的选项,并使用Redis作为存储库用Java编写了自己的延迟队列库。在本文中,我将讨论该库的功能,其替代方案以及我们在过程中偶然发现的那些“ rakes”。
功能性
那么延迟队列有什么作用?在指定的时间量后,添加到暂挂队列中的事件将传递到处理程序。如果处理失败,则事件将在以后再次传递。而且,最大尝试次数是有限的。Redis不能保证安全,因此您需要为事件的丢失做好准备。但是,在集群版本中,Redis表现出相当高的可靠性,并且我们在一年半的运行中从未遇到过这种情况。
API
将事件添加到队列
eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1)).subscribe();
请注意,该方法返回Mono
,因此要运行,您需要执行以下操作之一:
subscribe(...)
block()
的文档中提供了更详细的说明Project Reactor
。上下文将添加到事件,如下所示:
eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1), Map.of("key", "value")).subscribe();
注册事件处理程序
eventService.addHandler(DummyEvent.class, e -> Mono.just(true), 1);
, :
eventService.addHandler(
DummyEvent.class,
e -> Mono
.subscriberContext()
.doOnNext(ctx -> {
Map<String, String> eventContext = ctx.get("eventContext");
log.info("context key {}", eventContext.get("key"));
})
.thenReturn(true),
1
);
eventService.removeHandler(DummyEvent.class);
"-":
import static com.github.fred84.queue.DelayedEventService.delayedEventService;
var eventService = delayedEventService().client(redisClient).build();
:
import static com.github.fred84.queue.DelayedEventService.delayedEventService;
var eventService = delayedEventService()
.client(redisClient)
.mapper(objectMapper)
.handlerScheduler(Schedulers.fromExecutorService(executor))
.schedulingInterval(Duration.ofSeconds(1))
.schedulingBatchSize(SCHEDULING_BATCH_SIZE)
.enableScheduling(false)
.pollingTimeout(POLLING_TIMEOUT)
.eventContextHandler(new DefaultEventContextHandler())
.dataSetPrefix("")
.retryAttempts(10)
.metrics(new NoopMetrics())
.refreshSubscriptionsInterval(Duration.ofMinutes(5))
.build();
( Redis) eventService.close()
, @javax.annotation.PreDestroy
.
- , . :
- , Redis;
- , ( "delayed.queue.ready.for.handling.count" )
, delayed queue. 2018
Amazon Web Services.
, . : " , Amazon-, ".
:
- , JMS . SQS , 15 .
" " . , Redis :
- sorted sets,
- "sorted_set" "list" ( )
, Netflix dyno-queues
. , , .
, " " sorted set list, ( ):
var events = redis.zrangebyscore("delayed_events", Range.create(-1, System.currentTimeMillis()), 100);
events.forEach(key -> {
var payload = extractPayload(key);
var listName = extractType(key);
redis.lpush(listName, payload);
redis.zrem("delayed_events", key);
});
redis.brpop(listName)
.
"list" (, ), list . Redis , 2 .
events.forEach(key -> {
...
redis.multi();
redis.zrem("delayed_events", key);
redis.lpush(listName, payload);
redis.exec();
});
list-a . , . "sorted_set" .
events.forEach(key -> {
...
redis.multi();
redis.zadd("delayed_events", nextAttempt(key))
redis.zrem("delayed_events", key);
redis.lpush(listName, payload);
redis.exec();
});
, , " " "delayed queue" . "sorted set"
metadata;payload
, payload , metadata - . . , metadata payload Redis hset
"sorted set" .
var envelope = metadata + SEPARATOR + payload;
redis.zadd(envelope, scheduledAt);
var envelope = metadata + SEPARATOR + payload;
var key = eventType + SEPARATOR + eventId;
redis.multi();
redis.zadd(key, scheduledAt);
redis.hset("metadata", key, envelope)
redis.exec();
, . , list . TTL :
redis.set(lockKey, "value", ex(lockTimeout.toMillis() * 1000).nx());
Spring, . " " :
Lettuce , . Project Reactor , " ".
, Subscriber
redis
.reactive()
.brpop(timeout, queue)
.map(e -> deserialize(e))
.subscribe(new InnerSubscriber<>(handler, ... params ..))
class InnerSubscriber<T extends Event> extends BaseSubscriber<EventEnvelope<T>> {
@Override
protected void hookOnNext(@NotNull EventEnvelope<T> envelope) {
Mono<Boolean> promise = handler.apply(envelope.getPayload());
promise.subscribe(r -> request(1));
}
}
, ( Netflix dyno queue, poll- ).
?
- Kotlin DSL. Kotlin
suspend fun
APIProject Reactor