如何集成测试写入 Bigtable 的数据流管道?



根据Beam网站,

通常,对 管道代码,而不是调试管道的远程执行。

出于这个原因,我想对写入 Bigtable 的 Beam/Dataflow 应用程序使用测试驱动开发。

但是,在Beam测试文档之后,我陷入了僵局 - PAssert 没有用,因为输出 PCollection 包含 org.apache.hadoop.hbase.client.Put 对象,这些对象不会覆盖 equals 方法。

我也无法让 PCollection 的内容对它们进行验证,因为

无法

直接获取 PCollection 的内容 - Apache Beam 或 Dataflow 管道更像是一个查询计划。 应该完成处理,PCollection是合乎逻辑的 计划中中间节点,而不是包含数据。

那么,除了手动运行它之外,如何测试此管道呢?我正在使用Maven和JUnit(在Java中,因为这是Dataflow Bigtable Connector似乎支持的所有内容(。

Bigtable Emulator Maven插件可用于为此编写集成测试:

  • 配置 Maven 故障安全插件,并将测试用例的结尾从 *Test 更改为 *IT,以作为集成测试运行。
  • 在命令行的 gcloud sdk 中安装 Bigtable 模拟器:

    gcloud components install bigtable   
    

    请注意,此必需步骤将降低代码可移植性(例如,它会在您的构建系统上运行吗?在其他开发人员的机器上?所以我将在部署到构建系统之前使用 Docker 将其容器化。

  • 根据自述文件将模拟器插件添加到 pom 中

  • 使用 HBase 客户端 API 并查看示例 Bigtable 模拟器集成测试来设置会话和表。

  • 根据 Beam 文档正常编写测试,除了不使用 PAssert 实际上调用 CloudBigtableIO.writeToTable,然后使用 HBase 客户端从表中读取数据进行验证。

下面是一个集成测试示例:

package adair.example;
import static org.apache.hadoop.hbase.util.Bytes.toBytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.junit.Assert;
import org.junit.Test;
import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
/**
*  A simple integration test example for use with the Bigtable Emulator maven plugin.
*/
public class DataflowWriteExampleIT {
private static final String PROJECT_ID = "fake";
private static final String INSTANCE_ID = "fakeinstance";
private static final String TABLE_ID = "example_table";
private static final String COLUMN_FAMILY = "cf";
private static final String COLUMN_QUALIFIER = "cq";
private static final CloudBigtableTableConfiguration TABLE_CONFIG =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(PROJECT_ID)
.withInstanceId(INSTANCE_ID)
.withTableId(TABLE_ID)
.build();
public static final List<String> VALUES_TO_PUT = Arrays
.asList("hello", "world", "introducing", "Bigtable", "plus", "Dataflow", "IT");
@Test
public void testPipelineWrite() throws IOException {
try (Connection connection = BigtableConfiguration.connect(PROJECT_ID, INSTANCE_ID)) {
Admin admin = connection.getAdmin();
createTable(admin);
List<Mutation> puts = createTestPuts();
//Use Dataflow to write the data--this is where you'd call the pipeline you want to test.
Pipeline p = Pipeline.create();
p.apply(Create.of(puts)).apply(CloudBigtableIO.writeToTable(TABLE_CONFIG));
p.run().waitUntilFinish();
//Read the data from the table using the regular hbase api for validation
ResultScanner scanner = getTableScanner(connection);
List<String> resultValues = new ArrayList<>();
for (Result row : scanner) {
String cellValue = getRowValue(row);
System.out.println("Found value in table: " + cellValue);
resultValues.add(cellValue);
}
Assert.assertThat(resultValues,
IsIterableContainingInAnyOrder.containsInAnyOrder(VALUES_TO_PUT.toArray()));
}
}
private void createTable(Admin admin) throws IOException {
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(TABLE_ID));
tableDesc.addFamily(new HColumnDescriptor(COLUMN_FAMILY));
admin.createTable(tableDesc);
}
private ResultScanner getTableScanner(Connection connection) throws IOException {
Scan scan = new Scan();
Table table = connection.getTable(TableName.valueOf(TABLE_ID));
return table.getScanner(scan);
}
private String getRowValue(Result row) {
return Bytes.toString(row.getValue(toBytes(COLUMN_FAMILY), toBytes(COLUMN_QUALIFIER)));
}
private List<Mutation> createTestPuts() {
return VALUES_TO_PUT
.stream()
.map(this::stringToPut)
.collect(Collectors.toList());
}
private Mutation stringToPut(String cellValue){
String key = UUID.randomUUID().toString();
Put put = new Put(toBytes(key));
put.addColumn(toBytes(COLUMN_FAMILY), toBytes(COLUMN_QUALIFIER), toBytes(cellValue));
return put;
}
}

在 Google Cloud 中,您可以使用真正的云资源(如 Pub/Sub topic 和 BigQuery 表(轻松地对数据流管道进行 e2e 测试。

通过使用 Junit5 扩展模型 (https://junit.org/junit5/docs/current/user-guide/#extensions(,您可以创建自定义类,这些类将处理管道所需资源的创建和删除。

您可以在此处找到演示/种子项目 https://github.com/gabihodoroaga/dataflow-e2e-demo,https://hodo.dev/posts/post-31-gcp-dataflow-e2e-tests/在这里找到博客文章。

最新更新