我正在使用 AWS Glue 处理一些 S3 TSV 到 S3 Parquet。 由于非 UTF-8 传入文件,我被迫使用 DataFrame 而不是 DynamicFrame 来处理我的数据(这是一个已知的问题,没有任何工作,DynamicFrame 在处理任何非 UTF8 字符时完全失败)。 这似乎也意味着我无法使用 Glue 中的作业书签来跟踪我已经处理过的 S3 TSV 文件。
我的代码如下所示:
# pylint: skip-file
# flake8: noqa
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import split
from awsglue.dynamicframe import DynamicFrame
# @params: [JOB_NAME, s3target]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3target', 's3source'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Define massive list of fields in the schema
fields = [
StructField("accept_language", StringType(), True),
StructField("browser", LongType(), True),
.... huge list ...
StructField("yearly_visitor", ShortType(), True),
StructField("zip", StringType(), True)
]
schema = StructType(fields)
# Read in data using Spark DataFrame and not an AWS Glue DynamicFrame to avoid issues with non-UTF8 characters
df0 = spark.read.format("com.databricks.spark.csv").option("quote", """).option("delimiter", u'u0009').option("charset", 'utf-8').schema(schema).load(args['s3source'] + "/*.tsv.gz")
# Remove all rows that are entirely nulls
df1 = df0.dropna(how = 'all')
# Generate a partitioning column
df2 = df1.withColumn('date', df1.date_time.cast('date'))
# Write out in parquet format, partitioned by date, to the S3 location specified in the arguments
ds2 = df2.write.format("parquet").partitionBy("date").mode("append").save(args['s3target'])
job.commit()
我的问题是 - 每次运行时都没有作业书签,它会一遍又一遍地处理相同的 s3 文件。 如何将源 s3 存储桶中已处理的文件移动到子文件夹或其他内容,或者避免双重处理文件?
我不确定这里的诀窍是什么,Spark是一个并行系统,甚至不知道文件是什么。 我想我可以使用 Python Shell 作业类型创建第二个 Glue 作业,然后立即删除传入的文件,但即便如此,我也不确定要删除哪些文件等。
谢谢
克里斯
要将处理后的文件标记为输入源前缀之外,您必须使用boto3
(或直接使用 awscli)来移动或删除文件。
要确定要处理的文件,您可以通过 2 种不同的方式继续操作:
- 在使用 Spark 之前,使用 BOTO3 和
s3client.list_objects()
解决文件全局args['s3source'] + "/*.tsv.gz"
。 您可以提供一组已解析的文件,而不是要spark.read.load
的 glob 。
import boto3
client = boto3.client('s3')
# get all the available files
# Note: if you expect a lot of files, you need to iterate on the pages of results
response = client.list_objects_v2(Bucket=your_bucket_name,Prefix=your_path_prefix)
files=['s3://'+your_bucket_name+obj['Key'] for obj in response['Contents'] if obj.endswith('tsv.gz')]
... initialize your job as before ...
df0 = df0 = spark.read.format("com.databricks.spark.csv").option("quote", """).option("delimiter", u'u0009').option("charset", 'utf-8').schema(schema).load(files)
... do your work as before ...
- 使用 Spark 跟踪其所有输入文件的事实,在成功保存后对其进行后处理:
... process your files with pyspark as before...
# retrieve the tracked files from the initial DataFrame
# you need to access the java RDD instances to get to the partitions information
# The file URIs will be in the following format: u's3://mybucket/mypath/myfile.tsv.gz'
files = []
for p in df0.rdd._jrdd.partitions():
files.append([f.filePath() for f in p.files().array()])
获得文件列表后,删除、重命名或将它们添加到元数据存储中以在下一个作业中过滤掉它们非常简单。
例如,要删除它们:
# initialize a S3 client if not already done
from urlparse import urlparse # python 2
import boto3
client = boto3.client('s3')
# do what you want with the uris, for example delete them
for uri in files:
parsed = urlparse(uri)
client.delete_object(Bucket=parsed.netloc, Key=parsed.path)
如果您不担心再次处理相同的源文件(相对于时间限制),并且您的用例是目标中没有重复的数据,则可以考虑在写入数据帧时将保存模式更新为"覆盖">
https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/DataFrameWriter.html
我用于通过 AWS 胶水开发的 ETL 流程之一的解决方案是首先使用 boto3 API 列出 s3 中的文件并将其移动到"WORK"文件夹。此过程不应花费任何时间,因为您只是更改 s3 对象名称而不是任何物理移动。
完成上述步骤后,您可以使用"WORK"文件夹作为SPARK数据帧的输入,而新文件可以继续推送到其他s3文件夹。
我不确定您的用例,但我们使用当前系统日期时间来创建"WORK"文件夹,以便如果我们发现几天后加载的进程或数据有任何问题,我们可以调查或重新运行任何文件。
最终工作代码:
# pylint: skip-file
# flake8: noqa
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import split
import boto3
from urlparse import urlparse
# Read arguments
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3target', 's3source'])
# Initialise boto3
client = boto3.client('s3')
# Get all the available files
response = client.list_objects_v2(Bucket = "xxx")
files = [ "s3://xxx/" + obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.tsv.gz') ]
# Initialise the glue job
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Define massive list of fields in the schema
fields = [
StructField("accept_language", StringType(), True),
StructField("browser", LongType(), True),
.... huge list ...
StructField("yearly_visitor", ShortType(), True),
StructField("zip", StringType(), True)
]
schema = StructType(fields)
# Read in data using Spark DataFrame and not an AWS Glue DynamicFrame to avoid issues with non-UTF8 characters
df0 = spark.read.format("com.databricks.spark.csv").option("quote", """).option("delimiter", u'u0009').option("charset", 'utf-8').schema(schema).load(files)
# Remove all rows that are entirely nulls
df1 = df0.dropna(how = 'all')
# Generate a partitioning column
df2 = df1.withColumn('date', df1.date_time.cast('date'))
# Write out in parquet format, partitioned by date, to the S3 location specified in the arguments
ds2 = df2.write.format("parquet").partitionBy("date").mode("append").save(args['s3target'])
# retrieve the tracked files from the initial DataFrame
# you need to access the java RDD instances to get to the partitions information
# The file URIs will be in the following format: u's3://mybucket/mypath/myfile.tsv.gz'
files = []
for p in df0.rdd._jrdd.partitions():
files.extend([f.filePath() for f in p.files().array()])
# Move files to the processed folder
for uri in files:
parsed = urlparse(uri)
client.copy_object(CopySource = {'Bucket': parsed.netloc, 'Key': parsed.path.lstrip('/')}, Bucket = parsed.netloc, Key = 'processed' + parsed.path)
client.delete_object(Bucket = parsed.netloc, Key = parsed.path.lstrip('/'))
job.commit()