在 Apache Spark 上为每个工作线程创建一个单一实例



假设我想使用一个创建成本很高的对象来映射RDD。我想每个工作线程/线程都有一个这个对象,并且必须在处理每个工作线程上的RDD分区项目之前创建它。

我的解决方案是:

final Function0<ModelEvaluator> f = () -> {
if (ModelEvaluator.getInstance() == null) {
ModelEvaluator m = new ModelEvaluator(script);
ModelEvaluator.setInstance(m);
}
return ModelEvaluator.getInstance();
};
JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
(t) -> {
try {
double val = f.call().evaluateModel(t);
return new Tuple2<>(val, t);
} catch (Exception ex) {
return null;
}
}
);

public class ModelEvaluator {
private static ModelEvaluator instance;
public static void setInstance(ModelEvaluator instance) {
ModelEvaluator.instance = instance;
}
public static ModelEvaluator getInstance() {
return instance;
} 
...

在这种情况下,"ModelEvaluator"对象解析脚本,然后使用"服务"对象列表来配置模型参数,以便计算该参数配置的关联响应指标。但我不想在每次处理RDD行时都解析脚本。

我还将我的集群配置为为每个集群创建一个进程,每个进程只会生成一个工作线程,因为同一进程中的多个工作线程同时访问具有可变状态的单一实例会有问题。

对于我的问题,有没有更优雅的解决方案?

这可以通过Broadcast变量来实现。 这将允许你在驱动程序上创建一个对象,并且它将根据需要为每个工作线程发送一次。

final Broadcast<ModelEvaluator> model = jsc.broadcast(new ModelEvaluator(script));
JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
(t) -> {
try {
double val = model.value().evaluateModel(t);
return new Tuple2<>(val, t);
} catch (Exception ex) {
return null;
}
}
);

相关内容

最新更新