我正试图在发布/子主题的情况下触发一个云函数来创建一个新的dataproc集群,每当任何dataproc群集创建失败,并且在云日志记录(dataproc活动日志(中状态代码为零以外的数字(status_code!=0(时,该主题都会收到消息。
我已经为上面的场景编写了一个python代码,但这个云函数在被pub/sub触发后立即崩溃。
我的代码中有什么错误,需要根据上面的场景修改什么才能成功执行。
import base64
import json
import googleapiclient.discovery
from google.cloud import dataproc_v1 as dataproc
def dataproc_workflow(event, context):
"""
Triggered by a Cloud Pub/Sub message containing a Dataproc status code != 0
audit activity Stackdriver log message
"""
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
msg_json = json.loads(pubsub_message)
proto_payload = msg_json['protoPayload']
resource_name = proto_payload['resourceName']
email = proto_payload['authenticationInfo']['principalEmail']
client = dataproc.ClusterControllerClient()
create_cluster(client, project_id, zone, region, cluster_name)
print(f"Cluster created: {cluster_name}.")
def create_cluster(client, project_id, zone, region, cluster_name):
print('Creating cluster...')
cluster_data = {
'project_id': *********************,
'cluster_name': simple,
'region' : ******************,
'config': {
'config_bucket': ************,
'gce_cluster_config': {
'zone_uri': ********************,
'subnetwork_uri': ********************,
'internal_ip_only': true,
'service_account_scopes': [
'https://www.googleapis.com/auth/cloud-platform'
],
'tags': [
'dataproc-rule',
]
},
'master_config': {
'num_instances': 1,
'machine_type_uri': 'n1-standard-1'
},
'worker_config': {
'num_instances': 2,
'machine_type_uri': 'n1-standard-1'
}
}
}
response = client.create_cluster(project_id, region, cluster_data)
result = response.result()
print("After cluster create")
return result
从片段中可以看出,似乎所有的Dataproc集群配置都是硬编码的?例如project_id、region等,而不是从方法参数中读取。此外,create_cluster(client, project_id, zone, region, cluster_name)
中传递的参数似乎没有在任何地方定义,这肯定会在触发工作流时使程序崩溃。
如果这不是问题所在,并且这些变量被故意忽略,那么请附上您的完整堆栈。