我只能执行存储过程,但不能得到它返回的内容。
它只返回值true。
调用存储过程。imagendel存储过程
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
print('Trying to connect to DB')
source_jdbc_conf = glueContext.extract_jdbc_conf('sgc_con')
conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
print('Trying to connect to DB success!')
print(conn.getMetaData())
print('prepareCall')
statement = "EXEC MNA.dbo.zz_MNAvArticulosListar"
exec_statement = conn.prepareCall(statement)
print('execute')
rs = exec_statement.execute()
print(rs) #true
exec_statement.close()
到目前为止我只能连接到SQL Server并执行存储过程
固定,我可以得到SQL Server存储过程返回的数据,从执行AWS胶水。
我将继续在不写列名的情况下离开工作,以便动态获得它们。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import *
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
#Creates Empty RDD
emptyRDD = spark.sparkContext.emptyRDD()
print(emptyRDD)
# Defining the schema of the DataFrame
schema = StructType([
StructField('Id', StringType(), True),
StructField('IdArticulo', StringType(), True),
StructField('Referencia', StringType(), True),
StructField('Descripcion', StringType(), True)
])
#Create empty DataFrame from empty RDD
df = spark.createDataFrame(emptyRDD,schema)
df.printSchema()
#Create list columns list
columns = ['Id', 'IdArticulo', 'Referencia', 'Descripcion']
print('Trying to connect to DB')
source_jdbc_conf = glueContext.extract_jdbc_conf('sgc_con')
con = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
print('Trying to connect to DB success!')
print(con.getMetaData())
print('prepareCall')
stmt = con.prepareStatement("EXEC MNA.dbo.zz_MNAvArticulosListar");
#Execute query - stored procedure
rs = stmt.executeQuery();
print(rs.getMetaData().getColumnCount())
#get columns name
columnsNamme = []
i = 0;
for i in range(rs.getMetaData().getColumnCount()):
columnsNamme.append(rs.getMetaData().getColumnName(i+1))
print(columnsNamme)
#get data result from stored procedure
data = []
while rs.next():
data.append((rs.getString("Id"),rs.getString("IdArticulo"),rs.getString("Referencia"),rs.getString("Descripcion")))
print(data)
#Create final dataframe
second_df = spark.createDataFrame(data, columns)
second_df.show()
#pending obtain columns and data dynamically, to generate a generic job.
rs.close()
stmt.close()
输出数据帧图像