无法从 AWS Kinesis 接收数据



我使用以下代码构建一个 Flink-Kinesis 连接器:

public class DemoKinesisCA {
public static void main(String[] args) throws Exception {
ParameterTool pt = ParameterTool.fromArgs(args);
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setParallelism(1);
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "cn-north-1");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "${my_key}");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "${my_secret_key}");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
"${my_stream}",
new SimpleStringSchema(),
consumerConfig));
kinesis.print();
see.execute();
}
}

代码可以正常运行,日志中找不到异常。我已将初始位置设置为trim_horizon这意味着从最早的数据中消耗,但我可以从 Kinesis 接收 0 个字节。有人告诉我这不是一条空溪,所以有什么问题......

任何帮助,不胜感激。

日志如下所示:

2019-12-20 09:21:22,814 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Receive slot request 00b96cfdd8c98b09635926e3643210f3 for job c5449594afa59fde3826b82d6034a71b from resource manager with leader id 00000000000000000000000000000000.
2019-12-20 09:21:22,815 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Allocated slot for 00b96cfdd8c98b09635926e3643210f3.
2019-12-20 09:21:22,815 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Add job c5449594afa59fde3826b82d6034a71b for job leader monitoring.
2019-12-20 09:21:22,815 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Try to register at job manager akka.tcp://flink@localhost:6123/user/jobmanager_6 with leader id 00000000-0000-0000-0000-000000000000.
2019-12-20 09:21:22,819 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Resolved JobManager address, beginning registration
2019-12-20 09:21:22,819 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Registration at JobManager attempt 1 (timeout=100ms)
2019-12-20 09:21:22,826 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Successful registration at job manager akka.tcp://flink@localhost:6123/user/jobmanager_6 for job c5449594afa59fde3826b82d6034a71b.
2019-12-20 09:21:22,826 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Establish JobManager connection for job c5449594afa59fde3826b82d6034a71b.
2019-12-20 09:21:22,826 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Offer reserved slots to the leader of job c5449594afa59fde3826b82d6034a71b.
2019-12-20 09:21:22,830 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Activate slot 00b96cfdd8c98b09635926e3643210f3.
2019-12-20 09:21:22,837 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received task Source: Custom Source -> Sink: Print to Std. Out (1/1).
2019-12-20 09:21:22,837 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) switched from CREATED to DEPLOYING.
2019-12-20 09:21:22,837 INFO  org.apache.flink.runtime.taskmanager.Task                     - Creating FileSystem stream leak safety net for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) [DEPLOYING]
2019-12-20 09:21:22,837 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) [DEPLOYING].
2019-12-20 09:21:22,838 INFO  org.apache.flink.runtime.blob.BlobClient                      - Downloading c5449594afa59fde3826b82d6034a71b/p-c6e596cf391265b05e5e166e9448a1b36bc8fb74-756c152250698cb8c1c63a809f05d4b3 from localhost/127.0.0.1:44568
2019-12-20 09:21:23,011 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) [DEPLOYING].
2019-12-20 09:21:23,012 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) switched from DEPLOYING to RUNNING.
2019-12-20 09:21:23,013 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
2019-12-20 09:21:23,037 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber does not contain a setter for field sequenceNumber
2019-12-20 09:21:23,037 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2019-12-20 09:21:23,038 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - No restore state for FlinkKinesisConsumer.
2019-12-20 09:21:23,779 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='my_stream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49599347585804022687811337727581452022704930306761162754,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
2019-12-20 09:21:23,780 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='my_stream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49599347585804022687811337727581452022704930306761162754,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0

欢迎来到 Java 世界,在这里"${my_stream}"没有被评估,可能指向一个不存在的主题。

如果my_stream是一个参数,那么您需要使用pt.get("my_stream").

相关内容

  • 没有找到相关文章

最新更新