ValueError:当提供input_signature时,Python函数的所有输入都必须可转换为张量:



我有这样的pyspark df:

id        desc
1      abd hdbh jbj
2      sgjhd jhdgh gjhg
3      bvj hvhgvgh
4      jkjb bhj

现在我想将我的desc列转换为矢量,所以我使用谷歌句子编码器作为udf,这是我的代码:

module_url = "https://tfhub.dev/google/universal-sentence-encoder/4"
model = hub.load(module_url)
def embedding(input):
return (model[input])
df.withColumn("Embedding", list(embedding(f.lit("desc"))))

这是错误日志:

ValueError                                Traceback (most recent call last)
/tmp/ipykernel_13810/1173342766.py in <module>
----> 1 df_shirt_sample.withColumn("Embedding", list(embedding(f.lit("desc"))))
/tmp/ipykernel_13810/446837446.py in embedding(input)
1 def embedding(input):
----> 2     return (model(input))
~/miniconda3/envs/dev_env_37/lib/python3.7/site-packages/tensorflow/python/saved_model/load.py in _call_attribute(instance, *args, **kwargs)
684 
685 def _call_attribute(instance, *args, **kwargs):
--> 686   return instance.__call__(*args, **kwargs)
687 
688 
~/miniconda3/envs/dev_env_37/lib/python3.7/site-packages/tensorflow/python/util/traceback_utils.py in error_handler(*args, **kwargs)
151     except Exception as e:
152       filtered_tb = _process_traceback_frames(e.__traceback__)
--> 153       raise e.with_traceback(filtered_tb) from None
154     finally:
155       del filtered_tb
~/miniconda3/envs/dev_env_37/lib/python3.7/site-packages/tensorflow/python/eager/function_spec.py in _convert_inputs_to_signature(inputs, input_signature, flat_input_signature)
521         need_packing = True
522       except ValueError:
--> 523         raise ValueError("When input_signature is provided, all inputs to "
524                          "the Python function must be convertible to "
525                          "tensors:n"
ValueError: When input_signature is provided, all inputs to the Python function must be convertible to tensors:
inputs: (
Column<b'desc'>)
input_signature: (
TensorSpec(shape=<unknown>, dtype=tf.string, name=None)).

有人能告诉我做错了什么吗

在这篇文章之后,我尝试从UDF加载模型:Spark广播了一个经过训练的tensorflow SavedModel。这将在每个工人身上加载一个模型,然后您可以进行预测。

示例:

import tensorflow_hub as hub
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType

def embedding(x):
model_url = "https://tfhub.dev/google/universal-sentence-encoder/4"
model = hub.load(model_url, tags=["serve"])
return model([x])

@F.udf(returnType=ArrayType(ArrayType(FloatType())))
def infer(data):
outputs = embedding(data)
return outputs.numpy().tolist()

spark = SparkSession.builder.getOrCreate()
data = [{"text": "abd hdbh jbj"}]
df = spark.createDataFrame(data=data)
df = df.withColumn("embedding", infer("text"))
df.show(10)
df.printSchema()

哪个给出:

+------------+--------------------+                                             
|        text|           embedding|
+------------+--------------------+
|abd hdbh jbj|[[0.054931916, -0...|
+------------+--------------------+
root
|-- text: string (nullable = true)
|-- embedding: array (nullable = true)
|    |-- element: array (containsNull = true)
|    |    |-- element: float (containsNull = true)

查看使用PySpark在Spark上部署TF 2.0 SavedModel的指南:https://github.com/tensorflow/tensorflow/issues/31421

另一种方法是使用Petastorm将Spark数据帧转换为TensorFlow数据帧,然后将其提供给分布式模型,请参阅:https://www.databricks.com/notebooks/simple-aws/petastorm-spark-converter-tensorflow.html

相关内容

  • 没有找到相关文章

最新更新