使用Apache Fink(Scala Code)的ECE Elasticsearch汇总验证



编译器错误使用Flink文档中提供的示例时。Flink文档提供了样本Scala代码,以设置与Elasticsearch交谈时设置REST客户端出厂参数,https://ci.apache.org/projects/flink/flink/flink/flink-docs-stable/dev/dev/connectors/elasticsearch.html。尝试此代码时,我会在Intellij中获得一个编译器错误,该错误说"无法解析符号RESTCLIENTBUILDER"。

我找到了以下内容,所以这正是我的问题,除了它在Java中,我在Scala中这样做。Apache Flink(v1.6.0(身份验证Elasticsearch水槽(v6.4(

我尝试复制上述提供的解决方案代码,因此将自动转换代码的编译器错误粘贴到Intellij中。

      // provide a RestClientFactory for custom configuration on the internally created REST client
      // i only show the setMaxRetryTimeoutMillis for illustration purposes, the actual code will use HTTP cutom callback
      esSinkBuilder.setRestClientFactory(
        restClientBuilder -> {
          restClientBuilder.setMaxRetryTimeoutMillis(10)
        }
      )

然后我尝试了(自动生成的Java到Intellij的Scala代码(

// provide a RestClientFactory for custom configuration on the internally created REST client// provide a RestClientFactory for custom configuration on the internally created REST client
      import org.apache.http.auth.AuthScope
      import org.apache.http.auth.UsernamePasswordCredentials
      import org.apache.http.client.CredentialsProvider
      import org.apache.http.impl.client.BasicCredentialsProvider
      import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
      import org.elasticsearch.client.RestClientBuilder
      // provide a RestClientFactory for custom configuration on the internally created REST client// provide a RestClientFactory for custom configuration on the internally created REST client
      esSinkBuilder.setRestClientFactory((restClientBuilder) => {
        def foo(restClientBuilder) = restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
          override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = { // elasticsearch username and password
            val credentialsProvider = new BasicCredentialsProvider
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(es_user, es_password))
            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
          }
        })
        foo(restClientBuilder)
      })

原始代码段产生错误"无法解析RESTCLIENTFACTORY",然后Java到Scala显示了其他几个错误。

因此,基本上我需要找到Apache Flink(v1.6.0(中描述的解决方案的Scala版本,请验证Elasticsearch sink(v6.4(


更新1 :我能够在Intellij的一些帮助下取得了一些进步。以下代码编译和运行,但还有另一个问题。

esSinkBuilder.setRestClientFactory(
          new RestClientFactory {
            override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
              restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
                  // elasticsearch username and password
                  val credentialsProvider = new BasicCredentialsProvider
                  credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(es_user, es_password))
                  httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
                  httpClientBuilder.setSSLContext(trustfulSslContext)
                }
              })
            }
          }

问题是我不确定是否应该做一个新的RESTCLIENTFACTORY对象。发生的事情是该应用程序连接到Elasticsearch群集,但随后发现SSL证书无效,因此我不得不放置TrustfullsslContext(如下所述我已经过去了SSL问题,但是现在ES REST客户端进行了PING测试,并且PING失败了,它会引发异常和应用程序关闭。我怀疑ping由于SSL错误而失败,也许它不是使用TrustfulsslContext I设置为新RESTCLIENTFACTORY的一部分,这使我怀疑我不应该做新的,应该有一种简单的方法来更新该方法现有的RESTCLIENTFACTORY对象,基本上这一切都在发生,因为我缺乏Scala知识。

很高兴地报告解决此问题。我在更新1 中发布的代码是正确的。ECE的ping没有工作有两个原因:

  1. 证书需要包括完整的链条,包括根CA,中级CA和ECE证书。这有助于摆脱整个TrustfulsslContext的东西。

  2. ECE坐在HA-Proxy后面,代理在HTTP请求中对主机名进行了映射到ECE中实际部署群集名称。此映射逻辑没有考虑到Java REST高级别客户端使用org.apache.httphost类,该类别将主机名创建为主机名:port_number即使端口号为443。由于它由于443而没有找到映射因此,ECE返回了404个错误,而不是200个错误(找到此问题的唯一方法是在HA-Proxy上查看未加密的数据包(。一旦固定了ha-proxy中的映射逻辑,就找到了映射,并且pings已成功。

最新更新