python [oob] -数据类与生成器获得延迟计算



我想建立一个类,描述来自DB的数据对象,表可能很大,所以我在考虑使用生成器和"释放"。逐行/块,我想在类中添加几个函数,这些函数允许,首先,iter有什么问题?函数-我只获取第一行,


class Data(object):
DB_LOCATION = "./data.db"
def __init__(self, db_location=None):
"""Initialize db class variables"""
if db_location is not None:
self.connection = sqlite3.connect(db_location)
else:
self.connection = sqlite3.connect(self.DB_LOCATION)
self.cur = self.connection.cursor()
self.loop_ok = True
def __iter__(self):
while self.loop_ok:
row = self.cur.execute("select * from customers").fetchone()
if row:
yield row
else:
self.loop_ok = False 
def transform1(self):
pass
def transform2(self):
pass
def load(self):
pass
test = iter(Data())
print(next(text))
# getting first row
print(next(text))
# getting first row again 

其次,我如何通过类转换函数管道传输数据?我认为下面的方法可能适用于单行,但不确定如何实现它,所以它将处理表中的整个行…(伪代码)

test = iter(Data())
etl_steps = [
transform_1, 
transform_2, 
load
]
for step in etl_steps:
test = step(test)
for i in test:
print(i)
###Update 24/05

我修改了代码以获得从表中获取数据的惰性求值,


class Data(object):
DB_LOCATION = "./data.db"
def __init__(self, db_location=None):
"""Initialize db class variables"""
if db_location is not None:
self.connection = sqlite3.connect(db_location)
else:
self.connection = sqlite3.connect(self.DB_LOCATION)
self.cur = self.connection.cursor()
self.cur.execute("select * from customers")
def __iter__(self):
for row in self.cur.fetchall(): 
yield row           
def transform1(tuple) -> list:
return list(tuple)
def transform2(my_list):
return list(map(lambda x: x.upper() if isinstance(x,str) else x, my_list))
def load():
pass
test = iter(Data())
#Working as expected
print(transform2(transform1(next(test))))
#now getting second row
print(transform2(transform1(next(test))))

我不确定的是转换函数应该如何实现以获得真正的"流"。当我使用yield而不是return时,它不工作(即使我用迭代器包装每一行)。第二,这是一个非常丑陋的解决方案,将每个函数包装在一起。

我试了下面,但它没有工作:得到太多结果,并且看起来没有按预期应用转换。


etl_steps = [
transform1, 
transform2
]
for step in etl_steps:
test = step(test)
print(all(test))
for a in test:
print(a)

好了,我想我现在明白了。您希望使用__init__来执行查询。在我的例子中,我只是使用一个列表,而不是设置db文件等。然后使用__iter__返回self,使用__next__方法获取一行并返回。

关于第二个问题,你只需每"一步"就行了。函数是一个生成器,它循环输入并产生转换后的结果。然后你可以按照自己的意愿嵌套它们:

class Data(object):
def __init__(self):
pass # set up db connection
self.cur = [ # execute cursor
{'id': 1, 'firstName': 'Bob', 'lastName': 'Jones'},
{'id': 2, 'firstName': 'Mary', 'lastName': 'Jane'},
{'id': 3, 'firstName': 'Joe', 'lastName': 'Smith'},
{'id': 4, 'firstName': 'Jane', 'lastName': 'Doe'}
]
def __iter__(self):
return self
def __next__(self):
try:
row = self.cur.pop(0) # fetch next row from cursor
return row
except IndexError:
raise StopIteration
def transform1(data):
for row in data:
row['firstName'] = row['firstName'].upper()
yield row
def transform2(data):
for row in data:
row['midddleInitial'] = 'X'
yield row
def load(data):
for row in data:
print(row)
yield row

etl_steps = [transform1, transform2, load]
data = iter(Data())
for step in etl_steps:
data = step(data)
all(data)

最新更新