将 HttpEntity.Chunked 转换为 Array[string]



我有以下问题。我正在向服务器查询一些数据并将其作为HttpEntity.Chunked返回。响应字符串如下所示,最多包含 10.000.000 行,如下所示:

[{"name":"param1","value":122343,"time":45435345},
{"name":"param2","value":243,"time":4325435},
......]

现在我想将传入的数据放入 和 Array[String],其中每个字符串都是响应中的一行,因为稍后它应该导入到 apache spark 数据帧中。目前我这样做是这样的:

//For the http request
trait StartHttpRequest {
  implicit val system: ActorSystem
  implicit val materializer: ActorMaterializer
  def httpRequest(data: String, path: String, targetPort: Int, host: String): Future[HttpResponse] = {
    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
      Http().outgoingConnection(host, port = targetPort)
    }
    val responseFuture: Future[HttpResponse] =
      Source.single(RequestBuilding.Post(uri = path, entity = HttpEntity(ContentTypes.`application/json`, data)))
        .via(connectionFlow)
        .runWith(Sink.head)
    responseFuture
  }
}
//result of the request
val responseFuture: Future[HttpResponse] = httpRequest(.....)
//convert to string
responseFuture.flatMap { response =>
        response.status match {
          case StatusCodes.OK =>
            Unmarshal(response.entity).to[String]
    }
}
//and then something like this, but with even more stupid stuff
responseFuture.onSuccess { str:String =>
    masterActor! str.split("""},{""")
}

我的问题是,将结果放入数组的更好方法是什么?如何直接取消组响应实体?因为例如.to[Array[String]]不起作用。而且因为有这么多的线路来,我能不能用一条溪流来做,更有效率?

不按顺序回答您的问题:

如何直接取消组响应实体?

有一个与解组案例类数组相关的现有问题和答案。

将结果放入数组的更好方法是什么?

我会利用分块的性质并使用流。 这允许您同时执行字符串处理和 json 解析。

首先,您需要一个容器类和解析器:

case class Data(name : String, value : Int, time : Long)
object MyJsonProtocol extends DefaultJsonProtocol {
  implicit val dataFormat = jsonFormat3(Data)
}

然后,您必须执行一些操作以使json对象看起来正确:

//Drops the '[' and the ']' characters
val dropArrayMarkers = 
  Flow[ByteString].map(_.filterNot(b => b == '['.toByte || b == ']'.toByte))
val preppendBrace = 
  Flow[String].map(s => if(!s.startsWith("{")) "{" + s else s)
val appendBrace = 
  Flow[String].map(s => if(!s.endsWith("}")) s + "}" else s)
val parseJson = 
  Flow[String].map(_.parseJson.convertTo[Data])

最后,组合这些流以将字节字符串源转换为数据源对象:

def strSourceToDataSource(source : Source[ByteString,_]) : Source[Data, _] = 
  source.via(dropArrayMarkers)
        .via(Framing.delimiter(ByteString("},{"), 256, true))
        .map(_.utf8String)
        .via(prependBrace)
        .via(appendBrace)
        .via(parseJson)

然后,可以将此源排空到数据对象的Seq中:

val dataSeq : Future[Seq[Data]] = 
  responseFuture flatMap { response =>
    response.status match {
      case StatusCodes.OK =>
        strSourceToDataSource(response.entity.dataBytes).runWith(Sink.seq)
    }
  }

最新更新