PySpark UDF - 生成的 DF 无法显示"value error: "我的列" name is not in list"



场景与这篇文章非常相似,但有一些变化: Pyspark 不支持的文字类型类 java.util.ArrayList

我有这种格式的数据:

data.show()
+---------------+--------------------+--------------------+
|       features|                meta|           telemetry|
+---------------+--------------------+--------------------+
|   [seattle, 3]|[seattle, 3, 5344...|[[47, 1, 27, 92, ...|
|     [miami, 1]|[miami, 1, 236881...|[[31, 84, 24, 67,...|
|     [miami, 3]|[miami, 3, 02f4ca...|[[84, 5, 4, 93, 2...|
|   [seattle, 3]|[seattle, 3, ec48...|[[43, 16, 94, 93,...|
|   [seattle, 1]|[seattle, 1, 7d19...|[[70, 22, 45, 74,...|
|[kitty hawk, 3]|[kitty hawk, 3, d...|[[46, 15, 56, 94,...|

可以从以下链接下载生成的 .json 示例:https://aiaccqualitytelcapture.blob.core.windows.net/streamanalytics/2019/08/21/10/0_43cbc7b0c9e845a187ce182b46eb4a3a_1.json?st=2019-08-22T15%3A20%3A20Z&se=2026-08-23T15%3A20%3A00Z&sp=rl&sv=2018-03-28&sr=b&sig=tsYh4oTNZXWbLnEgYypNqIsXH3BXOG8XyAH5ODi8iQg%3D

特别是,您可以看到其中每个数据中的实际数据实际上是一个字典:"特征"列是我们感兴趣的列,其形式是这样的:{"factory_id":"西雅图","line_id":"3"}

我正在尝试对功能中的数据进行编码,以便通过经典的功能方式进行one_hot。

见下文:

def one_hot(value, categories_list):
num_cats = len(categories_list)
one_hot = np.eye(num_cats)[categories_list.index(value)]
return one_hot
def one_hot_features(row, feature_keys, u_features):
"""
feature_keys must be sorted.
"""
cur_key = feature_keys[0]
vector = one_hot(row["features"][cur_key], u_features[cur_key])
for i in range(1, len(feature_keys)):
cur_key = feature_keys[i]
n_vector = one_hot(row["features"][cur_key], u_features[cur_key])
vector = np.concatenate((vector,  n_vector), axis=None)
return vector

本例中的feature_keys和u_features包含以下数据:

feature_keys = ['factory_id', 'line_id']
u_features = {'factory_id': ['kitty hawk', 'miami', 'nags head', 'seattle'], 'line_id': ['1', '2', '3']}

我已经创建了一个 udf,并正在尝试创建一个新的数据帧,并使用此 udf 添加新列。 代码如下:

def calc_onehot_udf(feature_keys, u_features):
return udf(lambda x: one_hot_features(x, feature_keys, u_features))
n_data = data.withColumn("hot_feature", calc_onehot_udf(feature_keys, 
u_features)( col("features") ))
n_data.show()

这会导致以下错误集:


Py4JJava错误:调用 o148257.showString 时出错。 :org.apache.spark.SparkException:作业由于阶段失败而中止:阶段 91.0 中的任务 0 失败 4 次,最近一次失败:阶段 91.0 中丢失的任务 0.3(TID 1404,10.139.64.5,执行程序 1(:org.apache.spark.api.python.PythonException:回溯(最近一次调用(: 文件 "/databricks/spark/python/pyspark/sql/types.py",第 1514 行,在getitemidx = self 中。字段.索引(项( 值错误:"功能"不在列表中

在处理上述异常期间,发生了另一个异常:

回溯(最近一次调用(: 文件 "/databricks/spark/python/pyspark/worker.py",第 480 行,在主 进程(( 文件 "/databricks/spark/python/pyspark/worker.py",第 472 行,正在处理中 serializer.dump_stream(out_iter,输出文件( 文件 "/databricks/spark/python/pyspark/serializers.py",第 456 行,dump_stream self.serializer.dump_stream(self._batched(迭代器(,流( 文件 "/databricks/spark/python/pyspark/serializers.py",第 149 行,dump_stream 对于迭代器中的 obj: 文件 "/databricks/spark/python/pyspark/serializers.py",第 445 行,_batched 对于迭代器中的项: 文件 ",第 1 行,在 文件 "/databricks/spark/python/pyspark/worker.py",第 87 行,在 返回λ *a: f(*a( 文件 "/databricks/spark/python/pyspark/util.py",第 99 行,包装器 返回 f(*args, **kwargs( 文件 ",第 4 行,在 文件 ",第 11 行,one_hot_features 文件 "/databricks/spark/python/pyspark/sql/types.py",第 1519 行,在getitemraise ValueError(item( 中 值错误:功能


任何帮助将不胜感激。 我正在积极调查此事。

理想的输出是带有列的新数据帧:"hot_features",其中包含特征列中的一维一热编码数组。

事实证明,有几个关键问题:

  1. 您应该必须指定 UDF 中的返回类型。 在本例中,它是 ArrayType(FloatType(((
  2. 我没有从one_hot_features返回一个 nd 数组,而是调用了 vectors.tolist((
  3. 传递 col("features"( 会逐行发送特征列内的实际值,而不是实际的行数据;因此,像最初那样调用 row["features"] 是不正确的,因为没有访问器,因为我已经有了该行的值。 因此,我将第一个参数重命名为"features_val"而不是"行",以更好地反映预期的输入。

下面用于one_hot_features的新代码。

def one_hot_features(features_val, feature_keys, u_features):
cur_key = feature_keys[0]
vector = one_hot(features_val[cur_key], u_features[cur_key])
for i in range(1, len(feature_keys)):
cur_key = feature_keys[i]
n_vector = one_hot(features_val[cur_key], u_features[cur_key])
vector = np.concatenate((vector,  n_vector), axis=None)
return vector.tolist()

根据其他各种文档,我发现在撰写本文时,numpy 数组似乎不能很好地处理 spark 数据帧,因此最好将它们转换为更通用的 python 类型。 这似乎已经解决了这里面临的问题。

更新了以下 UDF 定义的代码:

def calc_onehot_udf(feature_keys, u_features):
return udf(lambda x: one_hot_features(x, feature_keys, u_features), 
ArrayType(FloatType()))
n_data = data.withColumn("hot_feature", calc_onehot_udf(feature_keys, 
u_features)(col("features")))
n_data.show()

如果你遇到这个问题,祝你好运;希望在这里记录有所帮助。

相关内容

最新更新