Spark 节点在随机播放期间如何通信?



我从这个问题中看到Spark节点有效地"直接通信",但我不太关心理论,而更关心实现。在这里,它显示,在页面底部附近的"###Encryption"部分中,您可以将Spark配置为使用许多SSL协议以确保安全性,至少对我来说,这表明它使用某种形式的HTTP(s(进行通信。我的问题实际上是两部分:Spark节点使用什么协议进行通信,以及如何格式化此传输的数据?

Spark使用RPC(Netty(在执行程序进程之间进行通信。您可以查看NettyRpcEndpointRef类以查看实际实现。

对于洗牌数据,我们从负责提供数据块的BlockManager开始。每个执行程序进程有一个。在内部,一个BlockStoreShuffleReader,它使用SerializerManager管理来自不同执行程序的读取。此管理器持有一个实际的序列化程序,该序列化程序由spark.serializer属性定义:

val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")

BlockManager尝试读取块时,它会使用该基础配置中的序列化程序。它可以是KryoSerializerJavaSerializer,具体取决于您的设置。

底线,对于读取和写入随机数据,Spark使用用户定义的序列化程序。


对于任务序列化,这有点不同。

Spark使用一个名为closureSerializer的变量,默认为JavaSerializerInstance,意思是Java序列化。您可以在DAGScheduler.submitMissingTasks方法中看到这一点:

val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}

序列化并发送到每个执行器的实际对象称为TaskDescription

def encode(taskDescription: TaskDescription): ByteBuffer = {
val bytesOut = new ByteBufferOutputStream(4096)
val dataOut = new DataOutputStream(bytesOut)
dataOut.writeLong(taskDescription.taskId)
dataOut.writeInt(taskDescription.attemptNumber)
dataOut.writeUTF(taskDescription.executorId)
dataOut.writeUTF(taskDescription.name)
dataOut.writeInt(taskDescription.index)
// Write files.
serializeStringLongMap(taskDescription.addedFiles, dataOut)
// Write jars.
serializeStringLongMap(taskDescription.addedJars, dataOut)
// Write properties.
dataOut.writeInt(taskDescription.properties.size())
taskDescription.properties.asScala.foreach { case (key, value) =>
dataOut.writeUTF(key)
// SPARK-19796 -- writeUTF doesn't work for long strings, which can happen for property values
val bytes = value.getBytes(StandardCharsets.UTF_8)
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
// Write the task. The task is already serialized, so write it directly to the byte buffer.
Utils.writeByteBuffer(taskDescription.serializedTask, bytesOut)
dataOut.close()
bytesOut.close()
bytesOut.toByteBuffer
}

并从CoarseGrainedSchedulerBackend.launchTasks方法通过 RPC 发送:

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

到目前为止,我所展示的内容是关于启动任务的。对于随机数据,Spark 持有一个BlockStoreShuffleReader,用于管理来自不同执行程序的读取。

最新更新