Azure事件中心-通过scala脚本处理数据



我需要将平面文件中的数据从VM unix服务器发送到Azure事件中心,并发布到Azure blob存储。

我可以使用下面的代码来做到这一点

val producer: EventHubProducerClient = new EventHubClientBuilder().connectionString(connectionString, eventHubName).buildProducerClient
val batch: EventDataBatch = producer.createBatch()
Reading the content of my file line by line and sending to tryAdd methos. 
for (line <- fileContent.getLines)
{
batch.tryAdd(new EventData(fileLine)) }
// send the batch of events to the event hub
//producer.send(batch)
// close the producer
producer.close()

我的档案里有大约1000条记录。事件中心已经创建了大约12个请求(看起来这是随机的(。

我只是想了解事件中心是在什么基础上创建请求的,有什么方法可以控制它吗?

任何有关它的信息都将对非常有用

对事件中心服务的每个发布操作都限制在一定数量的字节内,由事件中心命名空间的层管理。每个层的配额可以在Event Hubs文档中看到。

当调用tryAdd时,添加到批次中的每个事件都会根据该限制进行测量。如果事件不能安全地放入批中,tryAdd将返回false。此时,批次可能已完全满,或者可能还有一些容量。任何剩余容量都不足以容纳已通过的特定事件的全部大小。

除了有效负载的大小(在本例中为fileLine(之外,诊断元数据和批处理打包还存在一些大小开销,这些开销会影响事件的最终大小和批处理的容量。根据fileLine在序列化以供传输后大小的一致性,您可能会看到大小一致的批,或者可能会看到单个批中可容纳的事件数量有所变化。

所需的send调用数与保存每个fileLine事件所需的批处理数成正比。每个send调用可以发布一个批,因为该调用的流量受服务强制执行的字节大小限制。

我知道你问题中的片段可能只是为了说明,但我想指出的是,你忽略了tryAdd的返回,我强烈建议不要这样做。如果批处理已满,则tryAdd调用不会失败。如果忽略返回值,则在返回false时,您可能不会意识到某个事件未被接受到批处理中。这通常会导致数据丢失,因为事件不在批中,但应用程序认为它在批中并继续进行

最新更新