如何过滤大型查询的重复表行数据以删除重复的行



我是数据流的新手,所以如果我的问题很有趣,请原谅我,我有一个正在阅读的 csv 文件并且它有重复的行,我正在读取这些数据并写入大查询,但是我不想将重复数据写入我的 BQ 表。

我想到了 1 种方法,但我不知道如何实现它,它涉及向架构添加某种标志以将其标记为唯一,但我不知道如何

Lists.newArrayList(
  new TableFieldSchema()
         .setName("person_id")
         .setMode("NULLABLE").setType("STRING"),
  new TableFieldSchema()
         .setName("person_name")
         .setMode("NULLABLE")
         .setType("STRING") // Cant I add another unique property here?
) 

不知道该方法是否有效,但我所需要的只是过滤从转换中检索到的行,例如

PCollection<TableRow> peopleRows = 
  pipeline
     .apply(
        "Convert to BiqQuery Table Row",
        ParDo.of(new FormatForBigquery())
    // Next step to filter duplicates

如果我们将您读取 CSV 的输出视为 PCollection,那么我们可以通过 Distinct 转换传递 PCollection 来消除重复项。 此转换的目的是获取输入的 PCollection 并生成一个新的 PCollection,该 PCollection 是没有重复项的原始 PCollection。 作为 Distinct 预制转换的一部分,有机会指定自己的函数,该函数将被调用以确定将两个 PCollection 对象分类为相等的内容,从而删除哪些对象。

您可以直接在 bigquery 中执行此操作,加载整个文件并使用源表运行如下所示的查询。无需数据流。

WITH cte as (
  SELECT
    ROW_NUMBER() over (PARTITION BY column1,column2,column3,...) as idx,
    *
  FROM my_table
)
SELECT
*
FROM cte
WHERE idx = 1

最新更新