我正在遵循与kafka一起使用flink的示例。我只找到类似此页面的结果,这些结果无法正确编译并提供曲折,难以查找错误消息。
基本上,当我尝试编译此片段时,我会收到错误:
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public final class Main {
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup ) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<String> consumer =
new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
return consumer;
}
}
这是我的依赖项等在build.gradle
文件中:
group 'myapp'
version '1.0-SNAPSHOT'
apply plugin: 'java'
sourceCompatibility = 1.8
repositories {
jcenter()
}
dependencies {
ecj 'org.eclipse.jdt.core.compiler:ecj:4.6.1'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.2.0'
compile group: 'org.apache.flink', name: 'flink-java', version: '1.5.0'
compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.5.0'
compile group: 'org.apache.flink', name: 'flink-avro', version: '1.8.0'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.5.0'
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.11_2.11', version: '1.5.0'
compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '1.1.0'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '1.1.0'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
}
使用构建工具运行代码时,这是错误:
$ gradle build
> Task :compileJava FAILED
/Users/john/dev/john/flink-example/src/main/java/com/company/opi/flinkexample/Main.java:55: error: cannot infer type arguments for FlinkKafkaConsumer011<>
new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
^
Note: /Users/john/dev/john/flink-example/src/main/java/com/company/opi/flinkexample/EnvironmentConfig.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
1 error
FAILURE: Build failed with an exception.
* What went wrong:
Execution failed for task ':compileJava'.
> Compilation failed; see the compiler error output for details.
* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.
* Get more help at https://help.gradle.org
BUILD FAILED in 3s
1 actionable task: 1 executed
这是他的源代码的链接。
一个问题:您使用的所有flink库应具有相同的版本编号 - 您似乎是在混合版本1.2.0、1.5.0和1.8.0。以下是将正确编译的更新依赖项和源代码。
(build.gradle(
group 'myapp'
version '1.0-SNAPSHOT'
apply plugin: 'java'
sourceCompatibility = 1.8
repositories {
jcenter()
}
dependencies {
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.8.0'
compile group: 'org.apache.flink', name: 'flink-java', version: '1.8.0'
compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.8.0'
compile group: 'org.apache.flink', name: 'flink-avro', version: '1.8.0'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.8.0'
compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.12', version: '1.8.0'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
}
(worketcode.java(
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public final class Main {
public static FlinkKafkaConsumer<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup ) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<String> consumer =
new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(),props);
return consumer;
}
}
也与编译错误无关,因为您使用的是Kafka 1.1,因此您不妨使用Flink的Kafka连接器的更新版本,而不是用于Kafka 0.11的版本。Flinkkafkaconsumer(名称中没有版本号的类(是Kafka 1.0.0及以后的合适连接器。