如何合并来自2个事件源系统的事件



我需要合并来自Akka.Net Persistence模块处理的两个不同事件源系统的事件。合并必须根据时间戳对事件进行排序,我在Akka.Stream中找到了MergeSorted运算符,它正是我所需要的(尝试了2个数字列表-对于事件,我编写了一个自定义EventEnvelopComparer(。

在我的解决方案中,我有一个从db1读取的actor系统(readsystem1(和另一个从db2读取的actors系统(readysystem2(,这两个系统都是通过将正确的连接字符串传递到数据库(PostGres数据库(创建的。

问题是:当我使用MergeSorted运算符时,我需要传递ActorMaterializer的一个实例,如果actor实体化器是在readsystem1 actor系统中创建的,那么只有来自db1的事件被加载(并与它们自己合并(;相反,如果我在readsystem2中创建actor实体化器。我需要把它们都装进去。

下面是一个代码示例(将时间戳写入文件,只是为了测试它们(:

var actorMaterializer1 = ActorMaterializer.Create(
readSystem1,
ActorMaterializerSettings.Create(readSystem1).WithDebugLogging(true));
var readJournal1 = PersistenceQuery.Get(readSystem1)
.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
var source1 = readJournal1.CurrentEventsByPersistenceId("mypersistenceId", 0L, long.MaxValue);
await source1
.Select(x => ByteString.FromString($"{x.Timestamp.ToString()}{Environment.NewLine}"))
.RunWith(FileIO.ToFile(new FileInfo("c:\tmp\timestamps1.txt")), actorMaterializer1);
// just creating the materializer changes the events loaded by the source!!!
var actorMaterializer2 = ActorMaterializer.Create(
readSystem2, 
ActorMaterializerSettings.Create(readSystem1).WithDebugLogging(true));
var readJournal2 = PersistenceQuery.Get(readSystem2)
.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
var source2 = readJournal2.CurrentEventsByPersistenceId("mypersistenceId", 0L, long.MaxValue);
await source2
.Select(x => ByteString.FromString($"{x.Timestamp.ToString()}{Environment.NewLine}"))
.RunWith(FileIO.ToFile(new FileInfo("c:\tmp\timestamps2.txt")), actorMaterializer2);
// RunWith receives actorMaterializer1, so only events coming from db1
// will be loaded and merged with themselves
var source = source1.MergeSorted(source2, new EventEnvelopComparer());
await source
.Select(x => ByteString.FromString($"{x.Timestamp.ToString()}{Environment.NewLine}"))
.RunWith(FileIO.ToFile(new FileInfo("c:\tmp\timestamps.txt")), actorMaterializer1);

我怎样才能做到这一点?是否可以在同一个或不同的数据库中从同一个参与者系统读取两个不同的事件源表?ActorMaterializer有什么可以解决我的问题吗?我的做法完全错了吗?

要使用来自两个不同ActorSystems的事件,我认为您需要使用StreamRef。但是您可以在这里配置两个ReadJournalId,每个ReadJournalId指向不同的*.db文件。这样就可以使用一个ActorSystem和物化器。

var source1 = PersistenceQuery.Get(actorSystem).ReadJournalFor<SqlReadJournal>("read-journal-1")
.CurrentEventsByPersistenceId("sample-id-1", 0L, long.MaxValue);
var source2 = PersistenceQuery.Get(actorSystem).ReadJournalFor<SqlReadJournal>("read-journal-2")
.CurrentEventsByPersistenceId("sample-id-1", 0L, long.MaxValue);
var source = source1.MergeSorted(source2, new EventEnvelopComparer())
.RunForeach(x => System.Console.WriteLine($"EVENT: {x.Timestamp}"), actorSystem.Materializer());

我想我明白这里发生了什么。。。。

这就是为什么您选择Materializer会给您带来问题的原因——Materializer将编译您的Akka.Persistence.Query+Akka.Streams图。当您使用ActorSystemA中的Materializer时,它将把参与者具体化为ActorSystem,并将其Journal实现用于Akka.Persistence-这就是为什么您只从1个ActorSystem中获得1个事件。

然而,看起来你正在做我想做的事情——在将每个源合并在一起之前预先实现它们。。。有可能在GitHub上使用伪SQLite数据库或类似的数据库来创建一个复制品吗?如果是的话,我很乐意调试它。

最新更新