我在服务UI中看到我可以创建一个Spark集群。我还看到在执行作业时可以使用Spark操作符运行时。每一个的用例是什么,为什么我要选择一个而不是另一个?
在Iguazio有两种使用Spark的方式:
- 通过Iguazio UI创建一个独立的Spark集群(就像你在services页面上找到的那样)。这是一个持久的集群,您可以将其与多个作业、Jupyter笔记本等相关联。对于使用静态资源池进行长时间运行的计算来说,这是一个不错的选择。在这里可以找到Iguazio中Spark服务的概述以及一些摄取示例。
- 在UI中创建JupyterLab实例时,有一个将其与现有Spark集群关联的选项。这让你可以开箱即用PySpark
- 通过Spark Operator创建一个临时Spark集群。这是一个临时集群,仅在作业期间存在。对于具有静态或可变资源池的较短的一次性作业来说,这是一个不错的选择。如果不需要持久的Spark集群,Spark Operator运行时通常是更好的选择。在Iguazio上使用Spark操作符的一些示例可以在这里以及下面找到。
import mlrun
import os
# set up new spark function with spark operator
# command will use our spark code which needs to be located on our file system
# the name param can have only non capital letters (k8s convention)
sj = mlrun.new_function(kind='spark', command='spark_read_csv.py', name='sparkreadcsv')
# set spark driver config (gpu_type & gpus=<number_of_gpus> supported too)
sj.with_driver_limits(cpu="1300m")
sj.with_driver_requests(cpu=1, mem="512m")
# set spark executor config (gpu_type & gpus=<number_of_gpus> are supported too)
sj.with_executor_limits(cpu="1400m")
sj.with_executor_requests(cpu=1, mem="512m")
# adds fuse, daemon & iguazio's jars support
sj.with_igz_spark()
# set spark driver volume mount
# sj.function.with_driver_host_path_volume("/host/path", "/mount/path")
# set spark executor volume mount
# sj.function.with_executor_host_path_volume("/host/path", "/mount/path")
# args are also supported
sj.spec.args = ['-spark.eventLog.enabled','true']
# add python module
sj.spec.build.commands = ['pip install matplotlib']
# Number of executors
sj.spec.replicas = 2
# Rebuilds the image with MLRun - needed in order to support artifactlogging etc
sj.deploy()
# Run task while setting the artifact path on which our run artifact (in any) will be saved
sj.run(artifact_path='/User')
spark_read_csv.py
文件如下:
from pyspark.sql import SparkSession
from mlrun import get_or_create_ctx
context = get_or_create_ctx("spark-function")
# build spark session
spark = SparkSession.builder.appName("Spark job").getOrCreate()
# read csv
df = spark.read.load('iris.csv', format="csv",
sep=",", header="true")
# sample for logging
df_to_log = df.describe().toPandas()
# log final report
context.log_dataset("df_sample",
df=df_to_log,
format="csv")
spark.stop()