三剑客-事件源,事件风暴和事件存储-进入战斗:第1部分-尝试数据库事件存储





哈Ha!我决定离开Scala,Idris和其他FP一段时间,然后谈论一下Event Store(事件存储),该数据库可以将事件保存到事件流中。就像一本好书一样,事实上我们也有《火枪手》第4期,第四个是DDD。首先,我使用事件风暴来选择命令,事件和与其关联的实体。然后,基于它们,我将保存对象的状态并还原它。我将在本文中执行常规的TodoList。欲了解更多信息,欢迎来到猫下。



内容



  • 三剑客-事件源,事件风暴和事件存储-进入战斗:第1部分-尝试数据库事件存储


链接



来源

图像docker image

事件存储

事件吸引

事件风暴



实际上,事件存储是一个旨在存储事件的数据库。她还知道如何创建事件的订阅,以便可以以某种方式处理事件。还有一些预测也会对事件做出反应,并在事件的基础上积累一些数据。例如,在TodoCreated事件期间,您可以在投影中增加某种计数计数器。现在,在这一部分中,我将使用事件存储作为读取和写入Db。此外,在以下文章中,我将创建一个单独的数据库,用于读取数据库中保存的事件,并根据这些事件将数据写入其中,以写入事件存储。还有一个示例,说明如何通过将系统回滚到过去的状态来进行“ Time Travel”。

因此,让我们开始Event Stroming。通常,对于其实现,所有感兴趣的人员和专家都会聚集在一起,告诉他们该软件将模拟主题领域中的哪些事件。例如,对于工厂的软件-产品制造。对于游戏-受到的伤害。对于财务软件-钱记入帐户,依此类推。由于我们的主题领域与TodoList一样简单,因此我们几乎没有活动。因此,让我们在黑板上写下主题区域(域)的事件。







现在,让我们添加触发这些事件的命令。







接下来,让我们在实体周围对这些事件和命令进行分组,并更改它们的关联状态。







我的命令将简单地变成服务方法名称。让我们开始实施。



首先,让我们用代码描述事件。



    public interface IDomainEvent
    {
      // .   id   Event Strore
        Guid EventId { get; }
       // .        Event Store
        long EventNumber { get; set; }
    }

    public sealed class TodoCreated : IDomainEvent
    {
       //Id  Todo
        public Guid Id { get; set; }
       //  Todo
        public string Name { get; set; }
        public Guid EventId => Id;
        public long EventNumber { get; set; }
    }

    public sealed class TodoRemoved : IDomainEvent
    {
        public Guid EventId { get; set; }
        public long EventNumber { get; set; }
    }

    public sealed class TodoCompleted: IDomainEvent
    {
        public Guid EventId { get; set; }
        public long EventNumber { get; set; }
    }


现在我们的核心是一个实体:



    public sealed class Todo : IEntity<TodoId>
    {
        private readonly List<IDomainEvent> _events;

        public static Todo CreateFrom(string name)
        {
            var id = Guid.NewGuid();
            var e = new List<IDomainEvent>(){new TodoCreated()
                {
                    Id = id,
                    Name = name
                }};
            return new Todo(new TodoId(id), e, name, false);
        }

        public static Todo CreateFrom(IEnumerable<IDomainEvent> events)
        {
            var id = Guid.Empty;
            var name = String.Empty;
            var completed = false;
            var ordered = events.OrderBy(e => e.EventNumber).ToList();
            if (ordered.Count == 0)
                return null;
            foreach (var @event in ordered)
            {
                switch (@event)
                {
                    case TodoRemoved _:
                        return null;
                    case TodoCreated created:
                        name = created.Name;
                        id = created.Id;
                        break;
                    case TodoCompleted _:
                        completed = true;
                        break;
                    default: break;
                }
            }
            if (id == default)
                return null;
            return new Todo(new TodoId(id), new List<IDomainEvent>(), name, completed);
        }

        private Todo(TodoId id, List<IDomainEvent> events, string name, bool isCompleted)
        {
            Id = id;
            _events = events;
            Name = name;
            IsCompleted = isCompleted;
            Validate();
        }

        public TodoId Id { get; }
        public IReadOnlyList<IDomainEvent> Events => _events;
        public string Name { get; }
        public bool IsCompleted { get; private set; }

        public void Complete()
        {
            if (!IsCompleted)
            {
                IsCompleted = true;
                _events.Add(new TodoCompleted()
                {
                    EventId = Guid.NewGuid()
                });
            }
        }

        public void Delete()
        {
            _events.Add(new TodoRemoved()
            {
                EventId = Guid.NewGuid()
            });
        }

        private void Validate()
        {
            if (Events == null)
                throw new ApplicationException("  ");
            if (string.IsNullOrWhiteSpace(Name))
                throw new ApplicationException("  ");
            if (Id == default)
                throw new ApplicationException("  ");
        }
    }


我们连接到事件存储:



            services.AddSingleton(sp =>
            {
//  TCP        . 
//       .        .
                var con = EventStoreConnection.Create(new Uri("tcp://admin:changeit@127.0.0.1:1113"), "TodosConnection");
                con.ConnectAsync().Wait();
                return con;
            });


因此,主要部分。从事件存储本身存储和读取事件:



    public sealed class EventsRepository : IEventsRepository
    {
        private readonly IEventStoreConnection _connection;

        public EventsRepository(IEventStoreConnection connection)
        {
            _connection = connection;
        }

        public async Task<long> Add(Guid collectionId, IEnumerable<IDomainEvent> events)
        {
            var eventPayload = events.Select(e => new EventData(
//Id 
                e.EventId,
// 
                e.GetType().Name,
//  Json (True|False)
                true,
// 
                Encoding.UTF8.GetBytes(JsonSerializer.Serialize((object)e)),
// 
                Encoding.UTF8.GetBytes((string)e.GetType().FullName)
            ));
//      
            var res = await _connection.AppendToStreamAsync(collectionId.ToString(), ExpectedVersion.Any, eventPayload);
            return res.NextExpectedVersion;
        }

        public async Task<List<IDomainEvent>> Get(Guid collectionId)
        {
            var results = new List<IDomainEvent>();
            long start = 0L;
            while (true)
            {
                var events = await _connection.ReadStreamEventsForwardAsync(collectionId.ToString(), start, 4096, false);
                if (events.Status != SliceReadStatus.Success)
                    return results;
                results.AddRange(Deserialize(events.Events));
                if (events.IsEndOfStream)
                    return results;
                start = events.NextEventNumber;
            }
        }

        public async Task<List<T>> GetAll<T>() where T : IDomainEvent
        {
            var results = new List<IDomainEvent>();
            Position start = Position.Start;
            while (true)
            {
                var events = await _connection.ReadAllEventsForwardAsync(start, 4096, false);
                results.AddRange(Deserialize(events.Events.Where(e => e.Event.EventType == typeof(T).Name)));
                if (events.IsEndOfStream)
                    return results.OfType<T>().ToList();
                start = events.NextPosition;
            }
        }

        private List<IDomainEvent> Deserialize(IEnumerable<ResolvedEvent> events) =>
            events
                .Where(e => IsEvent(e.Event.EventType))
                .Select(e =>
                {
                    var result = (IDomainEvent)JsonSerializer.Deserialize(e.Event.Data, ToType(e.Event.EventType));
                    result.EventNumber = e.Event.EventNumber;
                    return result;
                })
                .ToList();

        private static bool IsEvent(string eventName)
        {
            return eventName switch
            {
                nameof(TodoCreated) => true,
                nameof(TodoCompleted) => true,
                nameof(TodoRemoved) => true,
                _ => false
            };
        }
        private static Type ToType(string eventName)
        {
            return eventName switch
            {
                nameof(TodoCreated) => typeof(TodoCreated),
                nameof(TodoCompleted) => typeof(TodoCompleted),
                nameof(TodoRemoved) => typeof(TodoRemoved),
                _ => throw new NotImplementedException(eventName)
            };
        }
    }


实体商店看起来非常简单。我们从EventStore获取实体的事件,然后从它们恢复它,或者我们只是保存实体的事件。



    public sealed class TodoRepository : ITodoRepository
    {
        private readonly IEventsRepository _eventsRepository;

        public TodoRepository(IEventsRepository eventsRepository)
        {
            _eventsRepository = eventsRepository;
        }

        public Task SaveAsync(Todo entity) => _eventsRepository.Add(entity.Id.Value, entity.Events);

        public async Task<Todo> GetAsync(TodoId id)
        {
            var events = await _eventsRepository.Get(id.Value);
            return Todo.CreateFrom(events);
        }

        public async Task<List<Todo>> GetAllAsync()
        {
            var events = await _eventsRepository.GetAll<TodoCreated>();
            var res = await Task.WhenAll(events.Where(t => t != null).Where(e => e.Id != default).Select(e => GetAsync(new TodoId(e.Id))));
            return res.Where(t => t != null).ToList();
        }
    }


与存储库和实体一起进行的服务:



    public sealed class TodoService : ITodoService
    {
        private readonly ITodoRepository _repository;

        public TodoService(ITodoRepository repository)
        {
            _repository = repository;
        }

        public async Task<TodoId> Create(TodoCreateDto dto)
        {
            var todo = Todo.CreateFrom(dto.Name);
            await _repository.SaveAsync(todo);
            return todo.Id;
        }

        public async Task Complete(TodoId id)
        {
            var todo = await _repository.GetAsync(id);
            todo.Complete();
            await _repository.SaveAsync(todo);
        }

        public async Task Remove(TodoId id)
        {
            var todo = await _repository.GetAsync(id);
            todo.Delete();
            await _repository.SaveAsync(todo);
        }

        public async Task<List<TodoReadDto>> GetAll()
        {
            var todos = await _repository.GetAllAsync();
            return todos.Select(t => new TodoReadDto()
            {
                Id = t.Id.Value,
                Name = t.Name,
                IsComplete = t.IsCompleted
            }).ToList();
        }

        public async Task<List<TodoReadDto>> Get(IEnumerable<TodoId> ids)
        {
            var todos = await Task.WhenAll(ids.Select(i => _repository.GetAsync(i)));
            return todos.Where(t => t != null).Select(t => new TodoReadDto()
            {
                Id = t.Id.Value,
                Name = t.Name,
                IsComplete = t.IsCompleted
            }).ToList();
        }
    }


好吧,实际上,到目前为止,还没有什么令人印象深刻的。在下一篇文章中,当我添加一个单独的数据库进行读取时,所有内容都会以不同的颜色闪烁。随着时间的流逝,这将立即使我们失去一致性。基于主从的事件存储和SQL DB。一个白色ES和许多黑色MS SQL从中读取数据。



抒情离题。鉴于最近发生的事件,我不禁开玩笑说奴隶主和黑人。哎呀,时代已经过去了,我们将告诉我们的孙子们,我们生活在复制期间的基础称为主从的时代。



在读取和写入数据很少(大多数)的系统中,这将提高工作速度。实际上,是主从服务器本身的复制,它的目标是您的写入速度变慢(以及使用索引),但是作为回报,通过在多个数据库之间分配负载来加快读取速度。



All Articles