从Impala迁移到SparkSQL时,Impala内置功能不可用



我正在使用Impala中的内置功能:

select id, parse_url(my_table.url, "QUERY", "extensionId") from my_table

现在我要迁移到SparkSQL(在Jupyter笔记本中使用Pyspark):

my_table.select(my_table.id.cast('string'), parse_url(my_table.url.cast('string'), "QUERY", "extensionId")).show()

但是,我有以下错误:

NameError: name 'parse_url' is not defined

还尝试了下面的尝试:

my_table.registerTempTable("my_table")
sqlContext.sql("select id, url, parse_url(url, 'QUERY', 'extensionId') as new_url from my_table").show(100)

,但所有的new_url变为null

知道我在这里错过了什么吗?另外,人们将如何处理这样的问题?谢谢!

一些丢失的部分:

  • 您不能用Spark执行Impala功能。
  • 有一个具有相同名称和语法的Hive UDF,可以与Spark一起使用,但没有本机实现和功能包装器。这就是为什么可以使用HiveContext/SparkSession并获得HIVE支持的SQL调用它的原因。

通常,它应该很好地工作:

spark.sql("""SELECT parse_url(
    'http://example.com?extensionId=foo', 'QUERY', 'extensionId'
)""").show()
+-----------------------------------------------------------------+
|parse_url(http://example.com?extensionId=foo, QUERY, extensionId)|
+-----------------------------------------------------------------+
|                                                              foo|
+-----------------------------------------------------------------+

NULL输出表示给定零件无法匹配:

spark.sql("""SELECT parse_url(
    'http://example.com?bar=foo', 'QUERY', 'extensionId'
)""").show()
+---------------------------------------------------------+
|parse_url(http://example.com?bar=foo, QUERY, extensionId)|
+---------------------------------------------------------+
|                                                     null|
+---------------------------------------------------------+

您可以使用UDF获得类似的结果,但它会明显慢。

from typing import Dict
from urllib.parse import parse_qsl, urlsplit
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, MapType
def parse_args(col: str) -> Dict[str, str]:
    """
    http://stackoverflow.com/a/21584580/6910411
    """
    try:
        return dict(parse_qsl(urlsplit(col).query))
    except:
        pass
parse_args_ = udf(parse_args, MapType(StringType(), StringType()))

数据定义为:

df = sc.parallelize([
    ("http://example.com?bar=foo", ),
    ("http://example.com?extensionId=foo", ),
]).toDF(["url"])

可以用如下:

df.select(parse_args_("url")["extensionId"]).show()

结果是:

+----------------------------+
|parse_args(url)[extensionId]|
+----------------------------+
|                        null|
|                         foo|
+----------------------------+

相关内容

  • 没有找到相关文章

最新更新