我想用 PySpark 进行单元测试。测试本身有效,但是对于我得到的每个测试
-
ResourceWarning: unclosed <socket.socket [...]>
和 -
ResourceWarning: unclosed file <_io.BufferedWriter [...]>
警告和 -
DeprecationWarning
关于invalid escape sequence
s。
我想了解为什么/如何解决这个问题,以免这些警告使我的 unittest 输出混乱。
这是MWE:
# filename: pyspark_unittesting.py
# -*- coding: utf-8 -*-
import unittest
def insert_and_collect(val_in):
from pyspark.sql import SparkSession
with SparkSession.builder.getOrCreate() as spark:
col = 'column_x'
df = spark.createDataFrame([(val_in,)], [col])
print('one')
print(df.count())
print('two')
collected = df.collect()
print('three')
return collected[0][col]
class MyTest(unittest.TestCase):
def test(self):
val = 1
self.assertEqual(insert_and_collect(val), val)
print('four')
if __name__ == '__main__':
val = 1
print('inserted and collected is equal to original: {}'
.format(insert_and_collect(val) == val))
print('five')
如果我用python pyspark_unittesting.py
调用它,则输出为:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
one
1
two
three
inserted and collected is equal to original: True
five
但是,如果我用python -m unittest pyspark_unittesting
调用它,则输出为:
/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py:1890: DeprecationWarning: invalid escape sequence *
/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py:1890: DeprecationWarning: invalid escape sequence *
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/readwriter.py:398: DeprecationWarning: invalid escape sequence `
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/readwriter.py:759: DeprecationWarning: invalid escape sequence `
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/readwriter.py:398: DeprecationWarning: invalid escape sequence `
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/readwriter.py:759: DeprecationWarning: invalid escape sequence `
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/streaming.py:618: DeprecationWarning: invalid escape sequence `
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/streaming.py:618: DeprecationWarning: invalid escape sequence `
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/functions.py:1519: DeprecationWarning: invalid escape sequence d
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/functions.py:1519: DeprecationWarning: invalid escape sequence d
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
/usr/lib/python3.6/subprocess.py:766: ResourceWarning: subprocess 10219 is still running
ResourceWarning, source=self)
/usr/lib/python3.6/importlib/_bootstrap.py:219: ImportWarning: can't resolve package from __spec__ or __package__, falling back on __name__ and __path__
return f(*args, **kwds)
one
1
two
/usr/lib/python3.6/socket.py:657: ResourceWarning: unclosed <socket.socket fd=7, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 49330), raddr=('127.0.0.1', 44169)>
self._sock = None
three
four
.
----------------------------------------------------------------------
Ran 1 test in 7.394s
OK
sys:1: ResourceWarning: unclosed file <_io.BufferedWriter name=5>
编辑 2018-03-29
关于@acue的答案,我尝试使用 subprocess.Popen
调用脚本 - 非常像它在 unittest
模块中完成的:
In [1]: import pathlib
: import subprocess
: import sys
:
: here = pathlib.Path('.').absolute()
: args = [sys.executable, str(here / 'pyspark_unittesting.py')]
: opts = dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd='/tmp')
: p = subprocess.Popen(args, **opts)
: out, err = [b.splitlines() for b in p.communicate()]
: print(out)
: print(err)
:
:
[b'one',
b'1',
b'two',
b'three',
b'inserted and collected is equal to original: True',
b'five']
[b"Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties",
b'Setting default log level to "WARN".',
b'To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).',
b'',
b'[Stage 0:> (0 + 0) / 8]',
b'[Stage 0:> (0 + 8) / 8]',
b' ']
资源警告未显示...
如果在测试的setUp
方法中添加忽略它们的指令,这些警告将消失:
import unittest
import warnings
class MyTest(unittest.TestCase):
def test(self):
val = 1
self.assertEqual(insert_and_collect(val), val)
print('four')
def setUp(self):
warnings.filterwarnings("ignore", category=ResourceWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)
现在运行python3 -m unittest pyspark_unittesting.py
输出:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
one
1
two
three
four
.
----------------------------------------------------------------------
Ran 1 test in 11.124s
OK