如何将Python bytestring存储在Spark DataFrame中



我正在看一个谜。我在RDD中有一堆长文档作为Python Bytestrings(b"I'm a byte string")。现在,我将此RDD转换为DataFrame将其连接到另一个DataFrame。我这样做是这样的:

Data_RDD = Paths_RDD.map(open_paths).flatMap(split_files)
Data_schema = StructType([
    StructField("URI", StringType(), True),
    StructField("Content", StringType(), True),
])
Data_DF = sqlContext.createDataFrame(Data_RDD, schema=Data_schema)
print(Data_DF.show(5))
+--------------------+-----------+
|                 URI|    Content|
+--------------------+-----------+
|http://01storytel...|[B@10628e42|
|http://05yxgs.com...|[B@36699775|
|http://1.lhcmaima...|[B@4e569e3b|
|http://100100.ove...|[B@18ae5bab|
|http://1015theriv...|[B@5f044435|
+--------------------+-----------+
only showing top 5 rows

这些简短的"[B@10628e42"字符串对我来说似乎没有用,可能是某种指针。在RDD中,字节仍然是"完整"的,因为我仍然可以访问它们。因此,在从RDD到DataFrame的转换中,问题发生了。现在,我试图将其副本存储在具有其他类型的字段中,即ByteType()BinaryType()。两者都无法正常工作,因为这些错误消息不接受:

TypeError: ByteType can not accept object b'some string' in type <class 'bytes'>
TypeError: BinaryType can not accept object b'some string' in type <class 'bytes'>

,但它甚至变得奇怪。当我设置一个小规模实验时:

ByteStrings = [b'one',b'two',b'three']
rdd_ByteStrings = sc.parallelize(ByteStrings)
print(rdd_ByteStrings.take(3))
DF2_schema = StructType([
    StructField("ByteString", StringType(), True),
])
DF_ByteStrings = sqlContext.createDataFrame(rdd_ByteStrings,schema=DF2_schema)
print(DF_ByteStrings.show())

也不允许像在弦类型列中那样允许小的字节。当我尝试运行此错误时,我会收到此错误消息:

StructType can not accept object b'one' in type <class 'bytes'>

当我尝试让Spark推断出一种类型时,此消息也会失败:

TypeError: Can not infer schema for type: <class 'bytes'>

因此,任何想法我如何将字节存储在DataFrame中,而无需.decode()。这是我只有在将两个DataFrames加入两个CC_10之后才能做的,因为另一个人持有解码信息。

我使用Python 3.5和Spark 2.0.1

预先感谢!

这不是一个谜。逐步:

  • Spark使用黄线石在Python和Java类型之间转换。
  • bytes的Java类型是byte[],等效于Scala中的Array[Byte]
  • 您将列定义为StringType,因此Array[Byte]将在存储在DataFrame中之前将CC_15转换为String
  • Arrays在Scala中是丑陋的Java伪像,除其他问题外,没有有用的toString方法:

    Array(192, 168, 1, 1).map(_.toByte)
    
    Array[Byte] = Array(-64, -88, 1, 1)
    
    Array(192, 168, 1, 1).map(_.toByte).toString
    
    String = [B@6c9fe061
    

    这就是您获得列的内容的方式。

Spark SQL中没有直接映射到Python bytes的类型。我个人会加入join RDD

from collections import namedtuple
Record = namedtuple("Record", ["url", "content"])
rdd = sc.parallelize([Record("none://", b"foo"), Record("none://", b"bar")])
df = rdd.map(lambda rec: Record(rec.url, bytearray(rec.content))).toDF()
df.printSchema()
root
 |-- url: string (nullable = true)
 |-- content: binary (nullable = true)

它不会给您可以在本地使用(JVM)或有意义的字符串表示形式:

+-------+----------+
|    url|   content|
+-------+----------+
|none://|[66 6F 6F]|
|none://|[62 61 72]|
+-------+----------+

但无损:

df.rdd.map(lambda row: bytes(row.content)).first()
b'foo'

可以在Python udf中访问:

from pyspark.sql.functions import udf
from pyspark.sql import Column
from typing import Union
def decode(col: Union[str, Column], enc: str="utf-8") -> Column:
    def decode_(bs: Union[bytearray, None]) -> Union[str, None]:
        if bs is not None:
            return bytes(bs).decode(enc)
        except UnicodeDecodeError:
            pass 
    return udf(decode_)(col)
df.withColumn("decoded", decode("content")).show()
+-------+----------+-------+
|    url|   content|decoded|
+-------+----------+-------+
|none://|[66 6F 6F]|    foo|
|none://|[62 61 72]|    bar|
+-------+----------+-------+

相关内容

  • 没有找到相关文章

最新更新