如何从启用了计时器的 Azure 函数从事件中心提取事件?



我正在使用Azure事件中心。我的要求是每天使用 azure 函数从 Azure 事件中心提取事件。基本上我的 azure 函数将启用计时器。它应该能够从 Azure 事件中心提取数据。有没有机制? 我知道,只要在事件中心收到事件,我们就可以触发 azure 函数。我不想要这个,因为该函数将执行 n 次。我只想每天获取事件。

您仍然可以创建计时器触发的函数,并在代码中创建使用者客户端以接收事件。请参阅下面的示例代码。如果您有任何问题,请告诉我。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs.Consumer;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
namespace FunctionApp7
{
public static class Function1
{
const string EventHubsConnectionString = "your connection string";
const string EventHubName = "evethub name";
const string ConsumerGroupName = "cgname";
[FunctionName("Function1")]
public static void Run([TimerTrigger("0 */5 * * * *")]TimerInfo myTimer, ILogger log)
{
log.LogInformation($"C# Timer trigger function executed at: {DateTime.Now}");
// You better dissover partitions by eventhub client. I am just hardcoding them here for now.
var partitions = new List<string> { "0", "1" };
var receiveTasks = new List<Task>();
foreach(var p in partitions)
{
receiveTasks.Add(ReadEventsFromPartition(p));
}
// Wait until all reads complete.
Task.WhenAll(receiveTasks);
}
public static async Task ReadEventsFromPartition(string partitionId)
{
await using (var consumer = new EventHubConsumerClient(ConsumerGroupName, EventHubsConnectionString, EventHubName))
{
EventPosition startingPosition = EventPosition.FromOffset(CheckpointStore.ReadOffsetForPartition(partitionId));
long lastOffset = -1;
await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition))
{
// Process received events here.
// Break if no events left.
if (receivedEvent.Data == null)
{
break;
}
lastOffset = receivedEvent.Data.Offset;
}
// Persist last event's offset so we can continue reading from this position next time function is triggered.
if (lastOffset != -1)
{
// Write offset into some durable store.
CheckpointStore.WriteOffsetForPartition(partitionId, lastOffset);
}
}
}
}
}

最新更新