我正在尝试使用reactivemongo-akkastream 0.12.1从mongodb流传输数据,然后将结果返回其中一个路由中的CSV流中(使用AKKA-HTTP)。我确实在这里实现了这一点:
http://doc.akka.io/docs/akka-http/10.0.0.0/scala/sscala/http/routing-dsl/source-source-source-streaming-support.html#simple-csimple-csv-csv-csv-treaming-example-example-example-example-example- example 起来正常工作。
我现在面临的唯一问题是如何将标题添加到输出CSV文件中。有什么想法吗?
谢谢
除了该示例并不是生成CSV(不提供适当的逃逸)的真正强大方法,您还需要对其进行仔细研究以添加标头。这是我要做的:
- 制作
Flow
将Source[Tweet]
转换为CSV行源,例如Source[List[String]]
- 将其连接到包含您的标头的源为单个
List[String]
- 调整卫生员以渲染行源而不是推文
这是一些示例代码:
case class Tweet(uid: String, txt: String)
def getTweets: Source[Tweet, NotUsed] = ???
val tweetToRow: Flow[Tweet, List[String], NotUsed] =
Flow[Tweet].map { t =>
List(
t.uid,
t.txt.replaceAll(",", "."))
}
// provide a marshaller from a row (List[String]) to a ByteString
implicit val tweetAsCsv = Marshaller.strict[List[String], ByteString] { row =>
Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () =>
ByteString(row.mkString(","))
)
}
// enable csv streaming
implicit val csvStreaming = EntityStreamingSupport.csv()
val route = path("tweets") {
val headers = Source.single(List("uid", "text"))
val tweets: Source[List[String], NotUsed] = getTweets.via(tweetToRow)
complete(headers.concat(tweets))
}
更新:如果您的getTweets
方法返回Future
,则可以通过其源值映射并以这种方式预处标题,例如:
val route = path("tweets") {
val headers = Source.single(List("uid", "text"))
val rows: Future[Source[List[String], NotUsed]] = getTweets
.map(tweets => headers.concat(tweets.via(tweetToRow)))
complete(rows)
}