Mule应用程序重新启动后,它将再次接收来自Azure事件中心的所有消息



我们有一个Azure事件中心,我使用Mule Azure服务总线连接器3.2.1与Anypoint studio 7.11.1和Mule 4。我们用mule 4.4.0创建了一个mule应用程序(代码如下(。连接工作正常,应用程序(通过azure服务总线"消息侦听器"(从事件中心获取特定主题的每条消息

我担心的是,当我重新启动应用程序时,它会收到来自事件中心的每一条消息,包括以前收到的消息

我试过添加azure服务巴士";接收";(所以希望应用程序下次不会再得到它(,但我得到了一个错误";AZURE-SERVICE-BUS-MESSAGING:LOCK_TOKEN_NOT_FOUND";由于lockToken是";00000000-0000-0000-0000-000000000";(它首先从消息侦听器获得(。我读过这篇文章:这篇文章https://help.mulesoft.com/s/article/Azure-Service-Bus-Connector-Abandon-Delivery-not-found-on-the-receive-link-Mule-4并将连接器版本更改为3.1.0;AZURE-SERVICE-BUS-MESSAGING:LOCK_TOKEN_NOT_FOUND";。以下是代码(添加azure服务总线"接收"(:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:azure-service-bus-messaging="http://www.mulesoft.org/schema/mule/azure-service-bus-messaging" xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd 
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/azure-service-bus-messaging http://www.mulesoft.org/schema/mule/azure-service-bus-messaging/current/mule-azure-service-bus-messaging.xsd">
<azure-service-bus-messaging:config name="Azure_Service_Bus_Messaging_Connector_Config" doc:name="Azure Service Bus Messaging Connector Config" doc:id="e2f8ebc0-a6c8-46ef-a1f4-cea60d5e6448" >
<azure-service-bus-messaging:sas-connection namespace="${eventhub.name}" sharedAccessKeyName="${eventhub.serviceKey}" sharedAccessKey="${eventhub.accesskey}" />
</azure-service-bus-messaging:config>
<configuration-properties doc:name="Configuration properties" doc:id="70ec97a1-f9bd-4178-a15e-89d4d1a37c13" file="${mule.env}.yaml" />
<global-property doc:name="Global Property" doc:id="5cc462ae-af0d-4702-a72d-e91d21efe2c3" name="mule.env" value="dev" />
<http:request-config name="HTTP_Request_configuration" doc:name="HTTP Request configuration" doc:id="6e1db046-7950-4242-aae7-47ff60f3834e" >
<http:request-connection host="localhost" port="8083" />
</http:request-config>
<flow name="sys-event-hub-listener-flow" doc:id="ae8e10fc-e1e2-4330-92f8-e05dfcbbe4ea" >
<azure-service-bus-messaging:message-listener doc:name="Message listener" doc:id="5850d0ae-d3a4-425b-93c9-e3cead3714f6" config-ref="Azure_Service_Bus_Messaging_Connector_Config" ackMode="AUTO" destinationName="user-crm/ConsumerGroups/user-crm-mulesoft-consumer-group/Partitions/0" numberOfConsumers="1" outputMimeType="application/json">
<reconnect />
</azure-service-bus-messaging:message-listener>
<ee:transform doc:name="Transform Message" doc:id="52fb5bfc-23ed-4927-a79c-e683ec2d06be" >
<ee:message >
</ee:message>
<ee:variables >
<ee:set-variable variableName="lockToken" ><![CDATA[%dw 2.0
output application/java
---
attributes.lockToken]]></ee:set-variable>
<ee:set-variable variableName="onePayload" ><![CDATA[%dw 2.0
output application/json
---
{
payload: payload,
hitId: attributes.properties.hitId.array,
topic: attributes.properties.kafka_topic.array,
timeStamp: attributes.properties.kafka_timestamp.array

}]]></ee:set-variable>

</ee:variables>
</ee:transform>
<logger level="INFO" doc:name="Logger" doc:id="ba7c30a0-6cde-4acd-9a9d-dafb7763b487" message="#[payload]"/>
<azure-service-bus-messaging:complete-message doc:name="Complete" doc:id="b3babc6b-2cdb-4109-ba39-08e8f2b287f7" config-ref="Azure_Service_Bus_Messaging_Connector_Config" lockToken="#[vars.lockToken]" />
<logger level="INFO" doc:name="Logger" doc:id="383f214a-1bed-4ab7-a324-5214f00d1da7" message="Sent Acknowledgement"/>

</flow>
<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="7e9718d6-12e3-46f2-a614-7106095c5985" >
<http:listener-connection host="localhost" port="8082" />
</http:listener-config>

</mule>

我在侦听器上尝试了AUTO(并删除了"Complete"或azure服务总线消息:Complete message(,错误如下:

DEBUG 2022-08-25 08:13:16,132 [ReactorThread73feee5b-9ec1-40de-ab19-2ff901632186] [processor: ; event: ] com.microsoft.azure.servicebus.amqp.ReceiveLinkHandler: onDelivery: linkName:Receiver_eb2ac7_075c0bf0913e4b858b98217424917420_G10S3, updatedLinkCredit:1, remoteCredit:0, remoteCondition:Error{condition=null, description='null', info=null}, delivery.isPartial:false
DEBUG 2022-08-25 08:13:16,132 [ReactorThread73feee5b-9ec1-40de-ab19-2ff901632186] [processor: ; event: ] com.microsoft.azure.servicebus.primitives.CoreMessageReceiver: Received a delivery '' from 'user-crm/ConsumerGroups/user-crm-mulesoft-consumer-group/Partitions/0'
DEBUG 2022-08-25 08:13:16,132 [ReactorThread73feee5b-9ec1-40de-ab19-2ff901632186] [processor: ; event: ] com.microsoft.azure.servicebus.primitives.CoreMessageReceiver: Received a message from 'user-crm/ConsumerGroups/user-crm-mulesoft-consumer-group/Partitions/0'. Adding to prefecthed messages.
DEBUG 2022-08-25 08:13:16,145 [pool-14-thread-9] [processor: ; event: ] com.microsoft.azure.servicebus.primitives.CoreMessageReceiver: Returning the message received from 'user-crm/ConsumerGroups/user-crm-mulesoft-consumer-group/Partitions/0' to a pending receive request
INFO  2022-08-25 08:13:16,145 [[MuleRuntime].uber.09: [sys-event-hub].Mule Azure Service Bus Messaging Connector - Message Listener Scheduler @54aa8c36] [processor: ; event: ] com.mulesoft.connector.azure.messaging.internal.source.ReceiverTask: null
DEBUG 2022-08-25 08:13:16,145 [[MuleRuntime].uber.09: [sys-event-hub].Mule Azure Service Bus Messaging Connector - Message Listener Scheduler @54aa8c36] [processor: ; event: ] com.microsoft.azure.servicebus.primitives.CoreMessageReceiver: Receiving maximum of '1' messages from 'user-crm/ConsumerGroups/user-crm-mulesoft-consumer-group/Partitions/0'
DEBUG 2022-08-25 08:13:16,145 [ReactorThread73feee5b-9ec1-40de-ab19-2ff901632186] [processor: ; event: ] com.microsoft.azure.servicebus.primitives.CoreMessageReceiver: Sent flow to the service. receiverPath:user-crm/ConsumerGroups/user-crm-mulesoft-consumer-group/Partitions/0, linkname:Receiver_eb2ac7_075c0bf0913e4b858b98217424917420_G10S3, updated-link-credit:1, sentCredits:1
INFO  2022-08-25 08:13:16,150 [[MuleRuntime].uber.07: [sys-event-hub].sys-event-hub-listener-flow.CPU_INTENSIVE @2f7f1931] [processor: sys-event-hub-listener-flow/processors/2; event: ae3f5e10-2477-11ed-a4e4-ac675d320d35] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: TimeStamp=2022-08-22T15:27:57.595 - 1661200077596
INFO  2022-08-25 08:13:16,151 [[MuleRuntime].uber.07: [sys-event-hub].sys-event-hub-listener-flow.CPU_INTENSIVE @2f7f1931] [processor: sys-event-hub-listener-flow/processors/3/route/0/processors/0; event: ae3f5e10-2477-11ed-a4e4-ac675d320d35] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Legit payload
INFO   [processor: sys-event-hub-listener-flow/processors/4; event: ae3f5e10-2477-11ed-a4e4-ac675d320d35] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Sent Acknowledgement
DEBUG [processor: ; event: ae3f5e10-2477-11ed-a4e4-ac675d320d35] com.microsoft.azure.servicebus.MessageReceiver: Completing message with lock token '00000000-0000-0000-0000-000000000000'
DEBUG 2022-08-25 08:13:16,154 [[MuleRuntime].uber.05: [sys-event-hub].sys-event-hub-listener-flow.BLOCKING @317bc21] [processor: ; event: ae3f5e10-2477-11ed-a4e4-ac675d320d35] com.microsoft.azure.servicebus.primitives.CoreMessageReceiver: Updating message state of delivery '????????????????' to 'Accepted{}'
INFO  2022-08-25 08:13:16,154 [[MuleRuntime].uber.05: [sys-event-hub].sys-event-hub-listener-flow.BLOCKING @317bc21] [processor: ; event: ae3f5e10-2477-11ed-a4e4-ac675d320d35] com.microsoft.azure.servicebus.primitives.CoreMessageReceiver: Delivery not found for delivery tag '????????????????'. Either receive link to 'user-crm/ConsumerGroups/user-crm-mulesoft-consumer-group/Partitions/0' closed with a transient error and reopened or the delivery was already settled by complete/abandon/defer/deadletter.
ERROR 2022-08-25 08:13:16,154 [[MuleRuntime].uber.05: [sys-event-hub].sys-event-hub-listener-flow.BLOCKING @317bc21] [processor: ; event: ae3f5e10-2477-11ed-a4e4-ac675d320d35] org.mule.runtime.core.internal.exception.OnErrorPropagateHandler: 
********************************************************************************
Message               : java.lang.IllegalArgumentException: Delivery not found on the receive link.
Element               : sys-event-hub-listener-flow/source @ sys-event-hub:sys-event-hub-app.xml:21 (Message listener)
Element DSL           : <azure-service-bus-messaging:message-listener doc:name="Message listener" doc:id="5850d0ae-d3a4-425b-93c9-e3cead3714f6" config-ref="Azure_Service_Bus_Messaging_Connector_Config" ackMode="AUTO" destinationName="user-crm/ConsumerGroups/user-crm-mulesoft-consumer-group/Partitions/0" numberOfConsumers="1" outputMimeType="application/json">
<reconnect></reconnect>
</azure-service-bus-messaging:message-listener>
Error type            : MULE:SOURCE_RESPONSE_SEND
FlowStack             : 
Payload Type          : [B
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.IllegalArgumentException: Delivery not found on the receive link.

您使用服务总线连接器连接到Event Hub,这两种服务有很大不同。使用Event Hub,您有一个数据流,需要为其提交偏移量,以便跟踪客户端当前接收的位置。以上操作是使用Event Hub SDK客户端通过将偏移量提交给存储帐户来处理的,然后客户端可以使用该帐户从上次提交的偏移量中提取数据。

由于服务总线的Mule连接器可能没有这个功能,所以每次重新启动客户端时,它都会从头开始拉。

我没有看到Mulesoft为Event Hub提供任何内置连接器,但是,您可以利用Kafka连接器https://docs.mulesoft.com/kafka-connector/4.6/

最新更新