使用批处理数据设计独立于数据源的应用程序



我们有一个遗留应用程序,它为每个用户从 mongo 读取数据(查询结果根据用户请求从小到大),我们的应用程序为每个用户创建一个文件并放入 FTP 服务器/s3。我们以 mongo 游标的形式读取数据,并在获得批处理数据后立即将每个批次写入文件,因此文件写入性能不错。此应用程序运行良好,但绑定到 mongo 和 mongo 光标。

现在我们必须重新设计这个应用程序,因为我们必须支持不同的数据源,即MongoDB,Postgres DB,Kinesis,S3等。到目前为止,我们的想法如下:

  1. 为每个源生成数据 API,并公开分页的 REST 响应。这是一个可行的解决方案,但对于大型来说可能很慢 查询数据与当前游标响应进行比较。
  2. 通过在 kafka 中馈送批处理数据并在文件生成器中读取批处理数据流来构建数据抽象层,但大多数情况下用户要求排序数据,因此我们需要按顺序读取消息。我们将失去在写入文件之前组合这些数据消息的巨大吞吐量和大量额外工作的好处。

我们正在寻找一种解决方案来替换当前的 mongo 光标并使我们的文件生成器独立于数据源。

因此,听起来您本质上想要创建一个API,您可以在其中尽可能保持流式传输的效率,就像您在读取用户数据时写入文件一样。

在这种情况下,您可能希望为ReadSource定义一个推送解析器 API,该 API 将数据流式传输到您的WriteTargets,后者将数据写入您实现的任何内容。排序将在事物的ReadSource端处理,因为对于某些来源,您可以有序地阅读(例如从数据库中);对于那些无法执行此操作的源,您只需执行中间步骤对数据进行排序(例如写入临时表),然后将其流式传输到WriteTarget

基本实现可能模糊地如下所示:

public class UserDataRecord {
private String data1;
private String data2;
public String getRecordAsString() {
return data1 + "," + data2;
}
}

public interface WriteTarget<Record> {
/** Write a record to the target */
public void writeRecord(Record record);
/** Finish writing to the target and save everything */
public void commit();
/** Undo whatever was written */
public void rollback();
}

public abstract class ReadSource<Record> {
protected final WriteTarget<Record> writeTarget;
public ReadSource(WriteTarget<Record> writeTarget) { this.writeTarget = writeTarget; }
public abstract void read();
}

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class RelationalDatabaseReadSource extends ReadSource<UserDataRecord> {
private Connection dbConnection;
public RelationalDatabaseReadSource (WriteTarget<UserDataRecord> writeTarget, Connection dbConnection) {
super(writeTarget);
this.dbConnection = dbConnection;
}
@Override public void read() {
// read user data from DB and encapsulate it in a record
try (Statement statement = dbConnection.createStatement();
ResultSet resultSet = statement.executeQuery("Select * From TABLE Order By COLUMNS");) {
while (resultSet.next()) {
UserDataRecord record = new UserDataRecord();
// stream the records to the write target
writeTarget.writeRecord(record);
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
public class FileWriteTarget implements WriteTarget<UserDataRecord> {
private File fileToWrite;
private PrintWriter writer;
public FileWriteTarget(File fileToWrite) throws IOException {
this.fileToWrite = fileToWrite;
this.writer = new PrintWriter(new FileWriter(fileToWrite));
}
@Override public void writeRecord(UserDataRecord record) {
writer.println(record.getRecordAsString().getBytes(StandardCharsets.UTF_8));
}
@Override public void commit() {
// write trailing records
writer.close();
}
@Override
public void rollback() {
try { writer.close(); } catch (Exception e) { }
fileToWrite.delete();
}
}

这只是一般的想法,需要认真改进。 任何人都可以随时更新此API。

相关内容

最新更新