在 C# 控制台应用程序中保持 ActiveMQ 侦听器处于活动状态并在关闭之前优雅地释放资源的正确方法是什么?



我在这里找到了我想要的大部分内容 ActiveMQ - 监听器事件触发后我是否需要重新订阅队列?,但我无法弄清楚如何保持侦听器运行,除了使用 while(true( 循环,我认为必须有更好的方法来保持侦听器处于活动状态,同时如果需要,我有能力优雅地处理所有进程以停止应用程序。用户 Tim Bish 肯定地回答了 reckface 的陈述"这是否意味着 Listener 事件将在没有 while 循环的情况下为每条消息触发?",但对于我的生活,我无法弄清楚如何在没有 while(true( 循环的情况下实现它。

using System;
using System.Collections.Generic;
using System.Threading;
using System.Text;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.Util;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using System.Runtime.Serialization.Json;
using System.IO;
using System.Configuration;
using System.Data;
using System.Data.SqlClient;
using System.Web;

namespace ActiveMQConnectionTest
{
class Program : IDisposable
{
private static IConnection connection;
private static ISession session;
private static SqlConnection sqlConn;
private static ActiveMQMessage msg;
private static MessageConsumer consumer;
private static DateTime timeStamp;
private static AutoResetEvent semaphore = new AutoResetEvent(false);
private static TimeSpan receiveTimeout = TimeSpan.FromSeconds(10);
static string un = ConfigurationManager.AppSettings["AMQUserName"];
static string pwd = ConfigurationManager.AppSettings["AMQPassword"];
static string url = ConfigurationManager.AppSettings["url"];
static string queue = ConfigurationManager.AppSettings["queue"];
private static string oldMsgId;

Program() 
{
AppDomain.CurrentDomain.ProcessExit += CurrentDomain_ProcessExit;
sqlConn = new SqlConnection(ConfigurationManager.AppSettings["SQLConn"].ToString());
System.Uri uri = new Uri(url);
IConnectionFactory factory = new ConnectionFactory(uri);

try
{
connection = factory.CreateConnection(un, pwd);
connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
ActiveMQDestination dest = session.GetDestination(queue) as ActiveMQDestination;
consumer = session.CreateConsumer(dest) as MessageConsumer;
}
catch (NMSConnectionException ex)
{
Console.Write(ex.Message);
connection.Dispose();
}
try
{
connection.Start();
Console.WriteLine("Connection Started...");
Console.WriteLine("Session Created....");
}
catch (ConnectionFailedException ex)
{
connection.Close();
Console.Write(ex.Message);
}
}
~Program()
{
Dispose(false);
}
protected void Dispose(Boolean itIsSafeToAlsoFreeManagedObjects)
{
if (itIsSafeToAlsoFreeManagedObjects)
{
if (connection != null)
{
connection.Dispose();
}
if (session != null)
{
session.Dispose();
}
if (consumer != null)
{
consumer.Dispose();
}
}
}
public void Dispose()
{
Dispose(true); 
}       
static void ShutDown()
{
session.Close();
if (connection.IsStarted)
{
connection.Stop();
connection.Close();
connection.Dispose();
}
}
protected static void consumer_Listener(IMessage messasge)
{
messasge.Acknowledge();
msg = (ActiveMQMessage)messasge;
if (msg.MessageId.ToString() != oldMsgId)
{
oldMsgId = msg.MessageId.ToString();
msg.Acknowledge();
if (msg == null)
{
Console.WriteLine("No message received!");
}
else
{
Console.WriteLine("Received message with ID: " + msg.NMSMessageId);
Console.WriteLine("Received message with conetent: " + msg.ToString());
try
{
string s = ASCIIEncoding.ASCII.GetString(msg.Content);
timeStamp = DateTime.Now;
DataContractJsonSerializer deserializer = new DataContractJsonSerializer(typeof(I280Message));
var ms = new MemoryStream(msg.Content);
I280Message rows = (I280Message)deserializer.ReadObject(ms);
int MessageId = InsertPerson(rows.Person);
semaphore.Set();
}
catch (NMSException ex)
{
ShutDown();
Console.WriteLine(ex.Message);
}
}
}
else {
Console.WriteLine("Same old message....");
}

}

private static int InsertPerson(Person person)
{
using (SqlConnection sqlConn = new SqlConnection(ConfigurationManager.AppSettings["SQLConn"]))
{
using (SqlCommand sqlCmd = new SqlCommand("I280MessagePerson_tbl_isp", sqlConn))
{
sqlCmd.CommandType = CommandType.StoredProcedure;
sqlCmd.Parameters.AddWithValue("@BirthDate", person.BirthDate);
sqlCmd.Parameters.AddWithValue("@Gender", person.Gender);
sqlCmd.Parameters.AddWithValue("@VisaPermitType", person.VisaPermitType, null);
sqlCmd.Parameters.AddWithValue("@CitizenshipStatus", person.CitizenshipStatus, null);
sqlCmd.Parameters.AddWithValue("@ConfidentialFlag", person.ConfidentialFlag);
sqlCmd.Parameters.AddWithValue("@DeceasedFlag", person.DeceasedFlag, null);
sqlCmd.Parameters.AddWithValue("@TimeStamp", timeStamp);
SqlParameter paramPersonId = new SqlParameter("@MessageId", SqlDbType.Int);
paramPersonId.Direction = ParameterDirection.Output;
sqlCmd.Parameters.Add(paramPersonId);


sqlConn.Open();
try
{
sqlCmd.ExecuteNonQuery();
return (int)(sqlCmd.Parameters["@MessageId"].Value);
}
catch (SqlException ex)
{
Console.WriteLine(ex.Message);
if (sqlConn.State == ConnectionState.Open) sqlConn.Close();
return -1;
}
}
}

}
static void Main(string[] args)
{           
using (Program pr = new Program())
{
consumer.Listener += new MessageListener(consumer_Listener);
}

//while (true)
//{
//    consumer.Listener += new MessageListener(consumer_Listener);
//    semaphore.WaitOne((int)receiveTimeout.TotalMilliseconds, true);
//}
//catch (NMSException ex)
//{
//    ShutDown();
//    Console.WriteLine(ex.Message);
//}
// Console.ReadLine();

}

}
public static class SqlParameterCollectionExtensions
{
public static SqlParameter AddWithValue(this SqlParameterCollection target, string parameterName, object value, object nullValue)
{
if (value == null || (string)value == "")
{
return target.AddWithValue(parameterName, nullValue ?? DBNull.Value);
}
return target.AddWithValue(parameterName, value);
}
}

}

所以基本上你的问题是你允许你的程序退出。 发生这种情况时,操作系统将回收在程序运行期间分配的任何内存,包括程序对象和使用者侦听器委托。

您在上一个问题中的朋友指出,如果您将 ActiveMQ 侦听器存储为成员变量并将其保留在范围内,您应该能够接收任意数量的消息,只要您想要,而无需在每次收到消息时添加新侦听器。

所以现在剩下要做的就是防止程序退出。有许多方法可以做到这一点:

  1. 您可以将程序更改为Winforms应用程序,如其他Stack Overflow帖子中所述。 这将导致创建消息循环。

  2. 您可以从控制台读取字符。 这是一个阻止调用;程序将等到用户按下一个键。 同时,您的 ActiveMQ 应该仍然能够接收事件。

  3. 您可以使用while (iStillWantToReceiveMessages) { }

这不是问题的答案。但我写这篇文章是为了让其他遇到与我类似的问题的人在遇到问题时可以参考答案。

但是我遇到了一个问题,即与 ActiveMQ(使用 C#(的连接处于活动状态,没有异常,没有错误,但客户端仍然没有收到服务器发布的任何消息。

在 Apache Site 学习了一段时间后,我可以发现这是由于超时而发生的。我使用以下代码行修复了它:-

brokerUri += "?transport.useInactivityMonitor=false&transport.useKeepAlive=true";

其中 brokerUri 是我的 activeMq uri。

最新更新