适用于 Google App Engine 的 BigQuery cron job credentials for Go



更新:我已经缩小了问题的范围,所以我删除了不必要的代码和示例:

更新 2:在让 cron 作业以 12 小时间隔运行相当长的一段时间后(每个作业都以成功告终,但没有用 BQ 编写任何内容),我们震惊地发现,大约一周后,其中一个 cron 作业确实成功写入了 BigQuery,以及 Stackdriver 日志,指出"此请求导致为您的应用程序启动一个新进程(...)", 如下文所述。以下作业再次停止写入。现在我想知道这是否以某种方式连接到缓存的应用程序状态(具有一些到期期限)或凭据到期日期,这以某种方式阻止了在第一次之后进一步写入 BigQuery,但不会导致错误。

问题描述:

我正在尝试在 App Engine(标准)中设置一个 cron 作业,以从 BigQuery 查询数据并将其写回 BigQuery(数据集与部署的应用程序位于同一项目中),并且 cron 作业成功执行,但仅在部署后的第一次执行时写入 BigQuery,之后它们仍然成功执行但不写入。

我发现的主要区别在于 Stackdriver 日志,对于正确写入的执行,有额外的调试和信息,对于后续执行,没有这样的消息:

2018-04-19 04:44:03.933 CEST
Converted retries value: 3 -> Retry(total=3, connect=None, read=None, redirect=None, status=None) (/base/data/home/apps/e~<redacted>/lib/urllib3/util/retry.py:200)
2018-04-19 04:44:04.154 CEST
Making request: POST https://accounts.google.com/o/oauth2/token (/base/data/home/apps/e~<redacted>/lib/google/auth/transport/requests.py:117)
2018-04-19 04:44:04.160 CEST
Starting new HTTPS connection (1): accounts.google.com (/base/data/home/apps/e~<redacted>/lib/urllib3/connectionpool.py:824)
2018-04-19 04:44:04.329 CEST
https://accounts.google.com:443 "POST /o/oauth2/token HTTP/1.1" 200 None (/base/data/home/apps/e~<redacted>/lib/urllib3/connectionpool.py:396)
2018-04-19 04:44:04.339 CEST
Starting new HTTPS connection (1): www.googleapis.com (/base/data/home/apps/e~<redacted>/lib/urllib3/connectionpool.py:824)
2018-04-19 04:44:04.802 CEST
https://www.googleapis.com:443 "POST /bigquery/v2/projects/<redacted>/jobs HTTP/1.1" 200 None (/base/data/home/apps/e~<redacted>/lib/urllib3/connectionpool.py:396)
2018-04-19 04:44:04.813 CEST
This request caused a new process to be started for your application, and thus caused your application code to be loaded for the first time. This request may thus take longer and use more CPU than a typical request for your application.

我试过:

  • 为默认应用引擎服务帐户添加了 BigQuery 数据所有者和用户权限,但没有效果。

  • 有人提到标准应用程序引擎不完全支持 google.cloud 库,所以我尝试使用 OAuth2/httplib2/googleapiclient 凭据进行身份验证,但这是我第一次尝试,我不明白如何将这些部分放在一起,没有 google.cloud 库,我什至不知道如何为 BQ 编写正确的查询

  • 下面建议的其他凭据设置方法,但似乎连接到BQ不是问题,它们都连接和写入(一次),只是在已经部署的应用程序引擎中重复它。

以下是完整的实现:

app.yaml:

runtime: python27
api_version: 1
threadsafe: true
handlers:
- url: /bigquerycron
script: bigquerycron.app
login: admin
libraries:
- name: ssl
version: latest
env_variables:
GAE_USE_SOCKETS_HTTPLIB : 'true'

bigquerycron.py

from __future__ import absolute_import
from google.cloud import bigquery
import webapp2
class MainPage(webapp2.RequestHandler):
def get(self):
self.response.headers['Content-Type'] = 'text/plain'
self.response.write('CRON test page')          
def writeDataTest(dataset_id = '<redacted>',table_id='<redacted>'):
client = bigquery.Client.from_service_account_json("credentials.json")
job_config = bigquery.QueryJobConfig()
table_ref = client.dataset(dataset_id).table(table_id)
job_config.destination = table_ref
job_config.write_disposition = 'WRITE_APPEND'
query_job = client.query(
"""SELECT CURRENT_DATETIME() AS Datetime, 'CRON' as Source""", job_config=job_config)
writeDataTest()
app = webapp2.WSGIApplication([
('/bigquerycron', MainPage),
], debug=True)

cron.yaml:

cron:
- url: /bigquerycron
schedule: every 30 minutes

在这种特定情况下,凭据不是问题,问题只是由于对App Engine工作原理的误解而导致函数调用的位置。bigquery 的函数调用应该在 MainPage 类定义中移动,修复 bigquerycron.py 如下所示(仅移动一行代码):

from __future__ import absolute_import
from google.cloud import bigquery
import webapp2
class MainPage(webapp2.RequestHandler):
def get(self):
self.response.headers['Content-Type'] = 'text/plain'
self.response.write('CRON test page')          
writeDataTest()
def writeDataTest(dataset_id = '<redacted>',table_id='<redacted>'):
client = bigquery.Client.from_service_account_json("credentials.json")
job_config = bigquery.QueryJobConfig()
table_ref = client.dataset(dataset_id).table(table_id)
job_config.destination = table_ref
job_config.write_disposition = 'WRITE_APPEND'
query_job = client.query(
"""SELECT CURRENT_DATETIME() AS Datetime, 'CRON' as Source""", job_config=job_config)
app = webapp2.WSGIApplication([
('/bigquerycron', MainPage),
], debug=True)

OP 中的版本确实只写入一次 BigQuery,当 App Engine 应用程序首次加载时,所有后续调用都只执行 MainPage 类,在这种情况下,该类不执行任何操作,因为实际的 BigQuery 代码在它之外。

此外,在不使用 google-cloud-python 库的情况下重写应用程序将是有益的,GAE 标准 (https://github.com/GoogleCloudPlatform/google-cloud-python/issues/1893) 不支持该库。这尤其不幸,因为即使是 python (https://cloud.google.com/bigquery/docs/) 的官方 bigquery 文档也使用了这个库。但是,有多种解决方法可以继续使用它,包括链接的github问题和此处提到的一些解决方法: 在GAE中使用gcloud-python和类似的解决方法在本例中使用。

但如前所述,最好使用专用的 Python Google API 客户端库: https://developers.google.com/api-client-library/python/

我怀疑如果你删除app.yaml的"登录:管理员"部分,它会起作用。

如果这是问题所在,请确保您具有正确的 X-Appengine 标头设置

以下是一些关于任务队列和 cron 作业的文档。

虽然我不确定原因,但我认为授权 App Engine 的服务帐户不足以访问 BigQuery。

要授权您的应用访问 BigQuery,您可以执行以下任一方法:

  1. 在 app.yaml 文件中,配置一个环境变量,该变量指向具有正确授权配置的服务帐户密钥文件,以便对 BigQuery 进行授权:

    env_variables: GOOGLE_APPLICATION_CREDENTIALS=[YOURKEYFILE].json

  2. 您的代码从存储桶中执行授权服务帐户密钥的提取,然后在云存储客户端库的帮助下加载它。看到你的运行时是python,你应该使用的代码如下:

    ....

    从 google.cloud 导入存储

    ....

    def download_key():

    "">

    从存储桶下载密钥。"">

    storage_client = 存储。客户端()

    bucket = storage_client.get_bucket('YOURBUCKET')

    blob = bucket.blob('Keynameinthebucket.json')

    blob.download_to_filename('keynameinyourapp.json')

    ....

    #within 代码:

    download_key()

    客户端=大查询。Client.from_service_account_json('keynameinyourapp.json')

最新更新