python SQL解析器来获取列名和数据类型



Im使用python SQL解析器来获取表信息。我可以得到表名和架构名。

import sqlparse
line = '''
CREATE TABLE public.actor (
actor_id integer DEFAULT nextval('public.actor_actor_id_seq'::regclass) NOT NULL,
first_name character varying(45) NOT NULL,
last_name character varying(45) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
);
CREATE TABLE public.category (
category_id integer DEFAULT nextval('public.category_category_id_seq'::regclass) NOT NULL,
name character varying(25) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
);
CREATE TABLE IF NOT EXISTS "sample_schema"."sample_table"
(
"div_cd" VARCHAR(2) NOT NULL
,"div_name" VARCHAR(30) NOT NULL
,"org_cd" VARCHAR(8) NOT NULL
,"org_name" VARCHAR(60) NOT NULL
,"team_cd" VARCHAR(2) NOT NULL
,"team_name" VARCHAR(120) NOT NULL
,"personal_cd" VARCHAR(7) NOT NULL
,"personal_name" VARCHAR(300) NOT NULL
,"username" VARCHAR(6) NOT NULL
,"staff_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
,"leader_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
CREATE TABLE IF NOT EXISTS "sample_schema"."ref_table"
(
"staff_flg" CHAR(1)  DEFAULT '0'::bpchar SORTKEY ENCODE lzo 
,"leader_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
'''
parse = sqlparse.parse(line)
print([str(t) for t in parse[0].tokens if t.ttype is None][0])
Output: 
public.actor

但是,如果我想返回列名和数据类型,我可以使用哪个令牌来打印这两个DDL。

输出是这样的,[不完全相同:(]

table: public.actor

逐个打印列名和数据类型(可能在for循环中(

column: actor_id
date type: integer
column: first_name
data type: character varying

代码记录的内联

import sqlparse
line = '''
CREATE TABLE public.actor (
actor_id integer DEFAULT nextval('public.actor_actor_id_seq'::regclass) NOT NULL,
first_name character varying(45) NOT NULL,
last_name character varying(45) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
);
CREATE TABLE public.category (
category_id integer DEFAULT nextval('public.category_category_id_seq'::regclass) NOT NULL,
name character varying(25) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
);
CREATE TABLE IF NOT EXISTS "sample_schema"."sample_table"
(
"div_cd" VARCHAR(2) NOT NULL
,"div_name" VARCHAR(30) NOT NULL
,"org_cd" VARCHAR(8) NOT NULL
,"org_name" VARCHAR(60) NOT NULL
,"team_cd" VARCHAR(2) NOT NULL
,"team_name" VARCHAR(120) NOT NULL
,"personal_cd" VARCHAR(7) NOT NULL
,"personal_name" VARCHAR(300) NOT NULL
,"username" VARCHAR(6) NOT NULL
,"staff_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
,"leader_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
CREATE TABLE IF NOT EXISTS "sample_schema"."ref_table"
(
"staff_flg" CHAR(1)  DEFAULT '0'::bpchar SORTKEY ENCODE lzo 
,"leader_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
'''
def get_table_name(tokens):
for token in reversed(tokens):
if token.ttype is None:
return token.value
return " "
parse = sqlparse.parse(line)
for stmt in parse:
# Get all the tokens except whitespaces
tokens = [t for t in sqlparse.sql.TokenList(stmt.tokens) if t.ttype != sqlparse.tokens.Whitespace]
is_create_stmt = False
for i, token in enumerate(tokens):
# Is it a create statements ?
if token.match(sqlparse.tokens.DDL, 'CREATE'):
is_create_stmt = True
continue

# If it was a create statement and the current token starts with "("
if is_create_stmt and token.value.startswith("("):
# Get the table name by looking at the tokens in reverse order till you find
# a token with None type
print (f"table: {get_table_name(tokens[:i])}")
# Now parse the columns
txt = token.value
columns = txt[1:txt.rfind(")")].replace("n","").split(",")
for column in columns:
c = ' '.join(column.split()).split()
c_name = c[0].replace('"',"")
c_type = c[1]  # For condensed type information 
# OR 
#c_type = " ".join(c[1:]) # For detailed type information 
print (f"column: {c_name}")
print (f"date type: {c_type}")
print ("---"*20)
break

输出:

table: public.actor
column: actor_id
date type: integer
column: first_name
date type: character
column: last_name
date type: character
column: last_update
date type: timestamp
------------------------------------------------------------
table: public.category
column: category_id
date type: integer
column: name
date type: character
column: last_update
date type: timestamp
------------------------------------------------------------
table: "sample_schema"."sample_table"
column: div_cd
date type: VARCHAR(2)
column: div_name
date type: VARCHAR(30)
column: org_cd
date type: VARCHAR(8)
column: org_name
date type: VARCHAR(60)
column: team_cd
date type: VARCHAR(2)
column: team_name
date type: VARCHAR(120)
column: personal_cd
date type: VARCHAR(7)
column: personal_name
date type: VARCHAR(300)
column: username
date type: VARCHAR(6)
column: staff_flg
date type: CHAR(1)
column: leader_flg
date type: CHAR(1)
------------------------------------------------------------
table: "sample_schema"."ref_table"
column: staff_flg
date type: CHAR(1)
column: leader_flg
date type: CHAR(1)
------------------------------------------------------------

您可以使用python sqlparse示例中可用的脚本中的函数来提取数据:

def extract_definitions(token_list):
# assumes that token_list is a parenthesis
definitions = []
tmp = []
par_level = 0
for token in token_list.flatten():
if token.is_whitespace:
continue
elif token.match(sqlparse.tokens.Punctuation, '('):
par_level += 1
continue
if token.match(sqlparse.tokens.Punctuation, ')'):
if par_level == 0:
break
else:
par_level += 1
elif token.match(sqlparse.tokens.Punctuation, ','):
if tmp:
definitions.append(tmp)
tmp = []
else:
tmp.append(token)
if tmp:
definitions.append(tmp)
return definitions

并使用for循环打印有关列名及其数据类型的信息:

parsed = sqlparse.parse(line)[0]
# extract the parenthesis which holds column definitions
_, par = parsed.token_next_by(i=sqlparse.sql.Parenthesis)
columns = extract_definitions(par)
for column in columns:
print(f"column: {column[0]}")
print(f"data type: {' '.join(str(t) for t in column[1:])}")

此代码产生以下输出:

column: actor_id
data type: integer DEFAULT nextval 'public.actor_actor_id_seq' :: regclass NOT NULL
column: first_name
data type: character varying 45 NOT NULL
column: last_name
data type: character varying 45 NOT NULL
column: last_update
data type: timestamp without time zone DEFAULT now NOT NULL

它实际上比你想要的信息多一点。但是,这些字符串应该很容易"可恢复",以便仅提取基本数据类型。

正如评论中已经提到的,使用特殊信息(例如DISTSTYLE(会导致解析器无法识别sqlparse.sql.Parenthesis实例,并被报告为错误。因此,在解析之前,需要从SQL查询中删除这些信息。

最新更新