我开始学习flink,并查看了其中一个官方教程。
据我所知,本练习的目标是连接时间属性上的两个流。
任务:
此练习的结果是Tuple2记录的数据流,每个记录对应一个不同的rideId。您应该忽略END事件,并且只加入每次骑行的START事件其对应的票价数据。
生成的流应打印为标准输出。
问题:EnrichmentFunction如何连接这两个流(又名)。它怎么知道参加哪场比赛?我希望它能缓冲多个集市/游乐设施,直到有一个匹配的合作伙伴。
在我看来,它只是保存了它看到的每一次骑行/博览会,并将其与下一次最好的骑行/博览会相结合。为什么这是一个合适的连接?
提供的解决方案:
/*
* Copyright 2017 data Artisans GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dataartisans.flinktraining.solutions.datastream_java.state;
import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiFare;
import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiFareSource;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.ExerciseBase;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;
/**
* Java reference implementation for the "Stateful Enrichment" exercise of the Flink training
* (http://training.data-artisans.com).
*
* The goal for this exercise is to enrich TaxiRides with fare information.
*
* Parameters:
* -rides path-to-input-file
* -fares path-to-input-file
*
*/
public class RidesAndFaresSolution extends ExerciseBase {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String ridesFile = params.get("rides", pathToRideData);
final String faresFile = params.get("fares", pathToFareData);
final int delay = 60; // at most 60 seconds of delay
final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);
DataStream<TaxiRide> rides = env
.addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
.filter((TaxiRide ride) -> ride.isStart)
.keyBy("rideId");
DataStream<TaxiFare> fares = env
.addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
.keyBy("rideId");
DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
.connect(fares)
.flatMap(new EnrichmentFunction());
printOrTest(enrichedRides);
env.execute("Join Rides with Fares (java RichCoFlatMap)");
}
public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
// keyed, managed state
private ValueState<TaxiRide> rideState;
private ValueState<TaxiFare> fareState;
@Override
public void open(Configuration config) {
rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
}
@Override
public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TaxiFare fare = fareState.value();
if (fare != null) {
fareState.clear();
out.collect(new Tuple2(ride, fare));
} else {
rideState.update(ride);
}
}
@Override
public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TaxiRide ride = rideState.value();
if (ride != null) {
rideState.clear();
out.collect(new Tuple2(ride, fare));
} else {
fareState.update(fare);
}
}
}
}
在这个关于状态丰富的特定训练练习的上下文中,每个乘车ID值都有三个事件——TaxiRide开始事件、TaxiRider结束事件和TaxiFare。此练习的目的是将每个TaxiRide开始事件与具有相同乘客ID的一个TaxiFare事件连接起来——或者换句话说,在知道每个事件只有一个的情况下,加入乘客ID上的乘车流和票价流。
本练习演示了关键帧状态在Flink中是如何工作的。键控状态实际上是一个分片的键值存储。当我们有一个ValueState
的项,例如ValueState<TaxiRide> rideState
时,Flink将在其状态后端为键的每个不同值(rideId
)存储一个单独的记录。
每次调用flatMap1
和flatMap2
时,上下文中都隐含着一个键(rideId
),当我们调用rideState.update(ride)
或rideState.value()
时,我们不是在访问单个变量,而是使用rideId
作为键在键值存储中设置和获取一个条目。
在本练习中,两个流都由rideId
进行键控,因此对于每个不同的rideId
,可能存在rideState
的一个元素和fareState
的一个单元。因此,所提供的解决方案是缓冲大量的乘车和票价,但每个rideId
只有一个(这就足够了,因为乘车和票价在这个数据集中是完美配对的)。
所以,你问:
EnrichmentFunction如何能够连接这两个流,也就是。它怎么知道加入哪趟车的票价?
答案是
它加入了具有相同
rideId
的票价。
您询问的这个特定练习展示了如何实现简单的富集联接,以了解键控状态和连接流的概念。但是Flink当然可以实现更复杂的连接。请参阅有关使用DataStream API联接、使用Flink的Table API联接和使用Flink SQL联接的文档。