如何在 flink CEP 中将子类型()转换为元组


Pattern< Tuple3< String, String, String >, ? > pattern = Pattern.<Tuple3< String, String, String > > begin( "start" )            
            .next( "3" ).where( new FilterFunction< Tuple3< String, String, String > >() {
                @Override
                public boolean filter ( Tuple3< String, String, String > value ) throws Exception {
                    return value.f2.equals( "3" );
                }
            } )
            .next( "4" ).subtype(Tuple.getTupleClass( 2 )).where( new FilterFunction< Tuple2< String, String> >() {
                @Override
                public boolean filter ( Tuple2< String, String > value ) throws Exception {
                    return value.f1.equals( "3" );
                }
            } )

子类型(Tuple.getTupleClass( 2 )),并导致错误 Inferred type 'capture<? extends org.apapche.flink.api.java.tuple.Tuple>' for type parameter 'S' is not within its bound;should extend 'org.apapche.flink.api.java.tuple.Tuple3<java.lang.String,java.lang.String,java.lang.String>'

我应该修改这个吗?但是如何修改?Pattern< Tuple3< String, String, String >, ? > pattern


按2017012更新

JoinedStreams< Tuple2< String, String >, Tuple3< String, String, String > >.Where< String >.EqualTo
        joinedStreams = someStream
        .join( otherStream )
        .where( value -> value.f1 )
        .equalTo( value -> value.f1 );
Pattern< Tuple, ? > pattern = Pattern.< Tuple > begin( "start" )
        .subtype( Tuple3.class )
        .where( evt -> evt.f2.equals( "3" ) )
        .next( "4" )
        .subtype( Tuple2.class )
        .where( evt -> evt.f1.equals( "3" ) )
        .within( Time.seconds( 10 ) );
PatternStream< ...> patternStream = CEP.pattern( joinedStreams, pattern );

试过这个,不知道我应该填写什么PatternStream< ...>.感谢任何可以提供帮助的人。

试试这段代码:

Pattern<Tuple, ?> pattern =
    Pattern.<Tuple>begin("start")
    .next("3")
        .subtype(Tuple3.class)
        .where(new FilterFunction<Tuple3>() {
            @Override
            public boolean filter(Tuple3 value) throws Exception {
                return value.f2.equals("3");
            }
        })
    .next("4")
        .subtype(Tuple2.class)
        .where(new FilterFunction<Tuple2>() {
            @Override
            public boolean filter(Tuple2 value) throws Exception {
                return value.f1.equals("3");
            }
        });

从通用类型Tuple开始,对子事件使用具体的Tuple2Tuple3类型。此模式的数据流必须具有Tuple类型。

这个

呢:

    Pattern<Tuple, ?> pattern = Pattern.<Tuple>begin("start")
            .subtype(Tuple3.class)
            .where(evt -> evt.f2.equals("3"))
            .next("4")
            .subtype(Tuple2.class)
            .where(evt -> evt.f1.equals("3"))
            .within(Time.seconds(10));
  1. 无需在开始后添加下一步
  2. 请注意子类型的字面含义,元组 3 和元组 2 应扩展元组。

如果要连接两个不同的数据流。

DataStream<Tuple2> someStream = //...
DataStream<Tuple3> otherStream = //...
ConnectedStreams<Tuple2, Tuple3> connectedStreams = someStream.connect(otherStream);

然后你可以使用 CoMap、CoFlatMap 来获取相同的类型,例如将 Tuple2、Tuple3 转换为 String:ConnectedStreams → DataStream

connectedStreams.flatMap(new CoFlatMapFunction<Tuple2, Tuple3, String>() {
   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(Tuple2.toString());
   }
   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(Tuple3.toString);
       }
   }
});

以下是一些有用的链接,介绍了一个很好的用例:

  1. 使用 Apache Flink 引入复杂事件处理 (CEP)
  2. 我翻译的中文版

相关内容

  • 没有找到相关文章

最新更新