如何从谷歌语音api中获得每个话语的结果,并将每个音频话语块单独保存为wav文件



我使用下面的python脚本从直播音频输入中获取来自谷歌语音API的预测。

问题是,我需要谷歌语音API对每个话语的预测,然后还将每个话语的音频保存到磁盘上。

我不确定如何修改脚本以保存每个话语的实时音频,并打印每个话语的结果,而不是连续预测。

#!/usr/bin/env python
import os
import re
import sys
import time
from google.cloud import speech
import pyaudio
from six.moves import queue
# Audio recording parameters
STREAMING_LIMIT = 240000  # 4 minutes
SAMPLE_RATE = 16000
CHUNK_SIZE = int(SAMPLE_RATE / 10)  # 100ms
api_key = r'path_to_json_filegoogle.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = api_key
RED = '33[0;31m'
GREEN = '33[0;32m'
YELLOW = '33[0;33m'

def get_current_time():
"""Return Current Time in MS."""
return int(round(time.time() * 1000))

class ResumableMicrophoneStream:
"""Opens a recording stream as a generator yielding the audio chunks."""
def __init__(self, rate, chunk_size):
self._rate = rate
self.chunk_size = chunk_size
self._num_channels = 1
self._buff = queue.Queue()
self.closed = True
self.start_time = get_current_time()
self.restart_counter = 0
self.audio_input = []
self.last_audio_input = []
self.result_end_time = 0
self.is_final_end_time = 0
self.final_request_end_time = 0
self.bridging_offset = 0
self.last_transcript_was_final = False
self.new_stream = True
self._audio_interface = pyaudio.PyAudio()
self._audio_stream = self._audio_interface.open(
format=pyaudio.paInt16,
channels=self._num_channels,
rate=self._rate,
input=True,
frames_per_buffer=self.chunk_size,
# Run the audio stream asynchronously to fill the buffer object.
# This is necessary so that the input device's buffer doesn't
# overflow while the calling thread makes network requests, etc.
stream_callback=self._fill_buffer,
)
def __enter__(self):
self.closed = False
return self
def __exit__(self, type, value, traceback):
self._audio_stream.stop_stream()
self._audio_stream.close()
self.closed = True
# Signal the generator to terminate so that the client's
# streaming_recognize method will not block the process termination.
self._buff.put(None)
self._audio_interface.terminate()
def _fill_buffer(self, in_data, *args, **kwargs):
"""Continuously collect data from the audio stream, into the buffer."""
self._buff.put(in_data)
return None, pyaudio.paContinue
def generator(self):
"""Stream Audio from microphone to API and to local buffer"""
while not self.closed:
data = []
if self.new_stream and self.last_audio_input:
chunk_time = STREAMING_LIMIT / len(self.last_audio_input)
if chunk_time != 0:
if self.bridging_offset < 0:
self.bridging_offset = 0
if self.bridging_offset > self.final_request_end_time:
self.bridging_offset = self.final_request_end_time
chunks_from_ms = round((self.final_request_end_time -
self.bridging_offset) / chunk_time)
self.bridging_offset = (round((
len(self.last_audio_input) - chunks_from_ms)
* chunk_time))
for i in range(chunks_from_ms, len(self.last_audio_input)):
data.append(self.last_audio_input[i])
self.new_stream = False
# Use a blocking get() to ensure there's at least one chunk of
# data, and stop iteration if the chunk is None, indicating the
# end of the audio stream.
chunk = self._buff.get()
self.audio_input.append(chunk)
if chunk is None:
return
data.append(chunk)
# Now consume whatever other data's still buffered.
while True:
try:
chunk = self._buff.get(block=False)
if chunk is None:
return
data.append(chunk)
self.audio_input.append(chunk)
except queue.Empty:
break
yield b''.join(data)

def listen_print_loop(responses, stream):
"""Iterates through server responses and prints them.
The responses passed is a generator that will block until a response
is provided by the server.
Each response may contain multiple results, and each result may contain
multiple alternatives;  Here we
print only the transcription for the top alternative of the top result.
In this case, responses are provided for interim results as well. If the
response is an interim one, print a line feed at the end of it, to allow
the next result to overwrite it, until the response is a final one. For the
final one, print a newline to preserve the finalized transcription.
"""
for response in responses:
if get_current_time() - stream.start_time > STREAMING_LIMIT:
stream.start_time = get_current_time()
break
if not response.results:
continue
result = response.results[0]
if not result.alternatives:
continue
transcript = result.alternatives[0].transcript
result_seconds = 0
result_nanos = 0
if result.result_end_time.seconds:
result_seconds = result.result_end_time.seconds
if result.result_end_time.nanos:
result_nanos = result.result_end_time.nanos
stream.result_end_time = int((result_seconds * 1000)
+ (result_nanos / 1000000))
corrected_time = (stream.result_end_time - stream.bridging_offset
+ (STREAMING_LIMIT * stream.restart_counter))
# Display interim results, but with a carriage return at the end of the
# line, so subsequent lines will overwrite them.
if result.is_final:
sys.stdout.write(GREEN)
sys.stdout.write('33[K')
sys.stdout.write(str(corrected_time) + ': ' + transcript + 'n')
stream.is_final_end_time = stream.result_end_time
stream.last_transcript_was_final = True
# Exit recognition if any of the transcribed phrases could be
# one of our keywords.
if re.search(r'b(exit|quit)b', transcript, re.I):
sys.stdout.write(YELLOW)
sys.stdout.write('Exiting...n')
stream.closed = True
break
else:
sys.stdout.write(RED)
sys.stdout.write('33[K')
sys.stdout.write(str(corrected_time) + ': ' + transcript + 'r')
stream.last_transcript_was_final = False

def main():
"""start bidirectional streaming from microphone input to speech API"""
client = speech.SpeechClient()
config = speech.types.RecognitionConfig(
encoding=speech.enums.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=SAMPLE_RATE,
language_code='en-US',
max_alternatives=1)
streaming_config = speech.types.StreamingRecognitionConfig(
config=config,
interim_results=True)
mic_manager = ResumableMicrophoneStream(SAMPLE_RATE, CHUNK_SIZE)
print(mic_manager.chunk_size)
sys.stdout.write(YELLOW)
sys.stdout.write('nListening, say "Quit" or "Exit" to stop.nn')
sys.stdout.write('End (ms)       Transcript Results/Statusn')
sys.stdout.write('=====================================================n')
with mic_manager as stream:
while not stream.closed:
sys.stdout.write(YELLOW)
sys.stdout.write('n' + str(
STREAMING_LIMIT * stream.restart_counter) + ': NEW REQUESTn')
stream.audio_input = []
audio_generator = stream.generator()
requests = (speech.types.StreamingRecognizeRequest(
audio_content=content)for content in audio_generator)
responses = client.streaming_recognize(streaming_config,
requests)
# Now, put the transcription responses to use.
listen_print_loop(responses, stream)
if stream.result_end_time > 0:
stream.final_request_end_time = stream.is_final_end_time
stream.result_end_time = 0
stream.last_audio_input = []
stream.last_audio_input = stream.audio_input
stream.audio_input = []
stream.restart_counter = stream.restart_counter + 1
if not stream.last_transcript_was_final:
sys.stdout.write('n')
stream.new_stream = True

if __name__ == '__main__':
main()

我很难理解这段代码中发生的一切,我不想花钱购买许可证来尝试它,但这里有一些想法。也许其他人会发现它们很有用,可以进一步帮助你。

检测句子结尾

首先,将句子与语音分开的一个大问题是,并不是每个人都遵守句子之间相同的停顿。有些人会等待更长的时间,而另一些人则会努力争取下一个。有些人在句子中也会停顿。这使得从音频数据中检测句子的结尾变得困难,如果你这样做是一种相对简单的方法,比如尝试检测停顿。

我能想到的最好的方法是使用从Google Speech API获得的解释,并在结束标点符号上进行拆分(!?.(。然后,您的问题将简化为将返回的响应与特定的音频数据块相关联。

看起来您只需将None传递回生成器,它就会优雅地结束,所以这应该不会太糟糕。当你决定一个句子结束时,你会想保存生成成绩单的任何音频数据块。

这可能很难,因为当收到更多音频时,Google Speech API可能会追溯性地判定一个完整的句子实际上并不完整,而是更长句子的一部分,所以你也要注意这一点。

保存音频数据

至于保存原始音频数据,一旦你知道哪些块适用于什么转录,只需将它们按顺序附加到列表中(例如list_of_chunks(并使用wave:

import wave 
with wave.open("foo.wav", 'wb') as f: 
f.setnchannels(self._num_channels)
f.setsampwidth(audio.get_sample_size(pyaudio.paInt16))
f.setframerate(self._rate)
f.writeframes(b''.join(list_of_chunks))

当然,如果您在ResumableMicrophoneStream类之外执行此操作,则必须使num_channelsrate可访问。

您可以使用"StreamingRecognitionConfig"来检测单个话语。API在检测到第一次暂停/静音后立即停止并返回结果。这对于短命令很有用。除了那一句话,我还没有看到任何类似的选择来检测多句话。

https://cloud.google.com/speech-to-text/docs/basics

以下设置将为您提供所识别单词的标点符号和时间信息。也许你可以用它们来实现@matthew salvatore viglione的建议(即通过标点符号分隔句子,然后使用单词时间列表来识别音频文件中的部分。如果你不使用流式识别,那么你也不应该担心追溯语音识别问题(。

{"enableWordTimeOffsets":布尔值,"enableAutomatic标点符号":布尔值,。。。。。}

https://cloud.google.com/speech-to-text/docs/reference/rest/v1/RecognitionConfig

在使用谷歌语音识别API深入研究之前,我建议您也看看其他语音识别服务,看看它们是否提供了您喜欢的句子检测功能(话语与句子不同(。

最新更新