使用 Beam 和 DataFlow 将数据从 Cloud SQL 移动到弹性搜索



我是 beam 和 Google 数据流的新手,我创建了一个简单的类,通过编写以下内容,使用批处理将数据从云 sql 迁移到弹性搜索:

package com.abc;
class DataFlowTest{
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("staging");  options.setTempLocation("gs://csv_to_sql_staging/temp");    options.setStagingLocation("gs://csv_to_sql_staging/staging");  options.setRunner(DataflowRunner.class);    options.setGcpTempLocation("gs://csv_to_sql_staging/temp");
Pipeline p = Pipeline.create(options);

p.begin();
p.apply(JdbcIO.read().withQuery("select * from user_table").withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://"+EnvironmentVariable.getDatabaseIp()+"/" + EnvironmentVariable.getDatabaseName()+ "&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user="+Credentials.getDatabaseUsername()+"&password="+Credentials.getDatabasePassword()+"&useSSL=false")));

Write w = ElasticsearchIO.write().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(new String [] {"host"}, "user-temp", "String").withUsername("elastic").withPassword("password")
);
p.apply(w);

p.run((.waitUntilFinish((; } }

and find below my dependnecies in pom.xml

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>2.19.0</version>
<exclusions>
<exclusion>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client-jackson2</artifactId>
</exclusion>

</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-elasticsearch</artifactId>
<version>2.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-jdbc</artifactId>
<version>2.19.0</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>2.19.0</version>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
</exclusion>
</exclusions>
</dependency>

现在的问题是编译错误,它说:

管道类型中的方法apply(PTransform(不适用于参数(ElasticsearchIO.Write( 在那一行:p.apply(w(;

有人可以帮忙吗? 我在pom文件中做了一些排除,以修复一些依赖冲突

不能直接将 ElasticSearchIO.write 应用于管道对象。首先创建一个 PCollection,然后将 ElasticsearchIO 应用于 PCollection。请参考下面的代码。

PCollection<String> sqlResult1 = p.apply(
JdbcIO.<String>read().withDataSourceConfiguration(config).withQuery("select * from test_table")
.withCoder(StringUtf8Coder.of()).withRowMapper(new JdbcIO.RowMapper<String>() {
private static final long serialVersionUID = 1L;
public String mapRow(ResultSet resultSet) throws Exception {
StringBuilder val = new StringBuilder();
return val.append(resultSet.getString(0)).append(resultSet.getString(1)).toString();
// return KV.of(resultSet.getString(1), resultSet.getString(2));
}
}));
sqlResult1.apply(ElasticsearchIO.write().withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration
.create(new String[] { "https://host:9243" }, "user-temp", "String").withUsername("").withPassword("")));

我认为上面的代码应该适用于您的用例。

最新更新