Spark Sql: TypeError( "StructType can not accept object in type %s" % type(obj))



我目前正在使用PyODBC从SQL Server中提取数据,并试图以近实时(NRT)方式插入配置单元中的表中。

我从源代码中得到了一行,并转换为List[Strings]并用程序创建了模式,但在创建DataFrame时,Spark抛出了StructType错误。

>>> cnxn = pyodbc.connect(con_string)
>>> aj = cnxn.cursor()
>>>
>>> aj.execute("select * from tjob")
<pyodbc.Cursor object at 0x257b2d0>
>>> row = aj.fetchone()
>>> row
(1127, u'', u'8196660', u'', u'', 0, u'', u'', None, 35, None, 0, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, u'', 0, None, None)
>>> rowstr = map(str,row)
>>> rowstr
['1127', '', '8196660', '', '', '0', '', '', 'None', '35', 'None', '0', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', '', '0', 'None', 'None']
>>> schemaString = " ".join([row.column_name for row in aj.columns(table='tjob')])
>>> schemaString
u'ID ExternalID Name Description Notes Type Lot SubLot ParentJobID ProductID PlannedStartDateTime PlannedDurationSeconds Capture01 Capture02 Capture03 Capture04 Capture05 Capture06 Capture07 Capture08 Capture09 Capture10 Capture11 Capture12 Capture13 Capture14 Capture15 Capture16 Capture17 Capture18 Capture19 Capture20 User UserState ModifiedDateTime UploadedDateTime'
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
>>> schema = StructType(fields)
>>> [f.dataType for f in schema.fields]
[StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType]
>>> myrdd = sc.parallelize(rowstr)
>>> myrdd.collect()
['1127', '', '8196660', '', '', '0', '', '', 'None', '35', 'None', '0', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', '', '0', 'None', 'None']
>>> schemaPeople = sqlContext.createDataFrame(myrdd, schema)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/context.py", line 404, in createDataFrame
    rdd, schema = self._createFromRDD(data, schema, samplingRatio)
  File "/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/context.py", line 298, in _createFromRDD
    _verify_type(row, schema)
  File "/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/types.py", line 1132, in _verify_type
    raise TypeError("StructType can not accept object in type %s" % type(obj))
TypeError: StructType can not accept object in type <type 'str'>

以下是错误消息的原因:

>>> rowstr
['1127', '', '8196660', '', '', '0', '', '', 'None' ... ]   
#rowstr is a list of str
>>> myrdd = sc.parallelize(rowstr)
#myrdd is a rdd of str
>>> schema = StructType(fields)
#schema is StructType([StringType, StringType, ....])
>>> schemaPeople = sqlContext.createDataFrame(myrdd, schema)
#myrdd should have been RDD([StringType, StringType,...]) but is RDD(str)

要解决这个问题,请制作正确类型的RDD:

>>> myrdd = sc.parallelize([rowstr])

我现在收到了类似的错误!

TypeError:StructType无法接受类型<class 'str'> 中的对象"_id"

我就是这样解决的。

我正在使用大量嵌套的json文件进行调度,json文件由列表等字典的列表组成。

例如['1127', {time: '_id', '8196660', '', '', '0', '', '', 'None' ...}, {busstops: {_id, name} ]

对我来说,_id在其他字典中重复了很多次,我通过指定字典键来解决它。

kl= spark.createDataFrame(obj_day, schema=test()) #: I get the error

但我用解决了

kl= spark.createDataFrame(obj_day["busstops"], schema=test())

最新更新