读取PySpark中的二进制文件



我有一个二进制文件,我可以用numpy和pandas读取:

dt = numpy.dtype([('col1', numpy.int64), ('col2', numpy.float32), ('col3', numpy.int32)])
df = pandas.from_array(
numpy.fromfile(file, dtype=dt),
columns=data.dtype.names)
)

我想使用PySpark而不是首先创建一个pandas数据框,因为文件可能比内存大。

我看到一个推荐的方法是:

df = spark.read.format("binaryFile").load(file)
df.printSchema()
df.show()

但是这不允许我指定每个列的类型。此外,即使有了那个测试文件,我也得到了一个java.lang.OutOfMemoryError.

所以现在我试着把它加载到RDD:

rdd = spark.sparkContext.binaryFiles(file)

然后按照Scala的建议应用映射:

import java.nio.ByteBuffer
val result = YourRDD.map(x=>(ByteBuffer.wrap(x.take(4)).getInt,
ByteBuffer.wrap(x.drop(4).take(2)).getShort,
ByteBuffer.wrap(x.drop(6)).getLong))

但是我很难让它工作。例如,当我尝试rdd.first()时,我得到了整个文件。下面是我的尝试:

rdd = spark.sparkContext.binaryFiles(file)
def func1(x):

dt = numpy.dtype([('time', numpy.int64), ('e', numpy.float32), ('id', numpy.int32)])
df = pandas.DataFrame(
numpy.frombuffer(x, dtype=dt),
columns=dt.names
)
return (df.col1,df.col2,df.col3)
result = rdd.mapValues(lambda x: func1(x))
result.first()

但是这给了我一个完整列的单条目:

('file',
(0          2317613314222
...      
4026940    7317606063913
Name: col1, Length: 4026941, dtype: int64,
0          1.551823
...   
4026940    2.379845
Name: col2, Length: 4026941, dtype: float32,
0             556
...  
4026940    131336
Name: col3, Length: 4026941, dtype: int32))

如何加载这个文件?

编辑:文件的一小段摘录:

with open(file, mode="rb") as open_file:
contents = open_file.readlines()

contents[0:5]

结果:

[b'xaexb0x84x9cx1bx02x00x00 xa2xc6?,x02x00x00x0cBx95x9cx1bx02x00x00xe0ax9a?x19x02x02x00x0fxf7xa4x9cx1bx02x00x00`xe9x82?0x03x02x00x96@x03x9dx1bx02x00x00xd0Hx05@;x02x02x00xd5n',
b'n',
b'x9dx1bx02x00x00x00^xa1?x0fx01x00x00nq,x9dx1bx02x00x00xe0x89xad?xaex03x02x00Fx8exb6x9dx1bx02x00x00@Uxd1?<x03x02x00xc3_xfax9dx1bx02x00x00@}x87?)x02x02x00xacx92Kx9ex1bx02x00x00P/x1f@n',
b"x02x04x00x07Qx9ax9ex1bx02x00x00PIx04@,x01x02x00x04-xb2x9ex1bx02x00x00x80xdcxf0?x1dx00x04x00x0cwxbdx9ex1bx02x00x00xa0-xef?x0cx02x02x00xb0x86xcfx9ex1bx02x00x00 xc2xb4?,x02x00x00x12x03x1ex9fx1bx02x00x00x80xb6x85?)x02x02x00Exc9wx9fx1bx02x00x000xf3x03@x13x00x04x00Px1bx91x9fx1bx02x00x00x00xeax06@%x00x00x00x9b:x9cx9fx1bx02x00x00xe0Tx0b@x06x03x00x00x9bx9fxa4x9fx1bx02x00x00xc0/xf4?x06x03x00x00Zxcbxb8x9fx1bx02x00x00x00Axe1?!x02x02x00xbcJxbdx9fx1bx02x00x00xe0xc9xd2?!x02x04x00x06]xd0x9fx1bx02x00x00`Dxda?x1dx00x04x00hBxdex9fx1bx02x00x00xe0x10xff?x1dx01x02x00x9fi0xa0x1bx02x00x00xa0fxec?x86x03x00x00xf5Wsxa0x1bx02x00x00 xd5xca?x1dx00x04x00Lxa0x8dxa0x1bx02x00x00xc0#xda?x1dx00x04x00|<,xa1x1bx02x00x00 xbcxd1?x1dx00x04x00x8bxfb2xa1x1bx02x00x00xa0xbfxcb?x08x02x02x00dxd2Xxa1x1bx02x00x00 xc6xb4?5x00x04x00xaex1fcxa1x1bx02x00x00@x07x90?1x03x02x00xf3x80gxa1x1bx02x00x00`xbdxde?4x00x04x00gx1dmxa1x1bx02x00x00@7x96?x98x03x00x00xb8@|xa1x1bx02x00x00PKx11@x06x03x00x00xedjx83xa1x1bx02x00x00xc0x11xdd?,x02x00x00xb1xbdx8exa1x1bx02x00x00xa0xc7xc5?rx02x02x00xbdx0fxbaxa1x1bx02x00x00 xe3xc1?x1fx01x02x00xf5xa6xf5xa1x1bx02x00x00x80xdfxcd?x06x01x00x00'xb5 xa2x1bx02x00x00x00x02xb6?x1dx00x04x00xfas/xa2x1bx02x00x00xc0xb2xbb?x98x03x02x00=xaanxa2x1bx02x00x00`xafxe8?x08x02x02x00xa2x83x8fxa2x1bx02x00x00x00x02xcd?x1dx00x04x00xb2xcexcbxa2x1bx02x00x00`x9exc1?x1ax03x00x00x95x9fxefxa2x1bx02x00x00xe0xa4x8c?)x02x02x005x86xfaxa2x1bx02x00x00 x86xc8?x98x03x02x00bHx12xa3x1bx02x00x00xf0x1bx1e@x8cx02x02x00xa6xfax1bxa3x1bx02x00x00xe0n",
b'xaf?x1dx00x04x00xfbxcd3xa3x1bx02x00x00`xd2xde?x84x03x00x00x81xcaQxa3x1bx02x00x00 xc0xdb?8x01x02x00tx01x9axa3x1bx02x00x00x803xf8?#x01x02x00@xdbxa8xa3x1bx02x00x00@kx02@x84x03x00x00rx8e&xa4x1bx02x00x00xe0x96xc8?x1dx01x02x00xa3x05?xa4x1bx02x00x00xa0vxd2?x1dx00x04x00Exc3x8cxa4x1bx02x00x000#x0c@x02x01x02x00xf3nx9fxa4x1bx02x00x00xf0x06x13@8x01x02x00xac<xc5xa4x1bx02x00x00x80Ax9f?x8ax03x00x00}xfcxe5xa4x1bx02x00x00x80x00xc4?x19x02x02x00x126xffxa4x1bx02x00x00 x1dxac?n']
数据帧的head(5):
col1      col2    col3
0  2317613314222  1.551823     556
1  2317614400012  1.206112  131609
2  2317615429391  1.022747  131888
3  2317621608598  2.082569  131643
4  2317622053589  1.260681     271

一个问题是spark.sparkContext.binaryFiles()意味着几个文件。在这种情况下,应该使用spark.sparkContext.binaryRecord()

对于我来说:

rdd = spark.sparkContext.binaryRecords(file,16)
def func1(x):
dt = numpy.dtype([('col1', numpy.int64), ('col2', numpy.float32), ('col3', numpy.int32)])
df = pandas.DataFrame(
numpy.frombuffer(x, dtype=dt),
columns=dt.names
)
return df.to_dict("index")[0]
result = rdd.map(func1)
columns = types.StructType([
types.StructField('col1', types.LongType(), True),
types.StructField('col2', types.FloatType(), True),
types.StructField('col3', types.IntegerType(), True)
])
df = result.toDF(schema=columns)

模块struct也可以用来代替numpy

最新更新