为什么一次断开连接会多次触发stomp listener_on_disconnect方法



我有一个STOMP 1.2连接的侦听器(继承自stomp.ConnectionListener(。它目前订阅了3个队列。以下是连接和订阅代码。

当ActiveMQ 5.16.2服务器关闭时,侦听器的on_disconnected函数似乎被多次触发,尽管其间没有进行重新连接尝试。

知道为什么会这样吗?这与订阅数量有任何相关性吗?

更新:根据要求添加一个可重复的最小示例。此处不能包含主机、端口或任何凭据。

server.py:

import stomp
from time import sleep
if __name__ == '__main__':
conn = stomp.Connection12([(__host, __port)])
conn.connect(__user, __password, wait=True)
while(True):
now = datetime.now().strftime("%Y%m%d-%H%M%S")
conn.send(destination=__topic_name1, body='topic1: test information 1 '+ now)
print("sent topic 1")
time.sleep(SLEEP_TIME)
conn.send(destination=__topic_name2, body='topic2: test information 2 '+ now)
print("sent topic 2")
time.sleep(SLEEP_TIME)
conn.send(destination=__topic_name3, body='topic3: test information 3 '+ now)
print("sent topic 3")
time.sleep(SLEEP_TIME)
time.sleep(10)
conn.disconnect()

receiver.py:

import stomp
from time import sleep
HEARTBEATS = 30000
class MyListener(stomp.ConnectionListener):
def set_conn(self, conn):
self._conn = conn
def on_connected(self, frame):
print(f"On connected: {frame}")
def on_disconnected(self):
print("disconnected")
def on_message(self, frame):
print(f"on message = {frame}")
class Connection():
def __init__(self, listener):
self._conn = stomp.Connection12([(HOST,PORT)], heartbeats=(HEARTBEATS, HEARTBEATS))
self._conn.set_listener("My listener", listener)
def connect_and_subscribe(self, destinations):
for _id, dest in enumerate(destinations, 1):
client_id = dest.strip(".01")
self._conn.connect(USER, PASSWORD, wait=True,headers={"client-id": client_id})
self._conn.subscribe(
destination=dest, id=_id, ack="auto"
)
print(f"Subscribed to {dest}")
def main():
listener = MyListener()
conn = Connection(listener)
listener.set_conn(conn)
destinations = ["FirstTopic.01", "SecondTopic.01", "ThirdTopic.01", "FourthTopic.01"]
conn.connect_and_subscribe(destinations)
while True:
sleep(5)
if __name__ == "__main__":
main()

我只控制ActiveMQ服务器一次。这是输出:

On connected: {cmd=CONNECTED,headers=[{'server': 'ActiveMQ/5.16.2', 'heart-beat': '30000,30000', 'session': 'FirstTopic', 'version': '1.2'}],body=}
Subsribed to FirstTopic.01
Subsribed to SecondTopic.01
Subsribed to ThirdTopic.01
Subsribed to FourthTopic.01
disconnected
disconnected
^CTraceback (most recent call last):
File "receiver.py", line 53, in <module>
main()
File "receiver.py", line 50, in main
sleep(5)
KeyboardInterrupt

正如您所看到的,on_disconnected被触发的次数与目的地的数量不匹配。我也在触发器中看到过4次,有几次。

AMQ调试日志。

2021-07-26 16:04:22,776 [0.1:60474@61613] TRACE StompIO                        - Received:
STOMP
accept-version:1.2
login:admin
heart-beat:30000,30000
host:127.0.0.1
client-id:FirstTopic
passcode:*****

2021-07-26 16:04:22,778 [0.1:60474@61613] DEBUG StompInactivityMonitor         - Stomp Inactivity Monitor read check interval: 30000ms, write check interval: 30000ms
2021-07-26 16:04:22,781 [0.1:60474@61613] DEBUG ProtocolConverter              - Stomp Connect heartbeat conf RW[30000,30000]
2021-07-26 16:04:22,791 [0.1:60474@61613] TRACE StompIO                        - Sending:
CONNECTED
server:ActiveMQ/5.16.2
heart-beat:30000,30000
session:FirstTopic
version:1.2

2021-07-26 16:04:22,793 [0.1:60474@61613] TRACE StompIO                        - Received:
SUBSCRIBE
ack:auto
destination:FirstTopic.01
id:1

2021-07-26 16:04:22,793 [0.1:60474@61613] TRACE LegacyFrameTranslator          - New Composite Destination name: queue://FirstTopic.01
2021-07-26 16:04:22,806 [0.1:60474@61613] TRACE StompIO                        - Received:
STOMP
accept-version:1.2
login:admin
heart-beat:30000,30000
host:127.0.0.1
client-id:SecondTopic
passcode:*****

2021-07-26 16:04:22,806 [0.1:60474@61613] WARN  ProtocolConverter              - Exception occurred for client FirstTopic (tcp://127.0.0.1:60474) processing: STOMP -> org.apache.activemq.transport.stomp.ProtocolException: Already c
onnected.
2021-07-26 16:04:22,807 [0.1:60474@61613] DEBUG ProtocolConverter              - Exception detail
org.apache.activemq.transport.stomp.ProtocolException: Already connected.
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompConnect(ProtocolConverter.java:745)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:254)
at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.lang.Thread.run(Thread.java:748)
2021-07-26 16:04:22,809 [0.1:60474@61613] TRACE ProtocolConverter              - Command that caused the error: STOMP
accept-version:1.2
login:admin
heart-beat:30000,30000
host:127.0.0.1
client-id:SecondTopic
passcode:*****
2021-07-26 16:04:22,809 [0.1:60474@61613] TRACE StompIO                        - Sending:
ERROR
content-type:text/plain
message:Already connected.
org.apache.activemq.transport.stomp.ProtocolE...d.java:748)
2021-07-26 16:04:22,810 [0.1:60474@61613] TRACE StompIO                        - Received:
SUBSCRIBE
ack:auto
destination:SecondTopic.01
id:2

2021-07-26 16:04:22,810 [0.1:60474@61613] TRACE LegacyFrameTranslator          - New Composite Destination name: queue://SecondTopic.01
2021-07-26 16:04:22,814 [0.1:60474@61613] TRACE StompIO                        - Received:
STOMP
accept-version:1.2
login:admin
heart-beat:30000,30000
host:127.0.0.1
client-id:ThirdTopic
passcode:*****

2021-07-26 16:04:22,814 [0.1:60474@61613] WARN  ProtocolConverter              - Exception occurred for client FirstTopic (tcp://127.0.0.1:60474) processing: STOMP -> org.apache.activemq.transport.stomp.ProtocolException: Already c
onnected.
2021-07-26 16:04:22,814 [0.1:60474@61613] DEBUG ProtocolConverter              - Exception detail
org.apache.activemq.transport.stomp.ProtocolException: Already connected.
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompConnect(ProtocolConverter.java:745)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:254)
at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.lang.Thread.run(Thread.java:748)
2021-07-26 16:04:22,814 [0.1:60474@61613] TRACE ProtocolConverter              - Command that caused the error: STOMP
accept-version:1.2
login:admin
heart-beat:30000,30000
host:127.0.0.1
client-id:ThirdTopic
passcode:*****

2021-07-26 16:04:22,815 [0.1:60474@61613] TRACE StompIO                        - Sending:
ERROR
content-type:text/plain
message:Already connected.
org.apache.activemq.transport.stomp.ProtocolE...d.java:748)
2021-07-26 16:04:22,815 [0.1:60474@61613] TRACE StompIO                        - Received:
SUBSCRIBE
ack:auto
destination:ThirdTopic.01
id:3

2021-07-26 16:04:22,815 [0.1:60474@61613] TRACE LegacyFrameTranslator          - New Composite Destination name: queue://ThirdTopic.01
2021-07-26 16:04:22,818 [0.1:60474@61613] TRACE StompIO                        - Received:
STOMP
accept-version:1.2
login:admin
heart-beat:30000,30000
host:127.0.0.1
client-id:FourthTopic
passcode:*****

2021-07-26 16:04:22,818 [0.1:60474@61613] WARN  ProtocolConverter              - Exception occurred for client FirstTopic (tcp://127.0.0.1:60474) processing: STOMP -> org.apache.activemq.transport.stomp.ProtocolException: Already c
onnected.
2021-07-26 16:04:22,818 [0.1:60474@61613] DEBUG ProtocolConverter              - Exception detail
org.apache.activemq.transport.stomp.ProtocolException: Already connected.
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompConnect(ProtocolConverter.java:745)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:254)
at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.lang.Thread.run(Thread.java:748)
2021-07-26 16:04:22,819 [0.1:60474@61613] TRACE ProtocolConverter              - Command that caused the error: STOMP
accept-version:1.2
login:admin
heart-beat:30000,30000
host:127.0.0.1
client-id:FourthTopic
passcode:*****

2021-07-26 16:04:22,819 [0.1:60474@61613] TRACE StompIO                        - Sending:
ERROR
content-type:text/plain
message:Already connected.
org.apache.activemq.transport.stomp.ProtocolE...d.java:748)
2021-07-26 16:04:22,819 [0.1:60474@61613] TRACE StompIO                        - Received:
SUBSCRIBE
ack:auto
destination:FourthTopic.01
id:4

2021-07-26 16:04:22,819 [0.1:60474@61613] TRACE LegacyFrameTranslator          - New Composite Destination name: queue://FourthTopic.01

好的,所以问题是为它订阅的每个目的地调用self._conn.connect。所以我不得不将connect_and_subscribe修改为这个。

def connect_and_subscribe(self, destinations):
**self._conn.connect(USER, PASSWORD, wait=True,headers={"client-id": "client-id"})**
for _id, dest in enumerate(destinations, 1):
self._conn.subscribe(
destination=dest, id=_id, ack="auto"
)
print(f"Subscribed to {dest}")

现在,它只断开一次连接。

Subscribed to FirstTopic.01
Subscribed to SecondTopic.01
Subscribed to ThirdTopic.01
Subscribed to FourthTopic.01
disconnected
^CTraceback (most recent call last):
File "receiver-correct.py", line 54, in <module>
main()
File "receiver-correct.py", line 51, in main
sleep(5)
KeyboardInterrupt

相关内容

最新更新