flink 错误 在类路径中找不到适合'org.apache.flink.table.factories.BatchTableSourceFactory'的表工厂



我是Apache Flink的新手,我正在尝试读取一个Avro文件,如下所示,

val schema = new Schema()
.field("tconst", "string")
.field("titleType", "string")
.field("primaryTitle", "string")
.field("originalTitle", "string")
.field("isAdult", "int")
.field("startYear", "string")
.field("endYear", "string")
.field("runtimeMinutes", "int")
.field("genres", "string")
val avroFormat: Avro = new Avro()
.avroSchema(
"{" +
"  "type": "record"," +
"  "name": "test"," +
"  "fields" : [" +
"    {"name": "tconst", "type": "string"}," +
"    {"name": "titleType", "type": "string"}" +
"    {"name": "primaryTitle", "type": "string"}" +
"    {"name": "originalTitle", "type": "string"}" +
"    {"name":   "isAdult", "type": "int"}" +
"    {"name": "startYear", "type": "string"}" +
"    {"name": "endYear", "type": "string"}" +
"    {"name": "runtimeMinutes", "type": "int"}" +
"    {"name": "genres", "type": "string"}" +
"  ]" +
"}"
)
tableEnv.connect(new FileSystem().path("/Users/x/Documents/test_1.avro"))
.withSchema(schema)
.withFormat(avroFormat)
.registerTableSource("sink")

但当我运行这个时,我得到了以下错误。

Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.BatchTableSourceFactory' in
the classpath.
Reason: No context matches.
The following properties are requested:
connector.path=/Users/x/Documents/test_1.avro
connector.property-version=1
connector.type=filesystem
format.avro-schema=.... // above schema
format.property-version=1
format.type=avro
schema.0.name=tconst
schema.0.type=string
schema.1.name=titleType
schema.1.type=string
schema.2.name=primaryTitle
schema.2.type=string
schema.3.name=originalTitle
schema.3.type=string
schema.4.name=isAdult
schema.4.type=int
schema.5.name=startYear
schema.5.type=string
schema.6.name=endYear
schema.6.type=string
schema.7.name=runtimeMinutes
schema.7.type=int
schema.8.name=genres
schema.8.type=string
The following factories have been considered:
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.formats.avro.AvroRowFormatFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory

在这个Avro文件中,它有一个Flink Dataset,并使用AvroOutputFormat来写入文件。

val avroOutputFormat = new AvroOutputFormat[Row](classOf[Row])
flinkDatase.write(avroOutputFormat, "/Users/x/Documents/test_1.avro").setParallelism(1)

我在想,如果是错误类型的数据,可能会导致上述错误。有没有办法确定这方面的确切问题?

很抱歉误导了您。遗憾的是,到目前为止,文件系统连接器不支持Avro。

因此,除了使用数据集API之外别无选择。我建议使用avrohugger为您的avro模式生成一个合适的scala类。

// convert to your scala class
val dsTuple: DataSet[User] = tableEnv.toDataSet[User](table)
// write out
val avroOutputFormat = new AvroOutputFormat<>(User.class)
avroOutputFormat.setCodec(Codec.SNAPPY)
avroOutputFormat.setSchema(User.SCHEMA$)
specificUser.write(avroOutputFormat, outputPath1)

最新更新