Apache Camel文件组件 - 如何仅扫描新文件



我在Scala中设置了一个简单的Apache Camel Route,如下所示:

import org.apache.camel.builder.RouteBuilder
import akka.actor.ActorRef
import akka.camel._
    class SimpleRouteBuilder(val responder: ActorRef) extends RouteBuilder  {
      def configure  {
        from("file:C:\datafiles\input?noop=true&readLock=changed").to(responder)
      }
    }

我的问题是我如何只扫描新文件现在,它为每个目录中的每个文件发送一条消息。我希望它只为我的 Camel 应用程序启动创建的文件发送消息。

干杯!

更新

好的,我找到了解决方案...实现一个时间戳参与者,其唯一目的是检查(文件创建时间>应用程序开始时间)并将文件消息转发到轮询路由器。

路线现在是:

 from(s"file:$directoryName?noop=true&readLock=changed&delay=$delay&recursive=true&include=$include").
  convertBodyTo(classOf[java.io.File]).to(timestampActor)

时间戳执行组件的代码大约是:

object TimestampActor {
  def apply(timeMillis0: Long): Props = Props(new TimestampActor(timeMillis0))
}
// timeMillis0 is the time when the application has started
class TimestampActor(timeMillis0: Long ) extends Actor {
  val messageRouterActor: ActorRef = context.system.actorFor(s"akka://${Constants.ActorSystemName}/user/${Constants.MessageRouterActorName}")
  val logger: Logger = LoggerFactory.getLogger(classOf[TimestampActor])

  def receive = {
      case camelMessage: CamelMessage => {
       // Need to unbox
        self ! camelMessage.body.asInstanceOf[java.io.File]
      }
      case file: File => {
        try {
          val attrs: BasicFileAttributes  = Files.readAttributes(file.toPath, classOf[BasicFileAttributes])
          if (attrs.creationTime().toMillis  >= timeMillis0) {
            // Forward the message to the Round Robin message router
            messageRouterActor ! file
          }
        }
        catch {
          case ioe:IOException => {
            logger.error(s"Failed to get creation time for file ${file.getCanonicalPath}", ioe)
            // Retry in 20 minutes
            import context.dispatcher
            context.system.scheduler.scheduleOnce(Duration.create(20, TimeUnit.MINUTES), self, file)
          }
        }
      }
  }
}

您需要使用 Apache Camel File 组件的 idempotent 选项。它将记住已经处理过的文件,并且只会使用新文件。默认情况下,它将记住 1000 个条目,键将是文件的绝对路径,但您可以将其更改为所需的路径。它将是这样的:

from("file:C:\datafiles\input?noop=true&readLock=changed&idempotent=true").to(responder)

您还可以使用不同类型的幂等存储库来做更多花哨的事情。

如果您担心丢失原始文件,我建议您使用多播。这样,您可以将文件的副本发送到备份文件夹。

from("file:C:\datafiles\input").multicast().to("backuplocation", "responder");

默认情况下,Camel将处理放置在输入文件夹中的任何文件

最新更新