使用QPid和golang包装Electron连接到AMQP 1.0 Azure EventHub



我想使用Qpid proton-c库的Electron golang包装器连接到Azure EventHub。

我正在将以下SASL详细信息组合到构建连接字符串所需的主机/端口/命名空间/路径中,但由于某种原因,我一直收到错误消息:connection reset by peer

package main
import (
"fmt"
"os"
"strings"
"qpid.apache.org/amqp"
"qpid.apache.org/electron"
)
var (
eventHubNamespaceName = "<MY_CUSTOM_NAMESPACE>"
eventHubName = "<MY_CUSTOM_NAME>"
eventHubSasKeyName = "<MY_CUSTOM_SAS_KEY_NAME>"
eventHubSasKey = "<MY_CUSTOM_SAS_KEY>" // this is the base64 encoded stuff
)
func main() {
sentChan := make(chan electron.Outcome) // Channel to receive acknowledgements.
container := electron.NewContainer(fmt.Sprintf("send[%v]", os.Getpid()))
urlStr := fmt.Sprintf("amqp://%s.servicebus.windows.net:5671/%s", eventHubNamespaceName, eventHubName)
fmt.Printf("The URL connection string: '%v'n", urlStr)
// parse URL
url, err := amqp.ParseURL(urlStr)
if err != nil {
panic(err)
}
fmt.Printf("The AMQP parsed URL: %vn", url)
// TCP dial
amqpHost := url.Host
fmt.Printf("The AMQP host used in the connection is: '%v'n", amqpHost)
c, err := container.Dial(
"tcp", amqpHost, 
electron.SASLEnable(), 
electron.Password([]byte(eventHubSasKey)), 
electron.User(eventHubSasKeyName),
)
if err != nil {
panic(err)
}
defer c.Close(nil)
// AMQP send
addr := strings.TrimPrefix(url.Path, "/")
s, err := c.Sender(electron.Target(addr))
if err != nil {
panic(err)
}
m := amqp.NewMessage()
body := fmt.Sprintf("bla bla bla %v", 42)
m.Marshal(body)
fmt.Printf("The AMQP message body: '%v'n", m.Body())
go s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan
// AMQP ACK receive
fmt.Printf("Waiting for ACKs...n")
for {
fmt.Printf("Waiting for an ACK coming out of the channel...n")
out := <-sentChan // Outcome of async sends.
fmt.Printf("Received something: '%v'n", out)
}   
}

当编译,然后运行代码时,这是输出:

The URL connection string: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP parsed URL: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP host used in the connection is: '<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671'
The AMQP message body: 'bla bla bla 42'
Waiting for ACKs...
Waiting for an ACK coming out of the channel...
Received something: '{unsent : read tcp <MY_PRIVATE_IP_IN_LAN>:<SOME_PORT>-><THE_NSLOOKUP_IP_OF_THE_AZURE_EVENTHUB>:5671: read: connection reset by peer bla bla bla 42}'
Waiting for an ACK coming out of the channel...

对我来说,收到的消息说connection reset by peer看起来不是有效的ACK,我不确定连接尝试出了什么问题?

  • proton-c的编译版本是0.18.0,我使用的是go1.7.4 linux/amd64
  • 如果我将electron.SASLAllowedMechs("EXTERNAL")添加到连接选项中,则会收到相同的错误消息
  • 如果我将端口更改为5672,则在尝试通过TCP拨号后会出现connection refused死机错误
  • 如果我用base64.StdEncoding.DecodeString(eventHubSasKey)解码base64密码字段,并将字节传递给连接选项,我会得到相同的错误connection reset by peer
  • 如果我添加这个连接选项electron.SASLAllowedMechs("ANONYMOUS"),那么我仍然会得到相同的错误消息connection reset by peer。这样做的原因是我没有使用任何SSL证书,而微软提供的AMQP的Java包装器似乎使用了这种"匿名"的东西而不是证书(事实上,使用Java连接器连接到EventHub不需要证书)

我不确定如何在这里继续,因为我被困在连接部分,我相信SASL详细信息是按照这里的文档以正确的方式传递的:https://godoc.org/qpid.apache.org/electron#ConnectionOption

我仍然不确定失败的原因不是由于SSL证书,如果是这样的话,我很难知道如何将它们包括在过程中。

编辑:

后来我发现,即使我没有提供任何私钥/公钥对,也必须通过TCP建立TLS连接,同时指定一个"虚拟主机"(否则AMQP会抱怨无法识别主机):

// TLS connection details
tlsConfig := &tls.Config{}
eventHubDomainPort := fmt.Sprintf("%s.servicebus.windows.net:5671", eventHubNamespaceName)
tlsConn, err := tls.Dial("tcp", eventHubDomainPort, tlsConfig)
if err != nil {
panic(err)
}
// AMPQ container connection on top of TLS via TCP
eventHubDomain := fmt.Sprintf("%s.servicebus.windows.net", eventHubNamespaceName)
amqpConn, err := container.Connection(
tlsConn, 
electron.SASLEnable(),
electron.User(eventHubSasKeyName), 
electron.Password([]byte(eventHubSasKey)),
electron.VirtualHost(eventHubDomain),
// electron.SASLAllowedMechs(<SOME_MECHANISM>),
)
if err != nil {
panic(err)
}
defer amqpConn.Close(nil)
// AMQP sender (a AMQP link with target the name defined on the Azure portal)
s, err := amqpConn.Sender(electron.Target(eventHubName))
if err != nil {
panic(err)
}

然而,当使用环境变量PN_TRACE_FRM=true运行应用程序时(这给了我一些proton-c级别的详细日志记录),现在错误是:

[handle=0, closed=true, error=@error(29) [condition=:"amqp:unauthorized-access", description="Unauthorized access. 'Send' claim(s) are required to perform this operation. Resource: 'sb://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>'. TrackingId:<SOME_UUID-ISH_HERE>, SystemTracker:<A_LABEL_HERE>, Timestamp:10/25/2017 4:02:58 PM"]]

这个afaik意味着SASL详细信息(用户名/密码)必须是"sender"类型,因为我正在尝试向事件中心发送内容。我在Azure门户网站上仔细检查了这些详细信息(单击"共享访问策略">,然后使用指定为"发送"的"声明"策略),它们是正确的。所以我不知道为什么会出现这个错误。

实际上,我在不同级别(<MY_CUSTOM_NAMESPACE><MY_CUSTOM_NAME>)尝试了Azure门户上定义的这些SASL策略,但总是出现相同的错误消息。

我还尝试包括各种SASL机制,例如,当使用electron.SASLAllowedMechs("PLAIN")时,我得到了这个错误:no mechanism available: No worthy mechs found (Authentication failed [mech=none])

在端口为5671的urlStr中使用"amqps"方案。事件集线器不允许纯tcp连接。您还需要启用SASL PLAIN来发送在命名空间或事件集线器实体上配置的SAS密钥(用户名=密钥名称,密码=密钥)(看起来您已经在这么做了)。我不确定golang,但有了Python绑定,就可以把所有东西都放在这样的Uri中。"amqps://sas-key-name:url-encoded-key@your-namespace.servicebus.windows.net:5671"。端口号是可选的。

如果底层的proton-c引擎看到不同的支持SASL机制,它可能不会使用SASL PLAIN。要强制执行PLAIN,可以在容器上设置允许的机制。在go中,SASLAllowedMechs函数似乎为您提供了一个可以在创建连接时提供的连接选项。

这是与事件中心配合良好的Python代码。

我设法使用AMQP之上的"基于声明的授权"(CBS)建立了连接。这似乎是微软特有的东西。一些详细信息可以在本页底部找到:https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-protocol-guide

基本上这是步骤列表:

  • electron.VirtualHost(eventHubDomain)ANONYMOUSSASL机制electron.SASLAllowedMechs("ANONYMOUS")的TLS连接(无需指定SASL用户名和密码)。请查看我上面问题的编辑部分的详细信息^
  • 特殊$cbs事件中心名称的AMQP链接:cbsLink, err := amqpConnection.Sender(electron.Target("$cbs"))
  • 准备一条AMQP消息,其中包含Microsoft对CBS握手的要求:

消息属性(检查此C#代码以进行比较https://github.com/Azure/amqpnetlite/blob/master/Examples/ServiceBus/Scenarios/CbsAsyncExample.cs):

appProps := make(map[string]interface{})
appProps["operation"] = "put-token"
appProps["type"] = "servicebus.windows.net:sastoken"
appProps["name"] = "amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>"

SAS令牌按照微软想要的方式格式化,我修改了这段代码:https://github.com/michaelbironneau/asbclient/blob/master/azure.go这边:

aqClient := newClient(Queue, "<MY_CUSTOM_NAMESPACE>", "<MY_CUSTOM_SAS_KEY_NAME>", "<MY_CUSTOM_SAS_KEY>")
sasToken := aqClient.authHeader("amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>", aqClient.signatureExpiry(time.Now()))

这段代码^基于python SDK:https://github.com/Azure/azure-sdk-for-python/blob/master/azure-servicebus/azure/servicebus/servicebusservice.py包含很多东西,比如大写/小写URL编码,混合了用于过期的时间戳以及SASL用户名和密码。

构建AMQP消息导入"qpid.apache.org/amqp":

cbsHandshakeMsg := amqp.NewMessage()
cbsHandshakeMsg.SetApplicationProperties(appProps)
cbsHandshakeMsg.Marshal(sasToken)
  • outcome := cbsLink.SendSync(cbsHandshakeMsg)发送这个AMQP消息,然后神奇地,您应该在一段时间内通过事件中心的身份验证
  • 首先将AMQP链接设置为要连接的事件中心名称:msgSender, err := amqpConnection.Sender(electron.Target("<MY_CUSTOM_NAME>"))

现在您可以通过以下方式使用最后一个AMQP链接发送您想要发送的消息:

m := amqp.NewMessage()
m.Marshal("my message: bla bla bla, foo bar baz!")
outcome := msgSender.SendSync(m)

完成:)

使用环境变量PN_TRACE_FRM=true运行此代码有助于对AMQP进行故障排除,因为proton-c库记录了许多有用的调试消息。

由于某种原因,在连接尝试期间直接传递SASL用户名和密码的AMQPPLAIN机制不适用于事件中心。我不确定这可能是他们或Electron/Qpid库的问题,但现在至少有人能够使用golang和他们提供的CBSMicrosoft协议发送消息。

需要TLS,如azure AMQP协议指南所述。

设置连接和TLS后,服务总线提供两个SASL机制选项:

  1. SASL PLAIN通常用于传递用户名和密码到服务器的凭据。Service Bus没有帐户,但是命名的共享访问安全规则,授予权限与密钥关联。规则的名称用作用户名并且密钥(作为base64编码的文本)被用作密码。这个与所选规则关联的权限管理允许的操作在连接上
  2. SASL ANONYMOUS用于在客户端希望使用基于声明的安全性(CBS)模型稍后描述。使用此选项,客户端连接可以在短时间内匿名建立,在此期间客户可以仅与CBS端点交互,并且CBS握手必须完整

我们可以选择SASL PLAIN或CBS进行身份验证,以PLAIN为例,我对您的代码进行了一些修改,它按预期工作。神奇的部分是下面的连接选项:

amqpConn, err := container.Connection(
tlsConn,
electron.SASLEnable(),
electron.Password([]byte(eventHubSasKey)),
electron.User(eventHubSasKeyName),
electron.VirtualHost(eventHubDomain),
electron.SASLAllowInsecure(true),
electron.SASLAllowedMechs("PLAIN"),
)

SASLAllowInsecure返回ConnectionOption,该ConnectionOption允许或不允许明文SASL身份验证机制,如果我们选择使用SASL PLAIN,则该机制应设置为true。

希望能有所帮助。

最新更新