我可以在戈兰将数据从一个作家流式传输到一个读者吗



我想处理一些文件,这些文件的内容不适合我的工作人员的内存。到目前为止,我发现的解决方案包括在上传到S3之前,将处理结果保存到/tmp目录。

import (
"bufio"
"bytes"
"context"
"fmt"
"log"
"os"
"runtime"
"strings"
"sync"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/korovkin/limiter"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/writer"
)
func DownloadWarc(
ctx context.Context,
s3Client *s3.Client,
warcs []*types.Warc,
path string,
) error {
key := fmt.Sprintf("parsed_warc/%s.parquet", path)
filename := fmt.Sprintf("/tmp/%s", path)
file, err := os.Create(filename)
if err != nil {
return fmt.Errorf("error creating file: %s", err)
}
defer file.Close()
bytesWriter := bufio.NewWriter(file)
pw, err := writer.NewParquetWriterFromWriter(bytesWriter, new(Page), 4)
if err != nil {
return fmt.Errorf("Can't create parquet writer: %s", err)
}
pw.RowGroupSize = 128 * 1024 * 1024 //128M
pw.CompressionType = parquet.CompressionCodec_SNAPPY
mutex := sync.Mutex{}
numWorkers := runtime.NumCPU() * 2
fmt.Printf("Using %d workersn", numWorkers)
limit := limiter.NewConcurrencyLimiter(numWorkers)
for i, warc := range warcs {
limit.Execute(func() {
log.Printf("%d: %+v", i, warc)
body, err := GetWarc(ctx, s3Client, warc)
if err != nil {
fmt.Printf("error getting warc: %s", err)
return
}
page, err := Parse(body)
if err != nil {
key := fmt.Sprintf("unparsed_warc/%s.warc", path)
s3Client.PutObject(
ctx,
&s3.PutObjectInput{
Body:   bytes.NewReader(body),
Bucket: &s3Record.Bucket.Name,
Key:    &key,
},
)
fmt.Printf("error getting page %s: %s", key, err)
return
}
mutex.Lock()
err = pw.Write(page)
pw.Flush(true)
mutex.Unlock()
if err != nil {
fmt.Printf("error writing page: %s", err)
return
}
})
}
limit.WaitAndClose()
err = pw.WriteStop()
if err != nil {
return fmt.Errorf("error writing stop: %s", err)
}
bytesWriter.Flush()
file.Seek(0, 0)
_, err = s3Client.PutObject(
ctx,
&s3.PutObjectInput{
Body:   file,
Bucket: &s3Record.Bucket.Name,
Key:    &key,
},
)
if err != nil {
return fmt.Errorf("error uploading warc: %s", err)
}
return nil
}

有没有办法避免将内容保存到临时文件中,并且在编写器和上传函数之间只使用有限大小的字节缓冲区?

换句话说,我可以在向同一缓冲区写入数据的同时开始向读取器流式传输数据吗?

是的,有一种方法可以将相同的内容写入多个编写器。使用io.MultiWriter可能允许您不使用临时文件。不过,使用临时文件可能还是不错的。

我经常使用io.MultiWriter来写入校验和(sha256…(计算器的列表。事实上,上次我读S3客户端代码时,我注意到它是在后台计算校验和的。MultiWriter对于在云位置之间管道传输大文件非常有用。

此外,如果您最终使用临时文件。您可能需要使用os.CreateTemp来创建临时文件。如果你不这样做,如果你的代码在两个进程中运行,或者你的文件有相同的名称,你可能会遇到创建文件名的问题。

请随时澄清您的问题。我可以试着再次回答:(

最新更新