我正在尝试使用此示例将Nifi连接到Flink:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8090/nifi")
.portName("Data for Flink")
.requestBatchCount(5)
.buildConfig();
SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2);
DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
@Override
public String map(NiFiDataPacket value) throws Exception {
return new String(value.getContent(), Charset.defaultCharset());
}
});
dataStream.print();
env.execute();
我将Nifi作为具有默认属性的独立服务器运行,但以下属性除外:
nifi.remote.input.host=localhost
nifi.remote.input.secure=false
nifi.remote.input.socket.port=8090
nifi.remote.input.http.enabled=true
每次调用都失败,Nifi 中出现以下登录:
[Site-to-Site Worker Thread-24] o.a.nifi.remote.SocketRemoteSiteListener
Unable to communicate with remote instance null due to
org.apache.nifi.remote.exception.HandshakeException: Handshake
with nifi://localhost:61680 failed because the Magic Header
was not present; closing connection
Nifi 版本: 1.7.1, Flink 版本: 1.7.1
使用nifi-toolkit
后,我删除了nifi.remote.input.socket.port
的自定义值,然后将transportProtocol(SiteToSiteTransportProtocol.HTTP)
添加到我的SiteToSiteClientConfig
中,并http://localhost:8080/nifi
作为 URL。
我首先更改端口的原因是,如果不指定协议HTTP
默认情况下它将使用RAW
。当从 Flink 端使用 RAW
协议时,客户端无法创建Transaction
并打印以下警告:
Unable to refresh Remote Group's peers due to Remote instance of NiFi
is not configured to allow RAW Socket site-to-site communications
这就是为什么我认为这是一个端口问题
所以现在使用 Nifi 的默认配置,这可以按预期工作:
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("portNameAsInNifi")
.transportProtocol(SiteToSiteTransportProtocol.HTTP)
.requestBatchCount(1)
.buildConfig();