编译器错误使用Flink文档中提供的示例时。Flink文档提供了样本Scala代码,以设置与Elasticsearch交谈时设置REST客户端出厂参数,https://ci.apache.org/projects/flink/flink/flink/flink-docs-stable/dev/dev/connectors/elasticsearch.html。尝试此代码时,我会在Intellij中获得一个编译器错误,该错误说"无法解析符号RESTCLIENTBUILDER"。
我找到了以下内容,所以这正是我的问题,除了它在Java中,我在Scala中这样做。Apache Flink(v1.6.0(身份验证Elasticsearch水槽(v6.4(
我尝试复制上述提供的解决方案代码,因此将自动转换代码的编译器错误粘贴到Intellij中。
// provide a RestClientFactory for custom configuration on the internally created REST client
// i only show the setMaxRetryTimeoutMillis for illustration purposes, the actual code will use HTTP cutom callback
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setMaxRetryTimeoutMillis(10)
}
)
然后我尝试了(自动生成的Java到Intellij的Scala代码(
// provide a RestClientFactory for custom configuration on the internally created REST client// provide a RestClientFactory for custom configuration on the internally created REST client
import org.apache.http.auth.AuthScope
import org.apache.http.auth.UsernamePasswordCredentials
import org.apache.http.client.CredentialsProvider
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.client.RestClientBuilder
// provide a RestClientFactory for custom configuration on the internally created REST client// provide a RestClientFactory for custom configuration on the internally created REST client
esSinkBuilder.setRestClientFactory((restClientBuilder) => {
def foo(restClientBuilder) = restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = { // elasticsearch username and password
val credentialsProvider = new BasicCredentialsProvider
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(es_user, es_password))
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
}
})
foo(restClientBuilder)
})
原始代码段产生错误"无法解析RESTCLIENTFACTORY",然后Java到Scala显示了其他几个错误。
因此,基本上我需要找到Apache Flink(v1.6.0(中描述的解决方案的Scala版本,请验证Elasticsearch sink(v6.4(
更新1 :我能够在Intellij的一些帮助下取得了一些进步。以下代码编译和运行,但还有另一个问题。
esSinkBuilder.setRestClientFactory(
new RestClientFactory {
override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
// elasticsearch username and password
val credentialsProvider = new BasicCredentialsProvider
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(es_user, es_password))
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
httpClientBuilder.setSSLContext(trustfulSslContext)
}
})
}
}
问题是我不确定是否应该做一个新的RESTCLIENTFACTORY对象。发生的事情是该应用程序连接到Elasticsearch群集,但随后发现SSL证书无效,因此我不得不放置TrustfullsslContext(如下所述我已经过去了SSL问题,但是现在ES REST客户端进行了PING测试,并且PING失败了,它会引发异常和应用程序关闭。我怀疑ping由于SSL错误而失败,也许它不是使用TrustfulsslContext I设置为新RESTCLIENTFACTORY的一部分,这使我怀疑我不应该做新的,应该有一种简单的方法来更新该方法现有的RESTCLIENTFACTORY对象,基本上这一切都在发生,因为我缺乏Scala知识。
很高兴地报告解决此问题。我在更新1 中发布的代码是正确的。ECE的ping没有工作有两个原因:
-
证书需要包括完整的链条,包括根CA,中级CA和ECE证书。这有助于摆脱整个TrustfulsslContext的东西。
-
ECE坐在HA-Proxy后面,代理在HTTP请求中对主机名进行了映射到ECE中实际部署群集名称。此映射逻辑没有考虑到Java REST高级别客户端使用org.apache.httphost类,该类别将主机名创建为主机名:port_number即使端口号为443。由于它由于443而没有找到映射因此,ECE返回了404个错误,而不是200个错误(找到此问题的唯一方法是在HA-Proxy上查看未加密的数据包(。一旦固定了ha-proxy中的映射逻辑,就找到了映射,并且pings已成功。