

from datetime import datetime
import pandas as pd
ts = [datetime(2022, 1, 1, 0), datetime(2022, 1, 1, 1), datetime(2022, 1, 1, 2)] 
pandas_df = pd.DataFrame({"Timestamp": ts, "column_a": ["a", "b", "c"], "column_b": [1.1, 2.2, 3.3], "column_c": [5, 6, 7]})
df = spark.createDataFrame(pandas_df)
df = df.withColumn("column_c", df.column_c.cast("int"))


Timestamp   column_a    column_b    column_c
0   2022-01-01 00:00:00 a   1.1 5
1   2022-01-01 01:00:00 b   2.2 6
2   2022-01-01 02:00:00 c   3.3 7


Timestamp   Columns string  double  int
2022-01-01T02:00:00.000+0000    column_c    null    null    7
2022-01-01T02:00:00.000+0000    column_b    null    3.3 null
2022-01-01T02:00:00.000+0000    column_a    c   null    null
2022-01-01T01:00:00.000+0000    column_c    null    null    6
2022-01-01T01:00:00.000+0000    column_b    null    2.2 null
2022-01-01T01:00:00.000+0000    column_a    b   null    null
2022-01-01T00:00:00.000+0000    column_c    null    null    5
2022-01-01T00:00:00.000+0000    column_b    null    1.1 null
2022-01-01T00:00:00.000+0000    column_a    a   null    null


import pyspark.sql.functions as func
final_df = None
for data_type in ["string", "double", "int"]:

dtype_cols = [item[0] for item in df.dtypes if item[1] == data_type]
stack_str = ",".join([f"'{item}', {item}" for item in dtype_cols])
expression = f"stack({len(dtype_cols)}, {stack_str}) as (Columns, {data_type})"
untransposed_df = df.select("Timestamp", func.expr(expression))
if not final_df:
final_df = untransposed_df
final_df = final_df.join(untransposed_df, ["Timestamp", "Columns"], how="outer") 




no_timestamp_columns = [ field for field in df.schema.fields  if field.name != 'timestamp' ] # use list comprehension to pull out fields we will need.
col("timestamp") ,
explode( #make many rows from this array
array( *[ # use list comprehension to build array.
struct( # use struct to group the data we want as columns together
lit(column.name).alias("column"), #for each existing column
*[ # create standard set of columns for all values nulling out the ones that aren't the name of the column we are working with and providing the value for the column we are working with.
(( lit(None) if value.name != column.name else col(column.name) ).cast(value.dataType) ).alias(str(value.dataType)) for value in no_timestamp_columns 
) for column in no_timestamp_columns 
col("timestamp") ,
|           timestamp|column|StringType|LongType|
|2022-01-28 23:32:...|  Col1|      str1|    null|
|2022-01-28 23:32:...|  Col2|      null|     100|
|2022-02-28 23:02:...|  Col1|      str2|    null|
|2022-02-28 23:02:...|  Col2|      null|     202|
|2022-02-28 17:22:...|  Col1|      str3|    null|
|2022-02-28 17:22:...|  Col2|      null|     102|
|2022-02-28 23:19:...|  Col1|      str4|    null|
|2022-02-28 23:19:...|  Col2|      null|     102|
|2022-03-29 17:32:...|  Col1|      str5|    null|
|2022-03-29 17:32:...|  Col2|      null|     102|
|2022-01-28 23:32:...|  Col1|      str6|    null|
|2022-01-28 23:32:...|  Col2|      null|     101|
|2022-02-28 17:28:...|  Col1|      str7|    null|
|2022-02-28 17:28:...|  Col2|      null|     201|
|2022-03-28 23:59:...|  Col1|      str8|    null|
|2022-03-28 23:59:...|  Col2|      null|     100|
|2022-02-28 21:02:...|  Col1|      str9|    null|
|2022-02-28 21:02:...|  Col2|      null|     100|



from  pyspark.sql.functions import col, first, array, struct
>>> data = [
... ("2022-01-28 23:32:52.0","str1",100),
... ("2022-02-28 23:02:40.0","str2",202),
... ("2022-02-28 17:22:45.0","str3",102),
... ("2022-02-28 23:19:37.0","str4",102),
... ("2022-03-29 17:32:02.0","str5",102),
... ("2022-01-28 23:32:40.0","str6",101),
... ("2022-02-28 17:28:09.0","str7",201),
... ("2022-03-28 23:59:54.0","str8",100),
... ("2022-02-28 21:02:40.0","str9",100),
... ]
>>> df = spark.createDataFrame(data)
>>> df = df.toDF("timestamp", "Col1", "Col2")
>>> df.show()
|           timestamp|Col1|Col2|
|2022-01-28 23:32:...|str1| 100|
|2022-02-28 23:02:...|str2| 202|
|2022-02-28 17:22:...|str3| 102|
|2022-02-28 23:19:...|str4| 102|
|2022-03-29 17:32:...|str5| 102|
|2022-01-28 23:32:...|str6| 101|
|2022-02-28 17:28:...|str7| 201|
|2022-03-28 23:59:...|str8| 100|
|2022-02-28 21:02:...|str9| 100|
col("timestamp") , 
array( #build array of column names
*[ lit(field.name) for field in df.schema.fields  if field.name != 'timestamp' ] 
struct( #build struct of inverted valued
*[col(field.name).alias(str(field.dataType)) for field in df.schema.fields if field.name != 'timestamp'] #use list comprehension to pass varargs in format we want
).alias("data") )
|           timestamp| col|StringType|LongType|
|2022-01-28 23:32:...|Col1|      str1|     100|
|2022-01-28 23:32:...|Col2|      str1|     100|
|2022-02-28 23:02:...|Col1|      str2|     202|
|2022-02-28 23:02:...|Col2|      str2|     202|
|2022-02-28 17:22:...|Col1|      str3|     102|
|2022-02-28 17:22:...|Col2|      str3|     102|
|2022-02-28 23:19:...|Col1|      str4|     102|
|2022-02-28 23:19:...|Col2|      str4|     102|
|2022-03-29 17:32:...|Col1|      str5|     102|
|2022-03-29 17:32:...|Col2|      str5|     102|
|2022-01-28 23:32:...|Col1|      str6|     101|
|2022-01-28 23:32:...|Col2|      str6|     101|
|2022-02-28 17:28:...|Col1|      str7|     201|
|2022-02-28 17:28:...|Col2|      str7|     201|
|2022-03-28 23:59:...|Col1|      str8|     100|
|2022-03-28 23:59:...|Col2|      str8|     100|
|2022-02-28 21:02:...|Col1|      str9|     100|
|2022-02-28 21:02:...|Col2|      str9|     100|
