为什么 Apache Flink 从数据流中删除事件



在下面的单元测试用例中,由 numberOfElements 指定的一些事件被生成并作为数据流馈送。该单元案例随机失败。

assertEquals(numberOfElements, CollectSink.values.size(((;

任何解释为什么Apache Flink跳过事件。

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;
public class FlinkTest {
StreamExecutionEnvironment env;
@Before
public void setup() {
    env = StreamExecutionEnvironment.createLocalEnvironment();
}
@Test
public void testStream1() throws Exception {
    testStream();
}
@Test
public void testStream2() throws Exception {
    testStream();
}
@Test
public void testStream3() throws Exception {
    testStream();
}
@Test
public void testStream4() throws Exception {
    testStream();
}

@Test
public void testStream() throws Exception {
    final int numberOfElements = 50;
    DataStream<Tuple2<String, Integer>> tupleStream = env.fromCollection(getCollectionOfBucketImps(numberOfElements));
    CollectSink.values.clear();
    tupleStream.addSink(new CollectSink());
    env.execute();
    sleep(2000);
    assertEquals(numberOfElements, getCollectionOfBucketImps(numberOfElements).size());
    assertEquals(numberOfElements, CollectSink.values.size());
}

public static List<Tuple2<String, Integer>> getCollectionOfBucketImps(int numberOfElements) throws InterruptedException {
    List<Tuple2<String, Integer>> records = new ArrayList<>();
    for (int i = 0; i < numberOfElements; i++) {
        records.add(new Tuple2<>(Integer.toString(i % 10), i));
    }
    return records;
}
// create a testing sink
private static class CollectSink implements SinkFunction<Tuple2<String, Integer>> {
    public static final List<Tuple2<String, Integer>> values = new ArrayList<>();
    @Override
    public synchronized void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        values.add(value);
    }
 }
}

例如,测试流X案例中的任何一个随机失败。

上下文:代码以 8 作为并行 setu 运行,因为它运行的 CPU 有 8 个内核

我不知道

你的工作是怎样的(我想这是 Flink 可以分配的最大值(。看起来您可以对接收器的附加值设置竞争条件。

溶液

我已经运行了您的示例代码,将环境并行度设置为 1,一切正常。有关测试的文档示例使用此解决方案链接到文档。

@Before
public void setup() {
    env = StreamExecutionEnvironment.createLocalEnvironment();
    env.setParallelism(1);
}

甚至更好

只能在接收器运算符上将并行度设置为 1,并保持管道其余部分的并行度。在下面的示例中,我添加了一个额外的 map 函数,对于 tha map 运算符,强制并行度为 8。

public void testStream() throws Exception {
    final int numberOfElements = 50;
    DataStream<Tuple2<String, Integer>> tupleStream = env.fromCollection(getCollectionOfBucketImps(numberOfElements));
    CollectSink.values.clear();
    tupleStream
            .map(new MapFunction<Tuple2<String,Integer>, Tuple2<String,Integer>>() {
                @Override
                public Tuple2<String,Integer> map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                    stringIntegerTuple2.f0 += "- concat something";
                    return stringIntegerTuple2;
                }
            }).setParallelism(8)
            .addSink(new CollectSink()).setParallelism(1);
    env.execute();
    sleep(2000);
    assertEquals(numberOfElements, getCollectionOfBucketImps(numberOfElements).size());
    assertEquals(numberOfElements, CollectSink.values.size());
}

当环境的平行大于 1 时,有多个 CollectSink 实例,这可能会导致争用条件。

以下是避免争用条件的解决方案:

  1. 在类对象上同步
private static class CollectSink implements SinkFunction<Tuple2<String, Integer>> {
    public static final List<Tuple2<String, Integer>> values = new ArrayList<>();
    @Override
    public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        synchronized(CollectSink.class) {
            values.add(value);
        }
    }
 }
  1. Collections.synchronizedList()
import java.util.Collections;
private static class CollectSink implements SinkFunction<Tuple2<String, Integer>> {
    public static final List<Tuple2<String, Integer>> values = Collections.synchronizedList(new ArrayList<>());
    @Override
    public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        values.add(value);
    }
 }

相关内容

  • 没有找到相关文章