每秒调用最大请求数的管道设计

  • 本文关键字:管道 请求 调用 apache-beam
  • 更新时间 :
  • 英文 :


我的目标是创建一个管道,以每秒最多次数调用后端(云托管)服务…我怎样才能做到呢?

背景故事:想象一个使用单个输入调用并返回单个输出的后端服务。此服务具有与之关联的配额,允许每秒最大请求数(假设每秒10个请求)。现在想象一个无界源PCollection,我希望通过后端服务传递输入中的元素来转换它们。我可以设想ParDo为输入PCollection中的每个元素调用一次后端服务。但是,这不会对后端执行任何类型的流控制。

我可以想象我的DoFn逻辑测试来自后端响应的响应,并重试直到成功,但这感觉不对。如果我有100个工作人员,那么我似乎会消耗大量资源,并在后端增加负载。我想做的是限制从管道到后端的调用。

你好,科尔班。除了Bruno Volpato的RampupThrottlingFn示例外,我还看到了以下示例的组合。请不要犹豫,让我知道我如何才能更清楚地更新这个例子。

  1. PeriodicImpulse -以固定的指定间隔发出一个瞬间。
  2. 如果使用数据流运行器,则固定maxNumWorkersnumWorkers的工人数量(请参阅数据流管道选项)。
  3. Beam Metrics API,用于监控一段时间内的实际资源请求计数并设置警报。当使用Dataflow时,Beam Metrics API自动连接到云监控作为自定义指标

下面显示了从整个管道开始的简短代码,后面跟着一些必要的细节,以提供清晰度。它假设目标是10工人,使用参数为--maxNumWorkers=10--numWorkers=10的数据流,目标是将所有工人之间的资源请求限制为10 requests per second。这转换为1 request per second per worker

PeriodicImpulse限制每秒创建1个请求

public class MyPipeline {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create(/* Usually with options */);
PCollection<Response> responses = pipeline.apply(
"PeriodicImpulse",
PeriodicImpulse
.create()
.withInterval(Duration.standardSeconds(1L))
).apply(
"Build Requests",
ParDo.of(new RequestFn())
)
.apply(ResourceTransform.create());
}
}

RequestFn DoFn从PeriodicImpulse发出的每瞬间发出请求

class RequestFn extends DoFn<Instant, Request> {
@ProcessElement
public void process(@Element Instant instant, OutputReceiver<Request> receiver) {
receiver.output(
Request.builder().build()
);
}
}

ResourceTransform将请求转换为响应,递增一个计数器

class ResourceTransform extends PTransform<PCollection<Request>, PCollection<Response>> {
static ResourceTransform create() {
return new ResourceTransform();
}
public PCollection<Response> expand(PCollection<Request> input) {
return ParDo.of("Consume Resource", new ResourceFn());
}
}
class ResourceFn extends DoFn<Request, Response> {
private Counter counter = Metrics.counter(ResourceFn.class, "some:resource");
private transient ResourceClient client = null;
@Setup
public void setup() {
client = new ResourceClient();
}
@ProcessElement
public void process(@Element Request request, OutputReceiver<> receiver) 
{
counter.inc(); // Increment the counter.
// not showing error handling
Response response = client.execute(request);
receiver.output(response);
}
}

请求和响应类

(旁白:考虑为请求输入和响应输出类创建Schema。下面的例子使用了AutoValue和AutoValueSchema)

@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class Request {
/* abstract Getters. */
abstract String getId();
@AutoValue.Builder
static abstract class Builder {
/* abstract Setters. */
abstract Builder setId(String value);
abstract Request build();
}
}
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class Response {
/* abstract Getters. */
abstract String getId();
@AutoValue.Builder
static abstract class Builder {
/* abstract Setters. */
abstract Builder setId(String value);
abstract Response build();
}
}

最新更新