需要在Azure服务结构上实现iCommunicationListener的Zeromq实现



我正在寻找ICommunicationListener的Zeromq实现,我可以将其与服务结构一起使用,以在Azure上运行Zeromq端点。

我看了几个小时,找不到。有人知道解决方案吗?我目前使用"服务应用程序面料/.NET Core 2.0无状态服务"模板,
这使我能够覆盖
IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
当我拥有Zeromq的ICommunicationListener实现时,
或覆盖Task RunAsync(CancellationToken cancellationToken)
当我想自己设置插座时。

我的第一次尝试将无法使用:

protected override async Task RunAsync(CancellationToken cancellationToken)
{
    using (var server = new ResponseSocket("tcp://xx.xx.xx.xx:xxxxx"))
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            var message = server.ReceiveFrameBytes();
            ServiceEventSource.Current.ServiceMessage(this.Context, "Message {0}",
                System.Text.Encoding.UTF8.GetString(message));
        }
    }
}

上述结果是无法启动的服务。除此之外找不到太多记录:

"在CodePackage激活过程中存在错误。服务主机用退出代码终止:255"

如果不存在,则可以通过创建ICommunicationListener的实现并从CreateServiceInstanceListeners返回它来创建自己的。使用OpenAsync打开通道并开始侦听。使用CloseAsync停止聆听。

请查看此服务总线的实现,以获取灵感。

这是Zeromq实现ICommunicationListener实现的粗略示例。该实现将充当Zeromq ResponseSocket,但可以轻松地更改为RequestSocketSubscriberSocket或您喜欢的任何类型的NetMQ.Sockets.*套接字实现。当然,在实现过程中,它将需要更多的细节,例如没有在检索消息时引发例外,但是应该清楚地了解其完成方式。它极大地受到ICommunicationListener接口的现有dotnetcore实现的启发。

public class ZeroMqResponseSocketCommunicationListener : ICommunicationListener, IDisposable
{
    private readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();
    private readonly ResponseSocket _responseSocket = new ResponseSocket();
    private readonly ServiceContext _serviceContext;
    private readonly string _endpointName;
    public ZeroMqResponseSocketCommunicationListener(ServiceContext serviceContext, string endpointName)
    {
        if (string.IsNullOrEmpty(endpointName))
            throw new ArgumentException("endpointName cannot be null or empty string.");
        _serviceContext = serviceContext;
        _endpointName = endpointName;
    }
    public Task<string> OpenAsync(CancellationToken cancellationToken)
    {
        var address = GetListenerUrl();
        if (address == null)
            throw new InvalidOperationException("No Url returned from ZeroMqResponseSocketCommunicationListener.GetListenerUrl");

        _responseSocket.Bind(address);
        ThreadPool.QueueUserWorkItem(state => MessageHandler(_cancellationToken.Token));
        return Task.FromResult(address);
    }
    public Task CloseAsync(CancellationToken cancellationToken)
    {
        _responseSocket.Close();
        return Task.FromResult(true);
    }
    public void Abort()
    {
        _responseSocket.Close();
    }
    private void MessageHandler(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            var message = _responseSocket.ReceiveFrameBytes();
            if (message != null)
                throw new Exception($"Message {Encoding.UTF8.GetString(message)}");
        }
    }
    private string GetListenerUrl()
    {
        var endpoints = _serviceContext.CodePackageActivationContext.GetEndpoints();
        if (!endpoints.Contains(_endpointName))
            throw new InvalidOperationException($"{_endpointName} not found in Service Manifest.");
        var serviceEndpoint = _serviceContext.CodePackageActivationContext.GetEndpoint(_endpointName);
        if (string.IsNullOrEmpty(serviceEndpoint.IpAddressOrFqdn))
            throw new InvalidOperationException("IpAddressOrFqdn not set on endpoint");
        if (serviceEndpoint.Port <= 0)
            throw new InvalidOperationException("Port not set on endpoint");
        var listenUrl = $"{serviceEndpoint.Protocol.ToString().ToLower()}://{serviceEndpoint.IpAddressOrFqdn}:{serviceEndpoint.Port}";
        return listenUrl;
    }
    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }
    protected virtual void Dispose(bool disposing)
    {
        if (!disposing || _responseSocket == null) return;
        try
        {
            _responseSocket.Close();
            _responseSocket.Dispose();
        }
        catch (Exception ex)
        {
            ServiceEventSource.Current.Message(ex.Message);
        }
    }
}

并返回您的应用程序织物中的ZeromqresponsocketCommunicationListener:

protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
{
    yield return new ServiceInstanceListener(listener => new ZeroMqResponseSocketCommunicationListener(listener, "EndpointName"));
}

确保您在服务的serviceManifest.xml中指定了一个端点:

<Resources>
  <Endpoints>
    <Endpoint Name="EndpointName" Port="80" Protocol="tcp" />
  </Endpoints>
</Resources>

最新更新