Apache Beam:无法通过Docker-Compose访问酒吧/子模拟器



我已经构建了一个软件,该软件使用GCP Pub/sub作为消息队列,Apache Beam构建管道和烧瓶以构建Web服务器。它在生产中运行顺利,但我很难将所有零件与码头组合,尤其是Apache Beam管道连接在一起。

我遵循了数据流管线和pubsub仿真器,通过从我的 docker-compose.yaml中定义的服务的名称中替换so答案的 localhost来使管道收听GCP Pub/sub Mimulator。

  pubsub_emulator:
    build: docker_images/message_queue
    ports:
      - 8085:8085
  webserver:
    build: docker_images/webserver
    environment:
      PUBSUB_EMULATOR_HOST: pubsub_emulator:8085
      PUBSUB_PROJECT_ID: my-dev
    restart: unless-stopped
    ports:
      - 8899:8080
    depends_on:
      - pubsub_emulator
   pipeline:
    build: docker_images/pipeline
    environment:
      PUBSUB_EMULATOR_HOST: pubsub_emulator:8085
      PUBSUB_PROJECT_ID: my-dev
    restart: unless-stopped
    depends_on:
      - pubsub_emulator

Web服务器能够访问酒吧/子模拟器并生成主题。

但是,使用 MalformedURLException启动时管道失败:

Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: no protocol: pubsub_emulator:8085/v1/projects/my-dev/subscriptions/sync_beam_1702190853678138166

管道的选项似乎很好,我用:

定义了它们
final String pubSubEmulatorHost = System.getenv("PUBSUB_EMULATOR_HOST"); 
BasePipeline.PipeOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                                .as(BasePipeline.PipeOptions.class);
options.as(DataflowPipelineOptions.class).setStreaming(true);
options.as(PubsubOptions.class).setPubsubRootUrl(pubSubEmulatorHost);
Pipeline pipeline = Pipeline.create(options);

有人暗示正在发生的事情以及如何解决吗?唯一的解决方案是否暗示将模拟器和管道设置在同一码头中?

您可以尝试将值更改为以下:

http://pubsub_emulator:8085

作为丢失protocol的错误,该错误预计在您的情况下为http

根据Apache Beam SDK,预期为完全合格的URL的值:

// getPubsubRootUrl
@Default.String(value="https://pubsub.googleapis.com")
 @Hidden
java.lang.String getPubsubRootUrl()
// Root URL for use with the Google Cloud Pub/Sub API.

但是,如果您来自Python背景,您会注意到使用GRPC Python的Python SDK在此处显示仅期望服务器地址由地址和端口组成

# A snippet from google-cloud-python library.
if os.environ.get("PUBSUB_EMULATOR_HOST"):
    kwargs["channel"] = grpc.insecure_channel(
        target=os.environ.get("PUBSUB_EMULATOR_HOST")
    )
grpc.insecure_channel(target, options=None)
Creates an insecure Channel to a server.
The returned Channel is thread-safe.
Parameters: 
target – The server address

最新更新