Apache Flink AWS S3水槽是否需要Hadoop进行本地测试



我对Apache Flink是相对较新的,并且我正在尝试创建一个简单的项目,该项目将文件生成AWS S3存储桶。根据文档,似乎我必须安装Hadoop才能做到这一点。

如何设置本地环境以允许我测试此功能?我在本地安装了Apache Flink以及Hadoop。我已经为hadoop的core site.xml配置添加了必要的更改,还将我的hadoop_conf路径添加到我的flink.yaml配置中。当我尝试通过Flink UI在本地提交工作时,我总是会出现错误

2016-12-29 16:03:49,861 INFO  org.apache.flink.util.NetUtils                                - Unable to allocate on port 6123, due to error: Address already in use
2016-12-29 16:03:49,862 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to run JobManager.
java.lang.RuntimeException: Unable to do further retries starting the actor system
    at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2203)
    at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2143)
    at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:2040)
    at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

我假设我缺少一些设置环境的东西。可以在本地这样做吗?任何帮助,将不胜感激。

当您需要Hadoop库时,您不必安装Hadoop即可在本地运行并写入S3。我刚刚尝试通过编写基于AVRO模式的镶木quet输出,并生成了S3的特定记录。我正在通过SBT和Intellij Idea在本地运行以下代码的版本。需要零件:

1)有以下文件指定所需的hadoop属性(注意:不建议使用AWS访问密钥/秘密密钥。最好在具有适当的IAM角色的EC2实例上运行,以读/写入您的S3存储桶。但要进行测试需要)

<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>
    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>
    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
    <property>
        <name>fs.s3a.access.key</name>
        <value>YOUR_ACCESS_KEY</value>
    </property>
    <!-- set your AWS access key -->
    <property>
        <name>fs.s3a.secret.key</name>
        <value>YOUR_SECRET_KEY</value>
    </property>
</configuration>

2)进口: 导入com.uebercomputing.eventrecord.eventonlyrecord

import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.avro.AvroParquetOutputFormat

3)Flink Code使用hadoopoutputformat和上述配置:

    val events: DataSet[(Void, EventOnlyRecord)] = ...
    val hadoopConfig = getHadoopConfiguration(hadoopConfigFile)
    val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord]
    val outputJob = Job.getInstance
    //Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T]
    //so key is Void, value of type T - EventOnlyRecord in this case
    val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord](
      outputFormat,
      outputJob
    )
    val outputConfig = outputJob.getConfiguration
    outputConfig.addResource(hadoopConfig)
    val outputPath = new Path("s3://<bucket>/<dir-prefix>")
    FileOutputFormat.setOutputPath(outputJob, outputPath)
    AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema)
    events.output(hadoopOutputFormat)
    env.execute
    ...
    def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = {
      val hadoopConfig = new HadoopConfiguration()
      hadoopConfig.addResource(new Path(hadoodConfigPath))
      hadoopConfig
    }

4)构建使用的依赖项和版本:

    val awsSdkVersion = "1.7.4"
    val hadoopVersion = "2.7.3"
    val flinkVersion = "1.1.4"
    val flinkDependencies = Seq(
      ("org.apache.flink" %% "flink-scala" % flinkVersion),
      ("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
    )
    val providedFlinkDependencies = flinkDependencies.map(_ % "provided")
    val serializationDependencies = Seq(
      ("org.apache.avro" % "avro" % "1.7.7"),
      ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
      ("org.apache.parquet" % "parquet-avro" % "1.8.1")
    )
    val s3Dependencies = Seq(
      ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
      ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
    )

编辑使用WriteAsText到S3:

1)使用文件core-site.xml创建Hadoop配置目录(将其称为Hadoop-Conf-Dir)。

例如:

mkdir /home/<user>/hadoop-config
cd /home/<user>/hadoop-config
vi core-site.xml
#content of core-site.xml 
<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>
    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>
    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
    <property>
        <name>fs.s3a.access.key</name>
        <value>YOUR_ACCESS_KEY</value>
    </property>
    <!-- set your AWS access key -->
    <property>
        <name>fs.s3a.secret.key</name>
        <value>YOUR_SECRET_KEY</value>
    </property>
</configuration>

2)用文件flink-conf.yaml创建一个目录(将其称为flink-conf-dir)。

例如:

mkdir /home/<user>/flink-config
cd /home/<user>/flink-config
vi flink-conf.yaml
//content of flink-conf.yaml - continuing earlier example
fs.hdfs.hadoopconf: /home/<user>/hadoop-config

3)编辑用于运行S3 Flink作业的Intellij Run配置 - 运行 - 编辑配置 - 并添加以下环境变量:

FLINK_CONF_DIR and set it to your flink-conf-dir
Continuing the example above:
FLINK_CONF_DIR=/home/<user>/flink-config

4)使用该环境变量集运行代码:

events.writeAsText("s3://<bucket>/<prefix-dir>")
env.execute

我必须执行以下操作才能在本地运行我的flink作业,从而下沉到S3:

1-添加了flink-s3-fs-hadoop-1.9.1。

2-修改的flink/conf/flink-conf.yaml包括s3.Access-key:aws_access_keys3.secret-key:aws_secret_keyfs.hdfs.hadoopconf:/etc/hadoop-config

i在Hadoop-Config文件夹中具有core-site.xml文件,但不包含任何配置,因此可能不需要fs.hdfs.hadoopconf。

在SBT中,我只需要添加S3库依赖项即可像本地文件系统一样使用它

SBT文件:

"org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion.value

阅读示例:

    public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<String> text = env.readTextFile("s3://etl-data-ia/test/fileStreamTest.csv");
    text.print();
    env.execute("test");}

基于该链接https://ci.apache.org/projects/flink/flink/flink/flink/flink-docs-release-1.13/docs/deployment/filesyment/filesystems/s3/#hadoppresto-s3--s3--s3--s3--s3--文件系统 - plugins

要使用flink-s3-fs-hadoop插件,您应该复制各自的jar从OPT目录到Flink的插件目录的文件启动Flink之前分发。

我知道的另一种方式是通过环境变量启用它enable_built_in_plugins =&quot; flink-s3-fs-hadoop- [flink-version] .jar&quot;

例如:flink-s3-fs-hadoop-1.12.2.2.jar

这两种方式都必须在flink-conf.yaml文件中定义S3配置

Flink将内部转换为fs.s3a.connection.maximim。无需使用Hadoop的XML配置文件传递配置参数。

s3.endpoint: <end-point>
s3.path.style.access : true

至于AWS凭证,必须在环境变量或。在flink-conf.yaml

中配置
s3.endpoint: <end-point>
s3.path.style.access : true
s3.access-key: <key>
s3.secret-key: <value>
s3.region: <region>

一旦全部设置,您就可以从@eyalp中提到的S3读取,或写入S3(即使用DataSet)

dataset.map(new MapToJsonString())
                .writeAsText("s3://....",
                        FileSystem.WriteMode.OVERWRITE);

如果您想在本地进行测试(没有真实AWS帐户),我建议您检查LocalStack。它完全支持各种AWS服务(包括S3)。如果您使用它,则无需AWS凭据(可能提供空),端点本身将是LocalStack本身。

相关内容

  • 没有找到相关文章

最新更新