无法从插件文件夹气流导入自定义运算符



我是个新手,正在尝试从dag中的插件文件夹导入自定义运算符。以下是文件结构:

├── dags
│   ├── my_dag.py
├── myrequirements.txt
├── plugins
│   ├── __init__.py
│   ├── my_airflow_plugin.py
│   └── operators
│       ├── __int__.py
│       └── my_airflow_operator.py

my_dag.py

from operators.my_airflow_operator import AwsLambdaInvokeFunctionOperator

my_airflow_plugin.py

from airflow.plugins_manager import AirflowPlugin
from operators.my_airflow_operator import AwsLambdaInvokeFunctionOperator
class lambda_operator(LambdaOperator):
pass

class my_plugin(AirflowPlugin):

name = 'my_airflow_plugin'
operators = [lambda_operator]

my_airflow_operator.py

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.aws_lambda_hook import AwsLambdaHook

class AwsLambdaExecutionError(Exception):
"""
Raised when there is an error executing the function.
"""

class AwsLambdaPayloadError(Exception):
"""
Raised when there is an error with the Payload object in the response.
"""

class AwsLambdaInvokeFunctionOperator(BaseOperator):
"""
Invoke AWS Lambda functions with a JSON payload.
The check_success_function signature should be a single param which will receive a dict.
The dict will be the "Response Structure" described in
"""

def succeeded(response):
payload = json.loads(response['Payload'].read())
# do something with payload

@apply_defaults
def __init__(
self,
function_name,
region_name,
payload,
check_success_function,
log_type="None",
qualifier="$LATEST",
aws_conn_id=None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.function_name = function_name
self.region_name = region_name
self.payload = payload
self.log_type = log_type
self.qualifier = qualifier
self.check_success_function = check_success_function
self.aws_conn_id = aws_conn_id
def get_hook(self):
"""
Initialises an AWS Lambda hook
:return: airflow.contrib.hooks.AwsLambdaHook
"""
return AwsLambdaHook(
self.function_name,
self.region_name,
self.log_type,
self.qualifier,
aws_conn_id=self.aws_conn_id,
)
def execute(self, context):
self.log.info("AWS Lambda: invoking %s", self.function_name)
response = self.get_hook().invoke_lambda(self.payload)
try:
self._validate_lambda_api_response(response)
self._validate_lambda_response_payload(response)
except (AwsLambdaExecutionError, AwsLambdaPayloadError) as e:
self.log.error(response)
raise e
self.log.info("AWS Lambda: %s succeeded!", self.function_name)

def _validate_lambda_response_payload(self, response):
"""
Call a user provided function to validate the Payload object for errors.
:param response: HTTP Response from AWS Lambda.
:type response: dict
:return: None
"""
if not self.check_success_function(response):
raise AwsLambdaPayloadError(
"AWS Lambda: error validating response payload!"
)

但我得到了这个错误:没有名为"operators"的模块

我尝试将my_dag.py中的导入语句更改为:

from airflow.operators.my_airflow_plugin import AwsLambdaInvokeFunctionOperator

我收到这个错误没有名为"airflow.operators.my_airflow_plugin"的模块

有人能建议一下这里不合适的地方吗?(气流版本为1.10.12(

init.py文件为空

你需要这个插件做什么?它只是为了在DAG中公开您的自定义运算符吗?这并不是Airflow架构中插件的真正目的,从版本2开始,它不再受支持。

您可以使用操作员创建一个模块,并使用该模块将其导入DAG,而不是使用插件。您可以按照本说明进行操作。

我的airflow版本是1.10.12,但仍然可以从插件文件夹访问自定义操作符,显示";没有模块发现错误";。最后,当我将自定义操作符文件移到dags文件夹时,它终于工作了。

最新更新