我正在尝试模拟具有以下代码的函数write_tables_to_datalake_by_partition_datehour()
。
tables_with_data[table].repartition("datehour_partition").write.format(
"parquet"
).partitionBy("datehour_partition").mode("overwrite").save(
cleansed_file_path + table
)
tables_with_data[table]
属于pyspark.sql.DataFrame
型。查看链接函数时,它会保持DataFrame
,直到write
函数返回DataFrameWriter
。DataFrameWriter
实际上能够使用其余的功能。
我不想将数据写入我们的数据湖。我想嘲笑它。
@mock.patch("module_name.io.DataFrame")
def test_write_tables_to_datalake_by_partition_datehour(mock_df) -> None:
mock_df.return_value.repartition.return_value.write.format.return_value.partitionBy.return_value.mode.return_value.save.return_value = (
"yes!"
)
这不起作用,因为DataFrame
没有方法save
,这仅在DataFrameWriter
中可用。但是我无法修补DataFrameWriter
因为我没有在我的模块中导入它,只有DataFrame
。
我也试过
with mock.patch.object(DataFrameWriter, "save") as mock1:
mock1.return_value.format.partitionBy.mode.save.return_value = "test"
result = write_tables_to_datalake_by_partition_datehour(
tablenames,
dataframes_cleansed,
cleansed_file_path,
dataframes_cleansed,
quarantine_file_path,
)
这也行不通。我怎样才能以正确的方式模拟它?
我通过在模块中导入DataFrameWriter
来让它工作。然后我模拟save
返回值,现在它不会写入我们的数据湖。非常简单的解决方案。
@mock.patch("mymodule.io.DataFrameWriter.save", return_value="1")
def test_write_tables_to_datalake_by_partition_datehour(mock_dfw) -> None:
# implementation
在我的模块中,我在顶部添加了这个,以便我的mock.patch
可以找到实际的类
from pyspark.sql import DataFrameWriter
即使它是一个未使用的导入,模拟也需要它。