Apache beam数据流作业问题-未指定可运行工作流的步骤



我正试图用自定义模板创建一个数据流作业,但得到一个错误为"未指定可运行工作流的步骤"日志中没有除此之外的任何信息。我是不是少了一步?

创建了一个虚拟环境并执行了以下代码。

我马上就要到了。对这个错误的任何帮助都将不胜感激。

代码为:

import sys
import os
import apache_beam as beam
import google.cloud.logging
import google.auth
from google.cloud import storage
import pandas as pd
import io
import gcsfs as gcs
from io import BytesIO
import re
from datetime import date
import datetime
import logging
import argparse
from pyspark.context import SparkContext
from pyspark import SparkConf
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext
from py4j.java_gateway import java_import
from pyspark.sql.functions import udf
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import urllib
import urllib.request
from urllib.request import urlretrieve
#import requests,zipfile
#from zipfile import zipFile
#from zipfile import is_zipfile
from zipfile import ZipFile, is_zipfile

date = datetime.datetime.now()
#monthname = date.strftime("%B")
monthname = 'September'
#monthno = date.strftime("%m")
monthno = '9'
yearname = date.strftime("%Y")
print(monthname)
print(monthno)
print(yearname)
url = 'https://www.abc.gov/files/zip/statecontract-'+monthname+'-'+yearname+'-employee.zip'
destination_zip_name = 'upload.zip'
def upload_blob(bucket_name, url, destination_zip_name,argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
storage_client = storage.Client()
source_bucket = storage_client.get_bucket(bucket_name)
print('source bucket - ',source_bucket)
destination_bucket_name = storage_client.get_bucket(bucket_name) #destination_bucket_name
print('destination bucket - ',destination_bucket_name)

my_file = urllib.request.urlopen(url)
blob1 = source_bucket.blob(destination_zip_name)
blob1.upload_from_string(my_file.read(), content_type='application/zip')
destination_blob_pathname = destination_zip_name
print('destination_blob_pathname - ',destination_blob_pathname)
blob = source_bucket.blob(destination_blob_pathname)
zipbytes = io.BytesIO(blob.download_as_string())

if is_zipfile(zipbytes):
with ZipFile(zipbytes, 'r') as myzip:
for contentfilename in myzip.namelist():
contentfile = myzip.read(contentfilename)
#print('contentfile - ',contentfile)

# unzip pdf files only, leave out if you don't need this.
if '.csv' in contentfilename.casefold():
output_file = f'/tmp/{contentfilename.split("/")[-1]}'
print('output_file - ',output_file)
outfile = open(output_file, 'wb')
outfile.write(contentfile)
outfile.close()
blob = source_bucket.blob(
f'{destination_zip_name.rstrip(".zip")}/{contentfilename}'
)
with open(output_file, "rb") as my_csv:
blob.upload_from_file(my_csv)

blob1.delete()                
print('done running function')

if __name__ == '__main__':
upload_blob('testbucket', url, destination_zip_name)

我认为您不需要使用Apache Beam来满足您的需要。您甚至没有使用Pipeline变量p,也没有初始化任何PCollection

相反,你应该把你的代码放在一个云函数中并运行它。它应该可以工作。

最新更新