哈Ha!我决定离开Scala,Idris和其他FP一段时间,然后谈论一下Event Store(事件存储),该数据库可以将事件保存到事件流中。就像一本好书一样,事实上我们也有《火枪手》第4期,第四个是DDD。首先,我使用事件风暴来选择命令,事件和与其关联的实体。然后,基于它们,我将保存对象的状态并还原它。我将在本文中执行常规的TodoList。欲了解更多信息,欢迎来到猫下。
内容
- 三剑客-事件源,事件风暴和事件存储-进入战斗:第1部分-尝试数据库事件存储
链接
来源
图像docker image
事件存储
事件吸引
事件风暴
实际上,事件存储是一个旨在存储事件的数据库。她还知道如何创建事件的订阅,以便可以以某种方式处理事件。还有一些预测也会对事件做出反应,并在事件的基础上积累一些数据。例如,在TodoCreated事件期间,您可以在投影中增加某种计数计数器。现在,在这一部分中,我将使用事件存储作为读取和写入Db。此外,在以下文章中,我将创建一个单独的数据库,用于读取数据库中保存的事件,并根据这些事件将数据写入其中,以写入事件存储。还有一个示例,说明如何通过将系统回滚到过去的状态来进行“ Time Travel”。
因此,让我们开始Event Stroming。通常,对于其实现,所有感兴趣的人员和专家都会聚集在一起,告诉他们该软件将模拟主题领域中的哪些事件。例如,对于工厂的软件-产品制造。对于游戏-受到的伤害。对于财务软件-钱记入帐户,依此类推。由于我们的主题领域与TodoList一样简单,因此我们几乎没有活动。因此,让我们在黑板上写下主题区域(域)的事件。
现在,让我们添加触发这些事件的命令。
接下来,让我们在实体周围对这些事件和命令进行分组,并更改它们的关联状态。
我的命令将简单地变成服务方法名称。让我们开始实施。
首先,让我们用代码描述事件。
public interface IDomainEvent
{
// . id Event Strore
Guid EventId { get; }
// . Event Store
long EventNumber { get; set; }
}
public sealed class TodoCreated : IDomainEvent
{
//Id Todo
public Guid Id { get; set; }
// Todo
public string Name { get; set; }
public Guid EventId => Id;
public long EventNumber { get; set; }
}
public sealed class TodoRemoved : IDomainEvent
{
public Guid EventId { get; set; }
public long EventNumber { get; set; }
}
public sealed class TodoCompleted: IDomainEvent
{
public Guid EventId { get; set; }
public long EventNumber { get; set; }
}
现在我们的核心是一个实体:
public sealed class Todo : IEntity<TodoId>
{
private readonly List<IDomainEvent> _events;
public static Todo CreateFrom(string name)
{
var id = Guid.NewGuid();
var e = new List<IDomainEvent>(){new TodoCreated()
{
Id = id,
Name = name
}};
return new Todo(new TodoId(id), e, name, false);
}
public static Todo CreateFrom(IEnumerable<IDomainEvent> events)
{
var id = Guid.Empty;
var name = String.Empty;
var completed = false;
var ordered = events.OrderBy(e => e.EventNumber).ToList();
if (ordered.Count == 0)
return null;
foreach (var @event in ordered)
{
switch (@event)
{
case TodoRemoved _:
return null;
case TodoCreated created:
name = created.Name;
id = created.Id;
break;
case TodoCompleted _:
completed = true;
break;
default: break;
}
}
if (id == default)
return null;
return new Todo(new TodoId(id), new List<IDomainEvent>(), name, completed);
}
private Todo(TodoId id, List<IDomainEvent> events, string name, bool isCompleted)
{
Id = id;
_events = events;
Name = name;
IsCompleted = isCompleted;
Validate();
}
public TodoId Id { get; }
public IReadOnlyList<IDomainEvent> Events => _events;
public string Name { get; }
public bool IsCompleted { get; private set; }
public void Complete()
{
if (!IsCompleted)
{
IsCompleted = true;
_events.Add(new TodoCompleted()
{
EventId = Guid.NewGuid()
});
}
}
public void Delete()
{
_events.Add(new TodoRemoved()
{
EventId = Guid.NewGuid()
});
}
private void Validate()
{
if (Events == null)
throw new ApplicationException(" ");
if (string.IsNullOrWhiteSpace(Name))
throw new ApplicationException(" ");
if (Id == default)
throw new ApplicationException(" ");
}
}
我们连接到事件存储:
services.AddSingleton(sp =>
{
// TCP .
// . .
var con = EventStoreConnection.Create(new Uri("tcp://admin:changeit@127.0.0.1:1113"), "TodosConnection");
con.ConnectAsync().Wait();
return con;
});
因此,主要部分。从事件存储本身存储和读取事件:
public sealed class EventsRepository : IEventsRepository
{
private readonly IEventStoreConnection _connection;
public EventsRepository(IEventStoreConnection connection)
{
_connection = connection;
}
public async Task<long> Add(Guid collectionId, IEnumerable<IDomainEvent> events)
{
var eventPayload = events.Select(e => new EventData(
//Id
e.EventId,
//
e.GetType().Name,
// Json (True|False)
true,
//
Encoding.UTF8.GetBytes(JsonSerializer.Serialize((object)e)),
//
Encoding.UTF8.GetBytes((string)e.GetType().FullName)
));
//
var res = await _connection.AppendToStreamAsync(collectionId.ToString(), ExpectedVersion.Any, eventPayload);
return res.NextExpectedVersion;
}
public async Task<List<IDomainEvent>> Get(Guid collectionId)
{
var results = new List<IDomainEvent>();
long start = 0L;
while (true)
{
var events = await _connection.ReadStreamEventsForwardAsync(collectionId.ToString(), start, 4096, false);
if (events.Status != SliceReadStatus.Success)
return results;
results.AddRange(Deserialize(events.Events));
if (events.IsEndOfStream)
return results;
start = events.NextEventNumber;
}
}
public async Task<List<T>> GetAll<T>() where T : IDomainEvent
{
var results = new List<IDomainEvent>();
Position start = Position.Start;
while (true)
{
var events = await _connection.ReadAllEventsForwardAsync(start, 4096, false);
results.AddRange(Deserialize(events.Events.Where(e => e.Event.EventType == typeof(T).Name)));
if (events.IsEndOfStream)
return results.OfType<T>().ToList();
start = events.NextPosition;
}
}
private List<IDomainEvent> Deserialize(IEnumerable<ResolvedEvent> events) =>
events
.Where(e => IsEvent(e.Event.EventType))
.Select(e =>
{
var result = (IDomainEvent)JsonSerializer.Deserialize(e.Event.Data, ToType(e.Event.EventType));
result.EventNumber = e.Event.EventNumber;
return result;
})
.ToList();
private static bool IsEvent(string eventName)
{
return eventName switch
{
nameof(TodoCreated) => true,
nameof(TodoCompleted) => true,
nameof(TodoRemoved) => true,
_ => false
};
}
private static Type ToType(string eventName)
{
return eventName switch
{
nameof(TodoCreated) => typeof(TodoCreated),
nameof(TodoCompleted) => typeof(TodoCompleted),
nameof(TodoRemoved) => typeof(TodoRemoved),
_ => throw new NotImplementedException(eventName)
};
}
}
实体商店看起来非常简单。我们从EventStore获取实体的事件,然后从它们恢复它,或者我们只是保存实体的事件。
public sealed class TodoRepository : ITodoRepository
{
private readonly IEventsRepository _eventsRepository;
public TodoRepository(IEventsRepository eventsRepository)
{
_eventsRepository = eventsRepository;
}
public Task SaveAsync(Todo entity) => _eventsRepository.Add(entity.Id.Value, entity.Events);
public async Task<Todo> GetAsync(TodoId id)
{
var events = await _eventsRepository.Get(id.Value);
return Todo.CreateFrom(events);
}
public async Task<List<Todo>> GetAllAsync()
{
var events = await _eventsRepository.GetAll<TodoCreated>();
var res = await Task.WhenAll(events.Where(t => t != null).Where(e => e.Id != default).Select(e => GetAsync(new TodoId(e.Id))));
return res.Where(t => t != null).ToList();
}
}
与存储库和实体一起进行的服务:
public sealed class TodoService : ITodoService
{
private readonly ITodoRepository _repository;
public TodoService(ITodoRepository repository)
{
_repository = repository;
}
public async Task<TodoId> Create(TodoCreateDto dto)
{
var todo = Todo.CreateFrom(dto.Name);
await _repository.SaveAsync(todo);
return todo.Id;
}
public async Task Complete(TodoId id)
{
var todo = await _repository.GetAsync(id);
todo.Complete();
await _repository.SaveAsync(todo);
}
public async Task Remove(TodoId id)
{
var todo = await _repository.GetAsync(id);
todo.Delete();
await _repository.SaveAsync(todo);
}
public async Task<List<TodoReadDto>> GetAll()
{
var todos = await _repository.GetAllAsync();
return todos.Select(t => new TodoReadDto()
{
Id = t.Id.Value,
Name = t.Name,
IsComplete = t.IsCompleted
}).ToList();
}
public async Task<List<TodoReadDto>> Get(IEnumerable<TodoId> ids)
{
var todos = await Task.WhenAll(ids.Select(i => _repository.GetAsync(i)));
return todos.Where(t => t != null).Select(t => new TodoReadDto()
{
Id = t.Id.Value,
Name = t.Name,
IsComplete = t.IsCompleted
}).ToList();
}
}
好吧,实际上,到目前为止,还没有什么令人印象深刻的。在下一篇文章中,当我添加一个单独的数据库进行读取时,所有内容都会以不同的颜色闪烁。随着时间的流逝,这将立即使我们失去一致性。基于主从的事件存储和SQL DB。一个白色ES和许多黑色MS SQL从中读取数据。
抒情离题。鉴于最近发生的事件,我不禁开玩笑说奴隶主和黑人。哎呀,时代已经过去了,我们将告诉我们的孙子们,我们生活在复制期间的基础称为主从的时代。
在读取和写入数据很少(大多数)的系统中,这将提高工作速度。实际上,是主从服务器本身的复制,它的目标是您的写入速度变慢(以及使用索引),但是作为回报,通过在多个数据库之间分配负载来加快读取速度。