我正在使用Impala中的内置功能:
select id, parse_url(my_table.url, "QUERY", "extensionId") from my_table
现在我要迁移到SparkSQL(在Jupyter笔记本中使用Pyspark):
my_table.select(my_table.id.cast('string'), parse_url(my_table.url.cast('string'), "QUERY", "extensionId")).show()
但是,我有以下错误:
NameError: name 'parse_url' is not defined
还尝试了下面的尝试:
my_table.registerTempTable("my_table")
sqlContext.sql("select id, url, parse_url(url, 'QUERY', 'extensionId') as new_url from my_table").show(100)
,但所有的new_url
变为null
。
知道我在这里错过了什么吗?另外,人们将如何处理这样的问题?谢谢!
一些丢失的部分:
- 您不能用Spark执行Impala功能。
- 有一个具有相同名称和语法的Hive UDF,可以与Spark一起使用,但没有本机实现和功能包装器。这就是为什么可以使用
HiveContext
/SparkSession
并获得HIVE支持的SQL调用它的原因。
通常,它应该很好地工作:
spark.sql("""SELECT parse_url(
'http://example.com?extensionId=foo', 'QUERY', 'extensionId'
)""").show()
+-----------------------------------------------------------------+
|parse_url(http://example.com?extensionId=foo, QUERY, extensionId)|
+-----------------------------------------------------------------+
| foo|
+-----------------------------------------------------------------+
和 NULL
输出表示给定零件无法匹配:
spark.sql("""SELECT parse_url(
'http://example.com?bar=foo', 'QUERY', 'extensionId'
)""").show()
+---------------------------------------------------------+
|parse_url(http://example.com?bar=foo, QUERY, extensionId)|
+---------------------------------------------------------+
| null|
+---------------------------------------------------------+
您可以使用UDF获得类似的结果,但它会明显慢。
from typing import Dict
from urllib.parse import parse_qsl, urlsplit
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, MapType
def parse_args(col: str) -> Dict[str, str]:
"""
http://stackoverflow.com/a/21584580/6910411
"""
try:
return dict(parse_qsl(urlsplit(col).query))
except:
pass
parse_args_ = udf(parse_args, MapType(StringType(), StringType()))
数据定义为:
df = sc.parallelize([
("http://example.com?bar=foo", ),
("http://example.com?extensionId=foo", ),
]).toDF(["url"])
可以用如下:
df.select(parse_args_("url")["extensionId"]).show()
结果是:
+----------------------------+
|parse_args(url)[extensionId]|
+----------------------------+
| null|
| foo|
+----------------------------+