在本地运行数据流会导致 JVM 崩溃 (OOM)



使用DirectPipelineRunner,我想在本地运行我的管道以进行调试。我正在使用 SDK 1.9.0 和 Java 8。

我的管道从 BigQuery 读取一个表,转换一些字段,然后写回 BigQuery。

在 GCP 上运行,即使用 DataflowPipelineRunner 运行器工作绝对没问题。但是,当我使用DirectPipelineRunner时,它只是不断吐出以下日志信息,而不做其他事情:

19:45:05,470 21866 [main] INFO  com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner - Executing pipeline using the DirectPipelineRunner.
19:45:18,594 34990 [main] INFO  com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl - No BigQuery job with job id beam_job_c88ee6741e434aabbf50e73d4e6733d1-extract found.
19:45:27,344 43740 [main] INFO  com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl - No BigQuery job with job id beam_job_012dca76d75e461480fe75897b5fa7ba-extract found.
19:45:38,150 54546 [main] INFO  com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl - No BigQuery job with job id beam_job_3548a0ee373a417e8e7570ae90aef78d-extract found.
19:45:47,912 64308 [main] INFO  com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl - No BigQuery job with job id beam_job_db0b957250ef41279a639bdc113c5493-extract found.
19:45:56,685 73081 [main] INFO  com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl - No BigQuery job with job id beam_job_3773e0643ec14475aaa140bcf46ea7af-extract found.
19:46:45,958 122354 [main] INFO  com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl - No BigQuery job with job id beam_job_27af9a1163944cb19e520242de98d899-extract found.
19:46:55,766 132162 [main] INFO  com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl - No BigQuery job with job id beam_job_5473e6702b3544118c7da8877c900f7a-extract found.
19:47:04,015 140411 [main] INFO  com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl - No BigQuery job with job id beam_job_40f47d35aa154708a6fc684c8ffb0ba4-extract found.
19:47:11,913 148309 [main] INFO  com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl - No BigQuery job with job id beam_job_6dce34301c97498884d7344b85a1b07e-extract found.
19:47:35,809 172205 [main] INFO  com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl - No BigQuery job with job id beam_job_4f7c26d372974095a24ac58b547c13d6-extract found.
19:47:45,136 181532 [main] INFO  com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl - No BigQuery job with job id beam_job_a7c33e75bfdb41a6990dd66810a0d44a-extract found.
19:47:55,802 192198 [main] INFO  com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl - No BigQuery job with job id beam_job_a1d7422ca42a4b1d96205bf8c6dada9d-extract found.

日志消息来自此处:

@VisibleForTesting
    public Job getJob(JobReference jobRef, Sleeper sleeper, BackOff backoff)
        throws IOException, InterruptedException {
      String jobId = jobRef.getJobId();
      Exception lastException;
      do {
        try {
          return client.jobs().get(jobRef.getProjectId(), jobId).execute();
        } catch (GoogleJsonResponseException e) {
          if (errorExtractor.itemNotFound(e)) {
            LOG.info("No BigQuery job with job id {} found.", jobId);
            return null;
          }....

最终,JVM 内存不足:

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:3664)
at java.lang.String.<init>(String.java:207)
at java.lang.String.toLowerCase(String.java:2647)
at com.google.api.client.json.JsonParser.parseValue(JsonParser.java:847)
at com.google.api.client.json.JsonParser.parse(JsonParser.java:472)
at com.google.api.client.json.JsonParser.parseValue(JsonParser.java:781)
at com.google.api.client.json.JsonParser.parseArray(JsonParser.java:648)
at com.google.api.client.json.JsonParser.parseValue(JsonParser.java:740)
at com.google.api.client.json.JsonParser.parse(JsonParser.java:472)
at com.google.api.client.json.JsonParser.parseValue(JsonParser.java:781)
at com.google.api.client.json.JsonParser.parseArray(JsonParser.java:648)
at com.google.api.client.json.JsonParser.parseValue(JsonParser.java:740)
at com.google.api.client.json.JsonParser.parse(JsonParser.java:472)
at com.google.api.client.json.JsonParser.parseValue(JsonParser.java:781)
at com.google.api.client.json.JsonParser.parse(JsonParser.java:382)
at com.google.api.client.json.JsonParser.parse(JsonParser.java:355)
at com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:87)
at com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:81)
at com.google.api.client.http.HttpResponse.parseAs(HttpResponse.java:459)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.dataflow.sdk.util.BigQueryTableRowIterator.executeWithBackOff(BigQueryTableRowIterator.java:497)
at com.google.cloud.dataflow.sdk.util.BigQueryTableRowIterator.advance(BigQueryTableRowIterator.java:180)
at com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl$BigQueryJsonReaderImpl.advance(BigQueryServicesImpl.java:555)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$BigQuerySourceBase$BigQueryReader.advance(BigQueryIO.java:1331)
at com.google.cloud.dataflow.sdk.io.Read$Bounded$1.evaluateReadHelper(Read.java:180)
at com.google.cloud.dataflow.sdk.io.Read$Bounded$1.evaluate(Read.java:168)
at com.google.cloud.dataflow.sdk.io.Read$Bounded$1.evaluate(Read.java:164)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:858)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:103)

BigQuery 中的表只有 100 行(仅用于调试)。

这里有什么问题?

我相信 BigQuery 消息是一个红鲱鱼; OOM 的堆栈跟踪表明数据是直接从表中读取的,而不是通过导出作业读取的。

DirectPipelineRunner 完全没有针对内存利用率进行优化;请尝试使用较新的 InProcessPipelineRunner。 此外,可能值得使用标准 Java 堆分析工具来查看内存的使用位置。

最新更新