如何将 Spark 用于具有大模型的机器学习工作流



是否有一种内存效率的方法可以将大型(>4GB(模型应用于Spark数据帧而不会遇到内存问题?

我们最近将一个自定义管道框架移植到 Spark(使用 python 和 pyspark(,在将 Word2Vec 和 Autoencoders 等大型模型应用于标记化文本输入时遇到了问题。首先,我非常天真地将转换调用转换为udf(熊猫和火花"原生"调用(,这很好,只要使用的模型/实用程序足够小,可以广播或重复实例化:

@pandas_udf("array<string>")
def tokenize_sentence(sentences: pandas.Series):
return sentences.map(lambda sentence: tokenize.word_tokenize(sentence))

在大型模型上尝试相同的方法(例如,通过word2vec将这些令牌嵌入向量空间(导致性能不佳,我明白为什么:

@pandas_udf("array<array<double>>")
def rows_to_lists_of_vectors(rows):
model = api.load('word2vec-google-news-300')
def words_to_vectors(words) -> List[List[float]]:
vectors = []
for word in words:
if word in model:
vec = model[word]
vectors.append(vec.tolist())
return vectors
return rows.map(words_to_vectors)

上面的代码会反复实例化~4Gb word2vec模型,将其从磁盘加载到RAM中,这非常慢。我可以通过使用mapPartition来解决此问题,它至少每个分区只加载一次。但更重要的是,如果我没有严格限制任务的数量,这会导致内存相关问题(至少在我的开发机器上(崩溃,这反过来又会使小 udf 非常慢。例如,将任务数限制为 2 可以解决内存崩溃问题,但会使标记化变得非常缓慢。

我知道Spark中有一个完整的管道框架,可以满足我们的需求,但在承诺之前,我想了解我遇到的问题是如何解决的。也许我们可以使用一些关键实践,而不必重写我们的框架。

因此,我的实际问题是双重的:

  1. 使用Spark管道框架是否可以解决我们在性能和内存方面的问题,假设我们为Spark开箱即用的步骤编写了自定义的估计器和转换器(例如Tokenizers和Word2Vec(。
  2. Spark如何解决这些问题,如果有的话?我可以改进当前的方法吗,或者使用 python 是不可能的(据我所知,进程不共享内存空间(。

如果以上任何一项让你相信我错过了Spark的核心原则,请指出来,毕竟我才刚刚开始使用Spark。

这在各种因素(模型、集群资源、管道(上有很大差异,但试图回答您的主要问题:

1(. Spark 管道可能会解决您的问题,如果它们符合您在分词器、Words2Vec 等方面的需求。然而,那些并不像货架上已经可用并装有api.load的那个那么强大。你可能还想看看Deeplearning4J,它把这些带到了Java/Apache Spark,看看它如何做同样的事情:tokenize,word2vec等

。2(. 按照当前的方法,我会看到在foreachParitionmapPartition中加载模型,并确保模型可以放入每个分区的内存中。您可以根据群集资源将分区大小缩小到更实惠的数字,以避免内存问题(例如,不是为每行创建数据库连接,而是为每个分区创建一个数据库连接时,情况相同(。

通常,当您应用一种对火花友好的业务逻辑而不是混合第 3 个外部方时,Spark udf 是很好的。

最新更新