我创建了一个数据流,该数据流从数据存储中获取输入并执行转换以将其转换为BigQuery Tablerow。我将时间戳与每个元素相连。然后将一天的窗口应用于PCollection。使用Apache Beam的BigQueryio将窗口输出写入BigQuery表中的分区。
在写入BigQuery之前,它使用通过随机键作为避免融合的中间步骤。
管道行为是:
1.对于输入中的280万实体:总VCPU时间-5.148 VCPU HR是时候完成工作 - 53分钟9秒目前的工人-27目标工人27作业ID:2018-04-04_04_20_34-19514739017698141392.对于700万个输入中的entutes:总VCPU时间-247.772 VCPU HR是时候完成工作-3小时45分钟目前的工人-69目标工人-1000工作ID:2018-04-02_21_59_47-8636729278179820259
我不明白为什么要完成第二种情况需要这么多时间完成工作和CPU小时。
高级别的数据流管线为:
// Read from datastore
PCollection<Entity> entities =
pipeline.apply("ReadFromDatastore",
DatastoreIO.v1().read().withProjectId(options.getProject())
.withQuery(query).withNamespace(options.getNamespace()));
// Apply processing to convert it to BigQuery TableRow
PCollection<TableRow> tableRow =
entities.apply("ConvertToTableRow", ParDo.of(new ProcessEntityFn()));
// Apply timestamp to TableRow element, and then apply windowing of one day on that
PCollection<TableRow> tableRowWindowTemp =
tableRow.apply("tableAddTimestamp", ParDo.of(new ApplyTimestampFn())).apply(
"tableApplyWindow",
Window.<TableRow> into(CalendarWindows.days(1).withTimeZone(
DateTimeZone.forID(options.getTimeZone()))));
//Apply reshuffle with random key for avoiding fusion
PCollection<TableRow> ismTableRowWindow =
tableRowWindow.apply("ReshuffleViaRandomKey",
Reshuffle.<TableRow> viaRandomKey());
// Write windowed output to BigQuery partitions
tableRowWindow.apply(
"WriteTableToBQ",
BigQueryIO
.writeTableRows()
.withSchema(BigqueryHelper.getSchema())
.to(TableRefPartition.perDay(options.getProject(),
options.getBigQueryDataset(), options.getTableName()))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
我在这里看到您在这里发布了一个类似的问题,现在您已经在代码中添加了步骤:
//Apply reshuffle with random key for avoiding fusion
...
正如某人在另一个问题中已经告诉您的那样:
" OOM可能是热键的症状"
因此,在这种情况下,它看起来仍在发生类似的事情(您在这里有有关热门问题的更多信息:
如果是这种情况,并且有一些工人陷入困境,那么实体的数量与完成工作的时间不必遵循任何线性。VCPU消耗应该更多的是优化代码以避免热键问题。