将Flink 1.3.2升级到1.4.0 Hadoop文件系统和路径问题



我最近尝试将Flink 1.3.2升级到1.4.0,并且我遇到了一些问题,无法再导入org.apache.hadoop.fs.{FileSystem, Path}。这个问题发生在两个地方:

parquetwriter:

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.flink.streaming.connectors.fs.Writer
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
class AvroWriter[T <: GenericRecord]() extends Writer[T] {
  @transient private var writer: ParquetWriter[T] = _
  @transient private var schema: Schema = _
  override def write(element: T): Unit = {
    schema = element.getSchema
    writer.write(element)
  }
  override def duplicate(): AvroWriter[T] = new AvroWriter[T]()
  override def close(): Unit = writer.close()
  override def getPos: Long = writer.getDataSize
  override def flush(): Long = writer.getDataSize
  override def open(fs: FileSystem, path: Path): Unit = {
    writer = AvroParquetWriter.builder[T](path)
      .withSchema(schema)
      .withCompressionCodec(CompressionCodecName.SNAPPY)
      .build()
  }
}

CustomBucketer:

import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer
import org.apache.flink.streaming.connectors.fs.Clock
import org.apache.hadoop.fs.{FileSystem, Path}
import java.io.ObjectInputStream
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.avro.generic.GenericRecord
import scala.reflect.ClassTag
class RecordFieldBucketer[T <: GenericRecord: ClassTag](dateField: String = null, dateFieldFormat: String = null, bucketOrder: Seq[String]) extends Bucketer[T] {
  @transient var dateFormatter: SimpleDateFormat = _
  private def readObject(in: ObjectInputStream): Unit = {
    in.defaultReadObject()
    if (dateField != null && dateFieldFormat != null) {
      dateFormatter = new SimpleDateFormat(dateFieldFormat)
    }
  }
  override def getBucketPath(clock: Clock, basePath: Path, element: T): Path = {
    val partitions = bucketOrder.map(field => {
      if (field == dateField) {
        field + "=" + dateFormatter.format(new Date(element.get(field).asInstanceOf[Long]))
      } else {
        field + "=" + element.get(field)
      }
    }).mkString("/")
    new Path(basePath + "/" + partitions)
  }
}

我注意到Flink现在有:

import org.apache.flink.core.fs.{FileSystem, Path}

但是新的Path似乎与AvroParquetWritergetBucketPath方法不起作用。我知道Flink的文件系统和Hadoop依赖性发生了一些变化,我只是不确定需要导入什么才能使我的代码再次工作。

我什至需要使用hadoop依赖项,还是现在有不同的写作方法和将镶木木材文件的方式写入S3?

build.sbt:

val flinkVersion = "1.4.0"
libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
  "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
  "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
  "org.apache.flink" % "flink-metrics-core" % flinkVersion,
  "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
  "org.apache.kafka" %% "kafka" % "0.10.0.1",
  "org.apache.avro" % "avro" % "1.7.7",
  "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
  "org.apache.parquet" % "parquet-avro" % "1.8.1",
  "io.confluent" % "kafka-avro-serializer" % "3.2.2",
  "com.fasterxml.jackson.core" % "jackson-core" % "2.9.2"
)

构建" hadoop fre-flink"是1.4版本的主要特征。您要做的就是将Hadoop依赖项包括在您的类路径或引用Changelogs:

...这也意味着,如果您使用与HDF的连接器(例如bucketingsink或RollingSink)为您的应用程序构建JAR文件。

在Hadoop-Commons项目中找到必要的org.apache.hadoop.fs.{FileSystem, Path}类。

相关内容

  • 没有找到相关文章

最新更新