如何使用依赖项通过 JAVA 客户端将消息发布到 Cloud Pub/Sub?



我在 maven 包中编写了以下方法:

public static void publishMessage(Publisher publisher, String message) throws ExecutionException, InterruptedException, TimeoutException {
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get(10L, TimeUnit.SECONDS);
LOG.info("Message Published! ID: {} Message: {}", messageId, message);
}

当我从 maven 包中的 Runner 主类调用此方法时,我能够毫无问题地将消息发布到 Pub/Sub。当我将此 maven 项目作为 SBT 项目中的依赖项加载并尝试调用此方法时,如果我在没有超时配置的情况下使用该语句,执行会在以下行超时:messageIdFuture.get(10L, TimeUnit.SECONDS);或无限期卡住。

我正在以这种方式构建发布者:

public static Publisher getPublisher(String projectId, String topicId) throws IOException {
TopicName topicName = TopicName.of(projectId, topicId);
return Publisher.newBuilder(topicName).build();
}

我有一种预感,这个问题与执行器和线程池有关,其中控件没有以我想要的方式流动。

其他实验:我尝试通过运行publisher.publish(pubsubMessage);publisher.shutdown();来避免future.get(),因为关闭会发布所有本地排队的消息。即便如此,消息也是通过 Runner 类发布的,并且 SBT 应用程序执行会无限期地停留在内部使用wait()publisher.shutdown();messagesWaiter.waitComplete();

请注意:

  • 我在本地通过最终用户凭据使用身份验证,但即使在使用服务帐户的已部署应用程序(SBT、Play)中,代码也会超时。
  • 在这些操作之后,我将分别关闭发布者(如文档中所述)(无论如何这不应该成为问题,因为它正在与 Runner 类一起使用)。

正在使用的 JAVA 客户端版本:1.108.1

我主要遵循 JAVA 客户端文档本身。链接: https://cloud.google.com/pubsub/docs/samples/pubsub-quickstart-publisher

升级到最新版本的 Java Client Library for Cloud Pub/Sub 工作。

要使用带有凭据的 Java 向 Google Cloud 发布/订阅主题发布消息,请按照以下步骤操作:

1.添加 maven 依赖项

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.123.2</version>
  1. 用于创建发布者和发布消息的示例代码。

    CredentialsProvider credentialsProvider = null;
    try {
    credentialsProvider = FixedCredentialsProvider
    .create(ServiceAccountCredentials
    .fromStream(new ByteArrayInputStream(
    StandardCharsets.UTF_8.encode("AccountCredential").array()
    )));
    } catch (Exception e) {}
    ProjectTopicName topic = ProjectTopicName.of("ProjectId", "Topic"); 
    Publisher publisher = Publisher.newBuilder(topic)
    .setCredentialsProvider(credentialsProvider)
    .build();
    ByteString data = ByteString.copyFromUtf8("Message Content");
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
    try {
    ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
    String messageId = messageIdFuture.get();
    ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {
    public void onSuccess(String messageId) {
    }
    public void onFailure(Throwable t) {
    }
    }, MoreExecutors.directExecutor());
    } catch (Exception e) {
    } 
    try {
    publisher.shutdown();
    publisher.awaitTermination(1, TimeUnit.MINUTES);
    } catch (Exception e) {
    } 
    

将帐户凭据、项目 ID、主题和消息内容替换为您的数据。

检查下面的 GitHub 存储库链接以获取可重用的发布者组件,并在任何地方使用。

可重用的 gCloud 消息发布者自定义库

这个答案可能会帮助一些想要使用 java 客户端在 gcloud 中发布消息的人。 谢谢