使用dask连接到aws红移错误



我使用dask连接到AWS Redshift并查询db。当我试图将连接字符串传递给read_sql_query方法时,我遇到了一个错误。

config.py

import os
# Create dictionary of environment variables
env_var_dict = {
# Amazon Redshift
'host':'xxxx.us-east-1.redshift.amazonaws.com',
'database':'db1',
'port':'5439',
'user':'user1',
'password':'xxxxx'
}

# set environment variables
os.environ['host'] = env_var_dict['host']
os.environ['database'] = env_var_dict['database']
os.environ['port'] = env_var_dict['port']
os.environ['user'] = env_var_dict['user']
os.environ['password'] = env_var_dict['password']


# connect to aws redshift cluster
import redshift_connector
conn = redshift_connector.connect(

host=os.environ['host'],
database=os.environ['database'],
port=int(os.environ['port']),
user=os.environ['user'],
password=os.environ['password']

)
import sqlalchemy as sa
host=os.environ['host'],
database=os.environ['database'],
port=int(os.environ['port']),
user=os.environ['user'],
password=os.environ['password']
conn_str = f'redshift+redshift_connector://{user}:{password}@{host}:{port}/{database}'
# dask
import dask.dataframe as dd
"redshift+redshift_connector://('user',):pwd@hostname,):('5439',)/('tracking',)"
# Query table using dask dataframe
query = '''
SELECT * 
FROM tbl
WHERE type = 'xxx'
and created_at >= '2023-01-01 00:00:00'
and created_at <= '2023-12-01 00:00:00'
'''
df = dd.read_sql_query(query, conn_str, index_col = 'id')
ValueError: invalid literal for int() with base 10: "('5439',)"
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File <command-2539550446659032>:10
1 # Query table using dask dataframe
2 query = '''
3         SELECT * 
4         FROM pmf
(...)
7         and created_at <= '2023-12-01 00:00:00'
8         '''
---> 10 df = dd.read_sql_query(query, conn_str, index_col = 'id')
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.9/site-packages/dask/dataframe/io/sql.py:107, in read_sql_query(sql, con, index_col, divisions, npartitions, limits, bytes_per_chunk, head_rows, meta, engine_kwargs, **kwargs)
104     raise TypeError("Must supply either 'divisions' or 'npartitions', not both")
106 engine_kwargs = {} if engine_kwargs is None else engine_kwargs
--> 107 engine = sa.create_engine(con, **engine_kwargs)

我试图通过port作为intstr。如何连接到aws红移并在任务中运行查询?

此连接字符串

"redshift+redshift_connector://('user',):pwd@hostname,):('5439',)/('tracking',)"

确实看起来不对!也许应该是

"redshift+redshift_connector://user:pwd@hostname:5439/tracking"

似乎所有传入f-string的变量都是元组,而不是简单的值(str/int)。由于您没有说明如何指定这些值,因此很难提供帮助,但它可能像代码中散落的逗号一样简单。

特别:改变

host=os.environ['host'],
database=os.environ['database'],
port=int(os.environ['port']),
user=os.environ['user'],

host=os.environ['host']
database=os.environ['database']
port=int(os.environ['port'])
user=os.environ['user']