实现实时说话人识别的Python多处理器模块



我正在开发一种用于实时说话人识别的算法。我的想法是使用multiprocessing模块并行运行三个任务,即writeAudio()detectionBlock()identificationBlock()

实际上,writeAudio()功能使用PyAudio来捕获连续记录并将0.5秒的音频文件保存到本地目录,detectionBlock()功能处理来自该目录的两个最旧的0.5秒文件并使用语音活动检测(VAD(模型来确定音频是语音还是噪声,并且CCD_ 8功能处理单独的3秒音频文件(保存到与0.5秒音频文件块不同的目录(,然后使用语音识别(VR(模型来确定说话者的身份。

我希望我可以在这里应用multiprocessing来避开全局解释器锁(GIL(,并将这三个函数作为Process对象同时运行。目前,程序要等到writeAudio()录制完成后才能开始运行detectionBlock()identificationBlock()功能。

以下是当前使用multiprocessing实现的代码:

from multiprocessing import Process
# Perform Parallel Processing with the Multiprocessing Module
def parallelProcessing(self):

# Define Individual Functions as Process() Objects
rec = Process(target=self.writeAudio()) # Cog 1
vad = Process(target=self.detectionBlock()) # Cog 2
si = Process(target=self.identificationBlock()) # Cog 3

cogs = [rec, vad, si] # List of functions

# Run All Three Cogs in Parallel
rec.start() # Start Cog 1

time.sleep(3) # Wait 3 sec to start speech detection & identification

vad.start() # Start Cog 2
si.start() # Start Cog 3

for cog in cogs:
cog.join() # Wait for processes to complete before continuing

我以前从未应用过multiprocessing,所以我想知道使用不同的实现方法是否可行。谢谢你的帮助。

编辑:

我添加了以下函数的简化版本,以增强清晰度。

# Speech Detection Sequence
def detectionBlock(self):

# Create VoiceActivityDetectionModel() Class Object
vad = VoiceActivityDetectionModel()

# Run Speech Detection on Oldest Audio Segments in Directory
files = self.getListDir() # List of audiofiles
index = 0 # First file in list

path_1 = os.path.join(self.VAD_audio_path, files[index])
path_2 = os.path.join(self.VAD_audio_path, files[index+1])

label_1, _, _ = vad.detectFromAudiofile(path_1) # VAD classifier for first segment
label_2, _, _ = vad.detectFromAudiofile(path_2) # VAD classifier for second segment

if (label_1 == 'speech') and (label_2 == 'speech'):
self.combineAudio(index) # Generate 3-sec recording for SI if 
# speech is detected in both audiofiles
else:
self.deleteAudio() # Remove oldest audio segment
# Speaker Identification Sequence
def identificationBlock(self):

# Create EnsemblePredictions() Class Object
ep = EnsemblePredictions()

# Run Speaker Identification on Oldest Audio Segment in Directory
files = self.getListDir(audio_type='SI')
index = 0 # First file in list

if files:
filepath = os.path.join(self.SI_audio_path, files[index])

speaker, prob_list = ep.predict(filepath, first_session=False) # SI classifier
time_stamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime()) # Time of identification

self.speakerDiarization(speaker=speaker, prob_list=prob_list, time_stamp=time_stamp) # Save results

# Remove 3-Second Audio Segment from Directory
self.deleteAudio(audio_type='SI')
# Audio Recording Sequence
def writeAudio(self):

# Instantiate Recording System Variables
FORMAT = pyaudio.paFloat32 # 32 bits per sample
CHANNELS = 1 # Mono
RATE = 16000 # Sampling Rate
CHUNK = int(self.VAD_audio_length*RATE) # Chunks of bytes to record from microphone

# Initialize Recording
p = pyaudio.PyAudio() # Create interface to PortAudio
input('Press ENTER to Begin Recording') # Wait for keypress to record
if keyboard.is_pressed('Enter'):
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
frames_per_buffer=CHUNK,
input=True)

print()
print('Hold SPACE to Finish Recording')

while(True):
# End Process with Manual User Interrupt
if keyboard.is_pressed('Space'):
break 

# Generate Audio Recording
data = stream.read(CHUNK) # Read 0.5-second segment from audio stream
data = np.frombuffer(data, dtype=np.float32) # Convert to NumPy array

filename = 'VAD_segment_' + str(self.VAD_audio_count) + '.wav'

sf.write(os.path.join(self.VAD_audio_path, filename), data, RATE)

# Adjust Segment Count
self.VAD_audio_count = self.VAD_audio_count + 1 # Increment

# Stop & Close Stream
stream.stop_stream()
stream.close()

# Terminate PortAudio Interface
p.terminate()

下面是我在评论中提到的一个例子。我没有真正运行它的所有组件,所以把它当作伪代码来处理,但我相信它应该是一个很好的起点。主要的改进是pastream的简化,它声称基本上没有GIL的portaudio迭代。这里的好处是开销更小,并且更容易将数据传输到检测音频的流水线中的至少第一级。在速度减慢的情况下,您可能需要一些额外的复杂性来删除帧,但如果我正确理解pastream文档,这种结构通常应该可以工作。

import pastream
import multiprocessing as mp
from Queue import Empty
class ExitFlag: pass
def voice_identification(rx_q: mp.Queue):
while True:
try:
received = rx_q.get(1) 
#if voice_identification is too slow you may want to `get` until 
#  the queue is empty to drop all but most recent frame. This way
#  you won't have an infinitely growing queue.
except Empty:
pass
if isinstance(received, ExitFlag):
break

else:
print(identify(received)) #identify audio
print("identifier process exiting")
if __name__ == "__main__":
tx_q = mp.Queue()
identifier_p = mp.Process(target=voice_identification, args=(tx_q,))
identifier_p.start()

samplerate=44100
stream = pastream.InputStream()
#3 second chunks every half second
for chunk in stream.chunks(chunksize=samplerate/2, overlap=(samplerate/2)*5):
if detect_audio(chunk): #detect audio
tx_q.put(chunk)
if exit_key_down(): #however you want to detect this, it's good to ensure smooth shutdown of child
tx_q.put(ExitFlag())
identifier_p.join()
break

相关内容

  • 没有找到相关文章

最新更新