我需要帮助处理气流上的弹性搜索挂钩。
我导入了以下内容:
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchHook
并运行搜索索引的命令:
elastic_hook = ElasticsearchHook(elasticsearch_conn_id = 'ELK')
elastic_conn = elastic_hook.get_conn()
res = elastic_conn.search(index=es_index,
size = 10000,
from_= 0,
request_timeout=1000,
body = body, scroll='2m', )
但我犯了一个错误;连接对象没有属性搜索";。我试着在钩子对象上运行dir命令,得到了以下结果:
['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_generate_insert_sql', '_serialize_cell', '_set_context', 'bulk_dump', 'bulk_load', 'conn_name_attr', 'conn_type', 'connection', 'connector', 'default_conn_name', 'elasticsearch_conn_id', 'get_autocommit', 'get_conn', 'get_connection', 'get_connections', 'get_cursor', 'get_first', 'get_hook', 'get_pandas_df', 'get_records', 'get_sqlalchemy_engine', 'get_uri', 'hook_name', 'insert_rows', 'log', 'run', 'schema', 'set_autocommit', 'supports_autocommit']
然而,我不知道如何用其中一个函数转换我的搜索命令(紧接着滚动(。
有什么想法吗?
对于apache-airflow-providers-elasticsearch>=4.1.0
:
有一个ElasticsearchPythonHook
公开了Elasticsearch Python SDK中的search
端点。它是由PR添加的示例用法:
def use_elasticsearch_hook():
es_hook = ElasticsearchPythonHook(hosts=["http://localhost:9200"])
query = {"query": {"match_all": {}}}
result = es_hook.search(query=query)
print(result)
return True
对于apache-airflow-providers-elasticsearch<4.1.0
:
钩子中没有search
函数。您只能使用SQL与Elasticsearch进行交互。钩子具有get_conn&get_uri函数。由于钩子继承自DbApiHook
,因此它的所有功能都可用。您可以使用测试作为如何使用它的示例。或者,您可以安装elasticsearch SDK,并通过创建自定义挂钩直接与之交互。