RabbitMQ RPC异步方式



我正在为一个客户做一个项目,他正在使用RabbitMQ和RPC,我对RabbitMQ不太了解,我很难在互联网上找到一些像样的例子。我需要实现一些异步操作,我会更好地解释自己。。

在当前状态下,我有一个生产者发送RPC请求并等待消费者的回答,到目前为止一切都很好。我的问题是,我不想等待答案,我仍然需要答案,但我不想在我的制片人那里等待。我将在这里发布我的生产者和消费者代码。

生产者

using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitProducer
{
public class RpcClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props;
public RpcClient()
{
ConnectionFactory factory = new ConnectionFactory() { HostName = "192.168.68.17" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.ConfirmSelect();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
props = channel.CreateBasicProperties();
string correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
string response = Encoding.UTF8.GetString(body);
if (ea.BasicProperties.CorrelationId == correlationId)
{
respQueue.Add(response);
}
};
channel.BasicAcks += (sender, ea) =>
{
};
channel.BasicNacks += (sender, ea) =>
{
};
}
public string Call(string message)
{
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
exchange: "",
routingKey: "Ciccio",
basicProperties: props,
body: messageBytes);
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
return respQueue.Take();
}
public void Close()
{
connection.Close();
}
}
class Program
{
public static void Main()
{
RpcClient rpcClient = new RpcClient();
Random random = new Random();
int a = random.Next(10, 50);
Console.WriteLine("Ciccio");
Console.WriteLine(a.ToString());
string response = rpcClient.Call(a.ToString());
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
Console.ReadLine();
}
}
}

消费者

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
namespace RabbitConsumer
{
class Program
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "192.168.68.17" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "Ciccio", durable: false,
exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "Ciccio",
autoAck: false, consumer: consumer);
Console.WriteLine("Ciccio");
Console.WriteLine(" [x] Awaiting RPC requests");
consumer.Received += (model, ea) =>
{
string response = null;
System.Threading.Thread.Sleep(5000);
var body = ea.Body.ToArray();
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
try
{
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine(" [.] fib({0})", message);
response = fib(n).ToString();
}
catch (Exception e)
{
Console.WriteLine(" [.] " + e.Message);
response = "";
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
basicProperties: replyProps, body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
}
};
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}

private static int fib(int n)
{
if (n == 0 || n == 1)
{
return n;
}
return fib(n - 1) + fib(n - 2);
}
}
}

好吧,显然我最近工作太多了,忘记了正确使用任务。。。这就是你如何实现我想要的。

ASYNC生产商

using System;
using System.Collections.Concurrent;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitProducer
{
public class RpcClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props;
public RpcClient()
{
ConnectionFactory factory = new ConnectionFactory() { HostName = "192.168.68.17" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.ConfirmSelect();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
props = channel.CreateBasicProperties();
string correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
string response = Encoding.UTF8.GetString(body);
if (ea.BasicProperties.CorrelationId == correlationId)
{
respQueue.Add(response);
}
};
channel.BasicAcks += (sender, ea) =>
{
};
channel.BasicNacks += (sender, ea) =>
{
};
}
public string Call(string message)
{
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
exchange: "",
routingKey: "Ciccio",
basicProperties: props,
body: messageBytes);
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
return respQueue.Take();            
}
public void Close()
{
connection.Close();
}
}
class Program
{
private static void Send(int n)
{
Console.WriteLine("SEND " + n);
Task taskA = new Task(() =>
{
RpcClient rpcClient = new RpcClient();
string resp = rpcClient.Call(n.ToString());
Console.WriteLine(n + " [.] Got '{0}'", resp);
rpcClient.Close();
});
taskA.Start();
}
public static void Main()
{
Random random = new Random();
int a = random.Next(10, 50);
Send(a);
int b = random.Next(10, 50);
Send(b);
int c = random.Next(10, 50);
Send(c);
Console.ReadLine();
}
}
}

完全async的方法是使用TaskCompletionSource。看见https://gigi.nullneuron.net/gigilabs/abstracting-rabbitmq-rpc-with-taskcompletionsource/例如。

通过这种方式,可以将逻辑抽象到一个中心位置,并在整个应用程序中使用,就像使用HttpClient.GetAsync()一样。

类似于:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Concurrent;
using System.Text;
using System.Threading.Tasks;
namespace RabbitProducer
{
public class RpcClient : IDisposable
{
private bool disposed = false;
private IConnection connection;
private IModel channel;
private EventingBasicConsumer consumer;
private ConcurrentDictionary<string,
TaskCompletionSource<string>> pendingMessages;
private string responseQueueName;
private const string exchangeName = ""; // default exchange
public RpcClient()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
this.connection = factory.CreateConnection();
this.channel = connection.CreateModel();
this.responseQueueName = this.channel.QueueDeclare().QueueName;
this.consumer = new EventingBasicConsumer(this.channel);
this.consumer.Received += Consumer_Received;
this.channel.BasicConsume(responseQueueName, true, consumer);
this.pendingMessages = new ConcurrentDictionary<string,
TaskCompletionSource<string>>();
}
public Task<string> SendAsync(string message)
{
var tcs = new TaskCompletionSource<string>();
var correlationId = Guid.NewGuid().ToString();
this.pendingMessages[correlationId] = tcs;
this.Publish(message, correlationId);
return tcs.Task;
}
private void Publish(string message, string correlationId)
{
var props = this.channel.CreateBasicProperties();
props.CorrelationId = correlationId;
props.ReplyTo = responseQueueName;
byte[] messageBytes = Encoding.UTF8.GetBytes(message);
this.channel.BasicPublish(exchangeName, "Cissio", props, messageBytes);
Console.WriteLine($"Sent: {message} with CorrelationId {correlationId}");
}
private void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
var correlationId = e.BasicProperties.CorrelationId;
var message = Encoding.UTF8.GetString(e.Body.ToArray());
Console.WriteLine($"Received: {message} with CorrelationId {correlationId}");
this.pendingMessages.TryRemove(correlationId, out var tcs);
if (tcs != null)
tcs.SetResult(message);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposed && disposing)
{
this.channel?.Dispose();
this.connection?.Dispose();
}
this.disposed = true;
}
public static async Task Main()
{
using var rpcClient = new RpcClient();
Random random = new Random();
var n = random.Next(10, 50);
var response = await rpcClient.SendAsync(n.ToString());
Console.WriteLine(n + " [.] Got '{0}'", response);
n = random.Next(10, 50);
response = await rpcClient.SendAsync(n.ToString());
Console.WriteLine(n + " [.] Got '{0}'", response);
n = random.Next(10, 50);
response = await rpcClient.SendAsync(n.ToString());
Console.WriteLine(n + " [.] Got '{0}'", response);
Console.ReadLine();
}
}
}

最新更新