我想创建一个云函数(HTTP(来提交Gcp DataProc
集群中的spark作业。我创建了一个示例微服务,并通过以下链接实现了这一点。如何使用Google DataProc Java客户端使用关联GS桶中的jar文件和类来提交火花作业?。现在我想在gcp
云功能服务方法中实现相同的功能。这是我的微服务中的代码:
public static void main(String a[]) throws IOException {
GoogleCredential credential = GoogleCredential.fromStream(new FileInputStream("My credential file location")).createScoped(
java.util.Arrays.asList("https://www.googleapis.com/auth/cloud-platform"));
Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential)
.setApplicationName("my-webabb/1.0")
.build();
String curJobId = "spark-job-" + UUID.randomUUID().toString();
Job jobSnapshot = null;
jobSnapshot = dataproc.projects().regions().jobs().submit(
"gcp-project-name", "cluster-region", new SubmitJobRequest()
.setJob(new Job()
.setReference(new JobReference()
.setJobId(curJobId))
.setPlacement(new JobPlacement()
.setClusterName("cluster-name"))
.setSparkJob(new SparkJob()
.setMainClass("MainMethod")
.setJarFileUris(ImmutableList.of("jarfilelocation"))
)))
.execute();
}
执行此代码时,将在该集群中创建作业。很好。但我的疑问是,如果我想在云功能中执行同样的步骤,我需要通过什么证书?如果我把云功能和集群都放在同一个网络中,它可能在没有凭据的情况下工作。但是,是否可以在以下代码中创建一个没有凭据的dataproc
?
Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential)
.setApplicationName("my-webabb/1.0")
.build();
我也试过使用
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-dataproc</artifactId>
<version>1.5.2</version>
</dependency>
这是代码
public static void main(String a[])
throws IOException, InterruptedException {
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", "us-central1");
// Configure the settings for the job controller client.
JobControllerSettings jobControllerSettings =
JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
// Create a job controller client with the configured settings. Using a try-with-resources
// closes the client,
// but this can also be done manually with the .close() method.
try (JobControllerClient jobControllerClient =
JobControllerClient.create(jobControllerSettings)) {
// Configure cluster placement for the job.
JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName("myclusterName").build();
// Configure Spark job settings.
SparkJob sparkJob =
SparkJob.newBuilder()
.setMainClass("mymain")
.addJarFileUris("myJarFile")
.build();
Job job =
Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
// Submit an asynchronous request to execute the job.
OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
jobControllerClient.submitJobAsOperationAsync("myProjectId", "us-central1", job);
Job response = submitJobAsOperationAsyncRequest.get();
// Print output from Google Cloud Storage.
Matcher matches =
Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
matches.matches();
} catch (ExecutionException e) {
// If the job does not complete successfully, print the error message.
System.err.println(String.format("submitJob: %s ", e.getMessage()));
}
}
在上面的依赖项中,我无法设置jobId。如何使用此设置JobId?
有人能提出任何可能的方法吗?
提前感谢:(
实际上,您在将作业ID提交到dataproc集群时设置了作业ID。您可以在dataproc API中找到它。如果您仔细查看要在参数中设置的Job对象。
这里有一个作业UUID。注释很清楚:仅输出,使用reference.job_id设置作业id
好的,转到引用对象,您可以定义一个JobID和一个项目ID。
现在,让我们对库执行此操作:在作业对象中,添加一个具有项目ID和作业ID的引用对象。
Job job = Job.newBuilder().setReference(
JobReference.newBuilder().setJobId("123").setProjectId("myProjectId").build())
.setPlacement(jobPlacement).setSparkJob(sparkJob).build();