当通过Paramiko从hive导出数据时,我有一个问题。通常,我在同一台服务器
上使用以下代码替换坏行错误with open('xxx.tsv', 'r') as temp_f:
# get No of columns in each line
col_count = [ len(l.split(",")) for l in temp_f.readlines() ]
### Generate column names (names will be 0, 1, 2, ..., maximum columns - 1)
column_names = [i for i in range(0, max(col_count))]
### Read csv
df2 = pd.read_csv('xxx.tsv', header=None,
delimiter="t", names=column_names)
df2 = df2.rename(columns=df2.iloc[0]).drop(df2.index[0])
df2 = df2[['content_id', 'title','product_id', 'type', 'episode_total','template_model','tags_name','grade','isdeleted' ,'actor']]
现在我想做的是如何将上面的代码与我的代码结合起来,像这样
import paramiko
import traceback
from io import StringIO
import pandas as pd
host = 'xxxx'
conn_obj = paramiko.SSHClient()
conn_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())
conn_obj.connect(host, username="xxxx",
password='xxxx')# insert username and password
query='"select content_id as content_id, title as title,product_id as product_id, type as type, episode_total as episode_total, template_model as template_model, tags_name as tags_name,grade as grade, isdeleted as isdeleted, actor as actor from aaa.aaa;"'
hive_query = 'beeline xxxx --outputformat=tsv2 -e '+ query
print(hive_query)
std_in, std_out, std_err = conn_obj.exec_command(hive_query)
edge_out_str = str(std_out.read())
edge_out_str_n = "n".join(edge_out_str.split("\n"))
edge_out_csv = StringIO(edge_out_str_n)
with open(edge_out_csv) as temp_f:
#get No of columns in each line
col_count = [ len(l.split(",")) for l in temp_f.readlines() ]
### Generate column names (names will be 0, 1, 2, ..., maximum columns - 1)
column_names = [i for i in range(0, max(col_count))]
### Read csv
df2 = pd.read_csv(temp_f, header=None, delimiter="t", names=column_names)
df2 = df2.rename(columns=df2.iloc[0]).drop(df2.index[0])
df2 = df2[['content_id', 'title','product_id', 'type', 'episode_total', 'template_model', 'tags_name','grade','isdeleted' ,'actor']]
conn_obj.close()
当我执行脚本时,我得到如下错误
Error :Traceback (most recent call last):
File "<ipython-input-13-360c6dba28e1>", line 21
with open(edge_out_csv) as temp_f:
TypeError: expected str, bytes or os.PathLike object, not _io.StringIO
StringIO
已经是一个类文件对象。所以你用它来代替temp_f
文件:
with StringIO(edge_out_str_n) as edge_out_csv:
# get No of columns in each line
col_count = [ len(l.split(",")) for l in edge_out_csv.readlines() ]
### Generate column names (names will be 0, 1, 2, ..., maximum columns - 1)
column_names = [i for i in range(0, max(col_count))]
# Seek back to the beginning
edge_out_csv.seek(0)
### Read csv
df2 = pd.read_csv(temp_f, header=None, delimiter="t", names=column_names)
强制性警告:不要使用AutoAddPolicy
-这样做会失去对MITM攻击的保护。有关正确的解决方案,请参阅Paramiko "Unknown Server">。