Altair在将数据保存到仪表板中时出错



大家好,我对altair和vega的弹性搜索交互有问题。具体情况如下:我有一个由38.8mb的数据生成的索引模式,其顺序为90k行。我想使用这个索引模式来创建一个可视化,让我可以根据两个值进行筛选。因此,我的想法是,我想及时表示数据,并将此可视化插入到kibana仪表板中。我应该能够与可视化交互,并有可能根据插入的过滤器进行交互。我设法做到了这一点,详细地说,我设法用下面的代码部分创建了这个图:详细地说我们可以看到defsaveVegaLiteVis和从链接中获取的save对象,然后我们可以看到我想要的构建和工作的图表。当数据增加时,我想解决这个问题,因为我们的数据小于3mb,所以在10k的顺序中,我在可视化方面没有问题。

def saveVegaLiteVis(client, index, visName, altairChart, resultSize=100, timeField=True):
chart_json = json.loads(altairChart.to_json())
chart_json['data']['url'] = {
"%context%": True,
"index": index,
"body": {
"size": resultSize
}
}
if timeField:
chart_json['data']['url']['%timefield%'] = "timestamp"
visState = {
"type": "vega",
"aggs": [],
"params": {
"spec": json.dumps(chart_json, sort_keys=True, indent=4, separators=(',', ': ')),
},
"title": visName
}
visSavedObject={
"visualization" : {
"title" : visName,
"visState" : json.dumps(visState, sort_keys=True, indent=4, separators=(',', ': ')),
"uiStateJSON" : "{}",
"description" : "",
"version" : 1,
"kibanaSavedObjectMeta" : {
"searchSourceJSON" : json.dumps({
"query": {
"language": "kuery",
"query": ""
},
"filter": []
}),
}
},
"type" : "visualization",
"references" : [ ],
"migrationVersion" : {
"visualization" : "7.7.0"
},
"updated_at" : datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z")
}
return client.index(index='.kibana',id='visualization:'+visName,body=visSavedObject)
def saveVegaVis(client, index, visName, altairChart, resultSize=100, timeField=True):
chart_json = json.loads(altairChart.to_json())
chart_json['spec']['data']['url'] = {
"%context%": True,
"index": index,
"body": {
"size": resultSize
}
}
if timeField:
chart_json['spec']['data']['url']['%timefield%'] = "timestamp"
visState = {
"type": "vega",
"aggs": [],
"params": {
"spec": json.dumps(chart_json, sort_keys=True, indent=4, separators=(',', ': ')),
},
"title": visName
}
visSavedObject={
"visualization" : {
"title" : visName,
"visState" : json.dumps(visState, sort_keys=True, indent=4, separators=(',', ': ')),
"uiStateJSON" : "{}",
"description" : "",
"version" : 1,
"kibanaSavedObjectMeta" : {
"searchSourceJSON" : json.dumps({
"query": {
"language": "kuery",
"query": ""
},
"filter": []
}),
}
},
"type" : "visualization",
"references" : [ ],
"migrationVersion" : {
"visualization" : "7.7.0"
},
"updated_at" : datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z")
}
return client.index(index='.kibana',id='visualization:'+visName,body=visSavedObject)
def getDataBasedonIndex(index,idValue,es):
es.indices.refresh(index=index)
res = es.get(index=index, id=idValue)
print(res['_source'])

input_dropdown = alt.binding_select(options=[[1,2,3,4,5],[4],[3],[1],[2],[5],[1,4,5],[3,4,5],[1,2,3,4],[1,3,5],[1,3,4],[1,2,5],[1,2,4],[1,2,3],[4,5],[4,2],[2,3],[1,2],[5,3],[3,4],[2,5],[1,4],[1,5],[1,2],[1,3]])
selection = alt.selection_single(fields=['NUMBER'], bind=input_dropdown, name='FIELD: ')
#dropdown
input_dropdown1 = alt.binding_select(options=[['M3','M4','M5','M6'],['M4'],['M3'],['M6'],['M5']])
selection1 = alt.selection_single(fields=['SHAPE TYPE'], bind=input_dropdown1, name='FIELD2: ')
#shape
selection_Operation= alt.selection_multi(fields=['NUMBER:N'],bind='legend')
shape_Operation = alt.condition(selection_Operation ,alt.Shape('NUMBER:N'), alt.value('lightgray'))
color = alt.condition(selection,alt.Color('SHAPE TYPE:N'), alt.value('lightgray'))
interaction1 = alt.selection_interval(bind='scales', on="[mousedown[event.altKey], mouseup] > mousemove", translate="[mousedown[event.altKey], mouseup] > mousemove!",
zoom="wheel![event.altKey]"
)
interactionY = alt.selection_interval(bind='scales', encodings=['x'],on="[mousedown[event.shiftKey], mouseup] > mousemove",translate="[mousedown[event.shiftKey], mouseup] > mousemove!",
zoom="wheel![event.shiftKey]")
ScatterLine=alt.Chart(df).mark_point(filled=True).encode(x=alt.X('@timestamp:T', title='TIMESTAMP'),y=alt.Y('value:Q', title='value'),  color=color,shape=shape_Operation,tooltip = ['value:N','NUMBER:N','SHAPE TYPE:N', alt.Tooltip('@timestamp:T', format = '%Y-%m-%d %H:%M'),'ID:N']
).add_selection(interaction1,interactionY,selection,selection1,selection_Operation).resolve_scale(  x='independent').transform_filter(selection & selection1)
ScatterLine 
saveVegaLiteVis(es, 'index-pattern1', 'RapresentationPoint', ScatterLine, timeField=True)

您怎么会在下面看到只有saveVegaLiteVis例程生成的错误。我试图修改saveVega,插入resultSize=1000000,但同时生成以下错误。我该如何解决这个问题?我想确保问题只出在保存此信息以便将信息插入仪表板上。这意味着可视化本身是有效的。

---------------------------------------------------------------------------
timeout                                   Traceback (most recent call last)
/usr/lib/python3/dist-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
420                     # Otherwise it looks like a bug in the code.
--> 421                     six.raise_from(e, None)
422         except (SocketTimeout, BaseSSLError, SocketError) as e:
/usr/lib/python3/dist-packages/six.py in raise_from(value, from_value)
/usr/lib/python3/dist-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
415                 try:
--> 416                     httplib_response = conn.getresponse()
417                 except BaseException as e:
/usr/lib/python3.8/http/client.py in getresponse(self)
1346             try:
-> 1347                 response.begin()
1348             except ConnectionError:
/usr/lib/python3.8/http/client.py in begin(self)
306         while True:
--> 307             version, status, reason = self._read_status()
308             if status != CONTINUE:
/usr/lib/python3.8/http/client.py in _read_status(self)
267     def _read_status(self):
--> 268         line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
269         if len(line) > _MAXLINE:
/usr/lib/python3.8/socket.py in readinto(self, b)
668             try:
--> 669                 return self._sock.recv_into(b)
670             except timeout:
timeout: timed out
During handling of the above exception, another exception occurred:
ReadTimeoutError                          Traceback (most recent call last)
~/.local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py in perform_request(self, method, url, params, body, timeout, ignore, headers)
250 
--> 251             response = self.pool.urlopen(
252                 method, url, body, retries=Retry(False), headers=request_headers, **kw
/usr/lib/python3/dist-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
718 
--> 719             retries = retries.increment(
720                 method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
/usr/lib/python3/dist-packages/urllib3/util/retry.py in increment(self, method, url, response, error, _pool, _stacktrace)
375             # Disabled, indicate to re-raise the error.
--> 376             raise six.reraise(type(error), error, _stacktrace)
377 
/usr/lib/python3/dist-packages/six.py in reraise(tp, value, tb)
702                 raise value.with_traceback(tb)
--> 703             raise value
704         finally:
/usr/lib/python3/dist-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
664             # Make the request on the httplib connection object.
--> 665             httplib_response = self._make_request(
666                 conn,
/usr/lib/python3/dist-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
422         except (SocketTimeout, BaseSSLError, SocketError) as e:
--> 423             self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
424             raise
/usr/lib/python3/dist-packages/urllib3/connectionpool.py in _raise_timeout(self, err, url, timeout_value)
329         if isinstance(err, SocketTimeout):
--> 330             raise ReadTimeoutError(
331                 self, url, "Read timed out. (read timeout=%s)" % timeout_value
ReadTimeoutError: HTTPConnectionPool(host='localhost', port=9200): Read timed out. (read timeout=10)
During handling of the above exception, another exception occurred:
ConnectionTimeout                         Traceback (most recent call last)
<ipython-input-31-45635f0d8635> in <module>
----> 1 saveVegaLiteVis(es, 'index-pattern', 'RapresentationPoint', ScatterLine, timeField=True,resultSize=100000)
<ipython-input-3-984bfed0a208> in saveVegaLiteVis(client, index, visName, altairChart, resultSize, timeField)
46     }
47 
---> 48     return client.index(index='.kibana',id='visualization:'+visName,body=visSavedObject)
49 
50 def saveVegaVis(client, index, visName, altairChart, resultSize=100, timeField=True):
~/.local/lib/python3.8/site-packages/elasticsearch/client/utils.py in _wrapped(*args, **kwargs)
166                 if p in kwargs:
167                     params[p] = kwargs.pop(p)
--> 168             return func(*args, params=params, headers=headers, **kwargs)
169 
170         return _wrapped
~/.local/lib/python3.8/site-packages/elasticsearch/client/__init__.py in index(self, index, body, doc_type, id, params, headers)
404             doc_type = "_doc"
405 
--> 406         return self.transport.perform_request(
407             "POST" if id in SKIP_IN_PATH else "PUT",
408             _make_path(index, doc_type, id),
~/.local/lib/python3.8/site-packages/elasticsearch/transport.py in perform_request(self, method, url, headers, params, body)
413                         raise e
414                 else:
--> 415                     raise e
416 
417             else:
~/.local/lib/python3.8/site-packages/elasticsearch/transport.py in perform_request(self, method, url, headers, params, body)
379 
380             try:
--> 381                 status, headers_response, data = connection.perform_request(
382                     method,
383                     url,
~/.local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py in perform_request(self, method, url, params, body, timeout, ignore, headers)
261                 raise SSLError("N/A", str(e), e)
262             if isinstance(e, ReadTimeoutError):
--> 263                 raise ConnectionTimeout("TIMEOUT", str(e), e)
264             raise ConnectionError("N/A", str(e), e)
265 
ConnectionTimeout: ConnectionTimeout caused by - ReadTimeoutError(HTTPConnectionPool(host='localhost', port=9200): Read timed out. (read timeout=10))

我设法解决了在es中添加更高请求时间的问题。

es = Elasticsearch([{'host': HOST_ADDRESS, 'port': THE_PORT}], timeout=30)

(此信息由另一个问题链接获取

为了避免系统崩溃,我还需要增加kibana所需的磁盘空间。为了做到这一点,我在kibana.yml 的环境部分插入了"--max-old.space-size=2048"

最新更新