如何使用CDK为Firehose启用AWS Lambda转换源记录



我正在尝试使用CDK将资源转换(使用Lambda(到Kinesis Firehose。我已经知道如何使用控制台来实现这一点,但我不知道如何使用AWS CDK来实现。这是我迄今为止使用Typescript 的代码

// KINESIS STREAM
const kinesisStream = new kinesis.CfnDeliveryStream(this, `${props.name}-Kinesis`, {
deliveryStreamName: `${props.name}-Stream`,
deliveryStreamType: 'DirectPut',
s3DestinationConfiguration: {
bucketArn: props.eventsBucketArn,
bufferingHints: {
intervalInSeconds: 300,
sizeInMBs: 5,
},
compressionFormat: 'UNCOMPRESSED',
prefix: 'year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/',
errorOutputPrefix: 'Errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}',
roleArn: kinesisRole.roleArn
}
});

提前感谢您的帮助!

检查ProcessingConfigurationProperty。Java代码:

List<Object> transformParams = new ArrayList<>();
transformParams.add(ProcessorParameterProperty.builder().
parameterName("LambdaArn").
parameterValue(transform.getFunctionArn()).
build());
transformParams.add(ProcessorParameterProperty.builder().
parameterName("RoleArn").
parameterValue(transform.getRole().getRoleArn()).
build());
extendedS3DestinationConfiguration(ExtendedS3DestinationConfigurationProperty.builder().
cloudWatchLoggingOptions(CloudWatchLoggingOptionsProperty.builder().
enabled(true).
logGroupName(logGroup.getLogGroupName()).
logStreamName(logStream.getLogStreamName()).
build()).
bucketArn(bucket.getBucketArn()).
bufferingHints(BufferingHintsProperty.builder().
intervalInSeconds(180).
sizeInMBs(1).
build()).
compressionFormat("UNCOMPRESSED").
roleArn(role.getRoleArn()).
processingConfiguration(ProcessingConfigurationProperty.builder().
enabled(Boolean.TRUE).
processors(processors).
build()).
build()).

最新更新