如何避免一个 Spark 流式处理窗口在运行一些本机 Python 代码的情况下阻止另一个窗口



我正在使用两个不同的窗口运行Spark Streaming(窗口用于使用SKLearn训练模型,另一个窗口用于基于该模型预测值),我想知道如何避免一个窗口("慢速"训练窗口)来训练模型,而不会"阻塞"快速"预测窗口。
我的简化代码如下所示:

conf = SparkConf()
conf.setMaster("local[4]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)
stream = ssc.socketTextStream("localhost", 7000)

import Custom_ModelContainer
### Window 1 ###
### predict data based on model computed in window 2 ###
def predict(time, rdd):
    try:
       # ... rdd conversion to df, feature extraction etc...
       # regular python code 
       X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
       pred = Custom_ModelContainer.getmodel().predict(X)
       # send prediction to GUI
    except Exception, e: print e
predictionStream = stream.window(60,60)
predictionStream.foreachRDD(predict)

### Window 2 ###
### fit new model ###
def trainModel(time, rdd):
try:
    # ... rdd conversion to df, feature extraction etc...
    X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
    y = np.array(df.map(lambda lp: lp.label).collect())
    # train test split etc...
    model = SVR().fit(X_train, y_train)
    Custom_ModelContainer.setModel(model)
except Exception, e: print e
modelTrainingStream = stream.window(600,600)
modelTrainingStream.foreachRDD(trainModel)

(注意:Custom_ModelContainer是我编写的一个类,用于保存和检索训练好的模型)

我的设置通常工作正常,除了每次在第二个窗口中训练新模型(大约需要一分钟)时,第一个窗口不会计算预测,直到模型训练完成。实际上,我想这是有道理的,因为模型拟合和预测都是在主节点上计算的(在非分布式设置 - 由于 SKLearn)。

所以我的问题如下:是否可以在单个工作节点(而不是主节点)上训练模型?如果是这样,我如何实现后者,这真的能解决我的问题吗?

如果没有,关于如何在不延迟窗口 1 中的计算的情况下使这样的设置工作的任何其他建议?

任何帮助将不胜感激。

编辑:我想更普遍的问题是:如何在两个不同的工作线程上并行运行两个不同的任务?

免责声明:这只是一组想法。这些都没有在实践中经过测试。


您可以尝试以下几种方法:

  1. 不要collect predict. scikit-learn模型通常是可序列化的,因此可以在群集上轻松处理预测过程:

    def predict(time, rdd):
        ... 
        model = Custom_ModelContainer.getmodel()
        pred = (df.rdd.map(lambda lp: lp.features.toArray())
            .mapPartitions(lambda iter: model.predict(np.array(list(iter)))))
        ...
    

    它不仅应该并行化预测,而且,如果原始数据没有传递给GUI,还应该减少必须收集的数据量。

  2. 尝试异步collect和发送数据。PySpark 不提供collectAsync方法,但您可以尝试使用 concurrent.futures 实现类似目标:

    from pyspark.rdd import RDD
    from concurrent.futures import ThreadPoolExecutor
    executor = ThreadPoolExecutor(max_workers=4)
    def submit_to_gui(*args): ...
    def submit_if_success(f):
        if not f.exception():
            executor.submit(submit_to_gui, f.result())
    

    从 1 继续。

    def predict(time, rdd):
        ...
        f = executor.submit(RDD.collect, pred)
        f.add_done_callback(submit_if_success)
        ...
    
  3. 如果您真的想使用本地scikit-learn模型,请尝试如上所述使用期货进行collectfit。您也可以尝试只收集一次,尤其是在数据未缓存的情况下:

    def collect_and_train(df):
        y, X = zip(*((p.label, p.features.toArray()) for p in df.collect()))
        ...
        return SVR().fit(X_train, y_train)
    def set_if_success(f):
        if not f.exception():
            Custom_ModelContainer.setModel(f.result())  
    def trainModel(time, rdd): 
       ...
        f = excutor.submit(collect_and_train, df)
        f.add_done_callback(set_if_success) 
       ...
    
  4. 使用现有的解决方案(如spark-sklearn)或自定义方法将训练过程移动到群集:

    • 朴素的解决方案 - 使用 mapPartitions 准备数据、coalesce(1)和训练单个模型。
    • 分布式解决方案 - 使用 mapPartitions 为每个分区创建和验证单独的模型,收集模型并用作集成,例如通过进行平均或中位数预测。
  5. 扔掉scikit-learn,使用可以在分布式流环境中训练和维护的模型(例如StreamingLinearRegressionWithSGD)。

    您当前的方法使Spark过时。如果可以在本地训练模型,则很有可能在本地计算机上更快地执行所有其他任务。否则,您的程序将仅在collect 上失败。

我认为您要查找的是默认为1的属性:"spark.streaming.concurrentJobs"。增加这个应该允许你并行运行多个foreachRDD函数。

在 JobScheduler.scala 中:

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
只是提醒一下,如果您要并行更改

和读取,还要注意自定义模型容器上的线程安全性。 :)

相关内容

  • 没有找到相关文章