PySpark在一个目录中执行所有测试用例



我正试图开发一个脚本,将运行所有的火花sql查询保存在一个目录。我已经能够在Python中做到这一点,但pyspark是一个不同的游戏。下面是我用来读取和执行目录中所有查询文件的python脚本。

导入sys,csv,sqlite3,codecs,unicodedata, string,glob, os,c康涅狄格州= psycopg2。连接(数据库="xxx",用户="xxxx",密码="xxxx",Host ="localhost", port="5432") cur = conn.cursor() print("done")

with open("*.txt", "r") as ins: for line in ins:
Words =line.split('|') print(Words) query= Words [0]
pmicode =话说[1]打印(查询)Cur = conn.cursor()Cur.execute (query) conn.commit() conn.close()

可以在PySpark中复制这个吗?

谢谢,Pankaj

我猜你想要pyspark从你在这个python脚本中使用的postgres数据库中提取数据。

如果Python中的当前代码类似于:

import sys, csv, sqlite3, codecs, unicodedata, string, glob, os
conn = psycopg2.connect(database="xxx", user="xxxx", password="xxxx", host="localhost", port="5432")
cur = conn.cursor()
print("done")
def runSQL(query):
    cur = conn.cursor()
    cur.execute(query)
    conn.commit()
with open("*.txt", "r") as ins:
    for line in ins:
        words = line.split('|')
        print(words)
        query = words[0]
        pmicode = words[1]
        print(query)
conn.close()

相当于使用JDBC连接和执行带有sqlContext的命令:

import sys, csv, sqlite3, codecs, unicodedata, string, glob, os
postgres_url = 'jdbc:postgresql://localhost:5432/database'
properties = {"user": "xxxx", "password": "xxxx"}
print("done")
def runSQL(query):
    return sqlContext.read.jdbc(
        url=postgres_url,
        table="( {0} ) TEMPDB_SPARK_DELINQ".format(query)
with open("*.txt", "r") as ins:
    for line in ins:
        words = line.split('|')
        print(words)
        query = words[0]
        pmicode = words[1]
        print(query)
        runSQL(query)

相关内容

  • 没有找到相关文章

最新更新