AWS EMR:文件存在,但错误显示文件不存在



我用Release label:emr-6.2.0创建了一个AWS EMR Spark集群Hadoop distribution:AmazonApplications:Spark 3.0.1, Zeppelin 0.9.0并将我所有的本地文件(.jar、.py、.csv和sas7bdat)复制到集群主

当我做时

[hadoop@ip-172-31-22-207 ~]$ ls -al /home/hadoop/sas_data1/
total 1071812
rwxrwxr-x 2 hadoop hadoop 66 Sep 13 04:08 .
drwxr-xr-x 7 hadoop hadoop 4096 Sep 13 04:38 ..
-rw-r--r-- 1 hadoop hadoop 471990272 Sep 13 04:07 file1.sas7bdat
-rw-r--r-- 1 hadoop hadoop 625541120 Sep 13 04:08 file2.sas7bdat

输出显示该文件存在。此外,在我的/home/hadoop程序中,

def process_raw_data(inputs, output):
spark = SparkSession.builder.
config("spark.jars.packages", "saurfang:spark-sas7bdat:3.0.0-s_2.12").
enableHiveSupport().getOrCreate()
sas_dir = f'{os.getcwd()}/sas_data1'
for filename in os.listdir(f"{sas_dir}"):
extension = os.path.splitext(filename)[1]
print("!!!!!!!!!!",f'{sas_dir}/{filename}')
df_spark = spark.read.format('com.github.saurfang.sas.spark').
load(f'{sas_dir}/{filename}')
raw_df = df_spark.select('field1','field2')
raw_df.write.mode('append').parquet(output + '/raw_data_output')

我正在遍历sas_data1目录中的文件,在输出中,它将文件名正确显示为!!!!!!!!!! /home/hadoop/sas_data1/file1.sas7bdat,这只有在文件存在的情况下才可能。但我得到了一个错误,文件不存在。我运行了以下命令;

spark-submit --jars parso-2.0.11.jar,spark-sas7bdat-3.0.0-s_2.12.jar,hadoop-aws-2.7.4.jar,aws-java-sdk-1.7.4.jar --master yarn process_raw_files.py
File "/home/hadoop/process_raw_files.py", line 112, in <module>
output_bucket % 'raw_immigration_output')
File "/home/hadoop/process_raw_files.py", line 21, in process_raw_immigration
load(f'{sas_dir}/{filename}')
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 178, in load
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 128, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o67.load.
: java.io.FileNotFoundException: File does not exist: /home/hadoop/sas_data1/i94_apr16_sub.sas7bdat
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:158)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1962)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:755)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:439)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:999)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:927)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2915)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:866)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:853)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:842)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1010)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:319)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:315)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:327)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:906)
at com.github.saurfang.sas.spark.SasRelation.inferSchema(SasRelation.scala:181)
at com.github.saurfang.sas.spark.SasRelation.<init>(SasRelation.scala:73)
at com.github.saurfang.sas.spark.SasRelation$.apply(SasRelation.scala:45)
at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:209)
at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:42)
at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:27)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /home/hadoop/sas_data1/i94_apr16_sub.sas7bdat
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:158)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1962)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:755)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:439)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:999)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:927)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2915)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1545)
at org.apache.hadoop.ipc.Client.call(Client.java:1491)
at org.apache.hadoop.ipc.Client.call(Client.java:1388)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
at com.sun.proxy.$Proxy17.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:324)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:864)
... 31 more

我将文件存储在电子病历集群主机的正常存储中。为什么调试会显示文件名,但错误提到文件不存在?这与我没有将文件复制到的工作节点有关吗?

Spark将在HDFS上查找文件。将文件复制到HDFS并重新运行作业。

相关内容

最新更新