Apache Flink CEP,模式不匹配



我是Flink CEP的新手,正在尝试测试基本的东西-在下面的代码中,我的期望是所有的输入都应该在模式中匹配,并且应该打印为匹配的结果。但不知何故,没有任何东西与("matechedStream.print(("(相匹配,对此原因有任何想法吗?

如有任何建议/帮助,我们将不胜感激。

package com.o9.flink;
import com.o9.flink.asyncio.DemandSupply;
import org.apache.flink.cep.CEP;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class DemandSupplyPattern {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> keyedInputStream = env.fromElements("AAA","BBB","CCC");
Pattern<String, ?> dspattern = Pattern.<String>begin("start");
PatternStream<String> patternStream = CEP.pattern(keyedInputStream, dspattern);
DataStream<String> matechedStream =  patternStream.process(new PatternProcessFunction<String, String>() {
@Override
public void processMatch(Map<String, List<String>> map, Context context, Collector<String> collector) throws Exception {
collector.collect(map.get("start").toString());
}
});
matechedStream.print();
env.execute("DemandSupply-CEP");
}
}

Maven依赖项:

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<!--<scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
<!--<scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>

</dependencies>

谢谢Mahendra

FlinkCEP默认使用EventTime,它依赖于WaterMark来帮助这些事件继续进行。如果将其切换到ProcessingTime,可能会有所帮助。

PatternStream<String> patternStream = CEP.pattern(keyedInputStream, dspattern).inProcessingTime();

如果你不知道差异,也不知道时间语义,你可以在时间的闪烁通知

最新更新