Akka 流重试重复结果



我正在为 HTTP 资源实现一个迭代器,我可以恢复分页的元素列表,我试图用普通Iterator来做到这一点,但它是一个阻塞实现,因为我正在使用akka它让我的调度程序有点疯狂。

我的意愿是使用 akka-stream 实现相同的迭代器。问题是我需要不同的重试策略。

该服务返回由id标识的元素列表,有时当我查询下一页时,该服务会在当前页面上返回相同的产品。

我目前的算法是这样的。

var seenIds = Set.empty
var position = 0
def isProblematicPage(elements: Seq[Element]) Boolean = {
  val currentIds = elements.map(_.id)
  val intersection = seenIds & currentIds
  val hasOnlyNewIds = intersection.isEmpty
  if (hasOnlyNewIds) {
    seenIds = seenIds | currentIds
  }
  !hasOnlyNewIds
}
def incrementPage(): Unit = {
  position += 10
}
def doBackOff(attempt: Int): Unit = {
  // Backoff logic
}
@tailrec
def fetchPage(attempt: Int = 0): Iterator[Element] = {
  if (attempt > MaxRetries) {
    incrementPage()
    return Iterator.empty
  } 
  val eventualPage = service.retrievePage(position, position + 10)
  val page = Await.result(eventualPage, 5 minutes)
  if (isProblematicPage(page)) {
    doBackOff(attempt)
    fetchPage(attempt + 1)
  } else {
    incrementPage()
    page.iterator
  }
}

我正在使用akka-streams进行实现,但我无法弄清楚如何使用流结构累积页面并测试重复。

有什么建议吗?

Flow.scan 方法在这种情况下很有用。

我会从仓位来源开始您的直播:

type Position = Int
//0,10,20,...
def positionIterator() : Iterator[Position] = Iterator from (0,10) 
val positionSource : Source[Position,_] = Source fromIterator positionIterator

然后可以将此位置源定向到一个Flow.scan,该该利用类似于您的fetchPage的功能(旁注:您应该尽可能避免等待,有一种方法可以在您的代码中没有等待,但这超出了您原始问题的范围(。 新函数需要接受已经看到的元素的"状态":

def fetchPageWithState(service : Service)
                      (seenEls : Set[Element], position : Position) : Set[Elements] = {
  val maxRetries = 10
  val seenIds = seenEls map (_.id)
  @tailrec
  def readPosition(attempt : Int) : Seq[Elements] = {
    if(attempt > maxRetries)
      Iterator.empty
    else {
      val eventualPage : Seq[Element] = 
        Await.result(service.retrievePage(position, position + 10), 5 minutes)
      if(eventualPage.map(_.id).exists(seenIds.contains)) {
        doBackOff(attempt)
        readPosition(attempt + 1)
      }
      else 
        eventualPage            
    }
  }//end def readPosition
  seenEls ++ readPosition(0).toSet
}//end def fetchPageWithState

现在可以在Flow中使用:

def fetchFlow(service : Service) : Flow[Position, Set[Element],_] = 
  Flow[Position].scan(Set.empty[Element])(fetchPageWithState(service))

新的流可以轻松连接到您的位置源,以创建Set[Element]源:

def elementsSource(service : Service) : Source[Set[Element], _] = 
  positionSource via fetchFlow(service) 

elementsSource中的每个新值都将是一组不断增长的来自抓取页面的独特元素。

Flow.scan阶段是一个很好的建议,但它缺乏处理期货的功能,所以我实现了它的异步版本,Flow.scanAsync它现在可以在 akka 2.4.12 上使用。

当前的实现是:

val service: WebService
val maxTries: Int
val backOff: FiniteDuration
def retry[T](zero: T, attempt: Int = 0)(f: => Future[T]): Future[T] = {
  f.recoverWith {
    case ex if attempt >= maxAttempts =>
      Future(zero)
    case ex =>
      akka.pattern.after(backOff, system.scheduler)(retry(zero, attempt + 1)(f))
  }
}
def isProblematicPage(lastPage: Seq[Element], currPage: Seq[Element]): Boolean = {
  val lastPageIds = lastPage.map(_.id).toSet
  val currPageIds = currPage.map(_.id).toSet
  val intersection = lastPageIds & currPageIds
  intersection.nonEmpty
}
def retrievePage(lastPage: Seq[Element], startIndex: Int): Future[Seq[Element]] = {
  retry(Seq.empty) {
    service.fetchPage(startIndex).map { currPage: Seq[Element] =>
      if (isProblematicPage(lastPage, currPage)) throw new ProblematicPageException(startIndex)
      else currPage
    }
  }
}

val pagesRange: Range = Range(0, maxItems, pageSize)
val scanAsyncFlow = Flow[Int].via(ScanAsync(Seq.empty)(retrievePage))
Source(pagesRange)
  .via(scanAsyncFlow)
  .mapConcat(identity)
  .runWith(Sink.seq)

感谢拉蒙的建议:)

最新更新