除了挖掘日志输出的日志文件之外,有没有办法获取 Spark 跟踪 URL?



我有一个创建Spark Session的Scala应用程序,并且我已经设置了使用Spark REST API的健康检查。Spark应用程序本身运行在Hadoop Yarn上。REST API URL 当前是通过读取创建 Spark 会话时生成的 Spark 日志记录来检索的。这在大多数情况下都有效,但在我的应用程序中有一些边缘情况,它不能很好地工作。 有谁知道获取此跟踪URL的另一种方法?

"您可以通过从 YARN 的配置和应用程序 ID 中读取yarn.resourcemanager.webapp.address值(在侦听器总线上发送的事件中公开)和现有的 SparkContext 方法

中公开来执行此操作。复制了上面的段落,原样来自开发人员的回复,网址为:https://issues.apache.org/jira/browse/SPARK-20458

更新:

我确实尝试了解决方案并非常接近。以下是一些用于构建该URL的Scala/Spark代码:

@transient val ssc: StreamingContext = StreamingContext.getActiveOrCreate(rabbitSettings.checkpointPath, CreateStreamingContext)
// Update yarn logs URL in Elasticsearch
YarnLogsTracker.update(
ssc.sparkContext.uiWebUrl,
ssc.sparkContext.applicationId,
"test2")

YarnLogsTracker对象是这样的:

object YarnLogsTracker {
private def recoverURL(u: Option[String]): String = u match {
case Some(a) => a.split(":").take(2).mkString(":")
case None => ""
}
def update(rawUrl: Option[String], rawAppId: String, tenant: String): Unit = {
val logUrl = s"${recoverURL(rawUrl)}:8042/node/containerlogs/container${rawAppId.substring(11)}_01_000002/$tenant/stdout/?start=-4096"
...

这会产生这样的东西:http://10.99.25.146:8042/node/containerlogs/container_1516203096033_91164_01_000002/test2/stdout/?start=-4096

我发现了一种"合理"的方法来获得这个。 显然,最好的方法是让 Spark 库直接向启动器应用程序公开它们已经获取的ApplicationReport,因为它们会遇到设置委托令牌等的麻烦。 然而,这似乎不太可能发生。

这种方法是双管齐下的。 首先,它尝试自己构建一个YarnClient,以便获取具有权威跟踪URL的ApplicationReport。 但是,根据我的经验,这可能会失败(例如:如果作业在CLUSTER模式下运行,并且在 Kerberized 环境中--proxy-user,则将无法正确向 YARN 进行身份验证)。

就我而言,我从驱动程序本身调用此帮助程序方法,并将结果报告回我的启动器应用程序。 但是,原则上,任何具有HadoopConfiguration可用的地方都应该工作(可能包括启动器应用程序)。 显然,您可以使用此实现的任一"叉"(或两者),具体取决于您的需求和对复杂性、额外处理等的容忍度。

/**
* Given a Hadoop {@link org.apache.hadoop.conf.Configuration} and appId, use the YARN API (via an
* {@link YarnClient} instance) to get the application report, which includes the trackingUrl.  If this fails,
* then as a fallback, it attempts to "guess" the URL by looking at various YARN configuration properties,
* and assumes that the URL will be something like: <pre>[yarnWebUI:port]/proxy/[appId]</pre>.
*
* @param hadoopConf the Hadoop {@link org.apache.hadoop.conf.Configuration}
* @param appId the YARN application ID
* @return the app trackingUrl, either retrieved using the {@link YarnClient}, or manually constructed using
*         the fallback approach
*/
public static String getYarnApplicationTrackingUrl(org.apache.hadoop.conf.Configuration hadoopConf, String appId) {
LOG.debug("Attempting to look up YARN url for applicationId {}", appId);
YarnClient yarnClient = null;
try {
// do not attempt to fail over on authentication error (ex: running with proxy-user and Kerberos)
hadoopConf.set("yarn.client.failover-max-attempts", "0");
yarnClient = YarnClient.createYarnClient();
yarnClient.init(hadoopConf);
yarnClient.start();
final ApplicationReport report = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId));
return report.getTrackingUrl();
} catch (YarnException | IOException e) {
LOG.warn(
"{} attempting to get report for YARN appId {}; attempting to use manually constructed fallback",
e.getClass().getSimpleName(),
appId,
e
);
String baseYarnWebappUrl;
String protocol;
if ("HTTPS_ONLY".equals(hadoopConf.get("yarn.http.policy"))) {
// YARN is configured to use HTTPS only, hence return the https address
baseYarnWebappUrl = hadoopConf.get("yarn.resourcemanager.webapp.https.address");
protocol = "https";
} else {
baseYarnWebappUrl = hadoopConf.get("yarn.resourcemanager.webapp.address");
protocol = "http";
}
return String.format("%s://%s/proxy/%s", protocol, baseYarnWebappUrl, appId);
} finally {
if (yarnClient != null) {
yarnClient.stop();
}
}
}

相关内容

最新更新