我正试图开发一个脚本,将运行所有的火花sql查询保存在一个目录。我已经能够在Python中做到这一点,但pyspark是一个不同的游戏。下面是我用来读取和执行目录中所有查询文件的python脚本。
导入sys,csv,sqlite3,codecs,unicodedata, string,glob, os,c康涅狄格州= psycopg2。连接(数据库="xxx",用户="xxxx",密码="xxxx",Host ="localhost", port="5432") cur = conn.cursor() print("done")
with open("*.txt", "r") as ins: for line in ins:
Words =line.split('|') print(Words) query= Words [0]
pmicode =话说[1]打印(查询)Cur = conn.cursor()Cur.execute (query) conn.commit() conn.close()
可以在PySpark中复制这个吗?
谢谢,Pankaj
我猜你想要pyspark从你在这个python脚本中使用的postgres数据库中提取数据。
如果Python中的当前代码类似于:
import sys, csv, sqlite3, codecs, unicodedata, string, glob, os
conn = psycopg2.connect(database="xxx", user="xxxx", password="xxxx", host="localhost", port="5432")
cur = conn.cursor()
print("done")
def runSQL(query):
cur = conn.cursor()
cur.execute(query)
conn.commit()
with open("*.txt", "r") as ins:
for line in ins:
words = line.split('|')
print(words)
query = words[0]
pmicode = words[1]
print(query)
conn.close()
相当于使用JDBC连接和执行带有sqlContext的命令:
import sys, csv, sqlite3, codecs, unicodedata, string, glob, os
postgres_url = 'jdbc:postgresql://localhost:5432/database'
properties = {"user": "xxxx", "password": "xxxx"}
print("done")
def runSQL(query):
return sqlContext.read.jdbc(
url=postgres_url,
table="( {0} ) TEMPDB_SPARK_DELINQ".format(query)
with open("*.txt", "r") as ins:
for line in ins:
words = line.split('|')
print(words)
query = words[0]
pmicode = words[1]
print(query)
runSQL(query)