我正在寻找一些关于我的要求的建议。以下是我对要求的描述。请随时与我联系以获取任何详细信息。甚至关于如何更清楚地描述我的问题的一些建议也非常感谢:)
需求描述
我有一些数据,格式如下:
router, interface,timestamp, src_ip, dst_ip, src_port, dst_port, protocol, bits
r1, 1, 1453016443, 10.0.0.1, 10.0.0.2, 100, 200, tcp, 108
r2, 1, 1453016448, 10.0.0.3, 10.0.0.8, 200, 200, udp, 100
如您所见,它是一些网络原始数据。我省略了一些列,只是为了让它看起来更清晰。数据量非常大。而且它的生成速度非常快,就像每 5 分钟产生 10 亿行......
我想要的是对这些数据进行一些实时分析。例如:
使用时间戳画一条线
选择总和(位),按路由器分组raw_data时间戳,接口 = 1,路由器 = R1。
找出哪个 3 个src_ip为一个接口发送的数据最多
从路由器=R1和接口=2的raw_data中选择总和(位) 按src_ip排序顺序 总和(位) 描述限制 3
我已经尝试了一些解决方案,每种解决方案都不太适合它。例如:
RDBMS
MySQL看起来很好,除了几个问题:
- 数据太大
- 我的列比我在这里描述的要多得多。为了提高我的查询速度,我必须在大多数列上进行一些索引。但是我认为在大表上创建索引并且包含太多列的索引不是很好,对吧?
打开TSDB
OpenTSDB是一个很好的时间序列数据库。但也不适合我的要求。
openTSDB在解决TOP N问题时遇到了问题。在我的要求"获取发送最多数据的前 3 名src_ip"中,openTSDB 无法解决这个问题。
火花
我知道 apache spark 可以像 RDBMS 一样使用。它具有称为spark SQL的功能。我没有尝试,但我想性能应该不能满足实时分析/查询要求,对吧?毕竟,火花更适合离线计算,对吧?
弹性搜索
当我知道这个项目时,我真的对 ES 寄予了很大的希望。但它也不合适。因为当你聚合多个列时,你必须在 elasticsearch 中使用所谓的嵌套桶聚合。并且此聚合的结果无法排序。您必须检索所有结果并自行排序。就我而言,结果太多了。对结果进行排序将非常困难
所以。。。。我被困在这里。任何人都可以给出一些建议吗?
我不明白为什么 ES 无法满足您的要求。我想你误解了这部分
但它也不合适。因为当你聚合多个列时,你必须在 elasticsearch 中使用所谓的嵌套桶聚合。并且此聚合的结果无法排序。
使用时间戳绘制一条线的第一个要求可以通过如下所示的查询/聚合轻松实现:
{
"query": {
"bool": {
"must": [
{
"term": {
"interface": 1
}
},
{
"term": {
"router": "r1"
}
}
]
}
},
"aggs": {
"by_minute": {
"date_histogram": {
"field": "timestamp",
"interval": "1m"
},
"aggs": {
"sum_bits": {
"sum": {
"field": "bits"
}
}
}
}
}
}
至于您的第二个要求,找出哪个 3 src_ip为一个接口发送的数据最多,也可以通过这样的查询/聚合轻松实现:
{
"query": {
"bool": {
"must": [
{
"term": {
"interface": 2
}
},
{
"term": {
"router": "r1"
}
}
]
}
},
"aggs": {
"by_src_ip": {
"terms": {
"field": "src_ip",
"size": 3,
"order": {
"sum_bits": "desc"
}
},
"aggs": {
"sum_bits": {
"sum": {
"field": "bits"
}
}
}
}
}
}
更新
根据您的评论,您上面的第二个要求可能会发生变化,以找到 src_ip/dst_ip 的前 3 个组合。这可以通过使用 script
而不是构建 src/dest 组合并为每对提供位总和的项进行terms
聚合来实现,如下所示:
{
"query": {
"bool": {
"must": [
{
"term": {
"interface": 2
}
},
{
"term": {
"router": "r1"
}
}
]
}
},
"aggs": {
"by_src_ip": {
"terms": {
"script": "[doc.src_ip.value, doc.dst_ip.value].join('-')",
"size": 3,
"order": {
"sum_bits": "desc"
}
},
"aggs": {
"sum_bits": {
"sum": {
"field": "bits"
}
}
}
}
}
}
请注意,为了运行最后一个查询,您需要启用动态脚本。此外,由于您将拥有数十亿个文档,因此脚本可能不是最佳解决方案,但在进一步深入研究之前值得尝试一下。另一种可能的解决方案是在索引时添加一个combination
字段 ( src_ip-dst_ip
),以便您可以将其用作术语聚合中的字段,而无需诉诸脚本。
您可以尝试Axibase时序数据库,该数据库是非关系的,但除了类似休息的API外,还支持SQL查询。下面是一个前 N 个查询示例:
SELECT entity, avg(value) FROM cpu_busy
WHERE time between now - 1 * hour and now
GROUP BY entity
ORDER BY avg(value) DESC
LIMIT 3
https://axibase.com/docs/atsd/sql/#grouping
ATSD社区版是免费的。
披露:我在Axibase工作