Spark和Drools集成(从drl文件读取规则)



我正在编写一个spark程序,它从RDD获取输入并在其上运行一些从drl文件读取的流口水规则。

在drl文件中,我已经制定了一个规则,无论对象的hz属性为0,它都应该将counter属性增加1。

我不知道为什么不工作,它为流中的所有数据提供了0的输出(是的,有hz属性等于0的数据,是的,我可以打印所有属性并验证即使对它们counter也是0)

我正在使用我在github项目中找到的KieSessionFactory类https://github.com/mganta/sprue/blob/master/src/main/java/com/cloudera/sprue/KieSessionFactory.java

但是我很确定这部分不是问题所在,它只从drl文件中读取并应用规则。

下面是我的scala代码:(我已经标记了我认为问题所在的部分,但请先看看drl文件)

package com.streams.Scala_Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.{ DStream, InputDStream, ConstantInputDStream }
import org.apache.spark.streaming.kafka.v09.KafkaUtils
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.sql.functions.avg
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka.producer._
import org.apache.kafka.common.serialization.{ Deserializer, Serializer }
import org.apache.kafka.common.serialization.StringSerializer
import org.kie.api.runtime.StatelessKieSession
//import KieSessionFactory.getKieSession;
//import Sensor
object scala_consumer extends Serializable {
// schema for sensor data   
class Sensor(resid_1: String, date_1: String, time_1: String, hz_1: Double, disp_1: Double, flo_1: Double, sedPPM_1: Double, psi_1: Double, chlPPM_1: Double, counter_1: Int) extends Serializable
{
var resid = resid_1
var date = date_1
var time = time_1
var hz = hz_1
var disp = disp_1
var flo = flo_1
var sedPPM = sedPPM_1
var psi = psi_1
var chlPPM = chlPPM_1
var counter = counter_1
def IncrementCounter (param: Int) =
{
    counter = counter + param
}
}
// function to parse line of sensor data into Sensor class
def parseSensor(str: String): Sensor = {
    val p = str.split(",")
    //println("printing p: " + p)
    new Sensor(p(0), p(1), p(2), p(3).toDouble, p(4).toDouble, p(5).toDouble, p(6).toDouble, p(7).toDouble, p(8).toDouble, 0)
}
var counter = 0
val timeout = 10 // Terminate after N seconds
val batchSeconds = 2 // Size of batch intervals
def main(args: Array[String]): Unit = {
    val brokers = "maprdemo:9092" // not needed for MapR Streams, needed for Kafka
    val groupId = "testgroup"
    val offsetReset = "latest"
    val batchInterval = "2"
    val pollTimeout = "1000"
    val topics = "/user/vipulrajan/streaming/original:sensor"
    val topica = "/user/vipulrajan/streaming/fail:test"
    val xlsFileName = "./src/main/Rules.drl"
    val sparkConf = new SparkConf().setAppName("SensorStream").setMaster("local[1]").set("spark.testing.memory", "536870912")
                                                                    .set("spark.streaming.backpressure.enabled", "true")
                                  .set("spark.streaming.receiver.maxRate", Integer.toString(2000000))
                                  .set("spark.streaming.kafka.maxRatePerPartition", Integer.toString(2000000));
    val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt))
    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String](
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
        ConsumerConfig.GROUP_ID_CONFIG -> groupId,
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
            "org.apache.kafka.common.serialization.StringDeserializer",
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
            "org.apache.kafka.common.serialization.StringDeserializer",
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset,
        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
        "spark.kafka.poll.time" -> pollTimeout
    )
    val producerConf = new ProducerConf(
        bootstrapServers = brokers.split(",").toList
    )
    val messages = KafkaUtils.createDirectStream[String, String](ssc, kafkaParams, topicsSet)
    val values: DStream[String] = messages.map(_._2)
    println("message values received")
    //values.print(10)
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************/////////////
    values.foreachRDD(x => try{
                                print("did 1n")     //markers for manual and minor debugging
                                val myData = x.mapPartitions(s => {s.map(sens => {parseSensor(sens)})})
                                //myData.collect().foreach(println)
                                //println(youData.date)
                                print("did 2n")
                                val evalData = myData.mapPartitions(s => {
                                val ksession = KieSessionFactory.getKieSession(xlsFileName)
                                val retData = s.map(sens => {ksession.execute(sens); sens;})
                                retData
                                })
                                evalData.foreach(t => {println(t.counter)})
                                print("did 3n")
                               }
    catch{case e1: ArrayIndexOutOfBoundsException => println("exception in line " )})
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************///////////// 
    println("filtered alert messages ")
    // Start the computation
    ssc.start()
    // Wait for the computation to terminate
    ssc.awaitTermination()
}
}

package droolsexample
import com.streams.Scala_Consumer.Sensor;
import scala.com.streams.Scala_Consumer.Sensor; //imported because my rules file lies in the src/main folder
                                            //and code lies in src/main/scala 
// declare any global variables here
dialect "java"
rule "Counter Incrementer"
when
    sens : Sensor (hz == 0)
then
    sens.IncrementCounter(1);
end

我尝试过使用xls文件而不是drl文件,我尝试过在java中创建类和scala中的对象。我已经尝试了很多其他的方法,但是我在输出中得到的只是一个警告:

6/06/27 16:38:30.462 Executor task launch worker-0 WARN AbstractKieModule: No files found for KieBase defaultKieBase

,当我打印计数器值时,得到的都是零。有人来救援吗?

当你做spark提交和传递你的JAR执行时,请确保KIE等其他依赖JAR也包含在同一个JAR中,然后用Spark-Submit运行它。

另一个是有两个单独的项目一个是spark程序另一个是KIE项目所以你有两个jar,你像下面这样运行它:

 nohup spark-submit --conf "spark.driver.extraJavaOptions -Dlog4j.configuration=file:/log4j.properties" 
     --queue abc 
--master yarn 
--deploy-mode cluster 
--jars drools-kie-project-0.0.1-SNAPSHOT.jar --class com.abc.DroolsSparkJob SparkcallingDrools-0.0.1-SNAPSHOT.jar 
-inputfile /user/hive/warehouse/abc/* -output /user/hive/warehouse/drools-Op > app.log &

最新更新