我想在Serverless Dataproc上安装一些python包(例如:python-json-logger)。是否有一种方法可以在无服务器数据过程中进行初始化操作来安装python包?请让我知道。
您有两个选择:
- 在终端使用gcloud命令:
您可以在GCR(Google Container Registry GCP)中创建带有依赖项(python包)的自定义映像,并在下面的命令中添加uri作为参数:
例如
$ gcloud beta dataproc批量提交
——container-image=gcr.io/my-project-id/my-image:1.0.1
——project=my-project-id——region=us-central1
——jars=file:///usr/lib/spark/external/spark-avro.jar
——subnet=projects/my-project-id/regions/us-central1/subnetworks/my-subnet-name
为Spark的Dataproc serverless创建自定义容器映像。
- 使用操作符DataprocCreateBatchOperator的气流:
将下面的脚本添加到python-file中,它将安装所需的包,然后将该包加载到容器路径中(dataproc servless),该文件必须保存在bucket中,这里以secret manager包为例。
python-file.py
import pip import importlib from warnings import warn from dataclasses import dataclass def load_package(package, path): warn("Update path order. Watch out for importing errors!") if path not in sys.path: sys.path.insert(0,path) module = importlib.import_module(package) return importlib.reload(module) @dataclass class PackageInfo: import_path: str pip_id: str packages = [PackageInfo("google.cloud.secretmanager","google-cloud-secret-manager==2.4.0")] path = '/tmp/python_packages' pip.main(['install', '-t', path, *[package.pip_id for package in packages]]) for package in packages: load_package(package.import_path, path=path) ...
最后操作符调用python-file.py
create_batch = DataprocCreateBatchOperator(task_id ="batch_create",
批= {"pyspark_batch" {"main_python_file_uri"gs://bucket名/python-file.py"args"("value1","value2"],"jar_file_uris"gs://bucket名/jar-file.jar"},
环境配置": {"execution_config" {"subnetwork_uri"项目/my-project-id/区域/us-central1/子网/my-subnet-name">
}},},batch_id ="batch-create")