Apache Pulsar:LocalRunner中的访问状态存储不起作用



我试图实现一个简单的Apache Pulsar函数,并在LocalRunner模式下访问State API,但它不起作用。

pom.xml片段

<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-local-runner-original</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.1</version>
</dependency>
</dependencies>

功能类

public class TestFunction implements Function<String, String> {
public String process(String input, Context context) throws Exception {
System.out.println(">>> GOT input "+input);
context.incrCounter("counter",1); //--> try to access state
return input;
}
}

public class Main {
public static final String BROKER_URL = "pulsar://localhost:6650";
public static final String TOPIC_IN = "test-topic-input";
public static final String TOPIC_OUT = "test-topic-output";
public static void main(String[] args) throws Exception {
FunctionConfig functionConfig = FunctionConfig
.builder()
.className(TestFunction.class.getName())
.inputs(Collections.singleton(TOPIC_IN))
.name("Test Function")
.runtime(Runtime.JAVA)
.subName("Test Function Sub")
.build();
LocalRunner localRunner = LocalRunner.builder()
.brokerServiceUrl(BROKER_URL)
.stateStorageServiceUrl("bk://127.0.0.1:4181")
.functionConfig(functionConfig)
.build();
localRunner.start(false);
PulsarClient client = PulsarClient.builder().serviceUrl(BROKER_URL).build();
Producer<String> producer = client.newProducer(Schema.STRING).topic(TOPIC_IN).create();
producer.send("Hello World!");
System.out.println(">>> PRODUCER SENT");
}

我在本地docker容器(Win10上的docker Desktop(中运行Pulsar,开始时是这样的:

Docker容器

docker run -it 
-p 6650:6650  
-p 8080:8080 
-p 4181:4181 #I added that port to expose the bookkeeper, otherwise the function will hang and don't do anything
--mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf 
apachepulsar/pulsar:2.9.1 bin/pulsar standalone

当我启动应用程序时,控制台会显示以下日志:

2022-01-07T12:39:48,757+0100 [public/default/Test Function-0] WARN  org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:48,863+0100 [public/default/Test Function-0] WARN  org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:48,970+0100 [public/default/Test Function-0] WARN  org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:49,076+0100 [public/default/Test Function-0] WARN  org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
... and it goes on an on ...

Pulsar日志显示:

2022-01-07T12:13:48,882+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
2022-01-07T12:13:48,989+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
2022-01-07T12:13:49,097+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
2022-01-07T12:13:49,207+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
... goes on and on ...

我做错了什么?

问题在于您为函数选择的名称,"测试功能";。由于它有一个空间,当它使用该名称作为内部存储流时,这会在Pulsar的状态存储中引起问题。

如果你去掉空格;TestFunction";相反,它会很好地工作。我刚才已经亲自证实了。

2022-02-07T11:09:04,916-0800 [main] WARN  com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
>>> PRODUCER SENT
2022-02-07T11:09:05,267-0800 [public/default/TestFunction-0] INFO  org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Opening state table for function public/default/TestFunction
2022-02-07T11:09:05,279-0800 [client-scheduler-OrderedScheduler-7-0] INFO  org.apache.bookkeeper.clients.SimpleStorageClientImpl - Retrieved table properties for table public_default/TestFunction : stream_id: 1024
2022-02-07T11:09:05,527-0800 [pulsar-client-io-1-2] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [test-topic-input][Test Function Sub] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
>>> GOT input Hello World!

最新更新