队列或其他方法处理即时报价数据?



在我们的电子交易系统中,我们需要根据来自 100+ 合约的即时报价数据进行计算。

合约的即时报价数据不会在一条消息中接收。一条消息仅包含一份合约的即时报价数据。合约的时间戳略有不同(有时差异很大,但让我们忽略这种情况(。

eg: (first column is timestamp. Second is contract name)
below 2 data has 1ms diff
10:34:03.235,10002007,510050C2006A03500  ,0.0546
10:34:03.236,10001909,510050C2003A02750  ,0.3888
below 2 data has 3ms diff
10:34:03.594,10002154,510300C2003M03700  ,0.4985
10:34:03.597,10002118,510300C2001M03700  ,0.4514

只有那些有价格变化的人才会有数据。所以我无法计算合约编号来知道我是否收到了这个即时报价的所有数据。

但另一方面,我们不想等到收到所有即时报价数据,因为有时数据可能会延迟很长时间,我们会想要排除它们。

需要低延迟。所以我认为我们将定义一个窗口 - 比如 50 毫秒 - 并开始根据我们在过去 50 毫秒内收到的任何数据进行计算。

处理此类用例的最佳方法是什么?

最初我想使用 redis 流来维护一个小队列,每当收到合约的数据时,我都会将其推送到 redis 流。但是我无法弄清楚在特定时间(例如 50 毫秒(过去后立即提取数据的最佳方法是什么。

我在考虑也许我应该使用其他一些技术? 任何建议不胜感激。

使用XRANGE myStream - + COUNT 1获取第一个条目。

使用XREVRANGE myStream + - COUNT 1获取最后一个条目。

XINFO STREAM myStream还带来了第一个和最后一个条目,但文档说它是O(log N)

假设您使用时间戳作为 ID 或字段,则可以计算时差。

如果您使用的是 Redis Streams 自动 ID (XADD myStream * ...(,则 ID 的第一部分是以毫秒为单位的 UNIX 时间戳。

假设上述情况,您可以使用Lua脚本以原子方式进行检查:

EVAL "local first = redis.call('XRANGE', KEYS[1], '-', '+', 'COUNT', '1') local firstTime = {} if next(first) == nil then     return redis.error_reply('Stream is empty or key doesn`t exist') end for str in string.gmatch(first[1][1], '([^-]+)') do     table.insert(firstTime, tonumber(str)) end local last = redis.call('XREVRANGE', KEYS[1], '+', '-', 'COUNT', '1') local lastTime = {} for str in string.gmatch(last[1][1], '([^-]+)') do     table.insert(lastTime, tonumber(str)) end local ms = lastTime[1] - firstTime[1] if ms >= tonumber(ARGV[1]) then     return redis.call('XRANGE', KEYS[1], '-', '+') else     return redis.error_reply('Only '..ms..' ms') end" 1 myStream 50

参数numKeys(1 here) streamKey timeInMs(50 here)1 myStream 50

以下是Lua脚本的友好视图:

local first = redis.call('XRANGE', KEYS[1], '-', '+', 'COUNT', '1')
local firstTime = {}
if next(first) == nil then
return redis.error_reply('Stream is empty or key doesn`t exist')
end
for str in string.gmatch(first[1][1], '([^-]+)') do
table.insert(firstTime, tonumber(str))
end
local last = redis.call('XREVRANGE', KEYS[1], '+', '-', 'COUNT', '1')
local lastTime = {}
for str in string.gmatch(last[1][1], '([^-]+)') do
table.insert(lastTime, tonumber(str))
end
local ms = lastTime[1] - firstTime[1]
if ms >= tonumber(ARGV[1]) then
return redis.call('XRANGE', KEYS[1], '-', '+')
else
return redis.error_reply('Only '..ms..' ms')
end

它返回:

  • (error) Stream is empty or key doesn`t exist
  • (error) Only 34 ms如果我们没有经过所需的时间
  • 实际的条目列表(如果第一条消息和最后一条消息之间已过所需的时间(。

请务必查看 Redis Streams 简介以熟悉 Redis Streams,并查看 EVAL 命令以了解 Lua 脚本。

最新更新