将Hive Sql转换为Spark Sql



我想将Hive Sql转换为Spark Sql来测试查询的性能。这是我的Hive Sql。谁能建议我如何将Hive Sql转换为Spark Sql ?

SELECT split(DTD.TRAN_RMKS,'/')[0] AS TRAB_RMK1,
split(DTD.TRAN_RMKS,'/')[1] AS ATM_ID,
DTD.ACID,
G.FORACID,
DTD.REF_NUM,
DTD.TRAN_ID,
DTD.TRAN_DATE,
DTD.VALUE_DATE,
DTD.TRAN_PARTICULAR,
DTD.TRAN_RMKS,
DTD.TRAN_AMT,
SYSDATE_ORA(),
DTD.PSTD_DATE,
DTD.PSTD_FLG,
G.CUSTID,
NULL AS PROC_FLG,
DTD.PSTD_USER_ID,
DTD.ENTRY_USER_ID,
G.schemecode as SCODE
FROM DAILY_TRAN_DETAIL_TABLE2 DTD
JOIN ods_gam G
ON DTD.ACID = G.ACID
where substr(DTD.TRAN_PARTICULAR,1,3) rlike '(PUR|POS).*'
AND DTD.PART_TRAN_TYPE = 'D'
AND DTD.DEL_FLG <> 'Y'
AND DTD.PSTD_FLG = 'Y'
AND G.schemecode IN ('SBPRV','SBPRS','WSSTF','BGFRN','NREPV','NROPV','BSNRE','BSNRO')
AND  (SUBSTR(split(DTD.TRAN_RMKS,'/')[0],1,6) IN ('405997','406228','406229','415527','415528','417917','417918','418210','421539','421572','432198','435736','450502','450503','450504','468805','469190','469191','469192','474856','478286','478287','486292','490222','490223','490254','512932','512932','514833','522346','522352','524458','526106','526701','527114','527479','529608','529615','529616','532731','532734','533102','534680','536132','536610','536621','539149','539158','549751','557654','607118','607407','607445','607529','652189','652190','652157') OR   SUBSTR(split(DTD.TRAN_RMKS,'/')[0],1,8)  IN ('53270200','53270201','53270202','60757401','60757402') )
limit 50;

上面的查询代码很长,我不会尝试在这里编写代码,但我会提供DataFrames方法。

可以灵活地使用DataFrame, Column操作来实现上述查询如filter, withColumn(如果你想转换/应用hive UDF到scala函数/udf), cast用于转换数据类型等。最近我做了这个实验,结果很好。下面是Scala中的psuedo代码

   val df1 = hivecontext.sql ("select * from ods_gam").as("G")
   val df2 = hivecontext.sql("select * from DAILY_TRAN_DETAIL_TABLE2).as("DTD")
    

现在,使用dataframes

进行连接
val joinedDF = df1.join(df2 ,  df1("G.ACID") = df2("DTD.ACID"), "inner")
// now apply your string functions here...
joinedDF.withColumn or filter ,When otherwise ... blah.. blah here

注意:我认为在您的情况下不需要udfs,简单的字符串函数就足够了。

还可以看看DataFrameJoinSuite。Scala,可能对你很有用…

详情请参考docs

Spark 1.5:

  • DataFrame.html

  • 所有数据框架列操作column .html

    如果您正在寻找UDF的示例代码,下面是代码片段。

构造虚拟数据

import util.Random
import org.apache.spark.sql.Row
implicit class Crossable[X](xs: Traversable[X]) {
  def cross[Y](ys: Traversable[Y]) = for { x <- xs; y <- ys } yield (x, y)
}
val students = Seq("John", "Mike","Matt")
val subjects = Seq("Math", "Sci", "Geography", "History")
val random = new Random(1)
val data =(students cross subjects).map{x  =>  Row(x._1, x._2,random.nextInt(100))}.toSeq
 
// Create Schema Object
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType}
val schema = StructType(Array(
            StructField("student", StringType, nullable=false),
            StructField("subject", StringType, nullable=false),
            StructField("score", IntegerType, nullable=false)
    ))
 
// Create DataFrame 
import org.apache.spark.sql.hive.HiveContext
val rdd = sc.parallelize(data)
val df = sqlContext.createDataFrame(rdd, schema)
 
// Define udf
import org.apache.spark.sql.functions.udf
def udfScoreToCategory=udf((score: Int) => {
        score match {
        case t if t >= 80 => "A"
        case t if t >= 60 => "B"
        case t if t >= 35 => "C"
        case _ => "D"
    }})
df.withColumn("category", udfScoreToCategory(df("score"))).show(10)

试着按原样使用它,如果你在MapReduce上使用Hive运行这个查询,你应该马上受益,从那里如果你仍然需要得到更好的结果,你可以分析查询计划并进一步优化,比如使用分区。Spark使用内存更多,除了简单的转换之外,通常比MapReduce更快,Spark sql还使用Catalyst Optimizer,您的查询也从中受益。

考虑到你关于"使用spark函数如Map, Filter等"的评论,map()只是转换数据,但你只有字符串函数,我不认为你会通过使用.map(...)重写它们获得任何东西,spark会为你做转换,filter()如果你可以过滤输入数据,你可以使用子查询和其他sql功能重写查询。

相关内容

  • 没有找到相关文章

最新更新