输出pyodbc游标结果为python字典



如何将pyodbc游标输出(来自.fetchone, .fetchmany.fetchall)序列化为Python字典?

我使用的是bottlepy,需要返回dict,所以它可以返回JSON

如果您事先不知道列,可以使用Cursor.description构建列名列表,并对每行进行压缩以生成字典列表。本例假设建立了连接和查询:

>>> cursor = connection.cursor().execute(sql)
>>> columns = [column[0] for column in cursor.description]
>>> print(columns)
['name', 'create_date']
>>> results = []
>>> for row in cursor.fetchall():
...     results.append(dict(zip(columns, row)))
...
>>> print(results)
[{'create_date': datetime.datetime(2003, 4, 8, 9, 13, 36, 390000), 'name': u'master'},   
 {'create_date': datetime.datetime(2013, 1, 30, 12, 31, 40, 340000), 'name': u'tempdb'},
 {'create_date': datetime.datetime(2003, 4, 8, 9, 13, 36, 390000), 'name': u'model'},     
 {'create_date': datetime.datetime(2010, 4, 2, 17, 35, 8, 970000), 'name': u'msdb'}]

使用@Beargle的结果和bottlepy,我能够创建这个非常简洁的查询,暴露端点:

@route('/api/query/<query_str>')
def query(query_str):
    cursor.execute(query_str)
    return {'results':
            [dict(zip([column[0] for column in cursor.description], row))
             for row in cursor.fetchall()]}

对于游标不可用的情况—例如,当某些函数调用或内部方法返回行时,您仍然可以通过使用row.cursor_description

创建字典表示。
def row_to_dict(row):
    return dict(zip([t[0] for t in row.cursor_description], row))

这里有一个简短的版本,你可以使用

>>> cursor.select("<your SQL here>")
>>> single_row = dict(zip(zip(*cursor.description)[0], cursor.fetchone()))
>>> multiple_rows = [dict(zip(zip(*cursor.description)[0], row)) for row in cursor.fetchall()]

您可能已经意识到,当您将*添加到列表中时,您基本上剥离了列表,将单个列表条目作为您正在调用的函数的参数。通过使用zip,我们选择第1到n个条目并将它们拉在一起,就像裤子上的拉链一样。

使用

zip(*[(a,1,2),(b,1,2)])
# interpreted by python as zip((a,1,2),(b,1,2))

得到

[('a', 'b'), (1, 1), (2, 2)]

由于description是一个包含元组的元组,其中每个元组描述每个列的标题和数据类型,因此可以使用

提取每个元组中的第一个
>>> columns = zip(*cursor.description)[0]

等价于

>>> columns = [column[0] for column in cursor.description]

主要针对@Torxed响应,我创建了一个完整的通用函数集,用于在字典中查找模式和数据:

def schema_dict(cursor):
    cursor.execute("SELECT sys.objects.name, sys.columns.name FROM sys.objects INNER JOIN sys.columns ON sys.objects.object_id = sys.columns. object_id WHERE sys.objects.type = 'U';")
    schema = {}
    for it in cursor.fetchall():
        if it[0] not in schema:
            schema[it[0]]={'scheme':[]}
        else:
            schema[it[0]]['scheme'].append(it[1])
    return schema

def populate_dict(cursor, schema):
    for i in schema.keys():
        cursor.execute("select * from {table};".format(table=i))
        for row in cursor.fetchall():
            colindex = 0
            for col in schema[i]['scheme']:
                if not 'data' in schema[i]:
                    schema[i]['data']=[]
                schema[i]['data'].append(row[colindex])
                colindex += 1
    return schema
def database_to_dict():
    cursor = connect()
    schema = populate_dict(cursor, schema_dict(cursor))

可以随意使用代码高尔夫来减少行数;但与此同时,它是有效的!

我喜欢@bryan和@foo-stack答案。如果您正在使用postgresql并且正在使用psycopg2,则可以使用psycopg2中的一些功能,通过在从连接创建游标时将光标工厂指定为DictCursor来实现相同的功能,如下所示:

cur = conn.cursor( cursor_factory=psycopg2.extras.DictCursor )

现在你可以执行你的sql查询,你会得到一个字典来获取你的结果,而不需要手动映射它们。

cur.execute( sql_query )
results = cur.fetchall()
for row in results:
    print row['row_no']

请注意,你必须要import psycopg2.extras的工作

假设您知道列名!此外,这里有三种不同的解决方案,
你可能想看看最后一个!

colnames = ['city', 'area', 'street']
data = {}
counter = 0
for row in x.fetchall():
    if not counter in data:
        data[counter] = {}
    colcounter = 0
    for colname in colnames:
        data[counter][colname] = row[colcounter]
        colcounter += 1
    counter += 1

这是一个索引版本,不是最漂亮的解决方案,但它会工作。另一种方法是将列名索引为字典键,每个键中有一个按行号顺序包含数据的列表。通过:

colnames = ['city', 'area', 'street']
data = {}
for row in x.fetchall():
    colindex = 0
    for col in colnames:
        if not col in data:
            data[col] = []
        data[col].append(row[colindex])
        colindex += 1
写这篇文章,我明白做for col in colnames可以被for colindex in range(0, len())取代,但你明白了。当不是获取所有数据,而是一次获取一行数据时,后面的示例将非常有用,例如:

对每一行数据使用字典

def fetchone_dict(stuff):
    colnames = ['city', 'area', 'street']
    data = {}
    for colindex in range(0, colnames):
        data[colnames[colindex]] = stuff[colindex]
    return data
row = x.fetchone()
print fetchone_dict(row)['city']

获取表名(i think.)感谢Foo Stack):
一个更直接的解决方案从beargle下面!

cursor.execute("SELECT sys.objects.name, sys.columns.name FROM sys.objects INNER JOIN sys.columns ON sys.objects.object_id = sys.columns. object_id WHERE sys.objects.type = 'U';")
schema = {}
for it in cursor.fetchall():
    if it[0] in schema:
       schema[it[0]].append(it[1])
    else:
        schema[it[0]] = [it[1]]

步骤如下:

  1. 进口填词:
    from pandas import DataFrame
    import pyodbc
    import sqlalchemy
  • 从本地数据库获取结果:
  • db_file = r'xxx.accdb'
    odbc_conn_str = 'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=%s' % (db_file)
    conn = pyodbc.connect(odbc_conn_str)
    cur = conn.cursor() 
    qry = cur.execute("SELECT * FROM tbl")
    columns = [column[0] for column in cur.description]
    results = []
    for row in cur.fetchall():
     
       results.append(dict(zip(columns, row)))
    df = DataFrame(results) 
    df
    

    我所需要的,这与OP要求的略有不同:
    如果您想要完全泛化执行SQL Select查询的例程,但需要通过索引号而不是名称引用结果,则可以使用列表的列表而不是字典来实现这一点。

    返回的每一行数据在返回的列表中表示为字段(列)值的列表。
    列名可以作为返回列表的第一个条目提供,因此在调用例程中解析返回列表非常容易和灵活。
    通过这种方式,执行数据库调用的例程不需要知道它正在处理的数据的任何信息。下面是这样一个例程:

        def read_DB_Records(self, tablename, fieldlist, wherefield, wherevalue) -> list:
            DBfile = 'C:/DATA/MyDatabase.accdb'
            # this connection string is for Access 2007, 2010 or later .accdb files
            conn = pyodbc.connect(r'Driver={Microsoft Access Driver (*.mdb, *.accdb)};DBQ='+DBfile)
            cursor = conn.cursor()
            # Build the SQL Query string using the passed-in field list:
            SQL = "SELECT "
            for i in range(0, len(fieldlist)):
                SQL = SQL + "[" + fieldlist[i] + "]"
                if i < (len(fieldlist)-1):
                    SQL = SQL + ", "
            SQL = SQL + " FROM " + tablename
            # Support an optional WHERE clause:
            if wherefield != "" and wherevalue != "" :
                SQL = SQL + " WHERE [" + wherefield + "] = " + "'" + wherevalue + "';"
            results = []    # Create the results list object
            cursor.execute(SQL) # Execute the Query
            # (Optional) Get a list of the column names returned from the query:
            columns = [column[0] for column in cursor.description]
            results.append(columns) # append the column names to the return list
            # Now add each row as a list of column data to the results list
            for row in cursor.fetchall():   # iterate over the cursor
                results.append(list(row))   # add the row as a list to the list of lists
            cursor.close()  # close the cursor
            conn.close()    # close the DB connection
            return results  # return the list of lists
    

    我知道它很老,我只是在回顾别人已经说过的话。但我发现这种方法很简洁,因为它也可以安全注射。

    def to_dict(row):
        return dict(zip([t[0] for t in row.cursor_description], row))
    def query(cursor, query, params=[], cursor_func=to_dict):
        cursor.execute(query, params) 
        results = [cursor_func(row) for row in cursor.fetchall()]
        return results
    quotes = query(cursor, "select * from currency where abbreviation like ?", ["USD"])
    

    最新更新