Spark 2.1.0 会话配置设置 (PYspark)



我正在尝试覆盖 spark 会话/spark 上下文默认配置,但它正在选择整个节点/群集资源。

 spark  = SparkSession.builder
                      .master("ip")
                      .enableHiveSupport()
                      .getOrCreate()
 spark.conf.set("spark.executor.memory", '8g')
 spark.conf.set('spark.executor.cores', '3')
 spark.conf.set('spark.cores.max', '3')
 spark.conf.set("spark.driver.memory",'8g')
 sc = spark.sparkContext

当我将配置放入火花提交时,它工作正常

spark-submit --master ip --executor-cores=3 --diver 10G code.py

您实际上并没有使用此代码覆盖任何内容。只是为了让您自己看看,请尝试以下操作。

一旦你开始 pyspark shell 类型:

sc.getConf().getAll()

这将显示所有当前的配置设置。然后尝试您的代码并再次执行。什么都没有改变。

相反,您应该做的是创建一个新配置并使用它来创建SparkContext。这样做:

conf = pyspark.SparkConf().setAll([('spark.executor.memory', '8g'), ('spark.executor.cores', '3'), ('spark.cores.max', '3'), ('spark.driver.memory','8g')])
sc.stop()
sc = pyspark.SparkContext(conf=conf)

然后,您可以像上面一样检查自己:

sc.getConf().getAll()

这应该反映您想要的配置。

Spark 2.3.1 中的更新配置

要更改默认的 Spark 配置,您可以按照以下步骤操作:

导入所需的类

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

获取默认配置

spark.sparkContext._conf.getAll()

更新默认配置

conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','4g')])

停止当前 Spark 会话

spark.sparkContext.stop()

创建 Spark 会话

spark = SparkSession.builder.config(conf=conf).getOrCreate()

在配置中将"spark.driver.host"设置为"localhost"对我有用

spark = SparkSession 
    .builder 
    .appName("MyApp") 
    .config("spark.driver.host", "localhost") 
    .getOrCreate()

你也可以在启动 pyspark 时设置配置,就像 spark-submit 一样:

pyspark --conf property=value

这是一个例子

-bash-4.2$ pyspark
Python 3.6.8 (default, Apr 25 2019, 21:02:35) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /__ / .__/_,_/_/ /_/_   version 2.4.0-cdh6.2.0
      /_/
Using Python version 3.6.8 (default, Apr 25 2019 21:02:35)
SparkSession available as 'spark'.
>>> spark.conf.get('spark.eventLog.enabled')
'true'
>>> exit()
-bash-4.2$ pyspark --conf spark.eventLog.enabled=false
Python 3.6.8 (default, Apr 25 2019, 21:02:35) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /__ / .__/_,_/_/ /_/_   version 2.4.0-cdh6.2.0
      /_/
Using Python version 3.6.8 (default, Apr 25 2019 21:02:35)
SparkSession available as 'spark'.
>>> spark.conf.get('spark.eventLog.enabled')
'false'

我有一个非常不同的要求,我必须检查我是否正在获取执行器和驱动程序内存大小的参数,如果得到,则必须仅用执行器和驱动程序的更改替换配置。以下是步骤:

  1. 导入库
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
  1. 定义 Spark 并获取默认配置
spark = (SparkSession.builder
        .master("yarn")
        .appName("experiment") 
        .config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false")
        .getOrCreate())
conf = spark.sparkContext._conf.getAll()
    检查
  1. 执行器和驱动程序大小是否存在(我在这里给出伪代码 1 条件检查,其余您可以创建案例),然后根据参数使用给定的配置或跳到默认配置。
if executor_mem is not None and driver_mem  is not None:
    conf = spark.sparkContext._conf.setAll([('spark.executor.memory',executor_mem),('spark.driver.memory',driver_mem)])
    spark.sparkContext.stop()
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
else:
    spark = spark

不要忘记停止 Spark 上下文,这将确保执行器和驱动程序内存大小在传入参数时有所不同。希望这有帮助!

我知道这是一个有点旧的帖子,并且已经有一些被接受的帖子,但我只是想发布一个相同的工作代码。

from pyspark.sql import SparkSession
spark = SparkSession 
    .builder 
    .appName("MyApp") 
    .config("spark.driver.host", "localhost") 
    .getOrCreate()
default_conf = spark.sparkContext._conf.getAll()
print(default_conf)
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'),
                                        ('spark.app.name', 'Spark Updated Conf'),
                                        ('spark.executor.cores', '4'),
                                        ('spark.cores.max', '4'),
                                        ('spark.driver.memory','4g')])
spark.sparkContext.stop()
spark = SparkSession 
    .builder 
    .appName("MyApp") 
    .config(conf=conf) 
    .getOrCreate()

default_conf = spark.sparkContext._conf.get("spark.cores.max")
print("updated configs " , default_conf)

相关内容

  • 没有找到相关文章

最新更新