Hadoop Azure:在应用程序服务计划(Azure功能)中使用MSI验证存储帐户时出现问题



这与这里讨论的内容类似:https://www.mail-archive.com/user@hadoop.apache.org/msg24204.html

hadoop azure客户端所期望的MSI端点(我认为)只适用于VM,而不适用于函数。因此,我从环境变量IDENTITY_ENDPOINT中检索到MSI端点

sparkContext.hadoopConfiguration().set("fs.azure.account.auth.type", "OAuth");
sparkContext.hadoopConfiguration().set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider");
if (System.getenv("IDENTITY_ENDPOINT") != null) {
sparkContext.hadoopConfiguration().set("fs.azure.account.oauth2.msi.endpoint", System.getenv("MSI_ENDPOINT"));
}
sparkContext.hadoopConfiguration().set("fs.azure.account.oauth2.msi.tenant", "xx");
sparkContext.hadoopConfiguration().set("fs.azure.account.oauth2.client.id", "yy");
spark.parquet.read("")

在上面运行会给我以下错误:

状态代码:-1错误代码:null错误消息:身份验证失败:HTTP错误400;url='http://169.254.138.2:8081/msi/token'AAD令牌:HTTP连接到http://169.254.138.2:8081/msi/token获取失败AzureAD的代币。;contentType='application/json;charset=utf-8';响应"错误":HTTP与请求URI匹配的资源'http://169.254.138.2:8081/msi/token'不支持API版本"2018-02-01"&"innerError":null}}'org.apache.hoop.fs.azurebfs.oauth2.AzureADAuthenticator$HttpException:HTTP错误400;url='http://169.254.138.2:8081/msi/token'AAD令牌:到的HTTP连接http://169.254.138.2:8081/msi/token的失败从AzureAD获取代币。;contentType='application/json;charset=utf-8';回答"{"错误":{"代码":"不支持的PiVersion","消息":"HTTP资源与请求URI匹配的http://169.254.138.2:8081/msi/token'不支持API版本"2018-02-01"&"innerError":null}}'在org.apache.hoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:274)网址:org.apache.hoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:217)网址:org.apache.hoop.fs.azurebfs.services.AbfsRestOperation.lamba$execute$0(AbfsRestOperation.java:191)网址:org.apache.hoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:464)网址:org.apache.hoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:189)在org.apache.hoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:911)网址:org.apache.hoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:892)网址:org.apache.hoop.fs.azurebfs.AzureBlobFileSystemStore.getIsNamespaceEnabled(AzureBlob文件系统商店.java:358)网址:org.apache.hoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlob文件系统商店.java:932)网址:org.apache.hoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlob文件系统.java:609)在org.apache.hoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlob文件系统.java:599)在org.apache.hoop.fs.FileSystem.exists(FileSystem.java:1760)org.apache.hoop.fs.azurebfs.AzureBlobFileSystem.exists(AzureBlob文件系统.java:1177)在org.apache.spark.sql.exexecution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNenecessary$4(DataSource.scala:784)在org.apache.spark.sql.expension.datasources.DataSource$.$anonfun$checkAndGlobPathIfNenecessary$4$改编(DataSource.scala:782)网址:org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)在scala.concurrent.FFuture$.$anonfun$apply$1(Future.scala:678)位于scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
atjava.base/java.util.concurrent.FukJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)位于java.base/java.util.concurrent.FukJoinTask.doExec(ForkJoinTask.java:290)位于java.base/java.util.concurrent.FukJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)在java.base/java.util.concurrent.FukJoinPool.scan(ForkJoinPool.java:1656)在java.base/java.util.concurrent.FukJoinPool.runWorker(ForkJoinPool.java:1594)位于java.base/java.util.concurrent.FForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)原因:org.apache.hoop.fs.azurebfs.oauth2.AzureADAuthenticator$HttpException:HTTP错误400;url='http://169.254.138.2:8081/msi/token'AAD令牌:到的HTTP连接http://169.254.138.2:8081/msi/token的失败从AzureAD获取代币。;contentType='application/json;charset=utf-8';回答"{"错误":{"代码":"不支持的PiVersion","消息":"HTTP资源与请求URI匹配的http://169.254.138.2:8081/msi/token'不支持API版本"2018-02-01"&"innerError":null}}'位于org.apache.hoop.fs.azurebfs.oauth2.AzureADAuthenticator.getTokenSingleCall(AzureADAauthenticator.java:430)网址:org.apache.hoop.fs.azurebfs.oauth2.AzureADAuthenticator.getTokenCall(AzureADAauthenticator.java:306)在org.apache.hoop.fs.azurebfs.oauth2.AzureADAuthenticator.getTokenFromMsi(AzureADAauthenticator.java:154)在org.apache.hoop.fs.azurebfs.oauth2.MsiTokenProvider.refreshToken(MsiTokedProvider.java:57)网址:org.apache.hoop.fs.azurebfs.oauth2.AccessTokenProvider.getToken(AccessTokenProvider.java:50)网址:org.apache.hoop.fs.azurebfs.services.AbfsClient.getAccessToken(AbfsClient.java:1055)在org.apache.hoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:256)…还有23个

以下适用于azure-functionsazure-container-apps。其他托管服务的行为可能不同。

hadoopazure为自定义令牌提供者提供了一种机制。

package org.acme;
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenRequestContext;
import com.azure.identity.DefaultAzureCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.OffsetDateTime;
import java.util.Date;
class CustomToken implements  org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee {
private Logger log = LoggerFactory.getLogger(getClass());
private String accountName;
private volatile AccessToken token;
@Override
public void initialize(Configuration configuration, String accountName) {
log.info("Custom Token to be initialized. Config: " + configuration + ". AccountName: " + accountName);
this.accountName = accountName;
}
@Override
public String getAccessToken() {
if (token != null && OffsetDateTime.now().isBefore(token.getExpiresAt().minusHours(2))) {
return token.getToken();
} else {
log.info("token has expired or not been set. " +  token);
fetchAndSetToken();
return token.getToken();
}
}
private void fetchAndSetToken() {
DefaultAzureCredential creds = new DefaultAzureCredentialBuilder()
.build();
TokenRequestContext request = new TokenRequestContext();
request.addScopes("https://" + accountName);
this.token = creds.getToken(request).block();
log.info("Token has been set. Expires at: " + token.getExpiresAt() + " . " + token.isExpired());
}
@Override
public Date getExpiryTime() {
return new Date(token.getExpiresAt().toInstant().toEpochMilli());
}
}

要进一步调试:ssh到容器应用程序或azure函数vm上。

获取MSI机密:echo $MSI_SECRET。然后通过对进行curl调用来获得令牌

curl -v -H "X-IDENTITY-HEADER: msi_secret_from_above" "http://127.0.0.1:42356/msi/token/?resource=https://storage-account-name.dfs.core.windows.net/&api-version=2019-08-01"

前面编写的java代码复制了上述行为。我发现这个链接对理解MSI很有用。https://dev.to/stratiteq/managed-identity-how-it-works-behind-the-scenes-co4