我正在使用DataflowPipelineRunner创建一个数据流作业。我尝试了以下方案。
- 无需指定任何计算机类型
- 带g1小型机器
- 含 N1-高记忆-2
在上述所有情况下,输入是来自GCS的文件,该文件非常小(KB大小(,输出是大查询表。
在所有情况下我都遇到内存不足错误
我编译的代码大小为 94mb。我只尝试字数统计示例,它没有读取任何输入(在作业开始之前失败(。请帮助我了解为什么出现此错误。
注意:我正在使用应用程序引擎来启动作业。
注意:相同的代码适用于 beta versoin 0.4.150414
编辑 1
根据答案中的建议尝试了以下内容,
- 从自动缩放切换到基本缩放。
- 二手机器类型 B2,提供 256MB 内存
完成这些配置后,Java 堆内存问题就解决了。但是它试图将 jar 上传到超过 10Mb 的交错位置,因此失败了。
它记录以下异常
com.google.api.client.http.HttpRequest execute: exception thrown while executing request
com.google.appengine.api.urlfetch.RequestPayloadTooLargeException: The request to https://www.googleapis.com/upload/storage/v1/b/pwccloudedw-stagging-bucket/o?name=appengine-api-L4wtoWwoElWmstI1Ia93cg.jar&uploadType=resumable&upload_id=AEnB2Uo6HCfw6Usa3aXlcOzg0g3RawrvuAxWuOUtQxwQdxoyA0cf22LKqno0Gu-hjKGLqXIo8MF2FHR63zTxrSmQ9Yk9HdCdZQ exceeded the 10 MiB limit.
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.convertApplicationException(URLFetchServiceImpl.java:157)
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.fetch(URLFetchServiceImpl.java:45)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.fetchResponse(URLFetchServiceStreamHandler.java:543)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getInputStream(URLFetchServiceStreamHandler.java:422)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getResponseCode(URLFetchServiceStreamHandler.java:275)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
at java.util.concurrent.FutureTask.run(FutureTask.java:260)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
at java.security.AccessController.doPrivileged(Native Method)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
at java.lang.Thread.run(Thread.java:745)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)
我尝试直接上传jar文件 - appengine-api-1.0-sdk-1.9.20.jar,但它仍然尝试上传这个jar appengine-api-L4wtoWwoElWmstI1Ia93cg.jar。我不知道这是什么罐子。任何关于这个罐子的想法是值得赞赏的。
请帮我解决这个问题。
简短的回答是,如果您在托管虚拟机上使用 AppEngine,则不会遇到 AppEngine 沙盒限制(使用 F1 或 B1 实例类时的 OOM、执行时间限制问题、列入白名单的 JRE 类(。如果您确实想在 App Engine 沙盒中运行,那么您对数据流 SDK 的使用最符合 AppEngine 沙盒的限制。下面我将解释常见问题以及人们为符合 AppEngine 沙盒限制所做的工作。
数据流开发工具包需要一个具有足够内存的 AppEngine 实例类来执行用户应用程序,以构造管道、暂存任何资源并将作业描述发送到数据流服务。通常,我们看到用户需要使用内存超过 128mb 的实例类才能看到 OOM 错误。
通常,如果已暂存应用程序所需的资源,则构造管道并将其提交到数据流服务通常需要不到几秒钟的时间。将您的 JAR 和任何其他资源上传到 GCS 可能需要超过 60 秒的时间。这可以通过事先将 JAR 预先暂存到 GCS 来手动解决(如果数据流 SDK 检测到它们已经存在,它将跳过再次暂存它们(或使用任务队列来获取 10 分钟的限制(请注意,对于大型应用程序,10 分钟可能不足以暂存所有资源(。
最后,在 AppEngine 沙盒环境中,您和您的所有依赖项仅限于在 JRE 中使用列入白名单的类,否则将出现如下异常:
java.lang.SecurityException:
java.lang.IllegalAccessException: YYY is not allowed on ZZZ
...
编辑 1
我们对类路径上的 jar 内容执行哈希处理,并使用修改后的文件名将它们上传到 GCS。AppEngine 使用自己的 JAR 运行沙盒环境,appengine-api-L4wtoWwoElWmstI1Ia93cg.jar 是指 appengine-api.jar这是沙盒环境添加的 jar。您可以从我们的 PackageUtil#getUniqueContentName(...( 中看到,我们只是在.jar之前附加 -$HASH。
我们正在努力解决为什么您会看到 RequestPayloadToLarge 例外,目前建议您设置 filesToStage 选项并过滤掉执行数据流不需要的 jar 来解决您面临的问题。您可以看到我们如何使用 DataflowPipelineRunner#detectClassPathResourcesToStage(...( 构建要暂存的文件。
我对10MB的限制也有同样的问题。我所做的是过滤掉大于该限制的 JAR 文件(而不是特定文件(,然后在DataflowPipelineOptions
中设置重命名文件,setFilesToStage
.
所以我只是从数据流SDK中复制了detectClassPathResourcesToStage
的方法并对其进行了直观的更改:
private static final long FILE_BYTES_THRESHOLD = 10 * 1024 * 1024; // 10 MB
protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
if (!(classLoader instanceof URLClassLoader)) {
String message = String.format("Unable to use ClassLoader to detect classpath elements. "
+ "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
throw new IllegalArgumentException(message);
}
List<String> files = new ArrayList<>();
for (URL url : ((URLClassLoader) classLoader).getURLs()) {
try {
File file = new File(url.toURI());
if (file.length() < FILE_BYTES_THRESHOLD) {
files.add(file.getAbsolutePath());
}
} catch (IllegalArgumentException | URISyntaxException e) {
String message = String.format("Unable to convert url (%s) to file.", url);
throw new IllegalArgumentException(message, e);
}
}
return files;
}
然后当我创建DataflowPipelineOptions
时:
DataflowPipelineOptions dataflowOptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
...
dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(DataflowPipelineRunner.class.getClassLoader()));
这是 Helder 的 10MB 过滤解决方案的一个版本,它将适应 DataflowPipelineOptions
的默认文件暂存行为,即使它在 SDK 的未来版本中发生变化。
它不会复制逻辑,而是将DataflowPipelineOptions
的一次性副本传递给DataflowPipelineRunner
以查看它将暂存的文件,然后删除任何太大的文件。
请注意,此代码假定您已定义一个名为 MyOptions
的自定义PipelineOptions
类,以及一个名为 logger
的java.util.Logger
字段。
// The largest file size that can be staged to the dataflow service.
private static final long MAX_STAGED_FILE_SIZE_BYTES = 10 * 1024 * 1024;
/**
* Returns the list of .jar/etc files to stage based on the
* Options, filtering out any files that are too large for
* DataflowPipelineRunner.
*
* <p>If this accidentally filters out a necessary file, it should
* be obvious when the pipeline fails with a runtime link error.
*/
private static ImmutableList<String> getFilesToStage(MyOptions options) {
// Construct a throw-away runner with a copy of the Options to see
// which files it would have wanted to stage. This could be an
// explicitly-specified list of files from the MyOptions param, or
// the default list of files determined by DataflowPipelineRunner.
List<String> baseFiles;
{
DataflowPipelineOptions tmpOptions =
options.cloneAs(DataflowPipelineOptions.class);
// Ignore the result; we only care about how fromOptions()
// modifies its parameter.
DataflowPipelineRunner.fromOptions(tmpOptions);
baseFiles = tmpOptions.getFilesToStage();
// Some value should have been set.
Preconditions.checkNotNull(baseFiles);
}
// Filter out any files that are too large to stage.
ImmutableList.Builder<String> filteredFiles = ImmutableList.builder();
for (String file : baseFiles) {
long size = new File(file).length();
if (size < MAX_STAGED_FILE_SIZE_BYTES) {
filteredFiles.add(file);
} else {
logger.info("Not staging large file " + file + ": length " + size
+ " >= max length " + MAX_STAGED_FILE_SIZE_BYTES);
}
}
return filteredFiles.build();
}
/** Runs the processing pipeline with given options. */
public void runPipeline(MyOptions options)
throws IOException, InterruptedException {
// DataflowPipelineRunner can't stage large files;
// remove any from the list.
DataflowPipelineOptions dpOpts =
options.as(DataflowPipelineOptions.class);
dpOpts.setFilesToStage(getFilesToStage(options));
// Run the pipeline as usual using "options".
// ...
}