我在Google Cloud Run上部署了一个gRPC服务,我想从Composer调用。
我已经将roles/iam.serviceAccountTokenCreator
角色分配给我的编写器工作节点运行的服务帐户,并且我没有挂载任何自定义服务密钥文件或设置GOOGLE_APPLICATION_CREDENTIALS
环境变量。
使用气流gRPC钩子中的JWT_GOOGLE
认证选项,我得到以下错误:
[2022-05-31 14:20:16,082] {grpc.py:90} INFO - Calling gRPC service
[2022-05-31 14:20:16,097] {taskinstance.py:1152} ERROR - 'Credentials' object has no attribute 'signer_email'
Traceback (most recent call last):
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/airflow/airflow/providers/grpc/operators/grpc.py", line 95, in execute
for response in responses:
File "/usr/local/lib/airflow/airflow/providers/grpc/hooks/grpc.py", line 136, in run
with self.get_conn() as channel:
File "/usr/local/lib/airflow/airflow/providers/grpc/hooks/grpc.py", line 104, in get_conn
jwt_creds = google_auth_jwt.OnDemandCredentials.from_signing_credentials(credentials)
File "/opt/python3.6/lib/python3.6/site-packages/google/auth/jwt.py", line 695, in from_signing_credentials
kwargs.setdefault("issuer", credentials.signer_email)
AttributeError: 'Credentials' object has no attribute 'signer_email'
[2022-05-31 14:20:16,100] {taskinstance.py:1196} INFO - Marking task as FAILED. dag_id=example_dag, task_id=example_task, execution_date=20220531T135709, start_date=20220531T142015, end_date=20220531T142016
[2022-05-31 14:20:23,826] {local_task_job.py:102} INFO - Task exited with return code 1
有没有人知道我的证书如何/为什么不包括我需要的领域?
在与Google Cloud讨论后找到了解决方案-基本上,看起来JWT_GOOGLE
认证方法没有为GCE服务帐户设置,所以我走了CUSTOM
认证路线:
import google.auth.transport.grpc
import google.auth.transport.requests
import google.oauth2.credentials
import google.oauth2.id_token
from airflow.providers.grpc.operators.grpc import GrpcOperator
def connection_func(conn):
"""Custom connection function for gRPC authentication.
Args:
conn: Airflow Connection object
Returns:
An instantiated gRPC channel for making calls to our remote service.
"""
request = google.auth.transport.requests.Request()
if not str(conn.host).startswith("https://"):
audience = f"https://{conn.host}"
else:
audience = conn.host
token = google.oauth2.id_token.fetch_id_token(request, audience)
creds = google.oauth2.credentials.Credentials(token)
base_url = conn.host
if conn.port:
base_url = f"{base_url}:{conn.port}"
channel = google.auth.transport.grpc.secure_authorized_channel(
creds, None, base_url
)
return channel
return GrpcOperator(
...
custom_connection_func=connection_func,
)
它使用这里看到的方法为给定的受众获取ID令牌,然后从那里创建一组凭证,最后实例化gRPC安全通道以供操作符使用。