更改AWS Glue DynamicFrame列中的值



我试图在AWS Glue作业中更改DynamicFrame的某些列中的值。我看到有一个Map函数似乎对这个任务很有用,但是我不能使它工作。

这是我的代码:

def map_values_in_columns(self, df):
df = Map.apply(frame = df, f = self._map_values_in_columns)
return df

def _map_values_in_columns(self, rec):
for k, v in self.config['value_mapping'].items():
column_name = self.config['value_mapping'][k]['column_name']
values = self.config['value_mapping'][k]['values']
for old_value, new_value in values.items():
if rec[column_name] == old_value:
rec[column_name] = new_value
return rec

我的配置文件是一个yaml文件,结构如下:

value_mapping:
column_1:
column_name: asd
values:
- old_value_1: new_value_1
- old_value_2: new_value_2
column_2:
column_name: dsa
- old_value_1: new_value_1
- old_value_2: new_value_2

上面的方法抛出序列化错误:

_pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o81.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist

我不确定这是由于我如何实现Map方法,还是我应该使用一个完全不同的方法。

所以问题是:我如何使用AWS DynamicFrame在多个列中更改多个值,试图避免在DynamicFrames和dataframe之间来回转换?

您的代码和yaml配置中有一些问题,我不打算在这里调试它们。请参阅下面的工作示例,这也可以在jupyter笔记本中本地执行。

我已经简化了yaml以保持较低的解析复杂度。

from awsglue.context import GlueContext
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
glueContext = GlueContext(SparkContext.getOrCreate())
columns = ["id", "asd", "dsa"]
data = [("1", "retain", "old_val_dsa_1"), ("2", "old_val_asd_1", "old_val_dsa_2"), ("3", "old_val_asd_2", "retain"), ("4", None, "")]
df = spark.createDataFrame(data).toDF(*columns)
dyF = DynamicFrame.fromDF(df, glueContext, "test_dyF")
import yaml
config = yaml.load('''value_mapping:
asd:
old_val_asd_1: new_val_asd_1
old_val_asd_2: new_val_asd_2
dsa:
old_val_dsa_1: new_val_dsa_1
old_val_dsa_2: new_val_dsa_2''')
def map_values(rec):
for k, v in config['value_mapping'].items():
if rec[k] is not None:
replacement_val = v.get(rec[k])
if replacement_val is not None:
rec[k] = replacement_val
return rec
print("-- dyF --")
dyF.toDF().show()
mapped_dyF = Map.apply(frame = dyF, f = map_values)
print("-- mapped_dyF --")
mapped_dyF.toDF().show()
-- dyF --
+---+-------------+-------------+
| id|          asd|          dsa|
+---+-------------+-------------+
|  1|       retain|old_val_dsa_1|
|  2|old_val_asd_1|old_val_dsa_2|
|  3|old_val_asd_2|       retain|
|  4|         null|             |
+---+-------------+-------------+
-- mapped_dyF --
+-------------+-------------+---+
|          asd|          dsa| id|
+-------------+-------------+---+
|       retain|new_val_dsa_1|  1|
|new_val_asd_1|new_val_dsa_2|  2|
|new_val_asd_2|       retain|  3|
|         null|             |  4|
+-------------+-------------+---+```

最新更新