Py4J 序列化: 属性错误: 'dict'对象没有属性'_get_object_id'



使用 Py4J,我无法将 Python 字典对象解析为底层 JVM 实例。

我写了一个PySpark代码,我在RDD上运行UDF/lambda函数。我的目标是在RDD的每一行上运行一段Python代码,该代码需要连接到底层JVM,每个执行器核心,以便从自定义jar(源代码在Scala中)获取Java类实例,我在Spark提交期间使用--jars函数部署。我已经尝试了以下示例,但收到提到的错误。此代码在具有 Cloudera 发行版的 Spark 2.3 集群中的每个执行器核心上运行。出于安全原因,稍微屏蔽了代码。

from py4j.java_gateway import JavaGateway, launch_gateway
gateway = JavaGateway().launch_gateway(classpath='custom-code.jar:scala-library-2.11.8.jar:spark-catalyst_2.11-2.3.0.cloudera3.jar:scala-reflect-2.11.8.jar', jarpath='<path-to-py4j-jar>/py4j-0.10.7.jar')
jvm = gateway.jvm
input_dict = {0: 123}
temp = jvm.com.my.code.PyUtil.fullMap(input_dict)
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1248, in __call__
args_command, temp_args = self._build_args(*args)
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1218, in _build_args
[get_command_part(arg, self.pool) for arg in new_args])
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1218, in <listcomp>
[get_command_part(arg, self.pool) for arg in new_args])
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 298, in get_command_part
command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'dict' object has no attribute '_get_object_id'

奇怪的是,当我运行完全相同的代码,但调用我的自定义 jar 的不同函数时,它接受字符串而不是字典,它运行得很好,我得到了一个 py4j.java_gateway。JavaObject 回来了,这就是我打算在这里得到的。

查看py4j的发行说明,我知道从0.5版本开始就支持字典,所以我不确定我在这里到底做错了什么?我怀疑网关启动有问题。

任何方向性的帮助将不胜感激。

找到答案,列在这里:

https://www.py4j.org/advanced_topics.html#converting-python-collections-to-java-collections

我只需要自己手动转换它。

from py4j.java_collections import MapConverter
input_dict = {0: 123}
mc_run_map_dict = MapConverter().convert(input_dict, gateway_handle._gateway_client)
temp = jvm.com.my.code.PyUtil.fullMap(mc_run_map_dict)

最新更新