计数的Flask SQLAlchemy查询



我使用的是Flask SQLAlchemy,使用的是一对多关系。两种型号

class Request(db.Model):
id = db.Column(db.Integer, primary_key = True)
r_time = db.Column(db.DateTime, index = True, default=datetime.utcnow)
org = db.Column(db.String(120))
dest = db.Column(db.String(120))
buyer_id = db.Column(db.Integer, db.ForeignKey('buyer.id'))
sale_id = db.Column(db.Integer, db.ForeignKey('sale.id'))
cost = db.Column(db.Integer)
sr = db.Column(db.Integer)
profit = db.Column(db.Integer)
def __repr__(self):
return '<Request {} by {}>'.format(self.org, self.buyer_id)
class Buyer(db.Model):
id  = db.Column(db.Integer, primary_key = True)
name = db.Column(db.String(120), unique = True)
email = db.Column(db.String(120), unique = True)
requests = db.relationship('Request', backref = 'buyer', lazy='dynamic')
def __repr__(self):
return '<Buyer {}>'.format(self.name)

我需要确定哪位买家的最低要求买家。

我可以通过创建额外的列表并将所有请求并搜索列表。但我相信还有另一种简单的方法可以通过SQLAlchemy查询实现

您可以使用CTE(公共表表达式(为select执行此操作,该表达式将生成买家ID及其请求计数,因此

buyer_id|request_count:--------|:------------1|52|33|14|1

您可以在此处筛选必须大于0才能列出的计数。

然后,你可以加入买家表,对照它来生产:

buyer_id|buyer_name|buyeer_email|request_count:--------|:--------------------------------------|:---------------|:------------1|foo|foo@example.com|52|条形|bar@example.com|33|baz|baz@example.com|14|垃圾邮件|spam@example.com|1

但由于我们使用的是CTE,您也可以查询CTE以获得最低计数值。在上面的示例中,这是1,您可以使用cte计数查询向加入的买家添加WHERE子句,以将结果筛选到request_count值等于最小值的行。

对此的SQL查询是

WITH request_counts AS (
SELECT request.buyer_id AS buyer_id, count(request.id) AS request_count
FROM request GROUP BY request.buyer_id
HAVING count(request.id) > ?
)
SELECT buyer.*
FROM buyer
JOIN request_counts ON buyer.id = request_counts.buyer_id
WHERE request_counts.request_count = (
SELECT min(request_counts.request_count)
FROM request_counts
)

WITH request_counts AS (...)定义了一个CTE,它将生成具有buyer_idrequest_count的第一个表。然后request_count表与request连接,WHERE子句对min(request_counts.request_count)值进行过滤。

将以上内容翻译为Flask SQLAlchemy代码:

request_count = db.func.count(Request.id).label("request_count")
cte = (
db.select([Request.buyer_id.label("buyer_id"), request_count])
.group_by(Request.buyer_id)
.having(request_count > 0)
.cte('request_counts')
)
min_request_count = db.select([db.func.min(cte.c.request_count)]).as_scalar()
buyers_with_least_requests = Buyer.query.join(
cte, Buyer.id == cte.c.buyer_id
).filter(cte.c.request_count == min_request_count).all()

演示:

>>> __ = db.session.bulk_insert_mappings(
...     Buyer, [{"name": n} for n in ("foo", "bar", "baz", "spam", "no requests")]
... )
>>> buyers = Buyer.query.order_by(Buyer.id).all()
>>> requests = [
...     Request(buyer_id=b.id)
...     for b in [*([buyers[0]] * 3), *([buyers[1]] * 5), *[buyers[2], buyers[3]]]
... ]
>>> __ = db.session.add_all(requests)
>>> request_count = db.func.count(Request.id).label("request_count")
>>> cte = (
...     db.select([Request.buyer_id.label("buyer_id"), request_count])
...     .group_by(Request.buyer_id)
...     .having(request_count > 0)
...     .cte("request_counts")
... )
>>> buyers_w_counts = Buyer.query.join(cte, cte.c.buyer_id == Buyer.id)
>>> for buyer, count in buyers_w_counts.add_column(cte.c.request_count):
...     # print out buyer and request count for this demo
...     print(buyer, count, sep=": ")
<Buyer foo>: 3
<Buyer bar>: 5
<Buyer baz>: 1
<Buyer spam>: 1
>>> min_request_count = db.select([db.func.min(cte.c.request_count)]).as_scalar()
>>> buyers_w_counts.filter(cte.c.request_count == min_request_count).all()
[<Buyer baz>, <Buyer spam>]

我还创建了一个数据库<>在这里摆弄,包含相同的查询。

最新更新