Google Data Fusion:"Looping"输入数据,然后为每个输入行执行多个 Restful API 调用



我有以下挑战,我想最好在谷歌数据融合中解决:
我有一个web服务,它返回大约30-50个元素,在JSON负载中描述发票,如下所示:

{
"invoice-services": [
{
"serviceId": "[some-20-digit-string]",
// some other stuff omitted
},
[...]
]
}

对于每次出现的serviceId,我需要重复调用另一个Web服务https://example.com/api/v2/services/{serviceId}/items,其中每个serviceId来自第一次调用。我只对第二次调用中要持久化到BigQuery中的数据感兴趣。第二个服务调用不支持通配符或任何其他聚合项的机制,即,如果第一个调用中有30个serviceId,则需要调用第二个Web服务30次。

我已经完成了第一次调用,第二次调用使用了硬编码的serviceId,以及对BigQuery的持久性。这些调用只需使用Data Fusion HTTP适配器。

然而,我如何使用第一个服务的输出,以便为第一个调用返回的每一行发出一个第二个服务的Web服务调用,从而有效地在所有serviceId上循环?

我完全理解这在Python代码中非常容易,但为了可维护性和适合我们的环境,我更愿意在Data Fusion中解决这个问题,或者需要谷歌提供的任何其他即服务产品。

非常感谢您的帮助!J

附言:这不是一个大数据问题——我看到了大约50个serviceId,可能还有300个项目。

这有点棘手,因为HTTP插件似乎没有默认的功能来迭代来自另一个响应的ID。

经过一点实验,我们在一个HTTP插件中找到了一种方法来实现这一点;"变通方法":

  1. 将您的列表URL/API端点写入HTTP插件的URL字段。这将用于第一个请求
  2. 在寻呼部分中;自定义"选项在那里,您必须提供一个包含分页逻辑的Python脚本。脚本函数在HTTP插件收到每个响应后调用一次。请注意,这是Python 2代码
    该脚本用于解析项目列表/ID,并对所有ID进行迭代,以从不同的API端点获取详细信息
import json
import os
# this is a predefined function that is called after every successful HTTP response
# url: string of the last called URL
# page: string of the response content (our JSON)
# headers: response headers
# This function needs to return a string of the next URL to call or None to stop
def get_next_page_url(url, page, headers):
# parse the content as JSON object
page_json = json.loads(page)
# we need some way to differentiate between the list API endpoint and details API endpoint
# this is the branch for the list API endpoint to collect all item IDs
if url == "<your-list-api-endpoint>":
# iterating over all item IDs in the list
item_ids = []
for item in page_json["invoice-services"]:
item_ids.append(item["serviceId"])      

# we need to write the collected item IDs to the local file system to have them present in the next function call
with open("/tmp/id_list", "w") as file:
for item_id in item_ids:
file.write("%sn" % str(item_id))
# return the URL of the first item details to continue
return "https://example.com/api/v2/services/" + str(item_ids[0]) + "/items"
# this branch is used with the details URL to iterate over all IDs
else:
# get all your item IDs by reading them from file system again
with open("/tmp/id_list", "r") as file:
item_ids = [item_id.strip() for item_id in file.readlines()]
# get the last used id from the url
current_id = url.split("/")[-2]
# find the index of the current item ID in the list
pos_index = item_ids.index(current_id)
# if last item in ID list is reached, stop
if len(item_ids) == pos_index + 1:
return None
# continue with next item id
else:
return "https://example.com/api/v2/services/" + str(item_ids[pos_index+1]) + "/items"
  1. 此过程导致HTTP插件输出两种不同类型的内容作为记录:列表JSON和详细信息JSON。为了解决这个问题,您可以将HTTP插件的输出格式设置为";文本">
    之后,您可以使用JSON解析器插件来解析您的项目详细信息。当这个插件接收到列表JSON内容时,它会将每个条目放在";空";(因为它找不到为详细信息定义的字段(
    然后您可以使用Wrangler插件过滤掉所有emtpy(null(记录,以清除列表内容

最新更新