我对Apache Flink是相对较新的,并且我正在尝试创建一个简单的项目,该项目将文件生成AWS S3存储桶。根据文档,似乎我必须安装Hadoop才能做到这一点。
如何设置本地环境以允许我测试此功能?我在本地安装了Apache Flink以及Hadoop。我已经为hadoop的core site.xml配置添加了必要的更改,还将我的hadoop_conf路径添加到我的flink.yaml配置中。当我尝试通过Flink UI在本地提交工作时,我总是会出现错误
2016-12-29 16:03:49,861 INFO org.apache.flink.util.NetUtils - Unable to allocate on port 6123, due to error: Address already in use
2016-12-29 16:03:49,862 ERROR org.apache.flink.runtime.jobmanager.JobManager - Failed to run JobManager.
java.lang.RuntimeException: Unable to do further retries starting the actor system
at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2203)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2143)
at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:2040)
at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
我假设我缺少一些设置环境的东西。可以在本地这样做吗?任何帮助,将不胜感激。
当您需要Hadoop库时,您不必安装Hadoop即可在本地运行并写入S3。我刚刚尝试通过编写基于AVRO模式的镶木quet输出,并生成了S3的特定记录。我正在通过SBT和Intellij Idea在本地运行以下代码的版本。需要零件:
1)有以下文件指定所需的hadoop属性(注意:不建议使用AWS访问密钥/秘密密钥。最好在具有适当的IAM角色的EC2实例上运行,以读/写入您的S3存储桶。但要进行测试需要)
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
<name>fs.s3a.access.key</name>
<value>YOUR_ACCESS_KEY</value>
</property>
<!-- set your AWS access key -->
<property>
<name>fs.s3a.secret.key</name>
<value>YOUR_SECRET_KEY</value>
</property>
</configuration>
2)进口: 导入com.uebercomputing.eventrecord.eventonlyrecord
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.avro.AvroParquetOutputFormat
3)Flink Code使用hadoopoutputformat和上述配置:
val events: DataSet[(Void, EventOnlyRecord)] = ...
val hadoopConfig = getHadoopConfiguration(hadoopConfigFile)
val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord]
val outputJob = Job.getInstance
//Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T]
//so key is Void, value of type T - EventOnlyRecord in this case
val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord](
outputFormat,
outputJob
)
val outputConfig = outputJob.getConfiguration
outputConfig.addResource(hadoopConfig)
val outputPath = new Path("s3://<bucket>/<dir-prefix>")
FileOutputFormat.setOutputPath(outputJob, outputPath)
AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema)
events.output(hadoopOutputFormat)
env.execute
...
def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = {
val hadoopConfig = new HadoopConfiguration()
hadoopConfig.addResource(new Path(hadoodConfigPath))
hadoopConfig
}
4)构建使用的依赖项和版本:
val awsSdkVersion = "1.7.4"
val hadoopVersion = "2.7.3"
val flinkVersion = "1.1.4"
val flinkDependencies = Seq(
("org.apache.flink" %% "flink-scala" % flinkVersion),
("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
)
val providedFlinkDependencies = flinkDependencies.map(_ % "provided")
val serializationDependencies = Seq(
("org.apache.avro" % "avro" % "1.7.7"),
("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
("org.apache.parquet" % "parquet-avro" % "1.8.1")
)
val s3Dependencies = Seq(
("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
)
编辑使用WriteAsText到S3:
1)使用文件core-site.xml创建Hadoop配置目录(将其称为Hadoop-Conf-Dir)。
例如:
mkdir /home/<user>/hadoop-config
cd /home/<user>/hadoop-config
vi core-site.xml
#content of core-site.xml
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
<name>fs.s3a.access.key</name>
<value>YOUR_ACCESS_KEY</value>
</property>
<!-- set your AWS access key -->
<property>
<name>fs.s3a.secret.key</name>
<value>YOUR_SECRET_KEY</value>
</property>
</configuration>
2)用文件flink-conf.yaml创建一个目录(将其称为flink-conf-dir)。
例如:
mkdir /home/<user>/flink-config
cd /home/<user>/flink-config
vi flink-conf.yaml
//content of flink-conf.yaml - continuing earlier example
fs.hdfs.hadoopconf: /home/<user>/hadoop-config
3)编辑用于运行S3 Flink作业的Intellij Run配置 - 运行 - 编辑配置 - 并添加以下环境变量:
FLINK_CONF_DIR and set it to your flink-conf-dir
Continuing the example above:
FLINK_CONF_DIR=/home/<user>/flink-config
4)使用该环境变量集运行代码:
events.writeAsText("s3://<bucket>/<prefix-dir>")
env.execute
我必须执行以下操作才能在本地运行我的flink作业,从而下沉到S3:
1-添加了flink-s3-fs-hadoop-1.9.1。
2-修改的flink/conf/flink-conf.yaml包括s3.Access-key:aws_access_keys3.secret-key:aws_secret_keyfs.hdfs.hadoopconf:/etc/hadoop-config
i在Hadoop-Config文件夹中具有core-site.xml文件,但不包含任何配置,因此可能不需要fs.hdfs.hadoopconf。
在SBT中,我只需要添加S3库依赖项即可像本地文件系统一样使用它
SBT文件:
"org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion.value
阅读示例:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("s3://etl-data-ia/test/fileStreamTest.csv");
text.print();
env.execute("test");}
基于该链接https://ci.apache.org/projects/flink/flink/flink/flink/flink-docs-release-1.13/docs/deployment/filesyment/filesystems/s3/#hadoppresto-s3--s3--s3--s3--s3--文件系统 - plugins
要使用flink-s3-fs-hadoop插件,您应该复制各自的jar从OPT目录到Flink的插件目录的文件启动Flink之前分发。
我知道的另一种方式是通过环境变量启用它enable_built_in_plugins =&quot; flink-s3-fs-hadoop- [flink-version] .jar&quot;
例如:flink-s3-fs-hadoop-1.12.2.2.jar
这两种方式都必须在flink-conf.yaml文件中定义S3配置
Flink将内部转换为fs.s3a.connection.maximim。无需使用Hadoop的XML配置文件传递配置参数。
s3.endpoint: <end-point>
s3.path.style.access : true
至于AWS凭证,必须在环境变量或。在flink-conf.yaml
中配置s3.endpoint: <end-point>
s3.path.style.access : true
s3.access-key: <key>
s3.secret-key: <value>
s3.region: <region>
一旦全部设置,您就可以从@eyalp中提到的S3读取,或写入S3(即使用DataSet)
dataset.map(new MapToJsonString())
.writeAsText("s3://....",
FileSystem.WriteMode.OVERWRITE);
如果您想在本地进行测试(没有真实AWS帐户),我建议您检查LocalStack。它完全支持各种AWS服务(包括S3)。如果您使用它,则无需AWS凭据(可能提供空),端点本身将是LocalStack本身。