搜索一个非常简单的EsperIO Kafka示例



我只是在拼命寻找Esper CEP Kafka适配器代码的示例代码。我已经安装了Kafka,并使用生产者将数据写入Kafka主题,现在我想用Esper CEP处理它。不幸的是,Kafka适配器的Esper文档并不是很有意义。有人举一个很简单的例子吗?

编辑:

到目前为止,我添加了一个适配器,它似乎可以工作。但是,我不知道如何读取适配器,也不知道如何将CEP模式与该适配器链接起来。这是我迄今为止的代码:

config.addImport(KafkaOutputDefault.class);
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group.id");
props.put(EsperIOKafkaConfig.INPUT_SUBSCRIBER_CONFIG, EsperIOKafkaInputSubscriberByTopicList.class.getName());
props.put(EsperIOKafkaConfig.TOPICS_CONFIG, "test123");
props.put(EsperIOKafkaConfig.INPUT_PROCESSOR_CONFIG, EsperIOKafkaInputProcessorDefault.class.getName());
props.put(EsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG, EsperIOKafkaInputTimestampExtractorConsumerRecord.class.getName());
Configuration config2 = new Configuration();
config2.addPluginLoader("KafkaInput", EsperIOKafkaInputAdapterPlugin.class.getName(), props, null);
EsperIOKafkaInputAdapter adapter = new EsperIOKafkaInputAdapter(props, "default");
adapter.start();

我也遇到过同样的问题。我创建了一个你可以看看的示例项目,尤其是普通的esper分支。

一个更简化的版本是:

public class KafkaExample implements Runnable {
private String runtimeURI;
public KafkaExample(String runtimeURI) {
this.runtimeURI = runtimeURI;
}
public static void main(String[] args){
new KafkaExample("KafkaExample").run();
}
@Override
public void run() {
Configuration configuration = new Configuration();
configuration.getCommon().addImport(KafkaOutputDefault.class);
configuration.getCommon().addEventType(String.class);
Properties consumerProps = new Properties();
// Kafka Consumer Properties
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());
// EsperIO Kafka Input Adapter Properties
consumerProps.put(EsperIOKafkaConfig.INPUT_SUBSCRIBER_CONFIG, Consumer.class.getName());
consumerProps.put(EsperIOKafkaConfig.INPUT_PROCESSOR_CONFIG, InputProcessor.class.getName());
consumerProps.put(EsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG, EsperIOKafkaInputTimestampExtractorConsumerRecord.class.getName());
configuration.getRuntime().addPluginLoader("KafkaInput", EsperIOKafkaInputAdapterPlugin.class.getName(), consumerProps, null);
String stmt = "@name('sampleQuery') select * from String";
EPCompiled compiled;
try {
compiled = EPCompilerProvider.getCompiler().compile(stmt, new CompilerArguments(configuration));
} catch (EPCompileException ex) {
throw new RuntimeException(ex);
}
EPRuntime runtime = EPRuntimeProvider.getRuntime(runtimeURI, configuration);
EPDeployment deployment;
try {
deployment = runtime.getDeploymentService().deploy(compiled, new DeploymentOptions().setDeploymentId(UUID.randomUUID().toString()));
} catch (EPDeployException ex) {
throw new RuntimeException(ex);
}
EPStatement statement = runtime.getDeploymentService().getStatement(deployment.getDeploymentId(), "sampleQuery");
statement.addListener((newData, oldData, sta, run) -> {
for (EventBean nd : newData) {
System.out.println(nd.getUnderlying());
}
});
while (true) {}
}
}
public class Consumer implements EsperIOKafkaInputSubscriber {
@Override
public void subscribe(EsperIOKafkaInputSubscriberContext context) {
Collection<String> collection = new ArrayList<String>();
collection.add("input");
context.getConsumer().subscribe(collection);
}
}
public class InputProcessor implements EsperIOKafkaInputProcessor {
private EPRuntime runtime;
@Override
public void init(EsperIOKafkaInputProcessorContext context) {
this.runtime = context.getRuntime();
}
@Override
public void process(ConsumerRecords<Object, Object> records) {
for (ConsumerRecord record : records) {
if (record.value() != null) {
try {
runtime.getEventService().sendEventBean(record.value().toString(), "String");
} catch (Exception e) {
throw e;
}
}
}
}
public void close() {}
}
下面是示例代码。此代码假定主题中已经存在一些消息。这不会循环并等待更多消息。
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
KafkaConsumer consumer = new KafkaConsumer<>(consumerProps);
ConsumerRecords<String, String> rows = consumer.poll(1000);
Iterator<ConsumerRecord<String, String>> it = rows.iterator();
while (it.hasNext()) {
ConsumerRecord<String, String> row = it.next();
MyEvent event = new MyEvent(row.value()); // transform string to event
// process event
runtime.sendEvent(event);
}

最新更新