我在 maven 包中编写了以下方法:
public static void publishMessage(Publisher publisher, String message) throws ExecutionException, InterruptedException, TimeoutException {
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get(10L, TimeUnit.SECONDS);
LOG.info("Message Published! ID: {} Message: {}", messageId, message);
}
当我从 maven 包中的 Runner 主类调用此方法时,我能够毫无问题地将消息发布到 Pub/Sub。当我将此 maven 项目作为 SBT 项目中的依赖项加载并尝试调用此方法时,如果我在没有超时配置的情况下使用该语句,执行会在以下行超时:messageIdFuture.get(10L, TimeUnit.SECONDS);
或无限期卡住。
我正在以这种方式构建发布者:
public static Publisher getPublisher(String projectId, String topicId) throws IOException {
TopicName topicName = TopicName.of(projectId, topicId);
return Publisher.newBuilder(topicName).build();
}
我有一种预感,这个问题与执行器和线程池有关,其中控件没有以我想要的方式流动。
其他实验:我尝试通过运行publisher.publish(pubsubMessage);
后publisher.shutdown();
来避免future.get()
,因为关闭会发布所有本地排队的消息。即便如此,消息也是通过 Runner 类发布的,并且 SBT 应用程序执行会无限期地停留在内部使用wait()
publisher.shutdown();
messagesWaiter.waitComplete();
请注意:
- 我在本地通过最终用户凭据使用身份验证,但即使在使用服务帐户的已部署应用程序(SBT、Play)中,代码也会超时。
- 在这些操作之后,我将分别关闭发布者(如文档中所述)(无论如何这不应该成为问题,因为它正在与 Runner 类一起使用)。
正在使用的 JAVA 客户端版本:1.108.1
我主要遵循 JAVA 客户端文档本身。链接: https://cloud.google.com/pubsub/docs/samples/pubsub-quickstart-publisher
升级到最新版本的 Java Client Library for Cloud Pub/Sub 工作。
要使用带有凭据的 Java 向 Google Cloud 发布/订阅主题发布消息,请按照以下步骤操作:
1.添加 maven 依赖项
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.123.2</version>
用于创建发布者和发布消息的示例代码。
CredentialsProvider credentialsProvider = null; try { credentialsProvider = FixedCredentialsProvider .create(ServiceAccountCredentials .fromStream(new ByteArrayInputStream( StandardCharsets.UTF_8.encode("AccountCredential").array() ))); } catch (Exception e) {} ProjectTopicName topic = ProjectTopicName.of("ProjectId", "Topic"); Publisher publisher = Publisher.newBuilder(topic) .setCredentialsProvider(credentialsProvider) .build(); ByteString data = ByteString.copyFromUtf8("Message Content"); PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); try { ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage); String messageId = messageIdFuture.get(); ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() { public void onSuccess(String messageId) { } public void onFailure(Throwable t) { } }, MoreExecutors.directExecutor()); } catch (Exception e) { } try { publisher.shutdown(); publisher.awaitTermination(1, TimeUnit.MINUTES); } catch (Exception e) { }
将帐户凭据、项目 ID、主题和消息内容替换为您的数据。
检查下面的 GitHub 存储库链接以获取可重用的发布者组件,并在任何地方使用。
可重用的 gCloud 消息发布者自定义库
这个答案可能会帮助一些想要使用 java 客户端在 gcloud 中发布消息的人。 谢谢