如何使用Asyncio在3个子流程之间流式处理数据(使用管道)并使用生成的数据



我有3个脚本需要组合才能处理管道中的数据。脚本将永远运行,直到用户中断执行为止。这就是它们在终端内执行的方式:

script1_producer.sh | script2_processor.sh | script3_processor.sh

script1_producer.sh生成要处理的数据(例如,它只打印递增的数字(

i=1
while true; do
echo $i
i=$(($i+1))
sleep 1
done

script2_processor.sh消耗Script1中的数据并计算新的数据流(每个数字乘以*2(:

while read -r line
do
echo "$(($line*2))"
done < "${1:-/dev/stdin}"

script3_processor.sh使用Script2中的数据并计算新的数据流(在每个数字上添加一个字母(:

while read -r line
do
echo "A$(($line))"
done < "${1:-/dev/stdin}"

运行script1_producer.sh | script2_processor.sh | script3_processor.sh:时的结果输出

A2
A4
A6
...

现在,我希望这些脚本由使用管道的Python子流程控制。最后,我需要处理script3_processor.sh的输出,并对每一行执行操作。我正在尝试使用asyncio来实现这一点,尽管如果可能的话,不使用asynco也是可以的。

这是我非常天真的尝试process_pipes.py:

import asyncio
import subprocess
import os

async def async_receive():
p1 = await asyncio.create_subprocess_exec(
"./script1_producer.sh",
stdout=subprocess.PIPE,
)
p2 = await asyncio.create_subprocess_exec(
"./script2_processor.sh",
stdin=p1.stdout,
stdout=subprocess.PIPE,
)
p3 = await asyncio.create_subprocess_exec(
"./script3_processor.sh",
stdin=p2.stdout,
stdout=subprocess.PIPE,
)
# Read just one line to test
data = await p3.stdout.readline()
print(data)

asyncio.run(async_receive())

不幸的是,在执行此脚本时,我遇到了以下异常:

Traceback (most recent call last):
File "process_pipes.py", line 28, in <module>
asyncio.run(async_receive())
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "process_pipes.py", line 12, in async_receive
p2 = await asyncio.create_subprocess_exec(
File "/usr/lib/python3.8/asyncio/subprocess.py", line 236, in create_subprocess_exec
transport, protocol = await loop.subprocess_exec(
File "/usr/lib/python3.8/asyncio/base_events.py", line 1630, in subprocess_exec
transport = await self._make_subprocess_transport(
File "/usr/lib/python3.8/asyncio/unix_events.py", line 197, in _make_subprocess_transport
transp = _UnixSubprocessTransport(self, protocol, args, shell,
File "/usr/lib/python3.8/asyncio/base_subprocess.py", line 36, in __init__
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
File "/usr/lib/python3.8/asyncio/unix_events.py", line 789, in _start
self._proc = subprocess.Popen(
File "/usr/lib/python3.8/subprocess.py", line 808, in __init__
errread, errwrite) = self._get_handles(stdin, stdout, stderr)
File "/usr/lib/python3.8/subprocess.py", line 1477, in _get_handles
p2cread = stdin.fileno()
AttributeError: 'StreamReader' object has no attribute 'fileno'

我在Stackoverflow和其他地方读到了一些例子,告诉我要以不同的方式处理管道,但在我的场景中无法实现这些功能。

如何模拟运行script1_producer.sh | script2_processor.sh | script3_processor.sh并在Python中处理script3的输出?

我找到了另一个解决方案,通过这个问题指导我:

  1. 连接用asyncio.subprocess.create_subprocess_exec((启动的两个进程

在此之前,需要注意的一件事是,脚本有语法错误,因为在像echo "$(($line*2))"这样的行中,应该有更多的空格,像echo "$(( $line * 2 ))"这样的行,bash对空格有点傻。除此之外,一切都很好。

这里需要记住的一点是,管道有两端,一个用于读取,另一个用于写入。因此,在第一个过程中,它将类似于这个草图:

  • 写入结束(WE(
  • 读取端(RE(
p0 ---> | pipe 1 | ---> p1
WE        RE

您应该使用来自os的管道,如上面提到的问题中所述。这部分应该是这样的:

read1, write1 = os.pipe()
p0 = await asyncio.create_subprocess_exec(
"./script1_producer.sh",
stdout=write1
)

stdout将是管道的WE,而对于p1,我们有

| pipe 1 | ---> p1 -------> | pipe 2|
WE       RE=stdin  stdout=WE   

stdin是第一个管道的RE,stdout是第二个管道的WE,类似于以下内容:

read2, write2 = os.pipe()
p2 = await asyncio.create_subprocess_exec(
"./script2_processor.sh",
stdin=read1,
stdout=write2,
)

在第三个过程

| pipe 2 | ---> p3 -------> | asyncio PIPE|
WE       RE=stdin  stdout=WE   

我们有

import asyncio
import subprocess
import os

async def async_receive():
read1, write1 = os.pipe()
p0 = await asyncio.create_subprocess_exec(
"./script1_producer.sh",
stdout=write1
)
read2, write2 = os.pipe()
p2 = await asyncio.create_subprocess_exec(
"./script2_processor.sh",
stdin=read1,
stdout=write2,
)
p3 = await asyncio.create_subprocess_exec(
"./script3_processor.sh",
stdin=read2,
stdout=asyncio.subprocess.PIPE,
)
# Read just one line to test
while True:
data = await p3.stdout.readline()
data = data.decode('ascii').rstrip()
print(data)
print("Sleeping 1 sec...")
await asyncio.sleep(1)

asyncio.run(async_receive())

通过这种方式,您仍然可以使用asyncio。

这就是在没有异步的情况下解决问题的方法-只需将Popen与shell=True一起使用,并将管道放入命令中:

import subprocess
import os

def receive():
p = subprocess.Popen(
"./script1_producer.sh "
"| ./script2_processor.sh "
"| ./script3_processor.sh",
stdout=subprocess.PIPE, shell=True)
while True:
line = p.stdout.readline()
if line:
print(line.decode().strip())
if __name__ == '__main__':
receive()

最新更新