如何在使用DatabricksConnect时正确访问Scala中的dbutils



我正在使用DatabricksConnect从IntelliJ IDEA(Scala(本地运行Azure Databricks集群中的代码。

一切都很好。我可以在IDE中进行本地连接、调试和检查。

我创建了一个Databricks作业来运行我的自定义应用程序JAR,但它失败了,出现了以下异常:

19/08/17 19:20:26 ERROR Uncaught throwable from user code: java.lang.NoClassDefFoundError: com/databricks/service/DBUtils$
at Main$.<init>(Main.scala:30)
at Main$.<clinit>(Main.scala)

我的Main.scala班30号线是

val dbutils: DBUtils.type = com.databricks.service.DBUtils

就像这个文档页面上描述的一样

该页面显示了一种访问DBUtils的方法,该方法既可以在本地工作,也可以在集群中工作。但是这个例子只显示了Python,而我使用的是Scala。

在本地使用databricks连接和在运行JAR的databricks作业中访问它的正确方法是什么?

更新

使用DBUtils似乎有两种方法。

1( 这里描述的DbUtils类。引用文档,这个库允许您构建和编译项目,但不允许运行它。这不允许您在集群上运行本地代码。

2( 此处介绍了Databricks Connect。这个允许您在Databricks集群中运行本地Spark代码。

问题是这两种方法有不同的设置和程序包名称。似乎没有办法在本地使用Databricks Connect(这在集群中不可用(,但通过sbt/maven添加使用DbUtils类的jar应用程序,以便集群可以访问它。

我不知道为什么你提到的文档不起作用。也许你使用了不同的依赖关系?

这些文档有一个示例应用程序,您可以下载。这是一个测试非常少的项目,所以它不会创建作业,也不会尝试在集群上运行作业,但这只是一个开始。此外,请注意,它使用dbutils-api的旧0.0.1版本。

因此,要解决当前问题,请尝试从其他位置导入dbutils,而不是使用com.databricks.service.DBUtils

import com.databricks.dbutils_v1.DBUtilsHolder.dbutils

或者,如果你喜欢:

import com.databricks.dbutils_v1.{DBUtilsV1, DBUtilsHolder}
type DBUtils = DBUtilsV1
val dbutils: DBUtils = DBUtilsHolder.dbutils

此外,请确保您在SBT中具有以下依赖项(如果0.0.3不起作用,可能会尝试使用版本——最新的是0.0.4(:

libraryDependencies += "com.databricks" % "dbutils-api_2.11" % "0.0.3"

这个问题和答案为我指明了正确的方向。答案包含一个链接,指向一个正在使用dbutils:waimak的Github repo。我希望这个repo可以帮助您解决有关Databricks配置和依赖关系的进一步问题。

祝你好运!


更新

我明白了,所以我们有两个相似但不完全相同的API,并且没有很好的方法在本地版本和后端版本之间切换(尽管Databricks Connect承诺它无论如何都应该工作(。请让我提出一个变通办法。

Scala很方便编写适配器,这很好。这是一个应该作为桥梁工作的代码片段——这里定义了DBUtils对象,它为API的两个版本提供了足够的API抽象:在com.databricks.service.DBUtils上的Databricks Connect和后端com.databricks.dbutils_v1.DBUtilsHolder.dbutilsAPI。我们可以通过加载和随后通过反射使用com.databricks.service.DBUtils来实现这一点——我们没有对其进行硬编码导入

package com.example.my.proxy.adapter
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.catalyst.DefinedByConstructorParams
import scala.util.Try
import scala.language.implicitConversions
import scala.language.reflectiveCalls

trait DBUtilsApi {
type FSUtils
type FileInfo
type SecretUtils
type SecretMetadata
type SecretScope
val fs: FSUtils
val secrets: SecretUtils
}
trait DBUtils extends DBUtilsApi {
trait FSUtils {
def dbfs: org.apache.hadoop.fs.FileSystem
def ls(dir: String): Seq[FileInfo]
def rm(dir: String, recurse: Boolean = false): Boolean
def mkdirs(dir: String): Boolean
def cp(from: String, to: String, recurse: Boolean = false): Boolean
def mv(from: String, to: String, recurse: Boolean = false): Boolean
def head(file: String, maxBytes: Int = 65536): String
def put(file: String, contents: String, overwrite: Boolean = false): Boolean
}
case class FileInfo(path: String, name: String, size: Long)
trait SecretUtils {
def get(scope: String, key: String): String
def getBytes(scope: String, key: String): Array[Byte]
def list(scope: String): Seq[SecretMetadata]
def listScopes(): Seq[SecretScope]
}
case class SecretMetadata(key: String) extends DefinedByConstructorParams
case class SecretScope(name: String) extends DefinedByConstructorParams
}
object DBUtils extends DBUtils {
import Adapters._
override lazy val (fs, secrets): (FSUtils, SecretUtils) = Try[(FSUtils, SecretUtils)](
(ReflectiveDBUtils.fs, ReflectiveDBUtils.secrets)    // try to use the Databricks Connect API
).getOrElse(
(BackendDBUtils.fs, BackendDBUtils.secrets)    // if it's not available, use com.databricks.dbutils_v1.DBUtilsHolder
)
private object Adapters {
// The apparent code copying here is for performance -- the ones for `ReflectiveDBUtils` use reflection, while
// the `BackendDBUtils` call the functions directly.
implicit class FSUtilsFromBackend(underlying: BackendDBUtils.FSUtils) extends FSUtils {
override def dbfs: FileSystem = underlying.dbfs
override def ls(dir: String): Seq[FileInfo] = underlying.ls(dir).map(fi => FileInfo(fi.path, fi.name, fi.size))
override def rm(dir: String, recurse: Boolean = false): Boolean = underlying.rm(dir, recurse)
override def mkdirs(dir: String): Boolean = underlying.mkdirs(dir)
override def cp(from: String, to: String, recurse: Boolean = false): Boolean = underlying.cp(from, to, recurse)
override def mv(from: String, to: String, recurse: Boolean = false): Boolean = underlying.mv(from, to, recurse)
override def head(file: String, maxBytes: Int = 65536): String = underlying.head(file, maxBytes)
override def put(file: String, contents: String, overwrite: Boolean = false): Boolean = underlying.put(file, contents, overwrite)
}
implicit class FSUtilsFromReflective(underlying: ReflectiveDBUtils.FSUtils) extends FSUtils {
override def dbfs: FileSystem = underlying.dbfs
override def ls(dir: String): Seq[FileInfo] = underlying.ls(dir).map(fi => FileInfo(fi.path, fi.name, fi.size))
override def rm(dir: String, recurse: Boolean = false): Boolean = underlying.rm(dir, recurse)
override def mkdirs(dir: String): Boolean = underlying.mkdirs(dir)
override def cp(from: String, to: String, recurse: Boolean = false): Boolean = underlying.cp(from, to, recurse)
override def mv(from: String, to: String, recurse: Boolean = false): Boolean = underlying.mv(from, to, recurse)
override def head(file: String, maxBytes: Int = 65536): String = underlying.head(file, maxBytes)
override def put(file: String, contents: String, overwrite: Boolean = false): Boolean = underlying.put(file, contents, overwrite)
}
implicit class SecretUtilsFromBackend(underlying: BackendDBUtils.SecretUtils) extends SecretUtils {
override def get(scope: String, key: String): String = underlying.get(scope, key)
override def getBytes(scope: String, key: String): Array[Byte] = underlying.getBytes(scope, key)
override def list(scope: String): Seq[SecretMetadata] = underlying.list(scope).map(sm => SecretMetadata(sm.key))
override def listScopes(): Seq[SecretScope] = underlying.listScopes().map(ss => SecretScope(ss.name))
}
implicit class SecretUtilsFromReflective(underlying: ReflectiveDBUtils.SecretUtils) extends SecretUtils {
override def get(scope: String, key: String): String = underlying.get(scope, key)
override def getBytes(scope: String, key: String): Array[Byte] = underlying.getBytes(scope, key)
override def list(scope: String): Seq[SecretMetadata] = underlying.list(scope).map(sm => SecretMetadata(sm.key))
override def listScopes(): Seq[SecretScope] = underlying.listScopes().map(ss => SecretScope(ss.name))
}
}
}
object BackendDBUtils extends DBUtilsApi {
import com.databricks.dbutils_v1
private lazy val dbutils: DBUtils = dbutils_v1.DBUtilsHolder.dbutils
override lazy val fs: FSUtils = dbutils.fs
override lazy val secrets: SecretUtils = dbutils.secrets
type DBUtils = dbutils_v1.DBUtilsV1
type FSUtils = dbutils_v1.DbfsUtils
type FileInfo = com.databricks.backend.daemon.dbutils.FileInfo
type SecretUtils = dbutils_v1.SecretUtils
type SecretMetadata = dbutils_v1.SecretMetadata
type SecretScope = dbutils_v1.SecretScope
}
object ReflectiveDBUtils extends DBUtilsApi {
// This throws a ClassNotFoundException when the Databricks Connection API isn't available -- it's much better than
// the NoClassDefFoundError, which we would get if we had a hard-coded import of com.databricks.service.DBUtils .
// As we're just using reflection, we're able to recover if it's not found.
private lazy val dbutils: DBUtils =
Class.forName("com.databricks.service.DBUtils$").getField("MODULE$").get().asInstanceOf[DBUtils]
override lazy val fs: FSUtils = dbutils.fs
override lazy val secrets: SecretUtils = dbutils.secrets
type DBUtils = AnyRef {
val fs: FSUtils
val secrets: SecretUtils
}
type FSUtils = AnyRef {
def dbfs: org.apache.hadoop.fs.FileSystem
def ls(dir: String): Seq[FileInfo]
def rm(dir: String, recurse: Boolean): Boolean
def mkdirs(dir: String): Boolean
def cp(from: String, to: String, recurse: Boolean): Boolean
def mv(from: String, to: String, recurse: Boolean): Boolean
def head(file: String, maxBytes: Int): String
def put(file: String, contents: String, overwrite: Boolean): Boolean
}
type FileInfo = AnyRef {
val path: String
val name: String
val size: Long
}
type SecretUtils = AnyRef {
def get(scope: String, key: String): String
def getBytes(scope: String, key: String): Array[Byte]
def list(scope: String): Seq[SecretMetadata]
def listScopes(): Seq[SecretScope]
}
type SecretMetadata = DefinedByConstructorParams { val key: String }
type SecretScope = DefinedByConstructorParams { val name: String }
}

如果您将Main中提到的val dbutils: DBUtils.type = com.databricks.service.DBUtils替换为val dbutils: DBUtils.type = com.example.my.proxy.adapter.DBUtils,那么无论是在本地还是远程,一切都应该作为临时替换。

如果您有一些新的NoClassDefFoundError,请尝试向JAR作业添加特定的依赖项,或者尝试重新排列它们、更改版本或将依赖项标记为所提供的。

这个适配器并不漂亮,而且它使用了反射,但我希望它作为一个变通方法应该足够好。祝你好运:(

要访问dbutils.fs和dbutils.secrets Databricks实用程序,请使用dbutils模块。

示例:在scala编程中访问DBUtils看起来像:

val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())

参考:Databricks-访问DBUtils。

希望这能有所帮助。