我有一个字典列表:
friends = [
{'name': 'Sam', 'gender': 'male', 'sport': 'Basketball'},
{'name': 'Emily', 'gender': 'female', 'sport': 'volleyball'},
]
我需要创建函数query
,select
和field_filter
与类似的列表一起工作。这些函数必须提供选择必要的列并根据这些列进行过滤的可能性。
的例子:
result = query(friends,
select('name', 'gender', 'sport'),
field_filter('sport', *('Basketball', 'volleyball')),
field_filter('gender', *('male',)))`
[{'gender': 'male', 'name': 'Sam', 'sport': 'Basketball'}]
我需要使用预定义的代码:
from typing import Dict, Any, Callable, Iterable
DataType = Iterable[Dict[str, Any]]
ModifierFunc = Callable[[DataType], DataType]
def query(data: DataType, selector: ModifierFunc,
*filters: ModifierFunc) -> DataType:
"""Query data with column selection and filters
:param data: List of dictionaries with columns and values
:param selector: result of `select` function call
:param filters: Any number of results of `field_filter` function calls
:return: Filtered data"""
pass
def select(*columns: str) -> ModifierFunc:
"""Return function that selects only specific columns from dataset"""
pass
def field_filter(column: str, *values: Any) -> ModifierFunc:
"""Return function that filters specific column to be one of `values`"""
pass
虽然这个问题意味着select
和field_filter
可能想要成为类,我不认为这是必要的;我只是让它们返回常规的旧元组:
select = field_filter = lambda *args: args
然后query
只是一个列表和字典推导式,在其中迭代字典列表并返回与field_filter
s匹配的字典中的select
ed字段:
def query(data, keys, *filters):
return [
{k: d[k] for k in keys}
for d in data
if all(d[k] in v for k, *v in filters)
]
friends = [
{'name': 'Sam', 'gender': 'male', 'sport': 'Basketball'},
{'name': 'Emily', 'gender': 'female', 'sport': 'volleyball'},
]
result = query(
friends,
select('name', 'gender', 'sport'),
field_filter('sport', *('Basketball', 'volleyball')),
field_filter('gender', *('male',))
)
print(result) # [{'name': 'Sam', 'gender': 'male', 'sport': 'Basketball'}]
我所能理解的是'select'和'filter'函数都需要一个内部函数(函数/闭包中的函数)。这些内部函数需要有原始数据集作为参数来迭代。
工作解决方案如下:
def query(data: DataType, selector: ModifierFunc,
*filters: ModifierFunc) -> DataType:
filtered_data = selector(data)
for filter in filters:
filtered_data = filter(filtered_data)
return filtered_data
def select(*columns: str) -> ModifierFunc:
search_fields = columns
def selector(data: DataType) -> DataType:
selected_list = []
for i in data:
selected_dict = {key: value for (key, value) in i.items() if key in search_fields}
selected_list.append(selected_dict)
data = selected_list
return data
return selector
def field_filter(column: str, *values: Any) -> ModifierFunc:
flt_key = column
flt_values = values
def filter(data: DataType) -> DataType:
filtered_list = []
for i in data:
if (flt_key in i.keys() and i[flt_key] in flt_values) or not (flt_key in i.keys()):
filtered_list.append(i)
data = filtered_list
return data
return filter
除了上面的答案,如果你想有多个过滤器,你需要在query
中添加行:
filtered_data = []
for filter in filters:
if not filtered_data:
filtered_data += filter(selected_data)
else:
filtered_data = filter(filtered_data)
and remove condition infilter
:
if flt_key in i.keys() and i[flt_key] in flt_values:
filtered_list.append(i)