我的python
代码如下所示,我从Redshift
卸载数据到Amazon S3
桶。我试图从Redshift
和S3 bucket
中获取行数,以确保加载所有数据。此外,我还想从S3桶获得最后上传日期,以便我知道上次卸载是何时执行的。请建议代码并说明
提前感谢您的时间和努力!
import csv
import redshift_connector
import sys
CSV_FILE="Tables.csv"
CSV_DELIMITER=';'
S3_DEST_PATH="s3://..../"
DB_HOST="MY HOST"
DB_PORT=1234
DB_DB="MYDB"
DB_USER="MY_READ"
DB_PASSWORD="MY_PSWD"
IM_ROLE="arn:aws:iam::/redshift-role/unload data","arn:aws::iam::/write in bucket"
def get_tables(path):
tables=[]
with open (path, 'r') as file:
csv_reader = csv.reader (file,delimiter=CSV_DELIMITER)
header = next(csv_reader)
if header != None:
for row in csv_reader:
tables.append(row)
return tables
def unload(conn, tables, s3_path):
cur = conn.cursor()
for table in tables:
print(f">{table[0]}.{table[1]}")
try:
query= f'''unload('select * from {table[0]}.{table[1]}' to '{s3_path}/{table[1]}/'
iam_role '{IAM_ROLE}'
CSV
PARALLEL FALSE
CLEANPATH;'''
print(f"loading in progress")
cur.execute(query)
print(f"Done.")
except Esception as e:
print("Failed to load")
print(str(e))
sys.exit(1)
cur.close()
def main():
try:
conn = redshift_connector.connect(
host=DB_HOST,
port=DB_PORT,
database= DB_DB,
user= DB_USER,
password=DB_PASSWORD
)
tables = get_tables(CSV_FILE)
unload(conn,tables,S3_DEST_PATH)
conn.close()
except Exception as e:
print(e)
sys.exit(1)
根据SO用户的注释更新代码
tables=['schema1.tablename','schema2.table2']
conn=redshift_connector.connect(
host='my_host',
port= "my_port",
database='my_db'
user="user"
password='password')
cur=conn.cursor()
cur.execute ('select count(*) from {',' .join("'"+y+"'" for y in tables)}')
results=cur.fetchall()
print("The table {} contained".format(tables[0]),*result[0],"rows"+"n" ) #Printing row counts along with table names
cur.close()
conn.close()
2日更新:
表= [' schema1.tablename ', ' schema2.table2 ']
conn=redshift_connector.connect(
host='my_host',
port= "my_port",
database='my_db'
user="user"
password='password')
cur=conn.cursor()
for table in tables:
cur.execute(f'select count(*) from {table};')
results=cur.fetchone()
for row in result:
print("The table {} contained".format(tables[0]),result[0],"rows"+"n" ) #Printing row counts along with table names
获取行数的简单查询是
query = "select count(*) from {table_name}"
对于红移,你所需要做的就是
cur.execute(query)
row_count = cur.fetchall()
使用boto3
,您可以使用类似的SQL查询来获取S3行数,如本答案所述。
编辑:
稍微更正你的更新方法:
cur=conn.cursor()
for table in tables:
cur.execute(f'select count(*) from {table};')
result=cur.fetchone()
count = result[0] if result else 0
print(f"The table {table} contained {count} rows.n" )