气流迁移时Pyspark中的 Pickle错误



我正在从旧气流环境迁移到新的气流环境。在迁移到新的气流环境时,我遇到了pickle错误。
我理解因为RDD操作闭包需要pickle对象。
我也明白,我声明的对象不能被pickle,因为redis集群库的一些问题。

rc = RedisCluster(
startup_nodes=config["redis"]["mydashboard"]["nodes"],
password=redis_pwd,
decode_responses=True,
)

def write_to_redis(row) -> None:
rc.set(
name=row.mid, value=row.messages, ex=config["redis"]["ttl"]
)
result_df.rdd.foreach(write_to_redis)

但是我不明白为什么这个代码在旧的气流环境中执行得很好。
两个环境的区别

  1. 在执行python代码时,旧环境使用python命令而不是pyspark或spark-submit。在新环境中使用pyspark命令
  2. 旧的气流环境使用celay执行器,而新环境使用k8s执行器

两个环境都使用master选项作为yarn。如果你需要更多的信息,尽管问我。谢谢你

我简单解释一下为什么会出现选择错误:

当您执行foreach方法时,可调用函数(在您的示例中是write_to_redis)在Spark的执行节点上执行。这意味着,你的类(rc)在驱动节点上初始化,它在执行节点上使用(通常是完全其他服务器/实例等)。

Spark尝试做的是pickle初始化的类,然后"copy";将其放入函数使用的所有执行器节点中。一些不幸的是,初始化的类不能被pickle(例如boto3连接)。

要解决这个问题,您可以尝试在函数中调用的方法中初始化类,如下所示:

def write_to_redis(row) -> None:
rc = RedisCluster(
startup_nodes=config["redis"]["mydashboard"]["nodes"],
password=redis_pwd,
decode_responses=True,
)
rc.set(
name=row.mid, value=row.messages, ex=config["redis"]["ttl"]
)
result_df.rdd.foreach(write_to_redis)

如果你要使用这个函数,请注意你将打开到Redis集群的连接数量,因为每个调用该函数的执行节点将打开一个连接。

最新更新