我试图在AWS Glue作业中更改DynamicFrame的某些列中的值。我看到有一个Map函数似乎对这个任务很有用,但是我不能使它工作。
这是我的代码:
def map_values_in_columns(self, df):
df = Map.apply(frame = df, f = self._map_values_in_columns)
return df
def _map_values_in_columns(self, rec):
for k, v in self.config['value_mapping'].items():
column_name = self.config['value_mapping'][k]['column_name']
values = self.config['value_mapping'][k]['values']
for old_value, new_value in values.items():
if rec[column_name] == old_value:
rec[column_name] = new_value
return rec
我的配置文件是一个yaml文件,结构如下:
value_mapping:
column_1:
column_name: asd
values:
- old_value_1: new_value_1
- old_value_2: new_value_2
column_2:
column_name: dsa
- old_value_1: new_value_1
- old_value_2: new_value_2
上面的方法抛出序列化错误:
_pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o81.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
我不确定这是由于我如何实现Map方法,还是我应该使用一个完全不同的方法。
所以问题是:我如何使用AWS DynamicFrame在多个列中更改多个值,试图避免在DynamicFrames和dataframe之间来回转换?
您的代码和yaml配置中有一些问题,我不打算在这里调试它们。请参阅下面的工作示例,这也可以在jupyter笔记本中本地执行。
我已经简化了yaml以保持较低的解析复杂度。
from awsglue.context import GlueContext
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
glueContext = GlueContext(SparkContext.getOrCreate())
columns = ["id", "asd", "dsa"]
data = [("1", "retain", "old_val_dsa_1"), ("2", "old_val_asd_1", "old_val_dsa_2"), ("3", "old_val_asd_2", "retain"), ("4", None, "")]
df = spark.createDataFrame(data).toDF(*columns)
dyF = DynamicFrame.fromDF(df, glueContext, "test_dyF")
import yaml
config = yaml.load('''value_mapping:
asd:
old_val_asd_1: new_val_asd_1
old_val_asd_2: new_val_asd_2
dsa:
old_val_dsa_1: new_val_dsa_1
old_val_dsa_2: new_val_dsa_2''')
def map_values(rec):
for k, v in config['value_mapping'].items():
if rec[k] is not None:
replacement_val = v.get(rec[k])
if replacement_val is not None:
rec[k] = replacement_val
return rec
print("-- dyF --")
dyF.toDF().show()
mapped_dyF = Map.apply(frame = dyF, f = map_values)
print("-- mapped_dyF --")
mapped_dyF.toDF().show()
-- dyF --
+---+-------------+-------------+
| id| asd| dsa|
+---+-------------+-------------+
| 1| retain|old_val_dsa_1|
| 2|old_val_asd_1|old_val_dsa_2|
| 3|old_val_asd_2| retain|
| 4| null| |
+---+-------------+-------------+
-- mapped_dyF --
+-------------+-------------+---+
| asd| dsa| id|
+-------------+-------------+---+
| retain|new_val_dsa_1| 1|
|new_val_asd_1|new_val_dsa_2| 2|
|new_val_asd_2| retain| 3|
| null| | 4|
+-------------+-------------+---+```