所以我试图将一些基本的.csv文件直接从S3存储桶插入到弹性搜索中,每次将.csv放入S3时,它都会触发我的lambda,将.csv中的数据馈送到弹性搜索,到目前为止,我得到的是:
import json
import os
import logging
import boto3
from datetime import datetime
import re
import csv
from aws_requests_auth.aws_auth import AWSRequestsAuth
from elasticsearch import RequestsHttpConnection, helpers, Elasticsearch
from core_libs.dynamodbtypescasters import DynamodbTypesCaster
from core_libs.streams import EsStream, EsClient
credentials = boto3.Session().get_credentials()
AWS_REGION = 'eu-west-3'
HOST = MY_PERSONAL_COMPANY_HOST
ES_SERVER = f"https://{HOST}"
AWS_ACCESS_KEY = credentials.access_key
AWS_SECRET_ACCESS_KEY = credentials.secret_key
AWS_SESSION_TOKEN = credentials.token
s3 = boto3.client('s3')
def lambda_handler(event, context):
awsauth = AWSRequestsAuth(
aws_access_key=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_token=AWS_SESSION_TOKEN,
aws_host=HOST,
aws_region=AWS_REGION,
aws_service='es',
)
BUCKET_NAME = record['s3']['bucket']['name']
SOURCE_PATH = record['s3']['object']['key']
SOURCE_FILE = SOURCE_PATH.split('/')[-1]
obj = s3.get_object(Bucket=BUCKET_NAME, Key=SOURCE_PATH)
body = obj['Body'].read()
lines = body.splitlines()
for line in lines:
print(line)
这就是我被卡住的地方。不知道我是否应该使用批量API,是否可以直接插入.csv的json版本,也不知道如何这样做
这并不是你的具体问题的答案。
然而,我想知道为什么你不使用带有Filebeat的SQS设置(推荐/开箱即用功能(,而使用带有CSV处理器的摄取管道?