如何增加Flink taskmanager.numberOfTaskSlots以在没有Flink服务器的情况下运行它(在



我有一个问题,关于在IDE中运行Flink流媒体作业,或者作为fat jar运行,而不将其部署到Flink服务器。

问题是,当我的工作中有多个taskslot时,我无法在IDE中运行它。

public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
kafkaProperties.setProperty("group.id", "test");
env.setParallelism(1);
DataStream<String> kafkaSource = env
.addSource(new FlinkKafkaConsumer010<>("flink-source", new SimpleStringSchema(), kafkaProperties))
.name("Kafka-Source")
.slotSharingGroup("Kafka-Source");
kafkaSource.print().slotSharingGroup("Print");
env.execute("Flink Streaming Java API Skeleton");
}

}

我知道这个作业需要2个插槽,我可以在Flink集群中有两个任务管理器,但我如何在IDE中本地运行它。

目前,我必须为本地所有运营商指定相同的slotSharingGroup名称才能拥有一个插槽。但它并不灵活。

你是怎么处理的?

这是您正在描述的一个已知错误。您可以在此处找到相应的JIRA问题。

解决这个问题的方法是手动设置启动TaskExecutor的任务槽的数量。您可以通过TaskManagerOptions.NUM_TASK_SLOTS配置选项进行此操作:

final int parallelism = ...;
final Configuration configuration = new Configuration();
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, configuration);

最新更新