我有一个spark代码,它读取.mf4文件并以.txt文件的形式写入。还在databricks中运行此代码。
你能提出建议吗?即使增加了集群中数据块中的执行器内存,仍然存在问题。
pip install asammdf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from asammdf import MDF
import io
import os
import sys
from pyspark.sql.functions import col
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession
.builder
.appName("mdf4")
.getOrCreate()
sc = spark.sparkContext
def decodeBinary(val):
file_stream = io.BytesIO(val)
mdf = MDF(file_stream)
location = mdf.whereis(test_1)
return location
print("1")
input_hdfs_path_to_mdf4 = "dbfs:/FileStore/inputmfd4/"
channel_name = "test_1"
local_or_hdfs_output_path = "dbfs:/FileStore/outputmfd4/opp4.txt"
print("2")
raw_binary = sc.binaryFiles(input_hdfs_path_to_mdf4)
print("3")
decoded_binary = raw_binary.map(lambda r: r[1]).map(decodeBinary)
print("4")
decoded_binary.saveAsTextFile(local_or_hdfs_output_path)
print("5")
print(decoded_binary)
我在databricks中运行这段代码,我有5Gb mf4文件作为输入。当我尝试运行小文件时,没有问题。但当我使用这个5GB mf4文件获得
Caused by org.apache.spark.api.python.PythonException: asammdf.blocks.utils.MdfException: <_io.BytesIO object at 0x7efed84482c0> is not a valid ASAM MDF file: magic header is bxffxd8xffxe1Hxe6Ex from command-2705692180399242 line 24 Full traceback below
Traceback (most recent call last)
您能打印文件中的前100个字节吗?它很可能不是有效的MDF文件
编辑:看到iPhone JPG图像有非标准的魔术字节ff d8 ff e1吗?