将BigQuery连接器与Spark一起使用



我没有得到谷歌的示例工作

https://cloud.google.com/hadoop/examples/bigquery-connector-spark-examplePySpark

我认为代码中有一些错误,比如:

'#输出参数'mapred.bq.project.id':'',

应为:'mapred.bq.output.project.id':'',

'#将数据写回新的BigQuery表
'#BigQueryOutputFormat丢弃键,因此将键设置为None
(单词计数
.map(lambda对:None,json.dumps(对))
.saveAsNewAPIHadoopDataset(conf)

将给出错误消息。如果我将其更改为:
(单词计数
.map(lambda对:(None,json.dumps(对))
.saveAsNewAPIHadoopDataset(conf)

我收到错误消息:
org.apache.hadoop.io.Text无法转换为com.google.gson.JsonObject

无论我做什么,我都做不到
BigQuery中创建了一个数据集,我在"conf"中为其命名,后面有一个"_hadoop_temporary_job_201512081419_0008"
并创建一个末尾为"_attempt_201512081419_0008_r_ 000000_0"的表。但是总是空的

有人能帮我吗
感谢

我们正在努力更新文档,因为正如您所指出的,在这种情况下,文档是不正确的。抱歉!当我们正在更新文档时,我想尽快给你回复。

铸造问题

你提到的最重要的问题是选角问题。不幸的是,PySpark无法使用BigQueryOutputFormat来创建Java GSON对象。解决方案(变通方法)是将输出数据保存到谷歌云存储(GCS)中,然后使用bq命令手动加载。

代码示例

下面是一个代码示例,它导出到GCS并将数据加载到BigQuery中。您也可以使用subprocess和Python以编程方式执行bq命令。

#!/usr/bin/python
"""BigQuery I/O PySpark example."""
import json
import pprint
import pyspark
sc = pyspark.SparkContext()
# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory ='gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf = {
    # Input Parameters
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'publicdata',
    'mapred.bq.input.dataset.id': 'samples',
    'mapred.bq.input.table.id': 'shakespeare',
}
# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)
# Perform word count.
word_counts = (
    table_data
    .map(lambda (_, record): json.loads(record))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))
# Display 10 results.
pprint.pprint(word_counts.take(10))
# Stage data formatted as newline delimited json in Google Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
partitions = range(word_counts.getNumPartitions())
output_files = [output_directory + '/part-{:05}'.format(i) for i in partitions]
(word_counts
 .map(lambda (w, c): json.dumps({'word': w, 'word_count': c}))
 .saveAsTextFile(output_directory))
# Manually clean up the input_directory, otherwise there will be BigQuery export
# files left over indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
print """
###########################################################################
# Finish uploading data to BigQuery using a client e.g.
bq load --source_format NEWLINE_DELIMITED_JSON 
    --schema 'word:STRING,word_count:INTEGER' 
    wordcount_dataset.wordcount_table {files}
# Clean up the output
gsutil -m rm -r {output_directory}
###########################################################################
""".format(
    files=','.join(output_files),
    output_directory=output_directory)

相关内容

最新更新