区分进入我的服务总线队列的数据



下面是一个Azure函数,它将数据从我的cosmos数据库推送到我的服务总线中。这是已经写好的代码,它们在将Json返回到服务总线之前检查不同的元数据。现在我必须编写另一个函数,其中我从服务总线中提取这些数据,并根据来自上述函数或其他来源的数据进一步处理它,因为需求正在变化,我可能会从其他来源获得服务总线中的数据。我想知道如何区分这些数据,以识别哪些数据来自何处。

[FunctionName( "Push Data to Service Bus" )]
[return: ServiceBus( "topicname", Connection = "ServiceBusConnection" )]
public static string Notify(
[CosmosDBTrigger(
databaseName: "test",
collectionName: "testcontay",
ConnectionStringSetting = "CosmosDBConnection",
LeaseCollectionName = "leases",
CreateLeaseCollectionIfNotExists = true)]
IReadOnlyList<Document> documents,
ILogger log )
{
if( documents == null || documents.Count == 0 )
{
log.LogWarning( "No documents received" );
return null;
}

var triggerDocs =
(from d in documents
let trigger = d.GetPropertyValue<bool?>( "Trigger" )
where !trigger.HasValue || trigger == true
select new
{
Id = d.GetPropertyValue<string>( "id" ),
Project = d.GetPropertyValue<string>( "Project" ),
ProjectId = d.GetPropertyValue<string>( "ProjectId" ),
Tags = d.GetPropertyValue<string[]>( "Tags" ),
Properties = d.GetPropertyValue<Dictionary<string, object>>( "Properties" ),
Categories = d.GetPropertyValue<string[]>( "Categories" ),
Trigger = d.GetPropertyValue<bool?>( "Trigger" ),
Received = DateTime.UtcNow
}).ToList();

log.LogInformation( $"Documents triggered: {triggerDocs.Count}" );
if( triggerDocs.Count() == 0 )
return string.Empty;
var json = JsonSerializer.Serialize( triggerDocs, triggerDocs.GetType() );
return json;
}

[FunctionName( "GettingDataFromServiceBus" )]
public async void Run([ServiceBusTrigger("topicname", "subscriptionname",
Connection = "AzureServiceBusString")] 
string SbMsg, ExecutionContext context,
ILogger log)
{

if (!string.IsNullOrEmpty(SbMsg))
{
log.LogInformation($"C# ServiceBus topic trigger function processed message: {SbMsg}");
}
}

更新:

//Such as this is your message.
string str = "{"projectID":"111"}";
try
{
JObject obj = JObject.Parse(str);
if (obj["projectID"] != null)
{
//Message have projectID, so it is from xxx.
}
else
{
//Message don't have projectID, so it is from xxx.
}
}
catch (Exception ex) { 
//Message format is not json, so it is comes from xxx.
}

原始答:

您可以使用自定义属性来实现您的需求。将ApplicationProperties添加到消息中,然后将其发布到azure服务总线队列。

看看这个(基于。net):

https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusmessage?view=azure-dotnet属性c#示例:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
namespace ConsoleApp3
{
class Program
{
public static async Task Main(string[] args)
{
string connectionString = "Endpoint=sb://bowman1012.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=X/NHgQ4AQxul6YlMsUofD+JNE20Tovnzx3g2gDt8qyY=";
string queueName = "queuename";
string source = "function";
await SendMessageAsync(connectionString, queueName, source);
}
static async Task SendMessageAsync(string connectionString,string queueName,string source)
{
// create a Service Bus client 
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a sender for the queue 
ServiceBusSender sender = client.CreateSender(queueName);
// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage("Hello world!");
message.ApplicationProperties.Add("message_source",source);
// send the message
await sender.SendMessageAsync(message);
Console.WriteLine($"Sent a single message to the queue: {queueName}");
}
}
}
}

相关内容

最新更新