Kinesis Firehose到ES使用lambda转换




For processing data sent to Firehose by Cloudwatch Logs subscription filters.
Cloudwatch Logs sends to Firehose records that look like this:
"messageType": "DATA_MESSAGE",
"owner": "123456789012",
"logGroup": "log_group_name",
"logStream": "log_stream_name",
"subscriptionFilters": [
"logEvents": [
"id": "01234567890123456789012345678901234567890123456789012345",
"timestamp": 1510109208016,
"message": "log message 1"
"id": "01234567890123456789012345678901234567890123456789012345",
"timestamp": 1510109208017,
"message": "log message 2"
The data is additionally compressed with GZIP.
The code below will:
1) Gunzip the data
2) Parse the json
3) Set the result to ProcessingFailed for any record whose messageType is not DATA_MESSAGE, thus redirecting them to the
processing error output. Such records do not contain any log events. You can modify the code to set the result to
Dropped instead to get rid of these records completely.
4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass
each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom
transformations on the log events.
5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that
this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent
6) Any additional records which exceed 6MB will be re-ingested back into Firehose.
const zlib = require('zlib');
const AWS = require('aws-sdk');
* logEvent has this format:
* {
*   "id": "01234567890123456789012345678901234567890123456789012345",
*   "timestamp": 1510109208016,
*   "message": "log message 1"
* }
* The default implementation below just extracts the message and appends a newline to it.
* The result must be returned in a Promise.
function transformLogEvent(logEvent: any) {
return Promise.resolve(`${logEvent.message}n`);
function putRecordsToFirehoseStream(streamName: any, records: any, client: any, resolve: any, reject: any, attemptsMade: any, maxAttempts: any) {
DeliveryStreamName: streamName,
Records: records,
}, (err: any, data: any) => {
const codes = [];
let failed = [];
let errMsg = err;
if (err) {
failed = records;
} else {
for (let i = 0; i < data.RequestResponses.length; i++) {
const code = data.RequestResponses[i].ErrorCode;
if (code) {
errMsg = `Individual error codes: ${codes}`;
if (failed.length > 0) {
if (attemptsMade + 1 < maxAttempts) {
console.log('Some records failed while calling PutRecordBatch, retrying. %s', errMsg);
putRecordsToFirehoseStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts);
} else {
reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`);
} else {
function putRecordsToKinesisStream(streamName: any, records: any, client: any, resolve: any, reject: any, attemptsMade: any, maxAttempts: any) {
StreamName: streamName,
Records: records,
}, (err: any, data: any) => {
const codes = [];
let failed = [];
let errMsg = err;
if (err) {
failed = records;
} else {
for (let i = 0; i < data.Records.length; i++) {
const code = data.Records[i].ErrorCode;
if (code) {
errMsg = `Individual error codes: ${codes}`;
if (failed.length > 0) {
if (attemptsMade + 1 < maxAttempts) {
console.log('Some records failed while calling PutRecords, retrying. %s', errMsg);
putRecordsToKinesisStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts);
} else {
reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`);
} else {
function createReingestionRecord(isSas: any, originalRecord: any) {
if (isSas) {
return {
Data: Buffer.from(, 'base64'),
PartitionKey: originalRecord.kinesisRecordMetadata.partitionKey,
} else {
return {
Data: Buffer.from(, 'base64'),

function getReingestionRecord(isSas: any, reIngestionRecord: any) {
if (isSas) {
return {
Data: reIngestionRecord.Data,
PartitionKey: reIngestionRecord.PartitionKey,
} else {
return {
Data: reIngestionRecord.Data,
exports.handler = (event: any, context: any, callback: any) => {
Promise.all( (r: any) {
const buffer = Buffer.from(, 'base64');
let decompressed;
try {
decompressed = zlib.unzipSync(buffer);
} catch (e) {
return Promise.resolve({
recordId: r.recordId,
result: 'ProcessingFailed',
const data = JSON.parse(decompressed);
// CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
// They do not contain actual data.
if (data.messageType === 'CONTROL_MESSAGE') {
return Promise.resolve({
recordId: r.recordId,
result: 'Dropped',
} else if (data.messageType === 'DATA_MESSAGE') {
const promises =;
return Promise.all(promises)
.then(transformed => {
const payload: any = transformed.reduce(function (a: any, v: any) {
return a + v;
const encoded = Buffer.from(payload).toString();
return {
recordId: r.recordId,
result: 'Ok',
data: encoded,
} else {
return Promise.resolve({
recordId: r.recordId,
result: 'ProcessingFailed',
})).then(recs => {
const isSas =, 'sourceKinesisStreamArn');
const streamARN = isSas ? event.sourceKinesisStreamArn : event.deliveryStreamArn;
const region = streamARN.split(':')[3];
const streamName = streamARN.split('/')[1];
const result: any = { records: recs };
let recordsToReingest = [];
const putRecordBatches: any = [];
let totalRecordsToBeReingested = 0;
const inputDataByRecId: any = {};
event.records.forEach(function (r: any) { inputDataByRecId[r.recordId] = createReingestionRecord(isSas, r) });
let projectedSize = recs.filter(function (rec: any) { return rec.result === 'Ok' })
.map(function (r: any) { return r.recordId.length + })
.reduce((a, b) => a + b, 0);
// 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
for (let idx = 0; idx < event.records.length && projectedSize > 6000000; idx++) {
const rec: any = result.records[idx];
if (rec.result === 'Ok') {
recordsToReingest.push(getReingestionRecord(isSas, inputDataByRecId[rec.recordId]));
projectedSize -=;
result.records[idx].result = 'Dropped';
// split out the record batches into multiple groups, 500 records at max per group
if (recordsToReingest.length === 500) {
recordsToReingest = [];
if (recordsToReingest.length > 0) {
// add the last batch
if (putRecordBatches.length > 0) {
new Promise((resolve, reject) => {
let recordsReingestedSoFar = 0;
for (let idx = 0; idx < putRecordBatches.length; idx++) {
const recordBatch = putRecordBatches[idx];
if (isSas) {
const client = new AWS.Kinesis({ region: region });
putRecordsToKinesisStream(streamName, recordBatch, client, resolve, reject, 0, 20);
} else {
const client = new AWS.Firehose({ region: region });
putRecordsToFirehoseStream(streamName, recordBatch, client, resolve, reject, 0, 20);
recordsReingestedSoFar += recordBatch.length;
console.log('Reingested %s/%s records out of %s in to %s stream', recordsReingestedSoFar, totalRecordsToBeReingested, event.records.length, streamName);
() => {
console.log('Reingested all %s records out of %s in to %s stream', totalRecordsToBeReingested, event.records.length, streamName);
callback(null, result);
failed => {
console.log('Failed to reingest records. %s', failed);
callback(failed, null);
} else {
console.log('No records needed to be reingested.');
callback(null, result);
}).catch(ex => {
console.log('Error: ', ex);
callback(ex, null);


Check your function and make sure the output is in required format. In addition to that, make sure the processed records contain valid result status of Dropped, Ok, or ProcessingFailed



const firehoseDeliveryStream = new CfnDeliveryStream(this, "FirehoseDeliveryStream", {
deliveryStreamType: "DirectPut",
elasticsearchDestinationConfiguration: {
domainArn: elasticsearchDomain.domainArn,
roleArn: firehoseDeliveryRole.roleArn,
indexName: "test",
s3Configuration: {
bucketArn: this.logsBucket.bucketArn,
roleArn: firehoseDeliveryRole.roleArn,
cloudWatchLoggingOptions: {
enabled: true,
logGroupName: firehoseloggroup.logGroupName,
logStreamName: logstream.logStreamName
s3BackupMode: "AllDocuments",
cloudWatchLoggingOptions: {
enabled: true,
logGroupName: firehoseloggroup.logGroupName,
logStreamName: logstream.logStreamName
processingConfiguration: {
enabled: true,
processors: [{
type: "Lambda",
parameters: [{
parameterName: "LambdaArn",
parameterValue: handler.functionArn,

我有一个CloudWatchlog-group-1,kinesis firehose,lambda,S3。

log-group-1将日志发送到kinesis消防软管(使用订阅过滤器(。Kinesis消防软管触发lambda来处理日志。Lambda将日志返回到kinesis firehose,kinesis firehose将转换后的日志保存到S3。


"invocationId": "000ac99...",
"deliveryStreamArn": "arn:aws:firehose:eu-central-1:123456789123:deliverystream/delivery-09",
"region": "eu-central-1",
"records": [
"recordId": "496199814216613477...",
"approximateArrivalTimestamp": 1625854080200,
"data": "H4sIAAAAAAAAADWOwQrCM......"
"recordId": "4961998142166134...",
"approximateArrivalTimestamp": 1625854100311,
"data": "H4sIAAAAAAAAADVPy07DMB......"


"records": [
"recordId": "you better take it from the input",
"result": "can be Ok, Dropped, ProcessingFailed",
"data": "must be an encoded base-64 string"


const node_gzip_1 = require("node-gzip");
async function handler(event) {
console.log('event: ' + JSON.stringify(event, undefined, 3));
let result = [];
// Iterate through records list
const records = event.records;
for (let ii = 0; ii < records.length; ii++) {
const record = records[ii];
const recordId = record.recordId;
// Transform record data to a human readable string
const data =;
const decodedData = Buffer.from(data, 'base64');
const ungziped = await node_gzip_1.ungzip(decodedData);
console.log('ungziped: ' + ungziped);
// Parse record data to JSON
const dataJson = JSON.parse(ungziped.toString());
// Get a list of log events and iterate through each element
const logEventsList = dataJson.logEvents;
logEventsList.forEach((logEventValue) => {
// Get the message which was saved in CloudWatch
const messageString = logEventValue.message;
// Create the transformed result
const transformedResultJson = {
someRandomNumber: Math.random(), // Some random variable I decided to put in the result
message: messageString + '-my-custom-change' // Edit the message
// Final data must be encoded to base 64
const messageBase64 = Buffer.from(JSON.stringify(transformedResultJson) + 'n').toString('base64'); // Adding a new line to transformed result is optional. It just make reading the S3 easier
console.log('messageBase64: ' + messageBase64);
// Save transformed result
recordId: recordId,
result: 'Ok',
data: messageBase64
// Replace initial records list with the transformed list
event.records = result;
console.log('new event: ' + JSON.stringify(event, undefined, 2));
// Returned value will go back to kinesis firehose, then S3
return event;
exports.handler = handler;


"invocationId": "000ac99...",
"deliveryStreamArn": "arn:aws:firehose:eu-central-1:123456789123:deliverystream/delivery-09",
"region": "eu-central-1",
"records": [
"recordId": "496199814216613477...",
"result": "Ok",
"data": "eyJzb21lUmF..."
"recordId": "4961998142166134...",
"result": "Ok",
"data": "eyJzb21lUmFuZG9..."

您还可以使用lambda蓝图kinesis firehose syslog到json


  • Kinesis Firehose将JSON对象放在S3中而不使用分隔符逗号
