使用Apache Flink将SQL查询的结果写入文件



我有以下任务:

  1. 用SQL请求创建一个作业到配置单元表
  2. 在远程Flink集群上运行此作业
  3. 在文件中收集此作业的结果(最好是HDFS(

注意

因为有必要在远程Flink集群上运行此作业,所以我不能以简单的方式使用TableEnvironment。这张票中提到了这个问题:https://issues.apache.org/jira/browse/FLINK-18095.对于当前的解决方案,我使用来自http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-Environment-for-Remote-Execution-td35691.html.

代码

EnvironmentSettings batchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
// create remote env
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "/path/to/my/jar");
// create StreamTableEnvironment
TableConfig tableConfig = new TableConfig();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
batchSettings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
batchSettings.getBuiltInCatalogName(),
batchSettings.getBuiltInDatabaseName()))
.executionConfig(
streamExecutionEnvironment.getConfig())
.build();
ModuleManager moduleManager = new ModuleManager();
BatchExecutor batchExecutor = new BatchExecutor(streamExecutionEnvironment);
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
StreamTableEnvironmentImpl tableEnv = new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
functionCatalog,
tableConfig,
streamExecutionEnvironment,
new BatchPlanner(batchExecutor, tableConfig, functionCatalog, catalogManager),
batchExecutor,
false);
// configure HiveCatalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive/conf"; // a local path
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
// request to Hive
Table table = tableEnv.sqlQuery("select * from myhive.`default`.test");

问题

在这一步中,我可以调用table.execute((方法,然后通过collect((法获得CloseableIterator。但在我的情况下,由于我的请求,我可以获得大量的行数,将其收集到文件中(HDFS中的ORC(将是完美的。

我怎样才能达到目标?

Table.execute().collect()将视图的结果返回给客户端,用于交互目的。在您的情况下,可以使用文件系统连接器并使用INSERT INTO将视图写入文件。例如:

// create a filesystem table
tableEnvironment.executeSql("CREATE TABLE MyUserTable (n" +
"  column_name1 INT,n" +
"  column_name2 STRING,n" +
"  ..." +
" n" +
") WITH (n" +
"  'connector' = 'filesystem',n" +
"  'path' = 'hdfs://path/to/your/file',n" +
"  'format' = 'orc' n" +
")");
// submit the job
tableEnvironment.executeSql("insert into MyUserTable select * from myhive.`default`.test");

请参阅有关文件系统连接器的更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html

最新更新