我如何提供一个Scala+Akka+Camel SFTP(或FTP)消费者与文件列表来获取



我有一个应用程序(Scala 2.10.3, Akka 2.3.1, Camel 2.13.0),它订阅了一个JMS主题,当特定的文件可以下载时,它会通过JMS消息得到通知。每个JMS消息都包含可通过SFTP收集的文件的名称+路径。

然后我希望能够通过SFTP获取文件,但只获取我们已经收到JMS消息的文件(以避免我们可能获取正在写入的文件的问题)。

我想要一个适合Akka Camel和Consumer模型的解决方案。我已经阅读了用于SFTP端点的file2和ftp2的Camel选项,但我需要以下帮助:

  • 我如何定义一个类/对象,可以在endpointUri字符串通过&filter=…参数?我希望能够更新过滤器对象,以便每次Consumer轮询文件列表时,都应用更新的过滤器列表。

  • 我如何定义一个自定义的IdempotentRepository,以允许缓存大小大于默认的1000?

我的SFTP消费者参与者目前看起来像这样(一些值被编辑…):

class SftpConsumer extends Actor with ActorLogging with Consumer {
  val host = ...
  val username = ...
  val keyFile = ...
  def endpointUri: String = s"sftp://${host}?username=${username}&privateKeyFile=${keyFile}&idempotent=true"

过滤器idempotentRepository参数需要引用注册表中的对象(按名称)。

对于过滤器,您需要创建一个类的对象,该对象扩展了org.apache.camel.component.file.GenericFileFilter.

对于过滤器和/或idempotentRepository,您需要创建一个注册中心,将注册中心分配给Camel上下文,并将这些对象注册到注册中心,例如

// define a class which extends GenericFileFilter[T], and which
// filters for files which are explicitly allowed via include()
class MyFileFilter extends GenericFileFilter[File] {
  var include = Set[String]()
  def accept(file: GenericFile[File]) = include.contains(file.getFileName)
  def include(filename: String) = include = include + filename
  def exclude(filename: String) = include = include - filename
}
// Create a filter and a registry, add a mapping for the file filter to
// the registry, and assign the registry to the camel context
val myFileFilter = new MyFileFilter()
val simpleRegistry = new SimpleRegistry()
simpleRegistry.put("myFilter", myFileFilter )
camel.context.setRegistry(simpleRegistry);
// create a memory-based idempotent repository with a custom cache size
val cacheSize = 2500
val myRepository = MemoryIdempotentRepository.memoryIdempotentRepository(cacheSize)
simpleRegistry.put("myRepository", myRepository)
// adjust the endpointUri to include the &filter= and &idempotentRepository= parameters
def endpointUri: String = s"sftp://${host}?username=${username}...&idempotent=true&idempotentRepository=#myRepository&filter=#myFilter"

最新更新