是否可以在一个时间范围内使用KAFKA STREAMS的记录制作csv文件



我正在尝试使用SCALA使用Kafka Streams从Kafka主题发送数据记录。在每100条记录之后,它应该生成一个csv。不使用Kafka连接有可能吗??

更新:我可以用forEach处理器制作csv。我正在创建一个listBuffer,它存储100条记录,当它达到100条记录时,它会生成一个csv。如果有超过100条记录(比如142条记录(,如果在4秒的时间内没有更多的记录到达,它应该用剩下的42条记录创建csv。但由于foreach处理器在处理完所有记录后处于非活动状态,因此无法计算非活动时间。有没有办法检查下一条记录是否为空,这样我就可以用剩下的记录创建csv??或者有没有办法找到foreach处理器内部的非活动周期??writeToCsvFile是一个单独的函数,用于将记录写入csv。

我在这里张贴代码:


var t :Long = System.currentTimeMillis()
kStream.foreach((key, value) => {
recordFields = value.split(",")
listOfRecords += recordFields
if ((listOfRecords.size >= 100 || System.currentTimeMillis()-t >= 4000)) {
t = System.currentTimeMillis()
size = listOfRecords.size
fileNameVariable = new SimpleDateFormat("yyyy-MMM-dd_HH-mm-ss").format(Calendar.getInstance.getTime)
fileName = f"output/${fileNameVariable}_${System.currentTimeMillis()}${size}_records.csv"
writeToCsvFile(fileName, listOfRecords)
listOfRecords.clear()
}
})

关于最初的问题,是的,这是可能的,假设主题的行数恰好是您想要限制的文件中行数的倍数。例如,您想要写入100行的文件,那么如果您只消耗42条记录,那么就没有文件了。

然后,您为问题添加了额外的约束,例如计时器。如果没有要使用的记录,则不会检查时间条件,因此也不会写入文件。

检查";下一个记录为空";是使用常规Consumer并检查poll()返回的迭代器的大小


如果必须使用Kafka Streams,一种解决方法是定义一个后台线程。

public class FileWriterTask<T> implements Runnable {
private final List<T> buffer = new ArrayList<>(100);
private long lastFlush = System.currentTimeMillis();
// TODO: Constructor

public void add(T t) { buffer.add(t); }
public void flush(long flushTime) {
writeToCsv(buffer);
buffer.clear();
lastFlush = flushTime;
}
AtomicBoolean running = new AtomicBoolean(true);
@Override
public void run() {
while (running.get()) {
long now = System.getCurrentTimeMillis();
if (buffer.size() >= 100 || (now - lastFlush) >= 4000) {
flush(now);
} 
// NOTE: Need to make sure the consumer poll loop allows for one second plus any time it takes to create and write to a file
Thread.sleep(1000); 
}
}
}

然后将该线程传递给流操作。。。代码未经测试,我的Scala很弱,所以使用Java,但应该理解这一点。

final Thread t = new Thread(new FileWriterTask<String>());
t.start();
kStream.foreach((key, value) => { 
recordFields = value.split(",")
recordFields.foreach(t::add);
});
// TODO: add shutdown hook
// t.running.set(false);

请注意,您可能需要像这样(Java(以不同的方式编写它,以便正确地关闭线程。。。类可能也需要提取。

kStream.foreach((key, value) -> new ForeachAction { 

final Thread t = new Thread(new FileWriterTask<String>());
public FoeachAction() {
t.start();
}
@Override
public void apply(? k, String value) { // not sure what your key type is
recordFields = value.split(",")
recordFields.foreach(t::add);
}

});

相关内容

  • 没有找到相关文章

最新更新