我是PySpark的新手,我想将以下python脚本翻译成PySpark:
api_param_df = pd.DataFrame([[row[0][0], np.nan] if row[0][1] == '' else row[0] for row in http_path.values], columns=["api", "param"])
df = pd.concat([df['raw'], api_param_df], axis=1)
但是我面临以下错误,错误trackback如下:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-18-df055fb7d6a1> in <module>()
21 # Notice we also make ? and the second capture group optional so that when there are no query parameters in http path, it returns NaN.
22
---> 23 api_param_df = pd.DataFrame([[row[0][0], np.nan] if row[0][1] == '' else row[0] for row in http_path.values], columns=["api", "param"])
24 df = pd.concat([df['raw'], api_param_df], axis=1)
25
/usr/local/lib/python3.7/dist-packages/pyspark/sql/dataframe.py in __getattr__(self, name)
1642 if name not in self.columns:
1643 raise AttributeError(
-> 1644 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
1645 jc = self._jdf.apply(name)
1646 return Column(jc)
AttributeError: 'DataFrame' object has no attribute 'values'
全文如下,并注释了使用regex
应用于df
中的某列http_path
来解析api
和param
并将它们再次合并/连接到df的解释。
#Extract features from http_path ["API URL", "URL parameters"]
regex = r'([^?]+)?*(.*)'
http_path = df.filter(df['http_path'].rlike(regex))
# http_path
#0 https://example.org/path/to/file?param=42#frag...
#1 https://example.org/path/to/file
# api param
#0 https://example.org/path/to/file param=42#fragment
#1 https://example.org/path/to/file NaN
#where in regex pattern:
#- (?:https?://[^/]+/)? optionally matches domain but doesn't capture it
#- (?P<api>[^?]+) matches everything up to ?
#- ? matches ? literally
#- (?P<param>.+) matches everything after ?
# Notice we also make ? and the second capture group optional so that when there are no query parameters in http_path, it returns NaN.
api_param_df = pd.DataFrame([[row[0][0], np.nan] if row[0][1] == '' else row[0] for row in http_path.values], columns=["api", "param"])
df = pd.concat([df['raw'], api_param_df], axis=1)
df
任何帮助都将不胜感激。
该语法对Pandas dataframe有效,但对于PySpark创建的dataframe不存在该属性。您可以查看此链接以获取文档。
通常,collect()
方法或.rdd
属性将帮助您完成这些任务。
您可以使用下面的代码片段来生成所需的结果:
http_path = sdf.rdd.map(lambda row: row['http_path'].split('?'))
api_param_df = pd.DataFrame([[row[0], np.nan] if len(row) == 1 else row for row in http_path.collect()], columns=["api", "param"])
sdf = pd.concat([sdf.toPandas()['raw'], api_param_df], axis=1)
请注意,我删除了注释以使其更易于阅读,并且我还用简单的分割替换了regex
。