dask数据帧到spark的工作方式与pandas数据帧到spark不同



我正在从netcdf文件中读取一些风速数据。这产生了一个xarray数据集,我可以将其转换为panda和/或dask数据帧。最终,由于数据量的原因,我想转换为dask数据帧,然后转换为pyspark。然而,当从dask转换为spark时,我收到了一个错误,而在执行等效的panda到spark时我没有收到。请参阅下面的一些代码。

熊猫优先

df=u10.isel(time0=0).to_dataframe().reset_index()
print(df.head())
print("=======================================================")
print(df.dtypes)
print("=======================================================")
pyspark_df = spark.createDataFrame(df) 
pyspark_df.show(5)

lon    lat      time0  eastward_wind_at_100_metres  
0  0.0  90.00 2021-03-01                       -0.375   
1  0.0  89.75 2021-03-01                       -7.000   
2  0.0  89.50 2021-03-01                       -7.125   
3  0.0  89.25 2021-03-01                       -7.250   
4  0.0  89.00 2021-03-01                       -7.250   
eastward_wind_at_10_metres  northward_wind_at_100_metres  
0                     -0.1875                       -0.1250   
1                     -5.0625                        0.7500   
2                     -5.1875                        0.8125   
3                     -5.3750                        1.0625   
4                     -6.0000                        1.5625   
northward_wind_at_10_metres  
0                       0.0000  
1                       0.0625  
2                       0.1250  
3                       0.3750  
4                       1.1250  
=======================================================
lon                                    float64
lat                                    float64
time0                           datetime64[ns]
eastward_wind_at_100_metres            float32
eastward_wind_at_10_metres             float32
northward_wind_at_100_metres           float32
northward_wind_at_10_metres            float32
dtype: object
=======================================================
+---+-----+-------------------+---------------------------+--------------------------+----------------------------+---------------------------+
|lon|  lat|              time0|eastward_wind_at_100_metres|eastward_wind_at_10_metres|northward_wind_at_100_metres|northward_wind_at_10_metres|
+---+-----+-------------------+---------------------------+--------------------------+----------------------------+---------------------------+
|0.0| 90.0|2021-03-01 00:00:00|                     -0.375|                   -0.1875|                      -0.125|                        0.0|
|0.0|89.75|2021-03-01 00:00:00|                       -7.0|                   -5.0625|                        0.75|                     0.0625|
|0.0| 89.5|2021-03-01 00:00:00|                     -7.125|                   -5.1875|                      0.8125|                      0.125|
|0.0|89.25|2021-03-01 00:00:00|                      -7.25|                    -5.375|                      1.0625|                      0.375|
|0.0| 89.0|2021-03-01 00:00:00|                      -7.25|                      -6.0|                      1.5625|                      1.125|
+---+-----+-------------------+---------------------------+--------------------------+----------------------------+---------------------------+
only showing top 5 rows

现在为dask

ddf=u10.isel(time0=0).to_dask_dataframe()
print(ddf.head())
print("=======================================================")
print(ddf.dtypes)
print("=======================================================")
pyspark_df = spark.createDataFrame(ddf) 
pyspark_df.show(5)
lon    lat      time0  eastward_wind_at_100_metres  
0  0.0  90.00 2021-03-01                       -0.375   
1  0.0  89.75 2021-03-01                       -7.000   
2  0.0  89.50 2021-03-01                       -7.125   
3  0.0  89.25 2021-03-01                       -7.250   
4  0.0  89.00 2021-03-01                       -7.250   
eastward_wind_at_10_metres  northward_wind_at_100_metres  
0                     -0.1875                       -0.1250   
1                     -5.0625                        0.7500   
2                     -5.1875                        0.8125   
3                     -5.3750                        1.0625   
4                     -6.0000                        1.5625   
northward_wind_at_10_metres  
0                       0.0000  
1                       0.0625  
2                       0.1250  
3                       0.3750  
4                       1.1250  
=======================================================
lon                                    float32
lat                                    float32
time0                           datetime64[ns]
eastward_wind_at_100_metres            float32
eastward_wind_at_10_metres             float32
northward_wind_at_100_metres           float32
northward_wind_at_10_metres            float32
dtype: object
=======================================================
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
~AppDataLocalTemp/ipykernel_18808/850613924.py in <module>
4 print(ddf.dtypes)
5 print("=======================================================")
----> 6 pyspark_df = spark.createDataFrame(ddf)
7 pyspark_df.show(5)
...
...
...
1063 
1064     else:
-> 1065         raise TypeError("Can not infer schema for type: %s" % type(row))
1066 
1067     fields = []
TypeError: Can not infer schema for type: <class 'str'>

如果有人能解释为什么会发生这种情况,并提供一个伟大的解决方案

为createdataframe调用添加了架构

fields = [
StructField("lon", FloatType(), True),
StructField("lat", FloatType(), True),
StructField("time0", TimestampType(), True),
StructField("eastward_wind_at_100_metres",FloatType(),True), 
StructField("eastward_wind_at_10_metres",FloatType(),True), 
StructField("northward_wind_at_100_metres",FloatType(),True), 
StructField("northward_wind_at_10_metres",FloatType(),True)  ]

schema = StructType(fields)
ddf=u10.isel(time0=0).to_dask_dataframe()
print(ddf.head())
print("=======================================================")
print(ddf.dtypes)
print("=======================================================")
pyspark_df = spark.createDataFrame(ddf,schema) 
pyspark_df.show(10)
lon    lat      time0  eastward_wind_at_100_metres  
0  0.0  90.00 2021-03-01                       -0.375   
1  0.0  89.75 2021-03-01                       -7.000   
2  0.0  89.50 2021-03-01                       -7.125   
3  0.0  89.25 2021-03-01                       -7.250   
4  0.0  89.00 2021-03-01                       -7.250   
eastward_wind_at_10_metres  northward_wind_at_100_metres  
0                     -0.1875                       -0.1250   
1                     -5.0625                        0.7500   
2                     -5.1875                        0.8125   
3                     -5.3750                        1.0625   
4                     -6.0000                        1.5625   
northward_wind_at_10_metres  
0                       0.0000  
1                       0.0625  
2                       0.1250  
3                       0.3750  
4                       1.1250  
=======================================================
lon                                    float32
lat                                    float32
time0                           datetime64[ns]
eastward_wind_at_100_metres            float32
eastward_wind_at_10_metres             float32
northward_wind_at_100_metres           float32
northward_wind_at_10_metres            float32
dtype: object
=======================================================
...
...
~Anaconda3libsite-packagespysparksqltypes.py in verify_struct(obj)
1394                     verifier(d.get(f))
1395             else:
-> 1396                 raise TypeError(new_msg("StructType can not accept object %r in type %s"
1397                                         % (obj, type(obj))))
1398         verify_value = verify_struct
TypeError: StructType can not accept object 'lon' in type <class 'str'>

完全回溯

TypeError                                 Traceback (most recent call last)
~AppDataLocalTemp/ipykernel_5368/3996125343.py in <module>
15 print(ddf.dtypes)
16 print("=======================================================")
---> 17 pyspark_df = spark.createDataFrame(ddf,schema)
18 pyspark_df.show(10)
~Anaconda3libsite-packagespysparksqlsession.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
673             return super(SparkSession, self).createDataFrame(
674                 data, schema, samplingRatio, verifySchema)
--> 675         return self._create_dataframe(data, schema, samplingRatio, verifySchema)
676 
677     def _create_dataframe(self, data, schema, samplingRatio, verifySchema):
~Anaconda3libsite-packagespysparksqlsession.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
698             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
699         else:
--> 700             rdd, schema = self._createFromLocal(map(prepare, data), schema)
701         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
702         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
~Anaconda3libsite-packagespysparksqlsession.py in _createFromLocal(self, data, schema)
507         # make sure data could consumed multiple times
508         if not isinstance(data, list):
--> 509             data = list(data)
510 
511         if schema is None or isinstance(schema, (list, tuple)):
~Anaconda3libsite-packagespysparksqlsession.py in prepare(obj)
680 
681             def prepare(obj):
--> 682                 verify_func(obj)
683                 return obj
684         elif isinstance(schema, DataType):
~Anaconda3libsite-packagespysparksqltypes.py in verify(obj)
1407     def verify(obj):
1408         if not verify_nullability(obj):
-> 1409             verify_value(obj)
1410 
1411     return verify
~Anaconda3libsite-packagespysparksqltypes.py in verify_struct(obj)
1394                     verifier(d.get(f))
1395             else:
-> 1396                 raise TypeError(new_msg("StructType can not accept object %r in type %s"
1397                                         % (obj, type(obj))))
1398         verify_value = verify_struct
TypeError: StructType can not accept object 'lon' in type <class 'str'>

旁白:你确定要spark处理这些数据吗?Dask与xarray集成良好,您可能会发现转换不值得,尤其是因为您需要复制和转换数据,并且需要运行两个集群系统。

简言之:createDataFrame完全支持熊猫而不是"熊猫";熊猫般的物体";例如dask数据帧。从文档字符串:

data : :class:`RDD` or iterable
an RDD of any kind of SQL data representation (:class:`Row`,
:class:`tuple`, ``int``, ``boolean``, etc.), or :class:`list`, or
:class:`pandas.DataFrame`.

dask数据帧由惰性分区组成,但您要求spark将它们发送给spark工作程序。这不是一个简单的操作!您可以决定为每个dask分区创建spark数据帧,并在工作端将它们连接起来——这可能会达到您想要的效果,但根本没有效率。

to_spark_df = dask.delayed(spark.createDataFrame)
pieces = dask.compute(*[to_spark_df(d) for d in ddf.to_delayed()])
spark_df = functoos.reduce(pyspark.sql.DataFrame.unionAll, pieces)

(注意,这是在链中应用并集,在spark中似乎没有简单的concat操作(

最新更新