我正在Flink创建一个应用程序
- 阅读主题的消息
- 对此做一些简单的过程
- 将结果写入另一个主题
我的代码确实有效,但是不在并行运行
我该怎么做?
看来我的代码仅在一个线程/块上运行?
在Flink Web仪表板上:
- 应用程序转到运行状态
- 但是,概述子任务中只有一个块
- 和接收/发送的字节,收到/发送的记录始终为零(无更新)
这是我的代码,请帮助我学习如何将我的应用分开以并行运行,并且我是否正确编写了该应用程序?
public class SimpleApp {
public static void main(String[] args) throws Exception {
// create execution environment INPUT
StreamExecutionEnvironment env_in =
StreamExecutionEnvironment.getExecutionEnvironment();
// event time characteristic
env_in.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// production Ready (Does NOT Work if greater than 1)
env_in.setParallelism(Integer.parseInt(args[0].toString()));
// configure kafka consumer
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("auto.offset.reset", "earliest");
// create a kafka consumer
final DataStream<String> consumer = env_in
.addSource(new FlinkKafkaConsumer09<>("test", new
SimpleStringSchema(), properties));
// filter data
SingleOutputStreamOperator<String> result = consumer.filter(new
FilterFunction<String>(){
@Override
public boolean filter(String s) throws Exception {
return s.substring(0, 2).contentEquals("PS");
}
});
// Process Data
// Transform String Records to JSON Objects
SingleOutputStreamOperator<JSONObject> data = result.map(new
MapFunction<String, JSONObject>()
{
@Override
public JSONObject map(String value) throws Exception
{
JSONObject jsnobj = new JSONObject();
if(value.substring(0, 2).contentEquals("PS"))
{
// 1. Raw Data
jsnobj.put("Raw_Data", value.substring(0, value.length()-6));
// 2. Comment
int first_index_comment = value.indexOf("$");
int last_index_comment = value.lastIndexOf("$") + 1;
// - set comment
String comment =
value.substring(first_index_comment, last_index_comment);
comment = comment.substring(0, comment.length()-6);
jsnobj.put("Comment", comment);
}
else {
jsnobj.put("INVALID", value);
}
return jsnobj;
}
});
// Write JSON to Kafka Topic
data.addSink(new FlinkKafkaProducer09<JSONObject>("localhost:9092",
"FilteredData",
new SimpleJsonSchema()));
env_in.execute();
}
}
我的代码确实有效,但似乎仅在一个线程上运行(显示的一个块)在Web界面中(没有数据传递,因此发送/接收的字节未更新)。
如何并行运行?
要在并行运行您的作业,您可以做2件事:
- 增加在Env级别的工作的并行性 - 即做 之类的事情
streamExecutionEnvironment env_in = streamExecutionEnvironment.getExecutionEnvironment()。SetParallelism(4);
,但这只会在读取数据后在Flink End上增加并行性,因此,如果源较快地生成数据可能不会被充分利用。
- 要完全平行您的工作,为您的Kafka主题设置多个分区,理想情况下,您想要的flink工作想要的并行量。因此,当您创建Kafka主题时,您可能想做以下操作:
bin/kafka-topics.sh- create -zookeeper localhost:2181 - 复制因子3-分区4-主题测试