从 Google App Engine 应用运行 Google Dataflow 管道



我正在使用DataflowPipelineRunner创建一个数据流作业。我尝试了以下方案。

  1. 无需指定任何计算机类型
  2. 带g1小型机器
  3. 含 N1-高记忆-2

在上述所有情况下,输入是来自GCS的文件,该文件非常小(KB大小(,输出是大查询表。

在所有情况下我都遇到内存不足错误

我编译的代码大小为 94mb。我只尝试字数统计示例,它没有读取任何输入(在作业开始之前失败(。请帮助我了解为什么出现此错误。

注意:我正在使用应用程序引擎来启动作业。

注意:相同的代码适用于 beta versoin 0.4.150414

编辑 1

根据答案中的建议尝试了以下内容,

  1. 自动缩放切换到基本缩放
  2. 二手机器类型 B2,提供 256MB 内存

完成这些配置后,Java 堆内存问题就解决了。但是它试图将 jar 上传到超过 10Mb 的交错位置,因此失败了。

它记录以下异常

com.google.api.client.http.HttpRequest execute: exception thrown while executing request
com.google.appengine.api.urlfetch.RequestPayloadTooLargeException: The request to https://www.googleapis.com/upload/storage/v1/b/pwccloudedw-stagging-bucket/o?name=appengine-api-L4wtoWwoElWmstI1Ia93cg.jar&uploadType=resumable&upload_id=AEnB2Uo6HCfw6Usa3aXlcOzg0g3RawrvuAxWuOUtQxwQdxoyA0cf22LKqno0Gu-hjKGLqXIo8MF2FHR63zTxrSmQ9Yk9HdCdZQ exceeded the 10 MiB limit.
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.convertApplicationException(URLFetchServiceImpl.java:157)
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.fetch(URLFetchServiceImpl.java:45)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.fetchResponse(URLFetchServiceStreamHandler.java:543)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getInputStream(URLFetchServiceStreamHandler.java:422)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getResponseCode(URLFetchServiceStreamHandler.java:275)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
at java.util.concurrent.FutureTask.run(FutureTask.java:260)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
at java.security.AccessController.doPrivileged(Native Method)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
at java.lang.Thread.run(Thread.java:745)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)

我尝试直接上传jar文件 - appengine-api-1.0-sdk-1.9.20.jar,但它仍然尝试上传这个jar appengine-api-L4wtoWwoElWmstI1Ia93cg.jar。我不知道这是什么罐子。任何关于这个罐子的想法是值得赞赏的。

请帮我解决这个问题。

简短的回答是,如果您在托管虚拟机上使用 AppEngine,则不会遇到 AppEngine 沙盒限制(使用 F1 或 B1 实例类时的 OOM、执行时间限制问题、列入白名单的 JRE 类(。如果您确实想在 App Engine 沙盒中运行,那么您对数据流 SDK 的使用最符合 AppEngine 沙盒的限制。下面我将解释常见问题以及人们为符合 AppEngine 沙盒限制所做的工作。

数据流开发工具包需要一个具有足够内存的 AppEngine 实例类来执行用户应用程序,以构造管道、暂存任何资源并将作业描述发送到数据流服务。通常,我们看到用户需要使用内存超过 128mb 的实例类才能看到 OOM 错误。

通常,如果

已暂存应用程序所需的资源,则构造管道并将其提交到数据流服务通常需要不到几秒钟的时间。将您的 JAR 和任何其他资源上传到 GCS 可能需要超过 60 秒的时间。这可以通过事先将 JAR 预先暂存到 GCS 来手动解决(如果数据流 SDK 检测到它们已经存在,它将跳过再次暂存它们(或使用任务队列来获取 10 分钟的限制(请注意,对于大型应用程序,10 分钟可能不足以暂存所有资源(。

最后,在 AppEngine 沙盒环境中,您和您的所有依赖项仅限于在 JRE 中使用列入白名单的类,否则将出现如下异常:

java.lang.SecurityException:
  java.lang.IllegalAccessException: YYY is not allowed on ZZZ
  ...

编辑 1

我们对类路径上的 jar 内容执行哈希处理,并使用修改后的文件名将它们上传到 GCS。AppEngine 使用自己的 JAR 运行沙盒环境,appengine-api-L4wtoWwoElWmstI1Ia93cg.jar 是指 appengine-api.jar这是沙盒环境添加的 jar。您可以从我们的 PackageUtil#getUniqueContentName(...( 中看到,我们只是在.jar之前附加 -$HASH

我们正在努力解决为什么您会看到 RequestPayloadToLarge 例外,目前建议您设置 filesToStage 选项并过滤掉执行数据流不需要的 jar 来解决您面临的问题。您可以看到我们如何使用 DataflowPipelineRunner#detectClassPathResourcesToStage(...( 构建要暂存的文件。

我对10MB的限制也有同样的问题。我所做的是过滤掉大于该限制的 JAR 文件(而不是特定文件(,然后在DataflowPipelineOptions中设置重命名文件,setFilesToStage .

所以我只是从数据流SDK中复制了detectClassPathResourcesToStage的方法并对其进行了直观的更改:

private static final long FILE_BYTES_THRESHOLD = 10 * 1024 * 1024; // 10 MB
protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
    if (!(classLoader instanceof URLClassLoader)) {
        String message = String.format("Unable to use ClassLoader to detect classpath elements. "
                + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
        throw new IllegalArgumentException(message);
    }
    List<String> files = new ArrayList<>();
    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
        try {
            File file = new File(url.toURI());
            if (file.length() < FILE_BYTES_THRESHOLD) {
                files.add(file.getAbsolutePath());
            }
        } catch (IllegalArgumentException | URISyntaxException e) {
            String message = String.format("Unable to convert url (%s) to file.", url);
            throw new IllegalArgumentException(message, e);
        }
    }
    return files;
}

然后当我创建DataflowPipelineOptions时:

DataflowPipelineOptions dataflowOptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
...
dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(DataflowPipelineRunner.class.getClassLoader()));

这是 Helder 的 10MB 过滤解决方案的一个版本,它将适应 DataflowPipelineOptions 的默认文件暂存行为,即使它在 SDK 的未来版本中发生变化。

它不会复制逻辑,而是将DataflowPipelineOptions的一次性副本传递给DataflowPipelineRunner以查看它将暂存的文件,然后删除任何太大的文件。

请注意,此代码假定您已定义一个名为 MyOptions 的自定义PipelineOptions类,以及一个名为 loggerjava.util.Logger字段。

// The largest file size that can be staged to the dataflow service.
private static final long MAX_STAGED_FILE_SIZE_BYTES = 10 * 1024 * 1024;
/**
 * Returns the list of .jar/etc files to stage based on the
 * Options, filtering out any files that are too large for
 * DataflowPipelineRunner.
 *
 * <p>If this accidentally filters out a necessary file, it should
 * be obvious when the pipeline fails with a runtime link error.
 */
private static ImmutableList<String> getFilesToStage(MyOptions options) {
  // Construct a throw-away runner with a copy of the Options to see
  // which files it would have wanted to stage. This could be an
  // explicitly-specified list of files from the MyOptions param, or
  // the default list of files determined by DataflowPipelineRunner.
  List<String> baseFiles;
  {
    DataflowPipelineOptions tmpOptions =
        options.cloneAs(DataflowPipelineOptions.class);
    // Ignore the result; we only care about how fromOptions()
    // modifies its parameter.
    DataflowPipelineRunner.fromOptions(tmpOptions);
    baseFiles = tmpOptions.getFilesToStage();
    // Some value should have been set.
    Preconditions.checkNotNull(baseFiles);
  }
  // Filter out any files that are too large to stage.
  ImmutableList.Builder<String> filteredFiles = ImmutableList.builder();
  for (String file : baseFiles) {
    long size = new File(file).length();
    if (size < MAX_STAGED_FILE_SIZE_BYTES) {
      filteredFiles.add(file);
    } else {
      logger.info("Not staging large file " + file + ": length " + size
          + " >= max length " + MAX_STAGED_FILE_SIZE_BYTES);
    }
  }
  return filteredFiles.build();
}
/** Runs the processing pipeline with given options. */
public void runPipeline(MyOptions options)
    throws IOException, InterruptedException {
  // DataflowPipelineRunner can't stage large files;
  // remove any from the list.
  DataflowPipelineOptions dpOpts =
      options.as(DataflowPipelineOptions.class);
  dpOpts.setFilesToStage(getFilesToStage(options));
  // Run the pipeline as usual using "options".
  // ...
}

最新更新