将目录中许多文件的blob转换为DB2



我对此相当陌生。我需要转换上传到目录中的所有文件(.csv和xlsx(,即Supermarket.xlsx、sales.csv、marketing xlsx,并将其转换为DB2中的blob数据,表名SB_data_blob_TEST带有字段名";data_column"摄取_日期_时间"文件名"row_id";。

我只插入了1个文件,并说明了时间戳、文件名和row_id,但我如何将相同的功能应用于上传到该目录的文件列表,并相应地应用时间戳、列出文件名和row_id,而不手动插入此row_id?

代码:

import os
import pandas as pd
from subprocess import Popen, PIPE, run
import jaydebeapi
from project_lib import Project
constants = {
'INPUT_DIR': '/project_data/data_asset/'
}
file_names = {
'Supermart': 'Supermart.xlsx'
}
schema_name = 'ABC.'
table_prefix = 'SB_'
timestamp = pd.Timestamp.now("Asia/Singapore").strftime("%Y%m%d %H%M%S")
file = constants['INPUT_DIR'] + file_names['Supermart'] ## data
filename = constants['INPUT_DIR'] + file_names['Supermart'] ## ingestion_file_name
def convertToBinaryData(filename):
# Convert digital data to binary format
with open(filename, 'rb') as file:
binaryData = file.read()
return binaryData
def insertBLOB(data, ingestion_datetime, ingestion_filename, row_id):
print("Inserting BLOB into ABC SB_Data_Blob table")  
try:
project = Project.access()
abc_sb_credentials = project.get_connection(name="abc_sb")
print(abc_sb_credentials)
abc_sb_connection = jaydebeapi.connect('com.ibm.db2.jcc.DB2Driver',
'{}://{}:{}/{}:user={};password={};'.format('jdbc:db2',
abc_sb_credentials['host'],
abc_sb_credentials['port'],
abc_sb_credentials['database'],
abc_sb_credentials['username'],
abc_sb_credentials['password']))


curs = abc_sb_connection.cursor()       
sql_insert_blob_query = """ INSERT INTO ABC.SB_DATA_BLOB_TEST
(data_column, ingestion_date_time, ingestion_file_name, row_id) VALUES (?,?,?,?)"""

file = convertToBinaryData(data)
# Convert data into tuple format
insert_blob_tuple = (jaydebeapi.Binary(file), ingestion_datetime, ingestion_filename, row_id)
result = curs.execute(sql_insert_blob_query, insert_blob_tuple)
abc_sb_connection.commit()
print("File is inserted successfully as a BLOB into SB_DATA_BLOB table", result)
except Exception as error:
print(f"{error}")
print("Failed inserting BLOB data into DB2 table SB_DATA_BLOB".format(error))
finally:
## if abc_sb_connection.is_connected():
curs.close()
abc_sb_connection.close()
print("DB2 connection is closed")
insertBLOB(file, timestamp, filename, '2')

我需要转换上传到目录中的所有文件(.csv和xlsx(,即Supermarket.xlsx、sales.csv、marketing.xlsx

列出目录中的所有文件。假设目录由constants['INPUT_DIR']给定,文件名应该由扩展名.xlsx.csv过滤,然后使用Python模块glob及其方法glob

import glob
# list of filenames by given extension in given directory
csv_filenames = glob.glob(constants['INPUT_DIR'] + '*.csv')
xlsx_filenames = glob.glob(constants['INPUT_DIR'] + '*.xlsx')
all_filenames = csv_filenames + xlsx_filenames
row_id = 1  # initial row id to start inserting, will be incremented for each file
# for each of those files insert the blob, for example
for filename in all_filenames:
path = constants['INPUT_DIR'] + filename
ingestion_datetime = pd.Timestamp.now("Asia/Singapore").strftime("%Y%m%d %H%M%S")  # or timestamp of the file
ingestion_filename = filename  # only the filename, no directory
row_id += 1  # increase by one
insertBLOB(path, ingestion_datetime, ingestion_filename, row_id):

假设您的目录包含:

  • Supermart.xlsx
  • Supermart.csv

则列表将仅包含以下名称:['Supermart.xlsx', 'Supermart.csv']

另请参阅:

  • 如何列出目录中的所有文件

最新更新