使用Apache spark将200mb的大csv文件转换为Parquet文件



我正试图使用Apache spark将csv文件转换为Parquet文件,对于小型csv,它运行得很好,但对于大型csv,工作一直在进行,它从未停止,并且给出以下错误,我不确定我做错了什么,也不确定我必须添加什么配置。请帮助

20:02:27.232 [Executor task launch worker for task 0.0 in stage 2.0 (TID 3)] DEBUG org.apache.parquet.crypto.EncryptionPropertiesFactory - EncryptionPropertiesFactory is not configured - name not found in hadoop config
20:02:27.232 [Executor task launch worker for task 0.0 in stage 2.0 (TID 3)] DEBUG org.apache.hadoop.fs.FileSystem - NativeIO.createDirectoryWithMode error, path = D:bigcsvtest1.parquet_temporary_temporaryattempt_202204252002273263990563677151010_0002_m_000000_3, mode = 755
org.apache.hadoop.io.nativeio.NativeIOException: Cannot create a file when that file already exists.
at org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode(NativeIO.java:708)
at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:647)
at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:424)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:433)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:521)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)

我的代码如下:

SparkSession spark = SparkSession
.builder()
.appName("csv2parquet")
.master("local")
.getOrCreate();
final String dir = "D:/bigcsv/Data.csv";
Dataset<Row> ds = spark.read().option("header", true).option("inferSchema", true).csv(dir);
final String parquetFile = "D:/bigcsv/test1.parquet";
final String codec = "parquet";
// For mode I also tried "overwrite" but it's still not working
ds.write().option("compression", "gzip").mode("append").format(codec).save(parquetFile);
spark.stop();

在添加下面的行后,它开始工作,但对于200mb的文件,它花了太多时间来减少它。

ds.write().option("maxRecordsPerFile", 10000).option("compression", "gzip").mode("overwrite").format(codec).save(parquetFile);

也许可以尝试删除"。镶木地板";来自CCD_ 1。

例如将CCD_ 2改变为CCD_。如果你只想写一个镶木地板文件,而不是多个,你应该使用.coalesce(1),但写它需要更多的时间。

最新更新