使用Pyspark从Cassandra数据库读取数据
包:
from pyspark.ml.feature import SQLTransformer
from transform.Base import Transform
我已经加载了如下所示的数据
+----+--------------------+-------+---+
|time| MEM UTI PERC % |devId |Lid|
+----+--------------------+-------+---+
| 482| 8.661052632| 6| 20|
| 654| 9.162190612| 6| 20|
| 364| 8.219230769| 6| 20|
当我应用SQLTransform时,哪个SQL语句是
self.sqlstatement = "SELECT Time,MEM UTI PERC % FROM __THIS__ WHERE "
sqltrans = SQLTransformer()
sqltrans.setStatement(self.sqlstatement)
new_df = sqltrans.transform(sparkdf)
抛出错误
mismatched input 'UTI' expecting {<EOF>, ';'}(line 1, pos 19)
因此,我修改了SQL语句,将分隔的列包装在双引号/单引号内,如下所示
SELECT Time,"MEM UTI PERC %" FROM __THIS__ WHERE
这一次,转换器不会抛出异常,而是用相同的列名替换该间隔列的所有值,如
下面+----+--------------+
|Time|MEM UTI PERC %|
+----+--------------+
| 212|MEM UTI PERC %|
| 26|MEM UTI PERC %|
我想正确地获取数据,如
+----+--------------+
|Time|MEM UTI PERC %|
+----+--------------+
| 212|20.7 |
| 26|40.0 |
试着像这样用单引号括起列名,看看是否有效:
self.sqlstatement = "SELECT Time,'MEM UTI PERC %' FROM __THIS__ WHERE "
或者,您可能需要这样转义引号:
self.sqlstatement = "SELECT Time,"MEM UTI PERC %" FROM __THIS__ WHERE "
看哪一个有效。干杯!