下面的spark代码用于创建数据管道。
package Test
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
object myjson {def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Readfile")
.config("spark.driver.memory", "2g")
.master("local[*]")
//.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val df = spark.read.option("multiLine", true).json("D:mypathTest/myfile.json")
df.printSchema()
val newdf = ds.withColumn("upTime",regexp_replace(col("upTime"),"[a-zA-Z]","")).
}
}
有没有任何方法可以在spark编写的Scala中创建日志记录和警报机制。或如何实现错误处理,如if文件不在路径错误。请帮帮我。
对于错误处理,可以使用try/catch
语句。https://alvinalexander.com/scala/scala-try-catch-finally-syntax-examples-exceptions-wildcard/
对于日志记录,您可以使用log4j。https://logging.apache.org/log4j/2.x/manual/scala-api.html
Spark-throw InvalidInputException当您在Spark-read API中传递无效源时。。。
你可以有类似下面的scala代码
try{
// reading through spark
}catch{
case filenotfound : InvalidInputException => {log.error("please check input ",filenotfound)
handleException()
}
case others : Exception => handleException()
}
def handleException() = {
// have a notification system like AWS SES or some other alerting systems here
}
对于日志记录,您可以使用log4j框架创建日志对象并使用它记录错误。