来自应用程序引擎的管道提交



我需要将数据存储实体发送到BigQuery表,同时进行数据转换。到目前为止,我的设计如下:
AppEngine Java应用程序将数据发布到PUB/SUB服务中的Topic,使其正常工作。然后让DataflowPipeline订阅该主题并读取消息。然后完成转换并将结果写入BigQuery。我运行了一些示例代码来测试这一点。

我有一个粗糙的管道在我的本地开发机器上运行,我可以运行它——所有这些都作为演示代码运行。这是通过本地运行的mvn appengine:devserver

现在的问题是:你如何从谷歌应用引擎部署数据流管道?开发机器无法访问生产环境,因此我无法在谷歌管道服务上运行我的管道。我试图从谷歌应用引擎提交这个,但收到内存不足的错误。这似乎与某些身份验证问题有关。从StackOverflow上的其他帖子来看,似乎"官方"不支持这种来自应用引擎的"部署"。

那么在生产环境中如何做到这一点呢?

迄今为止的环境依赖性:
maven 3.3.0
谷歌应用引擎1.9.28
Google API客户端1.20.0
Java 1.7.0_79
工作站-Windows 7
谷歌开发环境:金牌套餐
这是我运行管道进程的示例代码。。。。

DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
        options.setNumWorkers(2);
        options.setRunner(DataflowPipelineRunner.class);
        options.setStagingLocation("gs://pipeline_bucket2");
        options.setProject("projectname");
        options.setJobName("starterpipeline");
        options.setUpdate(true);
        Pipeline p = Pipeline.create(options);
        p.apply(Create.of("Hello", "World")).apply(ParDo.named("StringExtract").of(new DoFn<String, String>() {
            @Override
            public void processElement(ProcessContext c) {
                c.output(c.element().toUpperCase());
            }
        })).apply(ParDo.named("StringLogger").of(new DoFn<String, Void>() {
            @Override
            public void processElement(ProcessContext c) {
                LOG.info(c.element());
            }
        }));
        p.run();

这是我在尝试运行上面的代码时对错误的堆栈跟踪:

Uncaught exception from servlet
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:2271)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection$BufferingOutputStream.write(URLFetchServiceStreamHandler.java:586)
    at com.google.api.client.util.ByteStreams.copy(ByteStreams.java:55)
    at com.google.api.client.util.IOUtils.copy(IOUtils.java:94)
    at com.google.api.client.http.AbstractInputStreamContent.writeTo(AbstractInputStreamContent.java:72)
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:79)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
    at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
    at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
    at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
    at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
    at java.util.concurrent.FutureTask.run(FutureTask.java:260)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
    at java.security.AccessController.doPrivileged(Native Method)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
    at java.lang.Thread.run(Thread.java:745)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)

Dataflow在上传应用程序工件时与Google Cloud Storage通信时使用64mb缓冲区。如果您使用的实例没有足够的内存,例如,如果您使用具有128MB内存的AppEngine实例,则可能会导致OOM。

还要注意,每当您更新模块或AppEngine进行内部更新时,第一次执行Dataflow管道时,Dataflow SDK需要上传所有更改为Google Cloud Storage的应用程序工件。根据应用程序的大小,这可能需要超过60秒的时间,这是前端实例请求的限制,并可能导致超过截止日期的错误。

最新更新