如何使用Spark将嵌套的json对象字段读取到Scala案例类中



我有一个tweets json文件,其结构如下所示。(这只是我的推文文件中一条推文的样本(。我需要用 Spark 将其读取为 JSON 并将其转换为具有以下 scala 代码的 case 类。我需要读取嵌套的 json 文件的特定字段。我特别想从嵌套在推文结构中的"enttities"中读取"主题标签数组"。但是,我还没有找到有效的解决方案。爆炸"实体"给我错误:

*Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`entities`)' due to data type mismatch: input to function explode should be array or map type, not StructType(StructField(hashtags,ArrayType(StructType(StructField(indices,ArrayType(LongType,true),true), StructField(text,StringType,true)),true),true), StructField(media,ArrayType(StructType(StructField(display_url,StringType,true), StructField(expanded_url,StringType,true), StructField(id,LongType,true), StructField(id_str,StringType,true), StructField(indices,ArrayType(LongType,true),true), StructField(media_url,StringType,true), StructField(media_url_https,StringType,true), StructField(sizes,StructType(StructField(large,StructType(StructField(h,LongType,true), StructField(resize,StringType,true), StructField(w,LongType,true)),true), StructField(medium,StructType(StructField(h,LongType,true), StructField(resize,StringType,true), StructField(w,LongType,true)),true), StructField(small,StructType(StructField(h,LongType,true), StructField(resize,StringType,true), StructField(w,LongType,true)),true), StructField(thumb,StructType(StructField(h,LongType,true), StructField(resize,StringType,true), StructField(w,LongType,true)),true)),true), StructField(source_status_id,LongType,true), StructField(source_status_id_str,StringType,true), StructField(source_user_id,LongType,true), StructField(source_user_id_str,StringType,true), StructField(type,StringType,true), StructField(url,StringType,true)),true),true), StructField(symbols,ArrayType(StringType,true),true), StructField(urls,ArrayType(StructType(StructField(display_url,StringType,true), StructField(expanded_url,StringType,true), StructField(indices,ArrayType(LongType,true),true), StructField(url,StringType,true)),true),true), StructField(user_mentions,ArrayType(StructType(StructField(id,LongType,true), StructField(id_str,StringType,true), StructField(indices,ArrayType(LongType,true),true), StructField(name,StringType,true), StructField(screen_name,StringType,true)),true),true));*

推特 JSON 文件

{"created_at":"Mon Sep 04 12:34:09 +0000 2017","id":904684005001269248,"id_str":"904684005001269248","text":"u63a8u3057u304cu53f3u624bu3067u30ddu30fcu30bau3092u53d6u308au304cu3061u3002 https://t.co/bNmQSC2Xog","display_text_range":[0,15],"source":"u003ca href="http://twitter.com/download/iphone" rel="nofollow"u003eTwitter for iPhoneu003c/au003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":2930557759,"id_str":"2930557759","name":"ud83dude08u3042u3086u2693ufe0f","screen_name":"CR7_AYU","location":"u6e05u6d41","url":"http://ameblo.jp/shuka-saito/","description":"u9022u7530u68a8u9999u5b50u3055u3093u3001u6589u85e4u6731u590fu3055u3093u3001u5c0fu6797u611bu9999u3055u3093u3092u5fdcu63f4u3057u3066u3044u307eu3059u3002","translator_type":"none","protected":false,"verified":false,"followers_count":564,"friends_count":644,"listed_count":54,"favourites_count":142433,"statuses_count":138352,"created_at":"Mon Dec 15 05:22:02 +0000 2014","utc_offset":32400,"time_zone":"Tokyo","geo_enabled":false,"lang":"ja","contributors_enabled":false,"is_translator":false,"profile_background_color":"00BFFF","profile_background_image_url":"http://pbs.twimg.com/profile_background_images/605381334823890944/qdEfh3qD.jpg","profile_background_image_url_https":"https://pbs.twimg.com/profile_background_images/605381334823890944/qdEfh3qD.jpg","profile_background_tile":true,"profile_link_color":"4D5AAF","profile_sidebar_border_color":"000000","profile_sidebar_fill_color":"000000","profile_text_color":"000000","profile_use_background_image":false,"profile_image_url":"http://pbs.twimg.com/profile_images/858259320038768640/7tqZv7WS_normal.jpg","profile_image_url_https":"https://pbs.twimg.com/profile_images/858259320038768640/7tqZv7WS_normal.jpg","profile_banner_url":"https://pbs.twimg.com/profile_banners/2930557759/1503961622","default_profile":false,"default_profile_image":false,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"is_quote_status":false,"quote_count":0,"reply_count":0,"retweet_count":0,"favorite_count":0,"entities":{"hashtags":[],"urls":[],"user_mentions":[],"symbols":[],"media":[{"id":904683986949070848,"id_str":"904683986949070848","indices":[16,39],"media_url":"http://pbs.twimg.com/media/DI4VSvwVYAAQxsd.jpg","media_url_https":"https://pbs.twimg.com/media/DI4VSvwVYAAQxsd.jpg","url":"https://t.co/bNmQSC2Xog","display_url":"pic.twitter.com/bNmQSC2Xog","expanded_url":"https://twitter.com/CR7_AYU/status/904684005001269248/photo/1","type":"photo","sizes":{"medium":{"w":1200,"h":1200,"resize":"fit"},"thumb":{"w":150,"h":150,"resize":"crop"},"small":{"w":680,"h":680,"resize":"fit"},"large":{"w":2048,"h":2048,"resize":"fit"}}}]},"extended_entities":{"media":[{"id":904683986949070848,"id_str":"904683986949070848","indices":[16,39],"media_url":"http://pbs.twimg.com/media/DI4VSvwVYAAQxsd.jpg","media_url_https":"https://pbs.twimg.com/media/DI4VSvwVYAAQxsd.jpg","url":"https://t.co/bNmQSC2Xog","display_url":"pic.twitter.com/bNmQSC2Xog","expanded_url":"https://twitter.com/CR7_AYU/status/904684005001269248/photo/1","type":"photo","sizes":{"medium":{"w":1200,"h":1200,"resize":"fit"},"thumb":{"w":150,"h":150,"resize":"crop"},"small":{"w":680,"h":680,"resize":"fit"},"large":{"w":2048,"h":2048,"resize":"fit"}}}]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"low","lang":"ja","timestamp_ms":"1504528449665"}

基本上我想得到几个嵌套字段并且爆炸似乎对我不起作用。我做错了什么?

斯卡拉代码

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.sql.functions._

object TwitterAnalytics {
  def main(args:Array[String]): Unit= {
    val conf = new SparkConf()
    conf.setAppName("TwitterAnalytics")
    conf.setMaster("local[2]")
    val sc = new SparkContext(conf)
    println(sc)
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()

    val df= spark.read.json("/home/gakuo/Downloads/TwitterAnalytics/tweets")
    println(df.select(explode(df("entities"))))
    println(df.select(explode(df("retweeted_status"))))

    type Tag = String
    type Likes = Int
    case class Tweet(id: BigInt,
                     text: String,
                     hashTags: Array[Tag],
                     likes: Likes)
    def parseTweet(tweet: DataFrame): Tweet = ???    
  }
}

重新定义(并移动到外部范围(:

type Tag = String
type Likes = Long   // Integer doesn't have required precision
case class Tweet(id: Long,
                 text: String,
                 hashTags: Array[Tag],
                 likes: Likes)

导入隐式:

import spark.implicits._

. select(假设您希望favorite_count likes(:

val Dataset[Tweet] = df
  .select($"id", $"text", $"entities.hashtags", $"favorite_count" as "likes")
  .as[Tweet]

相关内容

  • 没有找到相关文章

最新更新