peewee: upsert没有主键或唯一约束



我的数据库中有以下示例数据:

= 'BTCUSDT' and timeframe_id = '30m') or (symbol_id = 'ATOMUSDT' and timeframe_id = '30m');
id          timeframe_id  symbol_id   open_time      open        high        low         close       close_time     base_volume  quote_volume
----------  ------------  ----------  -------------  ----------  ----------  ----------  ----------  -------------  -----------  ---------------
21          1h            BTCUSDT     1631610000000  45969.99    46200.0     45754.5     45853.87    1631613599999  14494.688    665884011.49137
22          1h            BTCUSDT     1631613600000  45853.88    46085.0     45812.0     46036.14    1631617199999  6797.182     312263363.68921
23          1h            BTCUSDT     1631617200000  46036.13    46157.18    45865.72    45986.45    1631620799999  6843.614     314729653.50406
24          1h            BTCUSDT     1631620800000  45986.45    46647.0     45771.11    46438.53    1631624399999  30800.488    1424879182.6599
405         1h            ATOMUSDT    1631610000000  35.598      35.66       35.029      35.229      1631613599999  814367.91    28761577.13938
406         1h            ATOMUSDT    1631613600000  35.23       35.609      34.371      35.095      1631617199999  1020950.95   35708827.7814
407         1h            ATOMUSDT    1631617200000  35.093      35.908      34.7        34.966      1631620799999  1170633.58   41321501.81656
408         1h            ATOMUSDT    1631620800000  34.971      35.347      34.203      34.978      1631624399999  1031529.26   35908013.68847
17          30m           BTCUSDT     1631617200000  46036.13    46088.0     45923.44    46069.72    1631618999999  2946.376     135544292.71177
18          30m           BTCUSDT     1631619000000  46069.71    46157.18    45865.72    45986.45    1631620799999  3897.238     179185360.79229
19          30m           BTCUSDT     1631620800000  45986.45    46068.0     45771.11    45791.45    1631622599999  6421.929     294839291.18631
20          30m           BTCUSDT     1631622600000  45791.46    46647.0     45780.61    46440.82    1631624399999  24385.586    1130366232.9054
401         30m           ATOMUSDT    1631617200000  35.093      35.908      34.974      35.482      1631618999999  702385.57    24928097.46603
402         30m           ATOMUSDT    1631619000000  35.479      35.534      34.7        34.966      1631620799999  468248.01    16393404.35053
403         30m           ATOMUSDT    1631620800000  34.971      35.347      34.203      34.403      1631622599999  556383.46    19316380.04181
404         30m           ATOMUSDT    1631622600000  34.412      35.212      34.354      34.987      1631624399999  475295.69    16596878.52596
Run Time: real 0.000 user 0.000218 sys 0.000084
sqlite>

我需要插入可能包含重复数据的数据行。例如,timeframe_idsymbol_id将是重复的,open_time可能是重复的,但只能在每个timeframe_id和每个symbol_id的基础上。

我正试图使用upsert添加一行,但我遇到了我试图使用的各种.on_conflict()方法的问题。

.on_conflict()使用conflict_target,preserveupdate(如下面的示例代码所示)返回ON CONFLICT clause does not match any PRIMARY KEY or UNIQUE constraint,这是基于模式和作为upsert的数据所期望的。

.on_conflict_replace()on_conflict_ignore()都插入了一个新行,这不是我想要的。

是否有比upsert更好的方法来处理这个问题,或者upsert是否有一些额外的功能,可以让我做我想做的事情,我没有看到?

代码示例:

from peewee import *
from playhouse.sqlite_ext import SqliteExtDatabase
db = SqliteExtDatabase('test.db', pragmas={'foreign_keys': 1, 'journal_mode': 'wal'})
class BaseModel(Model):
class Meta:
database = db
class Symbols(BaseModel):
name = CharField(unique=True, primary_key = True)
class Timeframes(BaseModel):
name = CharField(unique=True, primary_key = True)
class Candles(BaseModel):
timeframe = ForeignKeyField(Timeframes)
symbol = ForeignKeyField(Symbols)
open_time = DateTimeField()
open = FloatField()
high = FloatField()
low = FloatField()
close = FloatField()
close_time = DateTimeField()
base_volume = IntegerField()
quote_volume = IntegerField()
db.create_tables([Symbols, Timeframes, Candles])
stream={}
stream['data'] = {}
stream['data']['k'] = {}
stream['data']['k']['i'] = '1h'
stream['data']['k']['s'] = 'BTCUSDT'
stream['data']['k']['t'] = 1631610000000
stream['data']['k']['o'] = 45969.99
stream['data']['k']['h'] = 46500.0
stream['data']['k']['l'] = 45700.0
stream['data']['k']['c'] = 45950.00
stream['data']['k']['T'] = 1631613599999
stream['data']['k']['v'] = 6400
stream['data']['k']['q'] = 300000000
try:
query = (Candles
.insert(
timeframe = stream['data']['k']['i'],
symbol = stream['data']['k']['s'],
open_time = stream['data']['k']['t'],
open = stream['data']['k']['o'],
high = stream['data']['k']['h'],
low = stream['data']['k']['l'],
close = stream['data']['k']['c'],
close_time = stream['data']['k']['T'],
base_volume = stream['data']['k']['v'],
quote_volume = stream['data']['k']['q']
)    
.on_conflict(
conflict_target = [
Candles.timeframe,
Candles.symbol,
Candles.open_time,
Candles.open
],
preserve = [ Candles.timeframe, Candles.symbol, Candles.open_time, Candles.open ],
update = {
Candles.high: stream['data']['k']['h'],
Candles.low: stream['data']['k']['l'],
Candles.close: stream['data']['k']['c'],
Candles.close_time: stream['data']['k']['T'],
Candles.base_volume: stream['data']['k']['v'],
Candles.quote_volume: stream['data']['k']['q']
}
)
.execute()
)
except OperationalError as e:
print(e)
执行:

% test2.py
ON CONFLICT clause does not match any PRIMARY KEY or UNIQUE constraint

我的解决方案是创建一个查询来查找id,并将其包含在preserve[1]

[1]我从SQLite更改为MySQL,不是因为它与这个问题有任何关系,而是因为我在前者中有数据库锁。因此,由于一路上数据库的变化,答案的格式与原始问题不同。

在我的无知中,我确实觉得这是一个沉重且可能昂贵的解决方案,但也许我错了。

try:
symbols_query = c_db.Symbols.select().dicts()
for symbol in symbols_query:
if symbol['name'] == stream['data']['k']['s']:
symbol_id = symbol['id']

timeframes_query = c_db.Timeframes.select().dicts()
for timeframe in timeframes_query:
if timeframe['name'] == stream['data']['k']['i']:
timeframe_id = timeframe['id']
current_id = (c_db.Candles
.select(c_db.Candles.id)
.where(
c_db.Candles.timeframe == timeframe_id,
c_db.Candles.symbol == symbol_id,
c_db.Candles.open_time == stream['data']['k']['t']
)
.get()
)
insert_query = (c_db.Candles
.insert(
id = current_id,
timeframe = timeframe_id,
symbol = symbol_id,
open_time = stream['data']['k']['t'],
open = stream['data']['k']['o'],
high = stream['data']['k']['h'],
low = stream['data']['k']['l'],
close = stream['data']['k']['c'],
close_time = stream['data']['k']['T'],
base_volume = stream['data']['k']['v'],
quote_volume = stream['data']['k']['q']
)    
.on_conflict(
preserve=[ c_db.Candles.id, c_db.Candles.timeframe_id, c_db.Candles.symbol_id ],
update = {
c_db.Candles.id: current_id,
c_db.Candles.timeframe: timeframe_id,
c_db.Candles.symbol: symbol_id,
c_db.Candles.open_time: stream['data']['k']['t'],
c_db.Candles.open: stream['data']['k']['o'],
c_db.Candles.high: stream['data']['k']['h'],
c_db.Candles.low: stream['data']['k']['l'],
c_db.Candles.close: stream['data']['k']['c'],
c_db.Candles.close_time: stream['data']['k']['T'],
c_db.Candles.base_volume: stream['data']['k']['v'],
c_db.Candles.quote_volume: stream['data']['k']['q']
}
)
.execute()
)
except peewee.DoesNotExist:
insert_query = (c_db.Candles
.insert(
timeframe = timeframe_id,
symbol = symbol_id,
open_time = stream['data']['k']['t'],
open = stream['data']['k']['o'],
high = stream['data']['k']['h'],
low = stream['data']['k']['l'],
close = stream['data']['k']['c'],
close_time = stream['data']['k']['T'],
base_volume = stream['data']['k']['v'],
quote_volume = stream['data']['k']['q']
)
.execute()
)
except peewee.OperationalError as e:
logging.info(f"{e}")

相关内容

  • 没有找到相关文章