用Sparksession创建广播变量?火花2.0



是否可以使用SparkSession提供的SparkContext创建广播变量?我一直在sc.broadcast下遇到错误,但是在使用org.apache.spark.sparkcontext的SparkContext时,我没有问题。

import org.apache.spark.sql.SparkSession

object MyApp {
 def main(args: Array[String]){
  val spark = SparkSession.builder()
       .appName("My App")
       .master("local[*]")
       .getOrCreate()
  val sc = spark.sparkContext
        .setLogLevel("ERROR")
  val path = "C:\Boxes\github-archive\2015-03-01-0.json"
  val ghLog = spark.read.json(path)

  val pushes = ghLog.filter("type = 'PushEvent'")
  pushes.printSchema()
  println("All events: "+ ghLog.count)
  println("Only pushes: "+pushes.count)
  pushes.show(5)

  val grouped = pushes.groupBy("actor.login").count()
  grouped.show(5)

  val ordered = grouped.orderBy(grouped("count").desc)
  ordered.show(5)
  import scala.io.Source.fromFile
  val fileName= "ghEmployees.txt"
  val employees = Set() ++ ( 
    for { 
      line <- fromFile(fileName).getLines()
    } yield line.trim
    )

  val bcEmployees = sc.broadcast(employees)
 }
}

还是使用set()而不是seq对象的问题?

感谢您的任何帮助

编辑:

我一直在Intellij

中继续获得"无法解决符号广播"错误味精

遵守后,我会发现一个错误:错误:(47,28)价值广播不是单位的成员 val bcemployees = sc.broadcast(员工) ^

您的sc变量具有Unit类型,因为根据文档,setLogLevel具有返回类型Unit。而是这样做:

val sc: SparkContext = spark.sparkContext
sc.setLogLevel("ERROR")

重要的是跟踪更早捕获错误的变量类型。

相关内容

  • 没有找到相关文章

最新更新