使用typescript向AWS Elasticsearch服务发出签名请求



我需要的是使用类型脚本lambda将流式数据从DynamoDB加载到Amazon Elasticsearch服务中。

流数据按预期到达,但我还没能弄清楚如何用签名验证HTTP请求。AWS似乎有一些用于签名和发布HTTP请求的专用API,这些请求在JS代码中被引用(用于从S3向ES加载数据(:

/*
* Add the given document to the ES domain.
* If all records are successfully added, indicate success to lambda
* (using the "context" parameter).
*/
function postDocumentToES(doc, context) {
var req = new AWS.HttpRequest(endpoint);
req.method = 'POST';
req.path = path.join('/', esDomain.index, esDomain.doctype);
req.region = esDomain.region;
req.body = doc;
req.headers['presigned-expires'] = false;
req.headers['Host'] = endpoint.host;
// Sign the request (Sigv4)
var signer = new AWS.Signers.V4(req, 'es');
signer.addAuthorization(creds, new Date());
// Post document to ES
var send = new AWS.NodeHttpClient();
send.handleRequest(req, null, function(httpResp) {
var body = '';
httpResp.on('data', function (chunk) {
body += chunk;
});
httpResp.on('end', function (chunk) {
numDocsAdded ++;
if (numDocsAdded === totLogLines) {
// Mark lambda success.  If not done so, it will be retried.
console.log('All ' + numDocsAdded + ' log records added to ES.');
context.succeed();
}
});
}, function(err) {
console.log('Error: ' + err);
console.log(numDocsAdded + 'of ' + totLogLines + ' log records added to ES.');
context.fail();
});
}

来源:https://github.com/aws-samples/amazon-elasticsearch-lambda-samples/blob/master/src/s3_lambda_es.js

但是当在TS中编写时,我似乎无法导入Signers.V4NodeHttpClient库,我想是因为它们是";专用API";。

另一个令人困惑的因素是,在本文档中,我们被告知aws-sdk会自动验证传出请求,我们不需要自己进行验证:https://docs.aws.amazon.com/general/latest/gr/signing_aws_api_requests.html

请有人深入了解如何使用TypeScript验证/签名,并将带有AWS Lambda凭据的http请求发送到AWS服务,如Elasticsearch?

以下是我的Elasticsearch客户端与Amazon Elasticsearch AWS4签名请求兼容的实现:

import { Client } from "@elastic/elasticsearch";
import * as AWS4 from "aws4";
import { Connection } from "@elastic/elasticsearch";
import { ConnectionOptions } from "@elastic/elasticsearch/lib/Connection";
import * as http from "http";
import { Readable } from "stream";
interface RequestOptions extends http.ClientRequestArgs {
asStream?: boolean;
body?: string | Buffer | Readable | null;
querystring?: string;
}
class AwsEsConnection extends Connection {
constructor(opts?: ConnectionOptions) {
super(opts);
}
getBodyString(body?: string | Buffer | Readable | null): string | null {
if (!body) {
return body as null;
}
if (typeof body === "string" || body instanceof String) {
return body as string;
}
if (body instanceof Buffer) {
return body.toString();
}
if (body instanceof Readable) {
throw new Error("Haven't implemented stream handling!!");
}
return body;
}
public request(
params: RequestOptions,
callback: (
err: Error | null,
response: http.IncomingMessage | null
) => void
): http.ClientRequest {
const body = this.getBodyString(params.body);
const opts = {
method: params.method,
host: params.host,
path: `${params.path}?${params.querystring}`,
service: "es",
region: "eu-west-1",
body: body,
headers: params.headers
};
AWS4.sign(opts);
params.headers = opts.headers;
params.body = opts.body;
return super.request(params, callback);
}
}
export class ElasticClientFactory {
constructor(
private esClusterBaseUrl: string,
) {}
create(): Client {
return new Client({
node: this.esClusterBaseUrl,
Connection: AwsEsConnection
});
}
}

最新更新