在另一个中创建新的数据流



我有两种数据类型。

type1 and type2 

我有一个type1的数据流。

DataStream<type1> stream1 =... 

stream1内部,我想创建type2的对象,并且我想收集type1type2的对象。

是否可以使用一个数据流拥有一种输入类型和两种输出类型?还是可以在stream1内部创建一个新的数据流(DataStream<type2> stream2)?

还是有其他方法可以收集一种从一种类型评估的两种不同类型的数据?

您需要首先创建包装器类型,然后以后将流进行分配和选择。对于包装器,只有一个成员是 not-null ;

class TypeWrapper {
    // keeping this short for brevity
    public TypeA firstType;
    public TypeB secondType;
}

拆分并选择:

DataStream<TypeWrapper> stream1 = ...
DataStream<TypeA> streamA = stream1.filter(new FilterFunction<TypeWrapper>() {
    @Override
    public boolean filter(TypeWrapper value) throws Exception {
        return value.firstType != null;
    }
})
.map(new MapFunction<TypeWrapper, TypeA>() {
    @Override
    public TypeA map(TypeWrapper value) throws Exception {
        return value.firstType;
    }
});
DataStream<TypeB> streamB = stream1.filter(new FilterFunction<TypeWrapper>() {
    @Override
    public boolean filter(TypeWrapper value) throws Exception {
        return value.secondType != null;
    }
})
.map(new MapFunction<TypeWrapper, TypeB>() {
    @Override
    public TypeB map(TypeWrapper value) throws Exception {
        return value.secondType;
    }
});

因为filter()map()将被链接到stream1都在同一线程上执行并且操作很便宜。

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
case class Type1(){}
case class Type2(){}

object MultipleOutputJob {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // Stream of Type1
    val stream1 = env.addSource((sc: SourceFunction.SourceContext[Type1]) => {
      while(true){
        Thread.sleep(1000)
        sc.collect(Type1())
      }
    })
    // Mapping from Type1 to Type2
    val stream2 = stream1.map(t1 => Type2())
    // Collect both the original and the derived data
    stream1.print
    stream2.print
    env.execute()
  }
}

相关内容

  • 没有找到相关文章

最新更新