如何使用python在数据流批处理管道中写入bigquery后执行独立函数



在文件成功加载到bigquery后,我正试图将其移动到不同的bucket,但在管道启动之前,独立函数正在执行。如何在成功加载到bigquery后计时独立函数'process(('的执行。

def process(file_name):
"""Moves a blob from one bucket to another."""
storage_client = storage.Client()
source_bucket=storage_client.bucket('source_bucket')
destination_bucket=storage_client.bucket('destination bucket')
source_blob=source_bucket.blob(file_name)
destination_blob_name=(file_name)
blob_copy = source_bucket.copy_blob(source_blob, destination_bucket,destination_blob_name)
source_bucket.delete_blob(file_name)
print('File {} is transfered from {} to{}'.format(file_name,source_bucket,destination_bucket))
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args, save_main_session=True)
with beam.Pipeline(options=pipeline_options) as p:
from google.cloud import storage
client = storage.Client()
bucket = client.bucket('sourcebucket')
blob = bucket.get_blob('sourcefile.avro')
downloaded_blob = "temporary.avro"
blob.download_to_filename(downloaded_blob)
reader = DataFileReader(open(downloaded_blob, "rb"), DatumReader())
file_name=blob.name
records = [r for r in reader]
# Populate pandas.DataFrame with records
df = pd.DataFrame.from_records(records)

(
convert.to_pcollection(df,pipeline=p,label="pcollection ")
|'To dictionaries ' >> beam.Map(lambda x: dict(x._asdict()))
| 'WriteToBigQuery ' >> beam.io.WriteToBigQuery('projectID:datasetID.table',
schema='SCHEMA_AUTODETECT',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,)

|'move'>>beam.FlatMap(process(file_name)
)


if __name__ == "__main__":
run()

我不是python背景,但我在Java中实现了类似的东西。您想要实现这一点的方法是使用FileReader.match(),它将向您返回新文件的元数据。然后将这个集合分成两个,一个(PCollection1(将获取元数据并执行FileIO.readMatches or TextIO.read(),然后进一步处理这些数据并将其写入接收器。

在另一个分支中,您将使用Wait.on(PCollection1).apply(ParDo.of(new yourDeleteFn)),然后在ParDo中创建一个存储客户端,并将已处理的文件移动到另一个存储桶中。

最新更新