为什么我的DoFn会无限期地重新启动对元素的处理



问题:

对于长时间运行(2-3小时(的元素,web抓取DoFn在返回新的pCollection时无限期地重新启动执行,而不进行管道中的下一步。

错误:

主要错误:

"Commit failed, will be retried at higher level but may not succeed"
computation = 'P51', sharding_key = '12c92e1638acbb09',
status = generic::internal: Windmill failed to commit the work item.
CommitStatus: VALIDATION_FAILED"

完全错误:

[
{
"insertId": "7054061089043139090:270578:0:287794",
"jsonPayload": {
"job": "2022-08-19_13_38_49-15282523611410283510",
"work": "2",
"step": "Scrape Products",
"instruction": "process_bundle-2-1",
"thread": "Thread-14",
"logger": "REDACTED PATH/product_scraper.py:69",
"worker": "df-hvm-beamapp-vincentye-0819203-08191338-dtgt-harness-l4gg",
"portability_worker_id": "sdk-0-0",
"message": "returning new_products_dataframe and REDACTED_dataframe"
},
"resource": {
"type": "dataflow_step",
"labels": {
"job_name": "beamapp-vincentye-0819203841-963772-8quxtp6u",
"step_id": "Scrape Products",
"project_id": "REDACTED",
"job_id": "2022-08-19_13_38_49-15282523611410283510",
"region": "us-west2"
}
},
"timestamp": "2022-08-19T21:38:39.037309408Z",
"severity": "WARNING",
"labels": {
"dataflow.googleapis.com/region": "us-west2",
"dataflow.googleapis.com/job_id": "2022-08-19_13_38_49-15282523611410283510",
"compute.googleapis.com/resource_name": "df-hvm-beamapp-vincentye-0819203-08191338-dtgt-harness-l4gg",
"dataflow.googleapis.com/job_name": "beamapp-vincentye-0819203841-963772-8quxtp6u",
"dataflow.googleapis.com/service_option": "prime",
"compute.googleapis.com/resource_id": "7054061089043139090",
"compute.googleapis.com/resource_type": "instance"
},
"logName": "projects/REDACTED/logs/dataflow.googleapis.com%2Fworker",
"receiveTimestamp": "2022-08-19T21:38:52.093172441Z"
},
{
"insertId": "s=6f21f5e3f31c4dfca9b6c2dc7f2de30a;i=664;b=f6f95ab5a8234fdeac1c3d31e7f63933;m=d31e6263;t=5e69eebe139ed;x=77b6120c808dd2fb",
"jsonPayload": {
"line": "sampler.go:311",
"message": "Successfully sampled resources"
},
"resource": {
"type": "dataflow_step",
"labels": {
"job_id": "2022-08-19_13_38_49-15282523611410283510",
"project_id": "REDACTED",
"region": "us-west2",
"step_id": "",
"job_name": "beamapp-vincentye-0819203841-963772-8quxtp6u"
}
},
"timestamp": "2022-08-19T21:38:40.180661Z",
"severity": "INFO",
"labels": {
"dataflow.googleapis.com/region": "us-west2",
"dataflow.googleapis.com/log_type": "system",
"dataflow.googleapis.com/job_id": "2022-08-19_13_38_49-15282523611410283510",
"compute.googleapis.com/resource_id": "4422558927708899858",
"dataflow.googleapis.com/job_name": "beamapp-vincentye-0819203841-963772-8quxtp6u",
"compute.googleapis.com/resource_name": "df-hvm-beamapp-vincentye-0819203-08191338-dtgt-harness-3jp2",
"compute.googleapis.com/resource_type": "instance",
"dataflow.googleapis.com/service_option": "prime"
},
"logName": "projects/REDACTED/logs/dataflow.googleapis.com%2Fresource",
"receiveTimestamp": "2022-08-19T21:38:41.688521247Z"
},
{
"insertId": "7054061089043139090:270576:0:17358",
"jsonPayload": {
"message": "Commit failed, will be retried at higher level but may not succeed. computation = "P51", sharding_key = "62fa5cb5ab454560", status = generic::internal: Windmill failed to commit the work item. CommitStatus: VALIDATION_FAILED",
"thread": "119",
"line": "streaming_worker_client.cc:514"
},
"resource": {
"type": "dataflow_step",
"labels": {
"region": "us-west2",
"project_id": "REDACTED",
"job_name": "beamapp-vincentye-0819203841-963772-8quxtp6u",
"job_id": "2022-08-19_13_38_49-15282523611410283510",
"step_id": ""
}
},
"timestamp": "2022-08-19T21:38:40.686591Z",
"severity": "ERROR",
"labels": {
"dataflow.googleapis.com/service_option": "prime",
"dataflow.googleapis.com/job_id": "2022-08-19_13_38_49-15282523611410283510",
"dataflow.googleapis.com/log_type": "system",
"dataflow.googleapis.com/job_name": "beamapp-vincentye-0819203841-963772-8quxtp6u",
"dataflow.googleapis.com/region": "us-west2",
"compute.googleapis.com/resource_name": "df-hvm-beamapp-vincentye-0819203-08191338-dtgt-harness-l4gg",
"compute.googleapis.com/resource_type": "instance",
"compute.googleapis.com/resource_id": "7054061089043139090"
},
"logName": "projects/REDACTED/logs/dataflow.googleapis.com%2Fharness",
"receiveTimestamp": "2022-08-19T21:38:42.095203701Z"
},
{
"insertId": "7054061089043139090:270575:0:13058",
"jsonPayload": {
"line": "exec.go:66",
"message": "E0819 21:38:40.686591     119 streaming_worker_client.cc:514] Commit failed, will be retried at higher level but may not succeed. computation = "P51", sharding_key = "62fa5cb5ab454560", status = generic::internal: Windmill failed to commit the work item. CommitStatus: VALIDATION_FAILED"
},
"resource": {
"type": "dataflow_step",
"labels": {
"job_id": "2022-08-19_13_38_49-15282523611410283510",
"step_id": "",
"job_name": "beamapp-vincentye-0819203841-963772-8quxtp6u",
"project_id": "REDACTED",
"region": "us-west2"
}
},
"timestamp": "2022-08-19T21:38:40.686846Z",
"severity": "INFO",
"labels": {
"dataflow.googleapis.com/log_type": "system",
"dataflow.googleapis.com/service_option": "prime",
"compute.googleapis.com/resource_type": "instance",
"dataflow.googleapis.com/region": "us-west2",
"compute.googleapis.com/resource_name": "df-hvm-beamapp-vincentye-0819203-08191338-dtgt-harness-l4gg",
"dataflow.googleapis.com/job_id": "2022-08-19_13_38_49-15282523611410283510",
"compute.googleapis.com/resource_id": "7054061089043139090",
"dataflow.googleapis.com/job_name": "beamapp-vincentye-0819203841-963772-8quxtp6u"
},
"logName": "projects/REDACTED/logs/dataflow.googleapis.com%2Fharness-startup",
"receiveTimestamp": "2022-08-19T21:38:52.093608994Z"
},
{
"insertId": "7054061089043139090:270576:0:17641",
"jsonPayload": {
"line": "streaming_worker_client.cc:566",
"thread": "119",
"message": "Error while processing a work item: INTERNAL: Windmill failed to commit the work item. CommitStatus: VALIDATION_FAILEDn=== Source Location Trace: ===ndist_proc/dax/workflow/worker/streaming/streaming_worker_client.cc:492"
},
"resource": {
"type": "dataflow_step",
"labels": {
"step_id": "",
"job_name": "beamapp-vincentye-0819203841-963772-8quxtp6u",
"job_id": "2022-08-19_13_38_49-15282523611410283510",
"project_id": "REDACTED",
"region": "us-west2"
}
},
"timestamp": "2022-08-19T21:38:40.687126Z",
"severity": "WARNING",
"labels": {
"compute.googleapis.com/resource_type": "instance",
"dataflow.googleapis.com/region": "us-west2",
"compute.googleapis.com/resource_name": "df-hvm-beamapp-vincentye-0819203-08191338-dtgt-harness-l4gg",
"dataflow.googleapis.com/job_name": "beamapp-vincentye-0819203841-963772-8quxtp6u",
"dataflow.googleapis.com/job_id": "2022-08-19_13_38_49-15282523611410283510",
"compute.googleapis.com/resource_id": "7054061089043139090",
"dataflow.googleapis.com/service_option": "prime",
"dataflow.googleapis.com/log_type": "system"
},
"logName": "projects/REDACTED/logs/dataflow.googleapis.com%2Fharness",
"receiveTimestamp": "2022-08-19T21:39:02.095105228Z"
}
]

更多信息:

  • 该错误仅发生在DoFn被迫运行超过2小时的大型元素中。

    • 长时间运行的元素的pCollection结果通常在100到150 MB左右。(这是一个Pandas数据帧,有许多字节形式的图像(
  • 将永远重试失败的元素。

  • LocalRunner上运行管道时未出现问题

已尝试:

  • 关闭Dataflow Prime,将machine_type设置为n2-highmem-2
  • number_of_worker_harness_threads设置为3

我最近与谷歌支持部门进行了交谈,发现VALIDATION_FAILED风车错误实际上与流媒体引擎中最大80MB的单个元素大小有关。

解决方案:

确保物化pCollection中的任何单个元素都小于80MB。

一些想法:

如果谷歌的人读到了这篇文章,也许会让错误消息更具描述性,而不是一个模糊的VALIDATION_FAILED风车错误——如果我没有谷歌支持,我想我不会这么快找到解决方案。

我建议Windmill: VALIDATION_FAILED, QUOTA_ERROR: Element over 80MB

参考文献:

https://cloud.google.com/dataflow/quotas

我的猜测是,某种内部超时会导致Dataflow流运行程序在很长一段时间内无法提交正在处理的元素。是否可以更新管道,使DoFn更频繁地输出元素(或通过将工作拆分为多个元素从ProcessElement调用返回(?

如果没有,我建议联系谷歌云支持,这样他们就可以查看你失败的工作。