我正在尝试运行一个简单的程序,该程序从一个 kinesis 流中读取,执行微不足道的转换,并将结果写入另一个 kinesis 流。
在 Flink 1.4.0 上本地运行(这是目前 EMR 支持的版本,因此无法升级(。
这是代码:
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val consumerConfig = new Properties()
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
val kinesisMaps = env.addSource(new FlinkKinesisConsumer[String](
"source-stream", new SimpleStringSchema, consumerConfig))
val jsonMaps = kinesisMaps.map { jsonStr => JSON.parseFull(jsonStr).get.asInstanceOf[Map[String, String]] }
val values = jsonMaps.map(jsonMap => jsonMap("field_name"))
values.print()
val producerConfig = new Properties()
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
val kinesisProducer = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig)
kinesisProducer.setFailOnError(true)
kinesisProducer.setDefaultStream("target-stream")
kinesisProducer.setDefaultPartition("0")
values.addSink(kinesisProducer)
// execute program
env.execute("Flink Kinesis")
}
如果我注释掉生产代码,程序将按预期运行并打印正确的值。
一旦我添加了生产者代码,我就会得到以下异常:
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.DaemonException: The child process has been shutdown and can no longer accept messages.
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:176)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:477)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.invoke(FlinkKinesisProducer.java:248)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:608)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:569)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:486)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:264)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:210)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
知道这是什么原因吗?
显然,这是 Flink 1.4 中使用的旧版 Amazon KPL 的问题。
对此,至少有两种可能的解决方案:
-
升级到 Flink 1.5 版本。 如果您按照此处所述在自定义 EMR 安装部分下安装它,您仍然可以在 EMR 上使用它: https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html
-
在为 Flink 1.4 构建 Kinesis 连接器时,您可以使用较新的 AWS 依赖项构建它:我从 1.5 中挑选了连接器
pom.xml
aws
依赖项更改,并用它们构建了连接器。看起来它按预期工作。