在Pyspark中解码字符串URL列



我正在使用Python 2.7和Spark 2.2.0。我在Pyspark中创建一个具有字符串列类型并包含URL的数据框架。

df = spark.createDataFrame([('example.com?title=%D0%BF%D1%80%D0%B0%D0%B2%D0%BE%D0%B2%D0%B0%D1%8F+%D0%B7%D0%B0%D1%89%D0%B8%D1%82%D0%B0',)], ['url'])
df.show(1, False)
+-------------------------------------------------------------------------------------------------------+
|url                                                                                                    |
+-------------------------------------------------------------------------------------------------------+
|example.com?title=%D0%BF%D1%80%D0%B0%D0%B2%D0%BE%D0%B2%D0%B0%D1%8F+%D0%B7%D0%B0%D1%89%D0%B8%D1%82%D0%B0|
+-------------------------------------------------------------------------------------------------------+

因此,要解码列中的所有URL,我尝试使用Urllib,然后使用它创建一个UDF,例如

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
decode_url = udf(lambda val: (urllib.unquote(val).decode('utf8'), StringType()))

然后在我的列上应用UDF后,我期望像这样的数据

+---------------------------------+
|url                              |
+---------------------------------+
|example.com?title=правовая+защита|
+---------------------------------+

,但我得到这样的错误

UnicodeEncodeError: 'ascii' codec can't encode characters in position 18-33: ordinal not in range(128)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

但是,如果我手动从列中取出一些URL并尝试解码它的工作正常

import urllib 
url='example.com?title=%D0%BF%D1%80%D0%B0%D0%B2%D0%BE%D0%B2%D0%B0%D1%8F+%D0%B7%D0%B0%D1%89%D0%B8%D1%82%D0%B0'
print urllib.unquote(url).decode('utf8')
example.com?title=правовая+защита

似乎在引擎盖下有一些奇怪的编码。您为什么不明确编码它?

>>> decode_udf= udf(lambda val: urllib.unquote(val.encode('utf-8')).decode('utf-8'), StringType())
>>> df.withColumn('decoded_url', decode_udf('url')).show(truncate=False)
+-------------------------------------------------------------------------------------------------------+---------------------------------+
|url                                                                                                    |decoded_url                      |
+-------------------------------------------------------------------------------------------------------+---------------------------------+
|example.com?title=%D0%BF%D1%80%D0%B0%D0%B2%D0%BE%D0%B2%D0%B0%D1%8F+%D0%B7%D0%B0%D1%89%D0%B8%D1%82%D0%B0|example.com?title=правовая+защита|
+-------------------------------------------------------------------------------------------------------+---------------------------------+
``

如果您想在Spark SQL中执行此操作,则可以这样做:

注意:编码字段名称为dctr

输入: im_segments%3Debejz4gv%2Ck1GmZLwg%2C8zY92P4g%2Cka6ee4eb%2CgPKlZXXb%2CqkVvpGk9%2Cky1ee4Dk%2CgvqKoW0b%2CgO5l6Zrk%2CgO5lGpdk%2CxkD6AYgm%2CgO5rENWk%2Cg7VrxvDb

预期输出: im_segments=ebejz4gv,k1GmZLwg,8zY92P4g,ka6ee4eb,gPKlZXXb,qkVvpGk9,ky1ee4Dk,gvqKoW0b,gO5l6Zrk,gO5lGpdk,xkD6AYgm,gO5rENWk,g7VrxvDb

答案:

select distinct reflect('java.net.URLDecoder','decode', dctr , 'utf-8') as dctr from table

相关内容

  • 没有找到相关文章

最新更新