我有一个数据管道,其中AWS API网关触发lambda,它将事件插入Kinesis流并将它们写入S3供Athena查询。
我使用"无服务器框架"定义了我的堆栈,如下所示:
service: analytic-event-collection
provider:
name: aws
stage: ${opt:stage, 'staging'}
region: ${opt:region, 'us-east-1'}
runtime: nodejs12.x
httpApi:
cors: true
iam:
role:
statements:
- Effect: 'Allow'
Action:
- 'kinesis:PutRecord'
Resource:
- '*'
custom:
kinesisSteamName: 'event-collection-stream-${self:provider.stage}'
s3AnalyticsStore: 'page-view-store-${self:provider.stage}'
s3AthenaStore: 's3-analytic-data-${self:provider.stage}'
glueName: 'analytics-store-${self:provider.stage}'
functions:
collect:
handler: handler.collect
events:
- httpApi:
path: /collect
method: post
environment:
KINESIS_STREAM_NAME: ${self:custom.kinesisSteamName}
resources:
Resources:
S3AnalyticsStore:
Type: AWS::S3::Bucket
Properties:
BucketName: ${self:custom.s3AnalyticsStore}
KinesisStreamData:
Type: AWS::Kinesis::Stream
Properties:
Name: ${self:custom.kinesisSteamName}
RetentionPeriodHours: 24
StreamModeDetails:
StreamMode: ON_DEMAND
FirehoseDeliveryIAMPolicy:
Type: 'AWS::IAM::Policy'
Properties:
PolicyName: "EventCollectionPolicy"
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 's3:AbortMultipartUpload'
- 's3:GetBucketLocation'
- 's3:GetObject'
- 's3:ListBucket'
- 's3:ListBucketMultipartUploads'
- 's3:PutObject'
Resource:
- !Join
- ''
- - 'arn:aws:s3:::'
- !Ref S3AnalyticsStore
- !Join
- ''
- - 'arn:aws:s3:::'
- !Ref S3AnalyticsStore
- '/*'
- Effect: Allow
Action:
- 'kinesis:DescribeStream'
- 'kinesis:GetShardIterator'
- 'kinesis:GetRecords'
Resource: !GetAtt
- KinesisStreamData
- Arn
Roles:
- !Ref FirehoseDeliveryIAMRole
DependsOn:
- KinesisStreamData
- S3AnalyticsStore
FirehoseDeliveryIAMRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: ''
Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: 'sts:AssumeRole'
Condition:
StringEquals:
'sts:ExternalId': !Ref 'AWS::AccountId'
KinesisFirehoseDeliveryStream:
Type: 'AWS::KinesisFirehose::DeliveryStream'
Properties:
DeliveryStreamName: !Join
- ''
- - '${self:custom.kinesisSteamName}-'
- 'deliver'
DeliveryStreamType: KinesisStreamAsSource
KinesisStreamSourceConfiguration:
KinesisStreamARN: !GetAtt
- KinesisStreamData
- Arn
RoleARN: !GetAtt
- FirehoseDeliveryIAMRole
- Arn
S3DestinationConfiguration:
BucketARN: !GetAtt
- S3AnalyticsStore
- Arn
BufferingHints:
IntervalInSeconds: 300
SizeInMBs: 5
CloudWatchLoggingOptions:
Enabled: 'false'
CompressionFormat: GZIP
EncryptionConfiguration:
NoEncryptionConfig: NoEncryption
RoleARN: !GetAtt
- FirehoseDeliveryIAMRole
- Arn
DependsOn:
- FirehoseDeliveryIAMPolicy
- FirehoseDeliveryIAMRole
S3AthenaStore:
Type: AWS::S3::Bucket
Properties:
BucketName: ${self:custom.s3AthenaStore}
AnalysisGlueDatabase:
Type: AWS::Glue::Database
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseInput:
Name: !Join
- ''
- - '${self:custom.glueName}-'
- 'db'
Description: "Analysis aws Glue database"
DependsOn:
- S3AthenaStore
AnalyticsGlueRole:
Type: AWS::IAM::Role
DependsOn:
- S3AnalyticsStore
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "glue.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
ManagedPolicyArns:
['arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole']
Policies:
-
PolicyName: "S3BucketAccessPolicy"
PolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Action:
- "s3:GetObject"
- "s3:PutObject"
Resource:
!Join
- ''
- - !GetAtt S3AnalyticsStore.Arn
- "*"
AnalyticsGlueCrawler:
Type: AWS::Glue::Crawler
Properties:
Name: "AnalysisCrawler"
Role: !GetAtt AnalyticsGlueRole.Arn
DatabaseName: !Ref AnalysisGlueDatabase
Targets:
S3Targets:
- Path: !Ref S3AnalyticsStore
SchemaChangePolicy:
UpdateBehavior: "LOG"
DeleteBehavior: "LOG"
Schedule:
ScheduleExpression: "cron(00 0/1 * * ? *)"
RecrawlPolicy:
RecrawlBehavior: CRAWL_NEW_FOLDERS_ONLY
DependsOn:
- AnalyticsGlueRole
- AnalysisGlueDatabase
AnalyticsAthenaWorkGroup:
Type: AWS::Athena::WorkGroup
Properties:
Name: ${self:service}-${self:provider.stage}-wg
WorkGroupConfiguration:
ResultConfiguration:
OutputLocation:
!Join
- ''
- - 's3://'
- !Ref S3AthenaStore
DependsOn:
- S3AthenaStore
Outputs:
AthenaDataStore:
Description: "Athena Data Store"
Value: !Ref S3AthenaStore
Export:
Name:
Fn::Sub: "${AWS::StackName}-athena"
AthenaDataStorePath:
Description: "Athena Data store path"
Value: !Join
- ''
- - 's3://'
- !Ref S3AthenaStore
AnalyticData:
Description: "event data"
Value: !Ref S3AnalyticsStore
Export:
Name:
Fn::Sub: "${AWS::StackName}-data"
AnalyticDataDB:
Description: "Glue Database"
Value: !Ref AnalysisGlueDatabase
Export:
Name:
Fn::Sub: "${AWS::StackName}-db"
AthenaWorkGroup:
Description: "Athena work group"
Value: !Ref AnalyticsAthenaWorkGroup
Export:
Name:
Fn::Sub: "${AWS::StackName}-athena-workgroup"
管道工作到一个点。我有一个名为page-view-store-staging
的桶,其中记录了发出的事件。
然而,Athena没有看到任何记录,或者我正在错误地查询数据。当我打开雅典娜控制台时,它看到我的数据库:analytics-store-staging-db
和一个表-page_view_store_staging
。
我的S3桶有记录。它们的形式是桶/年/月等,所以路径是:page-view-store-staging/2022/07/11/20/
和JSON压缩文本文件,例如:
{"anonymous_id":"123","url":"-","event_type":"pageView","timestamp":"2022-07-11T20:59:51.144Z","source_ip":"69.113.177.222","user_agent":"curl/7.54.0"}
当我尝试执行查询时,响应是:
SELECT * not allowed from relation that has no columns
This query ran against the "analytics-store-staging-db" database, unless qualified by the query.
我看了看glue控制台,它看到了数据库和表,表有objectCount: 0 UPDATED_BY_CRAWLER
。我还尝试手动调用爬虫。它启动并运行。然而,没有错误,也没有记录。
我做错了什么?我如何检查glue
是否工作和/或调试它?
看来我的桶访问策略语法不正确。正确的是:
Policies:
-
PolicyName: "S3BucketAccessPolicy"
PolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Action:
- "s3:GetObject"
- "s3:PutObject"
Resource:
- !Join
- ''
- - !GetAtt S3AnalyticsStore.Arn
- "*"