Minhashlsh中的哈希表的增加,降低了准确性和F1



我已经使用了带有Scala和Spark 2.4的Minhashlsh,并在网络之间找到边缘。基于文档相似性的链接预测。我的问题是,虽然我在Minhashlsh中增加了哈希表,但我的准确性和F1得分正在下降。我已经阅读了此算法的所有内容,向我表明我有问题。

我尝试了不同数量的哈希表,我提供了不同数量的jaccard相似性阈值,但是我有相同的问题,准确性迅速降低。我还尝试了数据集的不同采样,但没有任何更改。我的工作流程继续这样做:我正在串联数据框的所有文本列,其中包括标题,作者,日记和摘要,接下来,我将串联的列将其标记为单词。然后,我使用countvectorizer将这种"单词袋"转换为向量。接下来,我将在Minhashlsh提供此列,并使用一些哈希表,最后我正在做一个近似值的Join,以找到在我给定的阈值下的类似的"论文"。我的实现是以下内容。

import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg._
import UnsupervisedLinkPrediction.BroutForce.join
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf, when}
import org.apache.spark.sql.types._

object lsh {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR) // show only errors
//    val cores=args(0).toInt
//    val partitions=args(1).toInt
//    val hashTables=args(2).toInt
//    val limit = args(3).toInt
//    val threshold = args(4).toDouble
    val cores="*"
    val partitions=1
    val hashTables=16
    val limit = 1000
    val jaccardDistance = 0.89
    val master = "local["+cores+"]"
    val ss = SparkSession.builder().master(master).appName("MinHashLSH").getOrCreate()
    val sc = ss.sparkContext
    val inputFile = "resources/data/node_information.csv"
    println("reading from input file: " + inputFile)
    println
    val schemaStruct = StructType(
      StructField("id", IntegerType) ::
        StructField("pubYear", StringType) ::
        StructField("title", StringType) ::
        StructField("authors", StringType) ::
        StructField("journal", StringType) ::
        StructField("abstract", StringType) :: Nil
    )
    // Read the contents of the csv file in a dataframe. The csv file contains a header.
    //    var papers = ss.read.option("header", "false").schema(schemaStruct).csv(inputFile).limit(limit).cache()
    var papers = ss.read.option("header", "false").schema(schemaStruct).csv(inputFile).limit(limit).cache()
    papers.repartition(partitions)
    println("papers.rdd.getNumPartitions"+papers.rdd.getNumPartitions)
    import ss.implicits._
    // Read the original graph edges, ground trouth
    val originalGraphDF = sc.textFile("resources/data/Cit-HepTh.txt").map(line => {
      val fields = line.split("t")
      (fields(0), fields(1))
    }).toDF("nodeA_id", "nodeB_id").cache()
    val originalGraphCount = originalGraphDF.count()
    println("Ground truth count: " + originalGraphCount )
    val nullAuthor = ""
    val nullJournal = ""
    val nullAbstract = ""
    papers = papers.na.fill(nullAuthor, Seq("authors"))
    papers = papers.na.fill(nullJournal, Seq("journal"))
    papers = papers.na.fill(nullAbstract, Seq("abstract"))
    papers = papers.withColumn("nonNullAbstract", when(col("abstract") === nullAbstract, col("title")).otherwise(col("abstract")))
    papers = papers.drop("abstract").withColumnRenamed("nonNullAbstract", "abstract")
    papers.show(false)
        val filteredGt= originalGraphDF.as("g").join(papers.as("p"),(
          $"g.nodeA_id" ===$"p.id") || ($"g.nodeB_id" ===$"p.id")
        ).select("g.nodeA_id","g.nodeB_id").distinct().cache()
    filteredGt.show()
    val filteredGtCount = filteredGt.count()
    println("Filtered GroundTruth count: "+ filteredGtCount)
    //TOKENIZE
    val tokPubYear = new Tokenizer().setInputCol("pubYear").setOutputCol("pubYear_words")
    val tokTitle = new Tokenizer().setInputCol("title").setOutputCol("title_words")
    val tokAuthors = new RegexTokenizer().setInputCol("authors").setOutputCol("authors_words").setPattern(",")
    val tokJournal = new Tokenizer().setInputCol("journal").setOutputCol("journal_words")
    val tokAbstract = new Tokenizer().setInputCol("abstract").setOutputCol("abstract_words")
    println("Setting pipeline stages...")
    val stages = Array(
      tokPubYear, tokTitle, tokAuthors, tokJournal, tokAbstract
      //      rTitle, rAuthors, rJournal, rAbstract
    )
    val pipeline = new Pipeline()
    pipeline.setStages(stages)
    println("Transforming dataframen")
    val model = pipeline.fit(papers)
    papers = model.transform(papers)
    println(papers.count())
    papers.show(false)
    papers.printSchema()
    val udf_join_cols = udf(join(_: Seq[String], _: Seq[String], _: Seq[String], _: Seq[String], _: Seq[String]))
    val joinedDf = papers.withColumn(
      "paper_data",
      udf_join_cols(
        papers("pubYear_words"),
        papers("title_words"),
        papers("authors_words"),
        papers("journal_words"),
        papers("abstract_words")
      )
    ).select("id", "paper_data").cache()
    joinedDf.show(5,false)
    val vocabSize = 1000000
    val cvModel: CountVectorizerModel = new CountVectorizer().setInputCol("paper_data").setOutputCol("features").setVocabSize(vocabSize).setMinDF(10).fit(joinedDf)
    val isNoneZeroVector = udf({v: Vector => v.numNonzeros > 0}, DataTypes.BooleanType)
    val vectorizedDf = cvModel.transform(joinedDf).filter(isNoneZeroVector(col("features"))).select(col("id"), col("features"))
    vectorizedDf.show()
    val mh = new MinHashLSH().setNumHashTables(hashTables)
      .setInputCol("features").setOutputCol("hashValues")
    val mhModel = mh.fit(vectorizedDf)
    mhModel.transform(vectorizedDf).show()
    vectorizedDf.createOrReplaceTempView("vecDf")
    println("MinHashLSH.getHashTables: "+mh.getNumHashTables)
    val dfA = ss.sqlContext.sql("select id as nodeA_id, features from vecDf").cache()
    dfA.show(false)
    val dfB = ss.sqlContext.sql("select id as nodeB_id, features from vecDf").cache()
    dfB.show(false)
    val predictionsDF = mhModel.approxSimilarityJoin(dfA, dfB, jaccardDistance, "JaccardDistance").cache()
    println("Predictions:")
    val predictionsCount = predictionsDF.count()
    predictionsDF.show()
    println("Predictions count: "+predictionsCount)
        predictionsDF.createOrReplaceTempView("predictions")
        val pairs = ss.sqlContext.sql("select datasetA.nodeA_id, datasetB.nodeB_id, JaccardDistance from predictions").cache()
        pairs.show(false)
        val totalPredictions = pairs.count()
        println("Properties:n")
        println("Threshold: "+threshold+"n")
        println("Hahs tables: "+hashTables+"n")
        println("Ground truth: "+filteredGtCount)
        println("Total edges found: "+totalPredictions +" n")

        println("EVALUATION PROCESS STARTSn")
        println("Calculating true positives...n")
        val truePositives = filteredGt.as("g").join(pairs.as("p"),
          ($"g.nodeA_id" === $"p.nodeA_id" && $"g.nodeB_id" === $"p.nodeB_id") || ($"g.nodeA_id" === $"p.nodeB_id" && $"g.nodeB_id" === $"p.nodeA_id")
        ).cache().count()
       println("True Positives: "+truePositives+"n")
        println("Calculating false positives...n")
        val falsePositives = predictionsCount - truePositives
        println("False Positives: "+falsePositives+"n")
        println("Calculating true negatives...n")
        val pairsPerTwoCount = (limit *(limit - 1)) / 2
        val trueNegatives = (pairsPerTwoCount - truePositives) - falsePositives
        println("True Negatives: "+trueNegatives+"n")
        val falseNegatives = filteredGtCount - truePositives
        println("False Negatives: "+falseNegatives)
        val truePN = (truePositives+trueNegatives).toFloat
        println("TP + TN sum: "+truePN+"n")
        val sum = (truePN + falseNegatives+ falsePositives).toFloat
        println("TP +TN +FP+ FN sum: "+sum+"n")
        val accuracy = (truePN/sum).toFloat
        println("Accuracy: "+accuracy+"n")
        val precision = truePositives.toFloat / (truePositives+falsePositives).toFloat
        val recall = truePositives.toFloat/(truePositives+falseNegatives).toFloat
        val f1Score = 2*(recall*precision)/(recall+precision).toFloat
        println("F1 score: "+f1Score+"n")
    ss.stop()

我忘了告诉你,我正在用40个内核和64克RAM的群集运行此代码。请注意,近似相似性JOIN(SPARK的实现)与Jaccard距离一起使用,而不是Jaccard索引。因此,我将其作为相似性阈值的jaccard距离,而对于我的情况是jaccarddistance = 1-阈值。(阈值= jaccard索引)。

我期望在增加哈希表的同时获得更高的准确性和F1得分。您对我的问题有任何想法吗?

事先感谢大家!

这里有多个可见问题,可能更隐藏,因此仅枚举一些:

  • lsh并不是真正的分类器,并且试图将其评估为没有意义,即使您认为文本相似性在某种程度上是引用的代理(如果是很大)。
  • 如果要将问题作为分类问题进行构架,则应将其视为多标签分类(每篇论文可以引用或由多个来源引用)而不是多级分类,因此简单的准确性不是有意义的。
  • 即使是一个分类,也可以评估,因为您的计算不包括实际的负面因素,而这些负面因素不符合approxSimilarityJoin的阈值
  • 还将阈值设置为1将连接限制为确切的匹配或哈希碰撞案例 - 因此,偏爱LSH具有较高的碰撞率。

另外:

  • 您采用的文本处理方法是行人,并且更喜欢非特异性功能(请记住,您不会优化实际目标,而是文本相似性)。
  • 这种方法,尤其是将所有事物视为平等的方法,主要丢弃了集合中的大多数有用信息,但不限于时间关系。

最新更新