我是gcloud和BigQuery的新手,希望使用spark从BigQuery读取数据。我使用了Java的GoogleAPI客户端库。并且能够连接BigQuery。我得到com.google.api.services.bigquery.bigquery对象,能够打印读取的数据集、tableId和tableData
我的问题是
我如何将此BigQuery身份验证对象(凭据对象)连接到spark,或者是否有将此对象与hadoopApi 一起使用
如果除了如何将凭证对象传递给新的HadoopAPi 之外没有其他可能性
GoogleAuthorizationCodeFlow flow = getFlow();
GoogleTokenResponse response = flow.newTokenRequest(authorizationCode)
.setRedirectUri(REDIRECT_URI).execute();
Credential credential=flow.createAndStoreCredential(response, null);
return credential;
我的Hadoop api代码是我想要使用凭证对象的地方
val tableData = sc.newAPIHadoopRDD(
conf,
classOf[GsonBigQueryInputFormat],
classOf[LongWritable],
classOf[JsonObject]).
我认为BigQuery Connector for Hadoop可以解决您的问题,而无需编写自己的低级别客户端。看看:https://cloud.google.com/hadoop/bigquery-connector
下面是一个使用它将Spark连接到BigQuery的示例:https://cloud.google.com/hadoop/examples/bigquery-connector-spark-example
Thanx@michael在您链接的帮助下,我找到了解决方案
只需在hadoop配置上禁用服务帐户
hadoopConfiguration.set("fs.gs.auth.service.account.enable", "false")
下面的代码将是工作
val hadoopConfiguration = sc.hadoopConfiguration
//BigQueryConfiguration.
hadoopConfiguration.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
hadoopConfiguration.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
hadoopConfiguration.set("fs.gs.project.id", projectId);
hadoopConfiguration.set("fs.gs.auth.service.account.enable", "false")
hadoopConfiguration.set("fs.gs.auth.client.id",
clientId)
hadoopConfiguration.set("fs.gs.auth.client.secret",
clientSecret)
hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
hadoopConfiguration.set("fs.gs.auth.client.file", tokenPath);
hadoopConfiguration.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
// Configure input and output for BigQuery access.
com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.configureBigQueryInput(hadoopConfiguration, dataSetId + "." + tableId)
val tableData = sc.newAPIHadoopRDD(
hadoopConfiguration,
classOf[GsonBigQueryInputFormat],
classOf[LongWritable],
classOf[JsonObject])
其中令牌路径包含刷新令牌
{
"credentials": {
"user": {
"access_token": "ya29..wgL6fH2Gx5asdaadsBl2Trasd0sBqV_ZAS7xKDtNS0z4Qyv5ypassdh0soplQ",
"expiration_time_millis": 1460473581255,
"refresh_token": "XXXXXXXXXxxxxxxxxx"
}
}
}