在pyspark UDF中使用tensorflow.keras模型会生成pickle错误



我想在pysark-pandas_udf中使用tensorflow.keras模型。然而,在将模型发送给工作者之前,在序列化模型时,我会遇到pickle错误。我不确定我是否使用了最好的方法来执行我想要的操作,因此我将公开一个最小但完整的示例。

包装:

  • tensorflow-2.2.0(但所有以前的版本也会触发错误(
  • pyspark-2.4.5

导入语句为:

import pandas as pd
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from pyspark.sql import SparkSession, functions as F, types as T

Pyspark UDF是一个pandas_UDF:

def compute_output_pandas_udf(model):
'''Spark pandas udf for model prediction.'''
@F.pandas_udf(T.DoubleType(), F.PandasUDFType.SCALAR)
def compute_output(inputs1, inputs2, inputs3):
pdf = pd.DataFrame({
'input1': inputs1,
'input2': inputs2,
'input3': inputs3
})
pdf['predicted_output'] = model.predict(pdf.values)
return pdf['predicted_output']
return compute_output

主要代码:

# Model parameters
weights = np.array([[0.5], [0.4], [0.3]])
bias = np.array([1.25])
activation = 'linear'
input_dim, output_dim = weights.shape
# Initialize model
model = Sequential()
layer = Dense(output_dim, input_dim=input_dim, activation=activation)
model.add(layer)
layer.set_weights([weights, bias])
# Initialize Spark session
spark = SparkSession.builder.appName('test').getOrCreate()
# Create pandas df with inputs and run model
pdf = pd.DataFrame({
'input1': np.random.randn(200),
'input2': np.random.randn(200),
'input3': np.random.randn(200)
})
pdf['predicted_output'] = model.predict(pdf[['input1', 'input2', 'input3']].values)
# Create spark df with inputs and run model using udf
sdf = spark.createDataFrame(pdf)
sdf = sdf.withColumn('predicted_output', compute_output_pandas_udf(model)('input1', 'input2', 'input3'))
sdf.limit(5).show()

调用compute_output_pandas_udf(模型(时会触发此错误:

PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

我找到了这个关于pickle keras模型的页面,并在tensorflow.keras上进行了尝试,但当在UDF中调用模型的predict函数时,我出现了以下错误(所以序列化有效,但非序列化无效?(:

AttributeError: 'Sequential' object has no attribute '_distribution_strategy'

有人知道如何进行吗?提前谢谢!

PS:请注意,我没有直接使用keras库中的模型,因为我有另一个周期性出现的错误,解决它似乎更困难。然而,模型的序列化不会像tensorflow.keras模型那样产生错误。

因此,如果我们使用该解决方案直接在tensorflow.keras.models.Model类中扩展getstate

setstate然后,解决方案是使用Erp12在本文中建议的包装器类。

class ModelWrapperPickable:
def __init__(self, model):
self.model = model
def __getstate__(self):
model_str = ''
with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
tensorflow.keras.models.save_model(self.model, fd.name, overwrite=True)
model_str = fd.read()
d = { 'model_str': model_str }
return d
def __setstate__(self, state):
with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
fd.write(state['model_str'])
fd.flush()
self.model = tensorflow.keras.models.load_model(fd.name)

UDF变为:

def compute_output_pandas_udf(model_wrapper):
'''Spark pandas udf for model prediction.'''
@F.pandas_udf(T.DoubleType(), F.PandasUDFType.SCALAR)
def compute_output(inputs1, inputs2, inputs3):
pdf = pd.DataFrame({
'input1': inputs1,
'input2': inputs2,
'input3': inputs3
})
pdf['predicted_output'] = model_wrapper.model.predict(pdf.values)
return pdf['predicted_output']
return compute_output

主要代码:

# Model parameters
weights = np.array([[0.5], [0.4], [0.3]])
bias = np.array([1.25])
activation = 'linear'
input_dim, output_dim = weights.shape
# Initialize keras model
model = Sequential()
layer = Dense(output_dim, input_dim=input_dim, activation=activation)
model.add(layer)
layer.set_weights([weights, bias])
# Initialize model wrapper
model_wrapper= ModelWrapperPickable(model)
# Initialize Spark session
spark = SparkSession.builder.appName('test').getOrCreate()
# Create pandas df with inputs and run model
pdf = pd.DataFrame({
'input1': np.random.randn(200),
'input2': np.random.randn(200),
'input3': np.random.randn(200)
})
pdf['predicted_output'] = model_wrapper.model.predict(pdf[['input1', 'input2', 'input3']].values)
# Create spark df with inputs and run model using udf
sdf = spark.createDataFrame(pdf)
sdf = sdf.withColumn('predicted_output', compute_output_pandas_udf(model_wrapper)('input1', 'input2', 'input3'))
sdf.limit(5).show()

最简单的解决方案是broadcast模型的权重和pandas_udf内的负载权重。下面是一个演示示例:

import pandas as pd
import numpy as np
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import Model
# spark = SparkSession.builder.xxx.getOrCreate()
# sc = spark.sparkContext
def build_model():
inputs = Input(shape=(3,), name='inputs')
d1 = Dense(20, name='dense_01')(inputs)
d2 = Dense(50, name='dense_02')(d1)
o = Dense(1, activation='sigmoid', name='output')(d2)
net = Model(inputs=inputs, outputs=o)
return net
net = build_model()
ws = net.get_weights()
bc_model_state = sc.broadcast(ws)
@pandas_udf(FloatType())
def batch_predict(data):  # input: pd.Series; output: pd.Series
mdl = build_model()
mdl.set_weights(bc_model_state.value)
prediction = mdl.predict(data.values)
return pd.Series(prediction[:, 0])

该解不仅适用于tensorflow keras模型,也适用于pytorch模型。检查此

最新更新