最近我想编写一个简单的程序,从标准输入中读取数据并将其馈送到子流程中。我已经写过一次这样的应用程序并且它有效,但这次性能提升并不令人满意。在注意到CPU几乎不使用后,我运行了python -m profile svfeed2.py
,并注意到这主要运行acquire()
功能。起初我怀疑 GIL,但我认为我已经将所有全局变量本地化,但它仍然没有帮助。下面是代码,最后是探查器的输出:
#!/usr/bin/env python
import sys
import time
import locale
import subprocess
import re
import threading
import Queue
import copy
from svfeed_config import PG_SERVICEMATCH_CMD, PG_WAIT_TIMEOUT
INSERT_COLUMNS = ('service', 'product', 'version', 'info', 'cpe', 'os',
'hostname', 'devicetype')
FP_START = "SF-Port110-TCP:V=6.40%I=7%D=1/20%Time=52DD2F2C%"
"P=x86_64-redhat-linux-gnu%r"
MATCH_PATTERN = ('^MATCHED [^ :]+?:(?P<lineno>\d+)' +
'( \(FALLBACK: [^ ]+\))?' +
' svc (?P<service>[^ ]+)' +
'( p\|(?P<product>[^\|]+)\|)?' +
'( v\|(?P<version>[^\|]+)\|)?' +
'( i\|(?P<info>[^\|]+)\|)?' +
'( h\|(?P<hostname>[^\|]+)\|)?' +
'( o\|(?P<os>[^\|]+)\|)?' +
'( d\|(?P<devicetype>[^\|]+)\|)?' +
'( (?P<cpe>.*?))?$')
def print_stderr(s):
sys.stderr.write("%s" % s)
sys.stderr.flush()
def process_line(match_pattern, p, line):
ret = []
if not (
line.startswith("FAILED") or
line.startswith("MATCHED") or
line.startswith("SOFT MATCH") or
line.startswith("WARNING")
):
sys.stderr.write("WARNING: UNEXPECTED LINE: %sn" % line)
if p.poll():
sys.exit("Process died.")
if line.startswith("MATCHED"):
result = re.match(match_pattern, line)
assert(result)
result_dict = result.groupdict()
for key in result_dict:
if result_dict[key] is not None:
result_dict[key] = repr(result_dict[key])[1:-1]
ret += [result_dict]
return ret
def read_response(match_pattern, p):
ret = []
# Now, read any remaining matches.
while True:
line = p.stdout.readline().rstrip("rn")
#print(line)
if line == "DONE":
break
ret += process_line(match_pattern, p, line)
return ret
def handle_record(match_pattern, fp_start, fp_reply, fp_md5, probe_type, p):
fp_reply = fp_reply.replace('\', '\x5c')
fp_reply = fp_reply.replace('=', '\x')
fp_reply = fp_reply.replace('"', '\x22')
fp = fp_start + '%s(%s,%d,"%s");' % (fp_start, probe_type,
len(fp_reply), fp_reply)
p.stdin.write(fp)
p.stdin.write("nn")
p.stdin.flush()
ret = read_response(match_pattern, p)
if not ret:
pass
else:
for match in ret:
pass
def worker(q):
fp_start = copy.copy(FP_START)
match_pattern = copy.copy(MATCH_PATTERN)
timeout = copy.copy(PG_WAIT_TIMEOUT)
p = subprocess.Popen(PG_SERVICEMATCH_CMD,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
bufsize=0,
shell=True,
)
p.stdout.readline() # skip the "hello" message
try:
while True:
fingerprint, fingerprint_md5, probe, count = q.get(timeout=timeout)
try:
handle_record(match_pattern, fp_start, fingerprint, fingerprint_md5, probe, p)
pass
finally:
q.task_done()
except Queue.Empty:
pass
except IOError: # broken pipe due to CTRL+C
pass
finally:
p.stdin.close()
p.terminate()
def main():
q = Queue.Queue(maxsize=40)
for i in range(20):
t = threading.Thread(target=worker, args=(q,))
t.start()
for line in sys.stdin:
fingerprint, fingerprint_md5, probe, count = line.split()
#print(fingerprint_md5)
q.put([fingerprint, fingerprint_md5, probe, count])
try:
main()
except KeyboardInterrupt:
print_stderr("Caught a KeyboardInterrupt.n")
sys.exit(1)
下面是探查器输出:
29623 function calls (29620 primitive calls) in 2.134 CPU seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
4171 1.536 0.000 1.536 0.000 :0(acquire)
76 0.001 0.000 0.001 0.000 :0(allocate_lock)
2173 0.015 0.000 0.015 0.000 :0(append)
3 0.000 0.000 0.000 0.000 :0(compile)
1 0.000 0.000 0.000 0.000 :0(dir)
1 0.001 0.001 2.134 2.134 :0(execfile)
4 0.000 0.000 0.000 0.000 :0(extend)
120 0.000 0.000 0.000 0.000 :0(get)
22 0.000 0.000 0.000 0.000 :0(get_ident)
1 0.000 0.000 0.000 0.000 :0(getattr)
12 0.000 0.000 0.000 0.000 :0(getlower)
1 0.000 0.000 0.000 0.000 :0(hasattr)
1 0.000 0.000 0.000 0.000 :0(insert)
19 0.000 0.000 0.000 0.000 :0(isinstance)
1 0.000 0.000 0.000 0.000 :0(issubclass)
3 0.000 0.000 0.000 0.000 :0(items)
2146/2145 0.012 0.000 0.012 0.000 :0(len)
118 0.000 0.000 0.000 0.000 :0(match)
8 0.000 0.000 0.000 0.000 :0(min)
28 0.000 0.000 0.000 0.000 :0(ord)
5 0.000 0.000 0.000 0.000 :0(range)
3849 0.074 0.000 0.074 0.000 :0(release)
1775 0.014 0.000 0.014 0.000 :0(remove)
1 0.000 0.000 0.000 0.000 :0(setprofile)
2 0.000 0.000 0.000 0.000 :0(setter)
2000 0.032 0.000 0.032 0.000 :0(split)
20 0.009 0.000 0.009 0.000 :0(start_new_thread)
1 0.000 0.000 0.000 0.000 :0(sysconf)
1 0.000 0.000 2.134 2.134 <string>:1(<module>)
1 0.000 0.000 0.001 0.001 Queue.py:1(<module>)
2000 0.095 0.000 1.889 0.001 Queue.py:107(put)
1 0.000 0.000 0.000 0.000 Queue.py:13(Full)
1 0.000 0.000 0.000 0.000 Queue.py:17(Queue)
1 0.000 0.000 0.000 0.000 Queue.py:197(_init)
2012 0.030 0.000 0.042 0.000 Queue.py:200(_qsize)
2000 0.022 0.000 0.036 0.000 Queue.py:204(_put)
1 0.000 0.000 0.000 0.000 Queue.py:212(PriorityQueue)
1 0.000 0.000 0.000 0.000 Queue.py:22(__init__)
1 0.000 0.000 0.000 0.000 Queue.py:231(LifoQueue)
1 0.000 0.000 0.000 0.000 Queue.py:9(Empty)
1 0.001 0.001 0.001 0.001 bisect.py:1(<module>)
1 0.001 0.001 0.001 0.001 collections.py:1(<module>)
1 0.000 0.000 0.000 0.000 collections.py:19(OrderedDict)
1 0.000 0.000 0.001 0.001 heapq.py:31(<module>)
1 0.000 0.000 0.000 0.000 keyword.py:11(<module>)
1 0.000 0.000 0.000 0.000 pickle.py:1253(_EmptyClass)
1 0.000 0.000 0.000 0.000 pickle.py:171(Pickler)
1 0.001 0.001 0.003 0.003 pickle.py:25(<module>)
1 0.000 0.000 0.000 0.000 pickle.py:58(PickleError)
1 0.000 0.000 0.000 0.000 pickle.py:62(PicklingError)
1 0.000 0.000 0.000 0.000 pickle.py:69(UnpicklingError)
1 0.000 0.000 0.000 0.000 pickle.py:82(_Stop)
1 0.000 0.000 0.000 0.000 pickle.py:827(Unpickler)
1 0.000 0.000 2.134 2.134 profile:0(execfile('svfeed2.py'))
0 0.000 0.000 profile:0(profiler)
118 0.000 0.000 0.002 0.000 re.py:134(match)
2 0.000 0.000 0.002 0.001 re.py:188(compile)
120 0.000 0.000 0.004 0.000 re.py:229(_compile)
3 0.000 0.000 0.000 0.000 sre_compile.py:184(_compile_charset)
3 0.000 0.000 0.000 0.000 sre_compile.py:213(_optimize_charset)
13 0.000 0.000 0.000 0.000 sre_compile.py:24(_identityfunction)
1 0.000 0.000 0.000 0.000 sre_compile.py:264(_mk_bitmap)
1 0.000 0.000 0.000 0.000 sre_compile.py:360(_simple)
3 0.001 0.000 0.001 0.000 sre_compile.py:367(_compile_info)
4/3 0.000 0.000 0.001 0.000 sre_compile.py:38(_compile)
6 0.001 0.000 0.001 0.000 sre_compile.py:480(isstring)
3 0.000 0.000 0.002 0.001 sre_compile.py:486(_code)
3 0.000 0.000 0.004 0.001 sre_compile.py:501(compile)
4 0.000 0.000 0.000 0.000 sre_parse.py:132(__len__)
6 0.000 0.000 0.000 0.000 sre_parse.py:136(__getitem__)
1 0.000 0.000 0.000 0.000 sre_parse.py:140(__setitem__)
25 0.000 0.000 0.000 0.000 sre_parse.py:144(append)
5/4 0.000 0.000 0.000 0.000 sre_parse.py:146(getwidth)
3 0.000 0.000 0.000 0.000 sre_parse.py:184(__init__)
47 0.001 0.000 0.001 0.000 sre_parse.py:188(__next)
10 0.000 0.000 0.000 0.000 sre_parse.py:201(match)
41 0.000 0.000 0.001 0.000 sre_parse.py:207(get)
3 0.000 0.000 0.001 0.000 sre_parse.py:307(_parse_sub)
3 0.000 0.000 0.001 0.000 sre_parse.py:385(_parse)
3 0.000 0.000 0.001 0.000 sre_parse.py:669(parse)
3 0.000 0.000 0.000 0.000 sre_parse.py:73(__init__)
4 0.000 0.000 0.000 0.000 sre_parse.py:96(__init__)
1 0.001 0.001 0.004 0.004 subprocess.py:377(<module>)
1 0.000 0.000 0.000 0.000 subprocess.py:391(CalledProcessError)
1 0.000 0.000 0.000 0.000 subprocess.py:402(TimeoutExpired)
1 0.000 0.000 0.000 0.000 subprocess.py:579(Popen)
1 0.149 0.149 2.122 2.122 svfeed2.py:121(main)
1 0.002 0.002 2.133 2.133 svfeed2.py:3(<module>)
1 0.000 0.000 0.000 0.000 svfeed_config.py:1(<module>)
1 0.000 0.000 0.004 0.004 threading.py:1(<module>)
45 0.000 0.000 0.003 0.000 threading.py:176(Condition)
1 0.000 0.000 0.000 0.000 threading.py:179(_Condition)
45 0.003 0.000 0.003 0.000 threading.py:181(__init__)
32 0.001 0.000 0.002 0.000 threading.py:215(_release_save)
32 0.000 0.000 0.002 0.000 threading.py:218(_acquire_restore)
2033 0.031 0.000 0.126 0.000 threading.py:221(_is_owned)
32 0.005 0.000 0.048 0.001 threading.py:230(wait)
2001 0.074 0.000 0.237 0.000 threading.py:272(notify)
1 0.000 0.000 0.000 0.000 threading.py:290(notifyAll)
1 0.000 0.000 0.000 0.000 threading.py:299(_Semaphore)
1 0.000 0.000 0.000 0.000 threading.py:347(_BoundedSemaphore)
21 0.000 0.000 0.001 0.000 threading.py:359(Event)
1 0.000 0.000 0.000 0.000 threading.py:362(_Event)
21 0.000 0.000 0.001 0.000 threading.py:366(__init__)
20 0.000 0.000 0.000 0.000 threading.py:371(isSet)
1 0.000 0.000 0.000 0.000 threading.py:376(set)
20 0.000 0.000 0.031 0.002 threading.py:391(wait)
20 0.001 0.000 0.001 0.000 threading.py:401(_newname)
1 0.000 0.000 0.000 0.000 threading.py:414(Thread)
21 0.002 0.000 0.008 0.000 threading.py:426(__init__)
20 0.000 0.000 0.001 0.000 threading.py:446(_set_daemon)
20 0.001 0.000 0.044 0.002 threading.py:463(start)
1 0.000 0.000 0.000 0.000 threading.py:510(_set_ident)
1 0.000 0.000 0.000 0.000 threading.py:57(_Verbose)
87 0.001 0.000 0.001 0.000 threading.py:59(__init__)
2053 0.014 0.000 0.014 0.000 threading.py:64(_note)
20 0.000 0.000 0.000 0.000 threading.py:683(daemon)
1 0.000 0.000 0.000 0.000 threading.py:713(_Timer)
1 0.001 0.001 0.001 0.001 threading.py:742(_MainThread)
1 0.000 0.000 0.000 0.000 threading.py:744(__init__)
1 0.000 0.000 0.000 0.000 threading.py:752(_set_daemon)
1 0.000 0.000 0.000 0.000 threading.py:783(_DummyThread)
20 0.001 0.000 0.001 0.000 threading.py:808(currentThread)
1 0.000 0.000 0.000 0.000 threading.py:99(_RLock)
1 0.000 0.000 0.000 0.000 traceback.py:1(<module>)
1 0.000 0.000 0.002 0.002 warnings.py:45(filterwarnings)
有什么想法导致acquire()
如此频繁地启动吗?
我认为这是由于队列
q = Queue.Queue(maxsize=40)
查看实现,队列使用锁。也许你可以用另一种数据结构来规避这一点:每个工作线程的列表,用作append
和pop(0)
队列。