我有这样的pyspark df:
id desc
1 abd hdbh jbj
2 sgjhd jhdgh gjhg
3 bvj hvhgvgh
4 jkjb bhj
现在我想将我的desc
列转换为矢量,所以我使用谷歌句子编码器作为udf,这是我的代码:
module_url = "https://tfhub.dev/google/universal-sentence-encoder/4"
model = hub.load(module_url)
def embedding(input):
return (model[input])
df.withColumn("Embedding", list(embedding(f.lit("desc"))))
这是错误日志:
ValueError Traceback (most recent call last)
/tmp/ipykernel_13810/1173342766.py in <module>
----> 1 df_shirt_sample.withColumn("Embedding", list(embedding(f.lit("desc"))))
/tmp/ipykernel_13810/446837446.py in embedding(input)
1 def embedding(input):
----> 2 return (model(input))
~/miniconda3/envs/dev_env_37/lib/python3.7/site-packages/tensorflow/python/saved_model/load.py in _call_attribute(instance, *args, **kwargs)
684
685 def _call_attribute(instance, *args, **kwargs):
--> 686 return instance.__call__(*args, **kwargs)
687
688
~/miniconda3/envs/dev_env_37/lib/python3.7/site-packages/tensorflow/python/util/traceback_utils.py in error_handler(*args, **kwargs)
151 except Exception as e:
152 filtered_tb = _process_traceback_frames(e.__traceback__)
--> 153 raise e.with_traceback(filtered_tb) from None
154 finally:
155 del filtered_tb
~/miniconda3/envs/dev_env_37/lib/python3.7/site-packages/tensorflow/python/eager/function_spec.py in _convert_inputs_to_signature(inputs, input_signature, flat_input_signature)
521 need_packing = True
522 except ValueError:
--> 523 raise ValueError("When input_signature is provided, all inputs to "
524 "the Python function must be convertible to "
525 "tensors:n"
ValueError: When input_signature is provided, all inputs to the Python function must be convertible to tensors:
inputs: (
Column<b'desc'>)
input_signature: (
TensorSpec(shape=<unknown>, dtype=tf.string, name=None)).
有人能告诉我做错了什么吗
在这篇文章之后,我尝试从UDF加载模型:Spark广播了一个经过训练的tensorflow SavedModel。这将在每个工人身上加载一个模型,然后您可以进行预测。
示例:
import tensorflow_hub as hub
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType
def embedding(x):
model_url = "https://tfhub.dev/google/universal-sentence-encoder/4"
model = hub.load(model_url, tags=["serve"])
return model([x])
@F.udf(returnType=ArrayType(ArrayType(FloatType())))
def infer(data):
outputs = embedding(data)
return outputs.numpy().tolist()
spark = SparkSession.builder.getOrCreate()
data = [{"text": "abd hdbh jbj"}]
df = spark.createDataFrame(data=data)
df = df.withColumn("embedding", infer("text"))
df.show(10)
df.printSchema()
哪个给出:
+------------+--------------------+
| text| embedding|
+------------+--------------------+
|abd hdbh jbj|[[0.054931916, -0...|
+------------+--------------------+
root
|-- text: string (nullable = true)
|-- embedding: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: float (containsNull = true)
查看使用PySpark在Spark上部署TF 2.0 SavedModel的指南:https://github.com/tensorflow/tensorflow/issues/31421
另一种方法是使用Petastorm将Spark数据帧转换为TensorFlow数据帧,然后将其提供给分布式模型,请参阅:https://www.databricks.com/notebooks/simple-aws/petastorm-spark-converter-tensorflow.html