我正在看一个谜。我在RDD中有一堆长文档作为Python Bytestrings(b"I'm a byte string"
)。现在,我将此RDD转换为DataFrame
将其连接到另一个DataFrame
。我这样做是这样的:
Data_RDD = Paths_RDD.map(open_paths).flatMap(split_files)
Data_schema = StructType([
StructField("URI", StringType(), True),
StructField("Content", StringType(), True),
])
Data_DF = sqlContext.createDataFrame(Data_RDD, schema=Data_schema)
print(Data_DF.show(5))
+--------------------+-----------+
| URI| Content|
+--------------------+-----------+
|http://01storytel...|[B@10628e42|
|http://05yxgs.com...|[B@36699775|
|http://1.lhcmaima...|[B@4e569e3b|
|http://100100.ove...|[B@18ae5bab|
|http://1015theriv...|[B@5f044435|
+--------------------+-----------+
only showing top 5 rows
这些简短的"[B@10628e42"
字符串对我来说似乎没有用,可能是某种指针。在RDD中,字节仍然是"完整"的,因为我仍然可以访问它们。因此,在从RDD到DataFrame
的转换中,问题发生了。现在,我试图将其副本存储在具有其他类型的字段中,即ByteType()
和BinaryType()
。两者都无法正常工作,因为这些错误消息不接受:
TypeError: ByteType can not accept object b'some string' in type <class 'bytes'>
TypeError: BinaryType can not accept object b'some string' in type <class 'bytes'>
,但它甚至变得奇怪。当我设置一个小规模实验时:
ByteStrings = [b'one',b'two',b'three']
rdd_ByteStrings = sc.parallelize(ByteStrings)
print(rdd_ByteStrings.take(3))
DF2_schema = StructType([
StructField("ByteString", StringType(), True),
])
DF_ByteStrings = sqlContext.createDataFrame(rdd_ByteStrings,schema=DF2_schema)
print(DF_ByteStrings.show())
也不允许像在弦类型列中那样允许小的字节。当我尝试运行此错误时,我会收到此错误消息:
StructType can not accept object b'one' in type <class 'bytes'>
当我尝试让Spark推断出一种类型时,此消息也会失败:
TypeError: Can not infer schema for type: <class 'bytes'>
因此,任何想法我如何将字节存储在DataFrame
中,而无需.decode()
。这是我只有在将两个DataFrames
加入两个CC_10之后才能做的,因为另一个人持有解码信息。
我使用Python 3.5和Spark 2.0.1
预先感谢!
这不是一个谜。逐步:
- Spark使用黄线石在Python和Java类型之间转换。
-
bytes
的Java类型是byte[]
,等效于Scala中的Array[Byte]
。 - 您将列定义为
StringType
,因此Array[Byte]
将在存储在DataFrame
中之前将CC_15转换为String
。 -
Arrays
在Scala中是丑陋的Java伪像,除其他问题外,没有有用的toString
方法:Array(192, 168, 1, 1).map(_.toByte)
Array[Byte] = Array(-64, -88, 1, 1)
Array(192, 168, 1, 1).map(_.toByte).toString
String = [B@6c9fe061
这就是您获得列的内容的方式。
Spark SQL中没有直接映射到Python bytes
的类型。我个人会加入join
RDD
from collections import namedtuple
Record = namedtuple("Record", ["url", "content"])
rdd = sc.parallelize([Record("none://", b"foo"), Record("none://", b"bar")])
df = rdd.map(lambda rec: Record(rec.url, bytearray(rec.content))).toDF()
df.printSchema()
root
|-- url: string (nullable = true)
|-- content: binary (nullable = true)
它不会给您可以在本地使用(JVM)或有意义的字符串表示形式:
+-------+----------+
| url| content|
+-------+----------+
|none://|[66 6F 6F]|
|none://|[62 61 72]|
+-------+----------+
但无损:
df.rdd.map(lambda row: bytes(row.content)).first()
b'foo'
可以在Python udf
中访问:
from pyspark.sql.functions import udf
from pyspark.sql import Column
from typing import Union
def decode(col: Union[str, Column], enc: str="utf-8") -> Column:
def decode_(bs: Union[bytearray, None]) -> Union[str, None]:
if bs is not None:
return bytes(bs).decode(enc)
except UnicodeDecodeError:
pass
return udf(decode_)(col)
df.withColumn("decoded", decode("content")).show()
+-------+----------+-------+
| url| content|decoded|
+-------+----------+-------+
|none://|[66 6F 6F]| foo|
|none://|[62 61 72]| bar|
+-------+----------+-------+