我在Scala中设置了一个简单的Apache Camel Route,如下所示:
import org.apache.camel.builder.RouteBuilder
import akka.actor.ActorRef
import akka.camel._
class SimpleRouteBuilder(val responder: ActorRef) extends RouteBuilder {
def configure {
from("file:C:\datafiles\input?noop=true&readLock=changed").to(responder)
}
}
我的问题是我如何只扫描新文件?现在,它为每个目录中的每个文件发送一条消息。我希望它只为我的 Camel 应用程序启动后创建的文件发送消息。
干杯!
更新 :
好的,我找到了解决方案...实现一个时间戳参与者,其唯一目的是检查(文件创建时间>应用程序开始时间)并将文件消息转发到轮询路由器。
路线现在是:
from(s"file:$directoryName?noop=true&readLock=changed&delay=$delay&recursive=true&include=$include").
convertBodyTo(classOf[java.io.File]).to(timestampActor)
时间戳执行组件的代码大约是:
object TimestampActor {
def apply(timeMillis0: Long): Props = Props(new TimestampActor(timeMillis0))
}
// timeMillis0 is the time when the application has started
class TimestampActor(timeMillis0: Long ) extends Actor {
val messageRouterActor: ActorRef = context.system.actorFor(s"akka://${Constants.ActorSystemName}/user/${Constants.MessageRouterActorName}")
val logger: Logger = LoggerFactory.getLogger(classOf[TimestampActor])
def receive = {
case camelMessage: CamelMessage => {
// Need to unbox
self ! camelMessage.body.asInstanceOf[java.io.File]
}
case file: File => {
try {
val attrs: BasicFileAttributes = Files.readAttributes(file.toPath, classOf[BasicFileAttributes])
if (attrs.creationTime().toMillis >= timeMillis0) {
// Forward the message to the Round Robin message router
messageRouterActor ! file
}
}
catch {
case ioe:IOException => {
logger.error(s"Failed to get creation time for file ${file.getCanonicalPath}", ioe)
// Retry in 20 minutes
import context.dispatcher
context.system.scheduler.scheduleOnce(Duration.create(20, TimeUnit.MINUTES), self, file)
}
}
}
}
}
您需要使用 Apache Camel File 组件的 idempotent
选项。它将记住已经处理过的文件,并且只会使用新文件。默认情况下,它将记住 1000 个条目,键将是文件的绝对路径,但您可以将其更改为所需的路径。它将是这样的:
from("file:C:\datafiles\input?noop=true&readLock=changed&idempotent=true").to(responder)
您还可以使用不同类型的幂等存储库来做更多花哨的事情。
如果您担心丢失原始文件,我建议您使用多播。这样,您可以将文件的副本发送到备份文件夹。
from("file:C:\datafiles\input").multicast().to("backuplocation", "responder");
默认情况下,Camel将处理放置在输入文件夹中的任何文件