我正在尝试使用SCALA使用Kafka Streams从Kafka主题发送数据记录。在每100条记录之后,它应该生成一个csv。不使用Kafka连接有可能吗??
更新:我可以用forEach
处理器制作csv。我正在创建一个listBuffer,它存储100条记录,当它达到100条记录时,它会生成一个csv。如果有超过100条记录(比如142条记录(,如果在4秒的时间内没有更多的记录到达,它应该用剩下的42条记录创建csv。但由于foreach
处理器在处理完所有记录后处于非活动状态,因此无法计算非活动时间。有没有办法检查下一条记录是否为空,这样我就可以用剩下的记录创建csv??或者有没有办法找到foreach
处理器内部的非活动周期??writeToCsvFile
是一个单独的函数,用于将记录写入csv。
我在这里张贴代码:
var t :Long = System.currentTimeMillis()
kStream.foreach((key, value) => {
recordFields = value.split(",")
listOfRecords += recordFields
if ((listOfRecords.size >= 100 || System.currentTimeMillis()-t >= 4000)) {
t = System.currentTimeMillis()
size = listOfRecords.size
fileNameVariable = new SimpleDateFormat("yyyy-MMM-dd_HH-mm-ss").format(Calendar.getInstance.getTime)
fileName = f"output/${fileNameVariable}_${System.currentTimeMillis()}${size}_records.csv"
writeToCsvFile(fileName, listOfRecords)
listOfRecords.clear()
}
})
关于最初的问题,是的,这是可能的,假设主题的行数恰好是您想要限制的文件中行数的倍数。例如,您想要写入100行的文件,那么如果您只消耗42条记录,那么就没有文件了。
然后,您为问题添加了额外的约束,例如计时器。如果没有要使用的记录,则不会检查时间条件,因此也不会写入文件。
检查";下一个记录为空";是使用常规Consumer
并检查poll()
返回的迭代器的大小
如果必须使用Kafka Streams,一种解决方法是定义一个后台线程。
public class FileWriterTask<T> implements Runnable {
private final List<T> buffer = new ArrayList<>(100);
private long lastFlush = System.currentTimeMillis();
// TODO: Constructor
public void add(T t) { buffer.add(t); }
public void flush(long flushTime) {
writeToCsv(buffer);
buffer.clear();
lastFlush = flushTime;
}
AtomicBoolean running = new AtomicBoolean(true);
@Override
public void run() {
while (running.get()) {
long now = System.getCurrentTimeMillis();
if (buffer.size() >= 100 || (now - lastFlush) >= 4000) {
flush(now);
}
// NOTE: Need to make sure the consumer poll loop allows for one second plus any time it takes to create and write to a file
Thread.sleep(1000);
}
}
}
然后将该线程传递给流操作。。。代码未经测试,我的Scala很弱,所以使用Java,但应该理解这一点。
final Thread t = new Thread(new FileWriterTask<String>());
t.start();
kStream.foreach((key, value) => {
recordFields = value.split(",")
recordFields.foreach(t::add);
});
// TODO: add shutdown hook
// t.running.set(false);
请注意,您可能需要像这样(Java(以不同的方式编写它,以便正确地关闭线程。。。类可能也需要提取。
kStream.foreach((key, value) -> new ForeachAction {
final Thread t = new Thread(new FileWriterTask<String>());
public FoeachAction() {
t.start();
}
@Override
public void apply(? k, String value) { // not sure what your key type is
recordFields = value.split(",")
recordFields.foreach(t::add);
}
});