如何读取CSV到Dataframe,并操作它



我对pyspark很陌生,我试图用它来处理一个保存为csv文件的大型数据集。我想读CSV文件到spark数据框架,删除一些列,并添加新的列。我该怎么做呢?

我在将此数据放入数据框架时遇到麻烦。这是我目前为止的一个精简版本:

def make_dataframe(data_portion, schema, sql):
    fields = data_portion.split(",")
    return sql.createDateFrame([(fields[0], fields[1])], schema=schema)
if __name__ == "__main__":
    sc = SparkContext(appName="Test")
    sql = SQLContext(sc)
    ...
    big_frame = data.flatMap(lambda line: make_dataframe(line, schema, sql))
                .reduce(lambda a, b: a.union(b))
    big_frame.write 
        .format("com.databricks.spark.redshift") 
        .option("url", "jdbc:redshift://<...>") 
        .option("dbtable", "my_table_copy") 
        .option("tempdir", "s3n://path/for/temp/data") 
        .mode("append") 
        .save()
    sc.stop()

这会在reduce步骤产生一个错误TypeError: 'JavaPackage' object is not callable

这是可能的吗?reduce到一个数据框架的想法是能够将结果数据写入数据库(Redshift,使用spark-redshift包)。

我也试过使用unionAll()map()partial(),但不能让它工作。

我在Amazon的EMR上运行这个程序,使用spark-redshift_2.10:2.0.0和Amazon的JDBC驱动程序RedshiftJDBC41-1.1.17.1017.jar

更新-在评论中回答您的问题:

从CSV读取数据到dataframe:似乎您只尝试将CSV文件读取到spark数据框架中。

如果是,我的答案是:https://stackoverflow.com/a/37640154/5088142覆盖这个

下面的代码应该将CSV读入spark-data-frame
import pyspark
sc = pyspark.SparkContext()
sql = SQLContext(sc)
df = (sql.read
         .format("com.databricks.spark.csv")
         .option("header", "true")
         .load("/path/to_csv.csv"))
// these lines are equivalent in Spark 2.0 - using [SparkSession][1]
from pyspark.sql import SparkSession
spark = SparkSession 
    .builder 
    .appName("Python Spark SQL basic example") 
    .config("spark.some.config.option", "some-value") 
    .getOrCreate()
spark.read.format("csv").option("header", "true").load("/path/to_csv.csv") 
spark.read.option("header", "true").csv("/path/to_csv.csv")

drop column

你可以使用"drop(col)"来删除列https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

下降(col)

Returns a new DataFrame that drops the specified column.
Parameters: col – a string name of the column to drop, or a Column to drop.
>>> df.drop('age').collect()
[Row(name=u'Alice'), Row(name=u'Bob')]
>>> df.drop(df.age).collect()
[Row(name=u'Alice'), Row(name=u'Bob')]
>>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
[Row(age=5, height=85, name=u'Bob')]
>>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
[Row(age=5, name=u'Bob', height=85)]
<<p> 添加列/strong>你可以使用withColumnhttps://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

withColumn (colName坳)

Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
Parameters: 
    colName – string, name of the new column.
    col – a Column expression for the new column.
>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]

注意:spark有很多其他的功能可以使用(例如,你可以使用"select"而不是"drop")

相关内容

  • 没有找到相关文章

最新更新