我正试图通过PySpark写红移。我的Spark版本是3.2.0,Scala版本是2.12.15。
我试着按照这里的指导来写。我也试过通过aws_iam_role写作,如链接中所解释的,但它导致了同样的错误。我所有的依赖项都匹配scala 2.12版本,这是我的Spark使用的。
环境火花3.2Scala 2.12.15Pyspark 3.2.3Java 11Ubuntu 22.04 LTSPython 3.8
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc')
.config("spark.jars.packages","com.eclipsesource.minimal-json:minimal-json:0.9.5,com.amazon.redshift:redshift-jdbc42:2.1.0.12,com.google.guava:guava:31.1-jre,com.amazonaws:aws-java-sdk-s3:1.12.437,org.apache.spark:spark-avro_2.12:3.3.2,io.github.spark-redshift-community:spark-redshift_2.12:5.1.0,org.apache.hadoop:hadoop-aws:3.2.2,com.google.guava:failureaccess:1.0")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.access.key", "etc")
.config("spark.hadoop.fs.s3a.secret.key", "etc")
.config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
.getOrCreate()
df=spark.read.option("header",True)
.csv("demo.csv")
df.write
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", "jdbc:redshift:iam://host:5439/dev?user=user&password=pass")
.option("dbtable", "demo")
.option("forward_spark_s3_credentials","True")
.option("tempdir", "s3a://mubucket/folder")
.mode("append")
.save()
抛出错误
23/03/30 18:51:47 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/03/30 18:51:50 WARN Utils$: The S3 bucket demo does not have an object lifecycle configuration to ensure cleanup of temporary files. Consider configuring `tempdir` to point to a bucket with an object lifecycle policy that automatically deletes files after an expiration period. For more information, see https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html
23/03/30 18:51:51 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:54 ERROR Utils: Aborting task
java.lang.NoSuchMethodError: 'scala.Function1 org.apache.spark.sql.execution.datasources.DataSourceUtils$.createDateRebaseFuncInWrite(scala.Enumeration$Value, java.lang.String)'
凭据因张贴而被删除。通过相同的信用,我能够创建数据库/表。同样的用户也可以在S3上创建文件并拥有完全的访问权限。
我正试图通过火花写红移。我一直在用指南,但写不出来。我用手册中提供的不同方法尝试了多次,但都导致了同样的错误。这是手册。
这似乎不起作用。现在,我已经创建了一个自定义解决方案,它可以通过Spark作为parquet写入S3,并在数据库上运行复制命令。我也在GitHub上就此打开了一个问题。你可以在这里查看。
from pyspark.sql import SparkSession
import psycopg2
import boto3
def query_redshift(current_query,fetch,url):
conn_string = url
conn = psycopg2.connect(conn_string)
conn.autocommit=True
cursor = conn.cursor()
cursor.execute(current_query)
if fetch==1:
records=cursor.fetchall()
conn.commit()
return records
cursor.close()
conn.close()
print ("S3 to Redshift Transfer Successful")
def write_to_redshift(df,folder,arn,tablename,jdbc_url,bucket,aws_access_key_id,aws_secret_access_key):
staging = "s3://"+bucket+"/"+folder
s3a = staging.replace("s3://","s3a://")
df.write.parquet(s3a)
query=f"""
COPY {tablename}
FROM '{staging}'
CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}'
FORMAT AS PARQUET;
"""
try:
print(query)
resp = query_redshift(query,0,jdbc_url)
except Exception as e:
print(str(e))
finally:
s3 = boto3.resource('s3',aws_access_key_id=aws_access_key_id,
aws_secret_access_key= aws_secret_access_key)
bucket = s3.Bucket(bucket)
delete = bucket.objects.filter(Prefix=folder+"/").delete()
print(delete)
def main():
aws_access_key_id = 'etc'
aws_secret_access_key = 'etc'
spark = SparkSession.builder.appName('abc')
.config("spark.jars.packages","com.amazon.redshift:redshift-jdbc42:2.1.0.12,com.google.guava:guava:31.1-jre,org.apache.hadoop:hadoop-aws:3.2.2")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.access.key", aws_access_key_id)
.config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
.config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key)
.getOrCreate()
df=spark.read.option("header",True)
.csv("demo.csv") # replace with whatever dataframe you have
df.show()
tablename = 'public.demo'
iam_role=""
bucket_name = 'bucket'
#S3 Credentials Option 1
jdbc = "host = 'host' port ='5439' dbname = 'dev' user = 'user' password = 'pass' connect_timeout = 30000"
folder = "cache8"
write_to_redshift(df,folder,iam_role,tablename,jdbc,bucket_name,aws_access_key_id,aws_secret_access_key)
main()
这将您的数据帧作为parquet写入s3,然后从该数据在db上运行复制命令并将其从bucket中删除。