如何在Spark中检索数据框的别名



我正在使用Spark 2.0.2。我有一个具有别名的数据框,我希望能够检索它。一个简化的例子,说明我为什么要在下面。

def check(ds: DataFrame) = {
   assert(ds.count > 0, s"${df.getAlias} has zero rows!")    
}

上面的代码当然会失败,因为数据框架没有 getalias 函数。有办法做到吗?

您可以尝试这样的事情,但我不会远远宣称它得到了支持:

  • spark<2.1:

    import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
    import org.apache.spark.sql.Dataset
    def getAlias(ds: Dataset[_]) = ds.queryExecution.analyzed match {
      case SubqueryAlias(alias, _) => Some(alias)
      case _ => None
    }
    
  • 火花2.1 :

    def getAlias(ds: Dataset[_]) = ds.queryExecution.analyzed match {
      case SubqueryAlias(alias, _, _) => Some(alias)
      case _ => None
    }
    

示例用法:

val plain = Seq((1, "foo")).toDF
getAlias(plain)
Option[String] = None
val aliased = plain.alias("a dataset")
getAlias(aliased)
Option[String] = Some(a dataset)

免责声明:如上所述,此代码依赖于无证件的API。它的起作用为Spark 2.3。

大量挖掘了大多数无证件的火花方法之后,这是绘制字段列表的完整代码,以及pyspark中dataframe的表别名:

def schema_from_plan(df):
    plan = df._jdf.queryExecution().analyzed()
    all_fields = _schema_from_plan(plan)
    iterator = plan.output().iterator()
    output_fields = {}
    while iterator.hasNext():
        field = iterator.next()
        queryfield = all_fields.get(field.exprId().id(),{})
        if not queryfield=={}:
            tablealias = queryfield["tablealias"]
        else:
            tablealias = ""
        output_fields[field.exprId().id()] = {
            "tablealias": tablealias,
            "dataType": field.dataType().typeName(),
            "name": field.name()
        }
    return list(output_fields.values())
def _schema_from_plan(root,tablealias=None,fields={}):
    iterator = root.children().iterator()
    while iterator.hasNext():
        node = iterator.next()
        nodeClass = node.getClass().getSimpleName()
        if (nodeClass=="SubqueryAlias"):
            # get the alias and process the subnodes with this alias
            _schema_from_plan(node,node.alias(),fields)
        else:
            if tablealias:
                # add all the fields, along with the unique IDs, and a new tablealias field            
                iterator = node.output().iterator()
                while iterator.hasNext():
                    field = iterator.next()
                    fields[field.exprId().id()] = {
                        "tablealias": tablealias,
                        "dataType": field.dataType().typeName(),
                        "name": field.name()
                    }
            _schema_from_plan(node,tablealias,fields)
    return fields
# example: fields = schema_from_plan(df)

for Java

正如@veinhorn所述,也可以在 java 中获得别名。这是一个实用方法示例:

public static <T> Optional<String> getAlias(Dataset<T> dataset){
    final LogicalPlan analyzed = dataset.queryExecution().analyzed();
    if(analyzed instanceof SubqueryAlias) {
        SubqueryAlias subqueryAlias = (SubqueryAlias) analyzed;
        return Optional.of(subqueryAlias.alias());
    }
    return Optional.empty();
}

相关内容

  • 没有找到相关文章

最新更新