Flink Kafka-如何并行运行应用程序



我正在Flink创建一个应用程序

  1. 阅读主题的消息
  2. 对此做一些简单的过程
  3. 将结果写入另一个主题

我的代码确实有效,但是不在并行运行
我该怎么做?
看来我的代码仅在一个线程/块上运行?

在Flink Web仪表板上:

  • 应用程序转到运行状态
  • 但是,概述子任务中只有一个块
  • 和接收/发送的字节,收到/发送的记录始终为零(无更新)

这是我的代码,请帮助我学习如何将我的应用分开以并行运行,并且我是否正确编写了该应用程序?

public class SimpleApp {
    public static void main(String[] args) throws Exception {
        // create execution environment INPUT
        StreamExecutionEnvironment env_in  =    
                 StreamExecutionEnvironment.getExecutionEnvironment();
        // event time characteristic
        env_in.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // production Ready (Does NOT Work if greater than 1)
        env_in.setParallelism(Integer.parseInt(args[0].toString()));
        // configure kafka consumer
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("auto.offset.reset", "earliest");
        // create a kafka consumer
        final DataStream<String> consumer = env_in
                .addSource(new FlinkKafkaConsumer09<>("test", new   
                            SimpleStringSchema(), properties));
        // filter data
        SingleOutputStreamOperator<String> result = consumer.filter(new  
            FilterFunction<String>(){
            @Override
            public boolean filter(String s) throws Exception {
                return s.substring(0, 2).contentEquals("PS");
            }
        });
        // Process Data
        // Transform String Records to JSON Objects
        SingleOutputStreamOperator<JSONObject> data = result.map(new 
                MapFunction<String, JSONObject>()
        {
            @Override
            public JSONObject map(String value) throws Exception
            {
                JSONObject jsnobj = new JSONObject();
                if(value.substring(0, 2).contentEquals("PS"))
                {
                    // 1. Raw Data
                    jsnobj.put("Raw_Data", value.substring(0, value.length()-6));
                    // 2. Comment
                    int first_index_comment = value.indexOf("$");
                    int last_index_comment  = value.lastIndexOf("$") + 1;
                    //   - set comment
                    String comment          =  
                    value.substring(first_index_comment, last_index_comment);
                    comment = comment.substring(0, comment.length()-6);
                    jsnobj.put("Comment", comment);
                }
                else {
                    jsnobj.put("INVALID", value);
                }
                return jsnobj;
            }
        });
        // Write JSON to Kafka Topic
        data.addSink(new FlinkKafkaProducer09<JSONObject>("localhost:9092",
                "FilteredData",
                new SimpleJsonSchema()));
        env_in.execute();
    }
}

我的代码确实有效,但似乎仅在一个线程上运行(显示的一个块)在Web界面中(没有数据传递,因此发送/接收的字节未更新)。

如何并行运行?

要在并行运行您的作业,您可以做2件事:

  1. 增加在Env级别的工作的并行性 - 即做
  2. 之类的事情

streamExecutionEnvironment env_in = streamExecutionEnvironment.getExecutionEnvironment()。SetParallelism(4);

,但这只会在读取数据后在Flink End上增加并行性,因此,如果源较快地生成数据可能不会被充分利用。

  1. 要完全平行您的工作,为您的Kafka主题设置多个分区,理想情况下,您想要的flink工作想要的并行量。因此,当您创建Kafka主题时,您可能想做以下操作:

bin/kafka-topics.sh- create -zookeeper localhost:2181 - 复制因子3-分区4-主题测试

相关内容

  • 没有找到相关文章

最新更新