apache flink java.lang.IllegalArgumentException:当试图写AvroParq



我有以下flink程序:

object StreamToHive {
def main(args: Array[String]) {
val builder = KafkaSource.builder[MyEvent]
builder.setBootstrapServers("localhost:29092")
builder.setProperty("partition.discovery.interval.ms", "10000")
builder.setTopics("myevent")
builder.setBounded(OffsetsInitializer.latest)
builder.setStartingOffsets(OffsetsInitializer.earliest)
builder.setDeserializer(KafkaRecordDeserializationSchema.of(new MyEventSchema))
val source = builder.build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val streamSource = env
.fromSource[MyEvent](source, WatermarkStrategy.noWatermarks[MyEvent](), "Kafka Source")
val sink: StreamingFileSink[MyEvent] = StreamingFileSink
.forBulkFormat(new Path("hdfs://localhost:50070/mydata"),
AvroParquetWriters.forReflectRecord[MyEvent](classOf[MyEvent])
)
.build()
streamSource.addSink(sink)
env.execute()
}
}

但是对apache flink java.lang.IllegalArgumentException: Invalid lambda deserialization执行此操作失败。我想我完全错了,但是什么?必须做什么才能将POJO实例写入HDFS实例?读卡夫卡都没问题。

类MyEvent定义如下:

class MyEvent() extends Serializable{
@JsonProperty("id")
var id:String = null
@JsonProperty("timestamp")
var timestamp:Date = null
}

namenode正在运行以下docker-compose服务:

namenode:
image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
container_name: namenode
volumes:
- ./hdfs/namenode:/hadoop/dfs/name
environment:
- CLUSTER_NAME=hive
env_file:
- ./hive/hadoop-hive.env
ports:
- "50070:50070"
networks:
- shared
datanode:
image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
container_name: datanode
volumes:
- ./hdfs/datanode:/hadoop/dfs/data
env_file:
- ./hive/hadoop-hive.env
environment:
SERVICE_PRECONDITION: "namenode:50070"
depends_on:
- namenode
ports:
- "50075:50075"
networks:
- shared
hive-server:
image: bde2020/hive:2.3.2-postgresql-metastore
container_name: hive-server
env_file:
- ./hive/hadoop-hive.env
environment:
HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: "jdbc:postgresql://hive-metastore/metastore"
SERVICE_PRECONDITION: "hive-metastore:9083"
depends_on:
- hive-metastore
ports:
- "10000:10000"
networks:
- shared
hive-metastore:
image: bde2020/hive:2.3.2-postgresql-metastore
container_name: hive-metastore
env_file:
- ./hive/hadoop-hive.env
command: /opt/hive/bin/hive --service metastore
environment:
SERVICE_PRECONDITION: "namenode:50070 datanode:50075 hive-metastore-postgresql:5432"
depends_on:
- hive-metastore-postgresql
ports:
- "9083:9083"
networks:
- shared
hive-metastore-postgresql:
image: bde2020/hive-metastore-postgresql:2.3.0
container_name: hive-metastore-postgresql
volumes:
- ./metastore-postgresql/postgresql/data:/var/lib/postgresql/data
depends_on:
- datanode
networks:
- shared

SJ,你可能在maven的前端有这个依赖项。

<dependency>-->
<groupId>org.apache.flink</groupId>-->
<artifactId>flink-connector-hive_2.12</artifactId>-->
<version>${flink.version}</version>-->
<scope>compile</scope>-->
</dependency>

这个依赖项将在你想使用的真正的AvroParquetWriter.class中引入org.apache.flink.hive.shaded.parquet.avro.AvroParquetWriter

最新更新