我的数据库中有以下示例数据:
= 'BTCUSDT' and timeframe_id = '30m') or (symbol_id = 'ATOMUSDT' and timeframe_id = '30m');
id timeframe_id symbol_id open_time open high low close close_time base_volume quote_volume
---------- ------------ ---------- ------------- ---------- ---------- ---------- ---------- ------------- ----------- ---------------
21 1h BTCUSDT 1631610000000 45969.99 46200.0 45754.5 45853.87 1631613599999 14494.688 665884011.49137
22 1h BTCUSDT 1631613600000 45853.88 46085.0 45812.0 46036.14 1631617199999 6797.182 312263363.68921
23 1h BTCUSDT 1631617200000 46036.13 46157.18 45865.72 45986.45 1631620799999 6843.614 314729653.50406
24 1h BTCUSDT 1631620800000 45986.45 46647.0 45771.11 46438.53 1631624399999 30800.488 1424879182.6599
405 1h ATOMUSDT 1631610000000 35.598 35.66 35.029 35.229 1631613599999 814367.91 28761577.13938
406 1h ATOMUSDT 1631613600000 35.23 35.609 34.371 35.095 1631617199999 1020950.95 35708827.7814
407 1h ATOMUSDT 1631617200000 35.093 35.908 34.7 34.966 1631620799999 1170633.58 41321501.81656
408 1h ATOMUSDT 1631620800000 34.971 35.347 34.203 34.978 1631624399999 1031529.26 35908013.68847
17 30m BTCUSDT 1631617200000 46036.13 46088.0 45923.44 46069.72 1631618999999 2946.376 135544292.71177
18 30m BTCUSDT 1631619000000 46069.71 46157.18 45865.72 45986.45 1631620799999 3897.238 179185360.79229
19 30m BTCUSDT 1631620800000 45986.45 46068.0 45771.11 45791.45 1631622599999 6421.929 294839291.18631
20 30m BTCUSDT 1631622600000 45791.46 46647.0 45780.61 46440.82 1631624399999 24385.586 1130366232.9054
401 30m ATOMUSDT 1631617200000 35.093 35.908 34.974 35.482 1631618999999 702385.57 24928097.46603
402 30m ATOMUSDT 1631619000000 35.479 35.534 34.7 34.966 1631620799999 468248.01 16393404.35053
403 30m ATOMUSDT 1631620800000 34.971 35.347 34.203 34.403 1631622599999 556383.46 19316380.04181
404 30m ATOMUSDT 1631622600000 34.412 35.212 34.354 34.987 1631624399999 475295.69 16596878.52596
Run Time: real 0.000 user 0.000218 sys 0.000084
sqlite>
我需要插入可能包含重复数据的数据行。例如,timeframe_id
和symbol_id
将是重复的,open_time
可能是重复的,但只能在每个timeframe_id
和每个symbol_id
的基础上。
我正试图使用upsert
添加一行,但我遇到了我试图使用的各种.on_conflict()
方法的问题。
.on_conflict()
使用conflict_target
,preserve
和update
(如下面的示例代码所示)返回ON CONFLICT clause does not match any PRIMARY KEY or UNIQUE constraint
,这是基于模式和作为upsert
的数据所期望的。
.on_conflict_replace()
和on_conflict_ignore()
都插入了一个新行,这不是我想要的。
是否有比upsert
更好的方法来处理这个问题,或者upsert
是否有一些额外的功能,可以让我做我想做的事情,我没有看到?
代码示例:
from peewee import *
from playhouse.sqlite_ext import SqliteExtDatabase
db = SqliteExtDatabase('test.db', pragmas={'foreign_keys': 1, 'journal_mode': 'wal'})
class BaseModel(Model):
class Meta:
database = db
class Symbols(BaseModel):
name = CharField(unique=True, primary_key = True)
class Timeframes(BaseModel):
name = CharField(unique=True, primary_key = True)
class Candles(BaseModel):
timeframe = ForeignKeyField(Timeframes)
symbol = ForeignKeyField(Symbols)
open_time = DateTimeField()
open = FloatField()
high = FloatField()
low = FloatField()
close = FloatField()
close_time = DateTimeField()
base_volume = IntegerField()
quote_volume = IntegerField()
db.create_tables([Symbols, Timeframes, Candles])
stream={}
stream['data'] = {}
stream['data']['k'] = {}
stream['data']['k']['i'] = '1h'
stream['data']['k']['s'] = 'BTCUSDT'
stream['data']['k']['t'] = 1631610000000
stream['data']['k']['o'] = 45969.99
stream['data']['k']['h'] = 46500.0
stream['data']['k']['l'] = 45700.0
stream['data']['k']['c'] = 45950.00
stream['data']['k']['T'] = 1631613599999
stream['data']['k']['v'] = 6400
stream['data']['k']['q'] = 300000000
try:
query = (Candles
.insert(
timeframe = stream['data']['k']['i'],
symbol = stream['data']['k']['s'],
open_time = stream['data']['k']['t'],
open = stream['data']['k']['o'],
high = stream['data']['k']['h'],
low = stream['data']['k']['l'],
close = stream['data']['k']['c'],
close_time = stream['data']['k']['T'],
base_volume = stream['data']['k']['v'],
quote_volume = stream['data']['k']['q']
)
.on_conflict(
conflict_target = [
Candles.timeframe,
Candles.symbol,
Candles.open_time,
Candles.open
],
preserve = [ Candles.timeframe, Candles.symbol, Candles.open_time, Candles.open ],
update = {
Candles.high: stream['data']['k']['h'],
Candles.low: stream['data']['k']['l'],
Candles.close: stream['data']['k']['c'],
Candles.close_time: stream['data']['k']['T'],
Candles.base_volume: stream['data']['k']['v'],
Candles.quote_volume: stream['data']['k']['q']
}
)
.execute()
)
except OperationalError as e:
print(e)
执行:% test2.py
ON CONFLICT clause does not match any PRIMARY KEY or UNIQUE constraint
我的解决方案是创建一个查询来查找id
,并将其包含在preserve
[1]
[1]我从SQLite
更改为MySQL
,不是因为它与这个问题有任何关系,而是因为我在前者中有数据库锁。因此,由于一路上数据库的变化,答案的格式与原始问题不同。
在我的无知中,我确实觉得这是一个沉重且可能昂贵的解决方案,但也许我错了。
try:
symbols_query = c_db.Symbols.select().dicts()
for symbol in symbols_query:
if symbol['name'] == stream['data']['k']['s']:
symbol_id = symbol['id']
timeframes_query = c_db.Timeframes.select().dicts()
for timeframe in timeframes_query:
if timeframe['name'] == stream['data']['k']['i']:
timeframe_id = timeframe['id']
current_id = (c_db.Candles
.select(c_db.Candles.id)
.where(
c_db.Candles.timeframe == timeframe_id,
c_db.Candles.symbol == symbol_id,
c_db.Candles.open_time == stream['data']['k']['t']
)
.get()
)
insert_query = (c_db.Candles
.insert(
id = current_id,
timeframe = timeframe_id,
symbol = symbol_id,
open_time = stream['data']['k']['t'],
open = stream['data']['k']['o'],
high = stream['data']['k']['h'],
low = stream['data']['k']['l'],
close = stream['data']['k']['c'],
close_time = stream['data']['k']['T'],
base_volume = stream['data']['k']['v'],
quote_volume = stream['data']['k']['q']
)
.on_conflict(
preserve=[ c_db.Candles.id, c_db.Candles.timeframe_id, c_db.Candles.symbol_id ],
update = {
c_db.Candles.id: current_id,
c_db.Candles.timeframe: timeframe_id,
c_db.Candles.symbol: symbol_id,
c_db.Candles.open_time: stream['data']['k']['t'],
c_db.Candles.open: stream['data']['k']['o'],
c_db.Candles.high: stream['data']['k']['h'],
c_db.Candles.low: stream['data']['k']['l'],
c_db.Candles.close: stream['data']['k']['c'],
c_db.Candles.close_time: stream['data']['k']['T'],
c_db.Candles.base_volume: stream['data']['k']['v'],
c_db.Candles.quote_volume: stream['data']['k']['q']
}
)
.execute()
)
except peewee.DoesNotExist:
insert_query = (c_db.Candles
.insert(
timeframe = timeframe_id,
symbol = symbol_id,
open_time = stream['data']['k']['t'],
open = stream['data']['k']['o'],
high = stream['data']['k']['h'],
low = stream['data']['k']['l'],
close = stream['data']['k']['c'],
close_time = stream['data']['k']['T'],
base_volume = stream['data']['k']['v'],
quote_volume = stream['data']['k']['q']
)
.execute()
)
except peewee.OperationalError as e:
logging.info(f"{e}")