如何向keras模型实时提供实时音频



我正在尝试创建一个声控球门喇叭。我创建并训练了一个CNN模型,但我不知道如何使用它对实时数据进行预测。

我想基于来自内置麦克风的音频的最后一秒来进行实时课堂预测,以将最近的音频分类为";YES_ GOAL"YES_ WIN";,或";NO_GOAL";。

我的项目的最终目标是每次我尖叫时都能在iTunes上播放进球号角;进球">

当我尝试运行我的代码时,我会得到

ValueError:密集层的输入0与层不兼容:预期输入形状的轴-1的值为2200,但收到的输入具有形状[32,1]

到目前为止,这是我的代码:

import pyaudio
import librosa
import numpy as np
import time
import subprocess
import os
import sys
#import kbHitMod
import tensorflow.keras as keras
MODEL_PATH = "/Users/schoolwork/Documents/Goal_Horn_Project_Stuff/Goal Horn Program/Goal_Model.model"
GOAL_TRACK = "1 New York Islanders Overtime Goal and Win Horn || NYCB Live: Home of the Nassau Veterans Memorial Coliseum"
WIN_TRACK = "2 New York Islanders Win Horn || NYCB Live: Home of the Nassau Veterans Memorial Coliseum"
OT_GOAL_TRACK = "3 New York Islanders Goal Horn || NYCB Live Home of the Nassau Veterans Memorial Coliseum"
QUIET_TRACK = "4 pure silence"
PAUSE_COMMAND = "osascript -e 'tell application "iTunes" to pause'"
class RingBuffer:
""" class that implements a not-yet-full buffer """
def __init__(self,size_max):
self.max = size_max
self.data = []
class __Full:
""" class that implements a full buffer """
def append(self, x):
""" Append an element overwriting the oldest one. """
self.data[self.cur] = x
self.cur = (self.cur+1) % self.max
def get(self):
""" return list of elements in correct order """
return self.data[self.cur:]+self.data[:self.cur]
def append(self,x):
"""append an element at the end of the buffer"""
self.data.append(x)
if len(self.data) == self.max:
self.cur = 0
# Permanently change self's class from non-full to full
self.__class__ = self.__Full
def get(self):
""" Return a list of elements from the oldest to the newest. """
return self.data
# ring buffer will keep the last 1 second worth of audio
ringBuffer = RingBuffer(1 * 22050)
overtime = False
print("nOvertime mode: offn")
def play(track_name):
subprocess.getoutput("osascript -e 'tell application "iTunes" to play (first track of playlist "Library" whose name is "4 pure silence")'")
subprocess.getoutput("osascript -e 'tell application "iTunes" to play (first track of playlist "Library" whose name is "" + track_name + "")'")
def callback(in_data, frame_count, time_info, flag):

state = subprocess.getoutput("osascript -e 'tell application "iTunes" to player state as string'")
model = keras.models.load_model(MODEL_PATH, compile=True)
audio_data = np.fromstring(in_data, dtype=np.float32)

# we trained on audio with a sample rate of 22050 so we need to convert it
audio_data = librosa.resample(audio_data, 44100, 22050)
ringBuffer.append(audio_data)
# machine learning model takes live audio as input and
# decides if the last 1 second of audio contains a goal
if model.predict_classes(ringBuffer.get()) == "YES_GOAL" and state == "paused":
# GOAL!! 
if overtime:
play(GOAL_TRACK)
else:
play(OT_GOAL_TRACK)

# decides if the last 1 second of audio contains a win
elif model.predict_classes(ringBuffer.get()) == "YES_WIN" and state == "paused":
play(WIN_TRACK)
return (in_data, pyaudio.paContinue)
pa = pyaudio.PyAudio()
stream = pa.open(format = pyaudio.paFloat32,
channels = 1,
rate = 44100,
output = False,
input = True,
stream_callback=callback)
# start the stream
stream.start_stream()
i = 0 # This is just an alternative to breaking the loop with kbHitMod
while stream.is_active():
time.sleep(0.25)

"""
kb = kbHitMod.KBHit() # detects if a key has been pressed
ot = kb.getch()
if ot == "o":
if overtime == False:
overtime = True
print("Overtime mode: ONn")
else:
overtime = False
print("Overtime mode: offn")
elif ot == "q":
print("Quitting... Goodbye!n")
break
"""
i += 1
if i >= 100:
break

stream.close()
pa.terminate()
play(QUIET_TRACK)
subprocess.getoutput(PAUSE_COMMAND)
print("Program terminated. n")

我的型号:

import json
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow.keras as keras
DATA_PATH = "/Users/schoolwork/Documents/Goal_Horn_Project_Stuff/Goal Horn Program/data.json"
MODEL_PATH = "/Users/schoolwork/Documents/Goal_Horn_Project_Stuff/Goal Horn Program/Goal_Model.model"
def load_data(data_path):
"""Loads training dataset from json file.
:param data_path (str): Path to json file containing data
:return X (ndarray): Inputs
:return y (ndarray): Targets
"""
with open(data_path, "r") as fp:
data = json.load(fp)
X = np.array(data["mfcc"])
y = np.array(data["labels"])
return X, y
def prepare_datasets(test_size, validation_size):
# load data
X, y = load_data(DATA_PATH)
# create train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size)
# create train/validation split
X_train, X_validation, y_train, y_validation = train_test_split(X_train, y_train, test_size=validation_size)
# 3d array -> (130, 50, 1)
X_train = X_train[..., np.newaxis] # 4d array -> (num_samples, 130, 50, 1) (I don't know where these numbers are coming from. They might not be right)
X_validation = X_validation[..., np.newaxis]
X_test = X_test[..., np.newaxis]
return X_train, X_validation, X_test, y_train, y_validation, y_test
def build_model(input_shape):
# create model
model = keras.Sequential()
# 1st conv layer
model.add(keras.layers.Conv2D(32, (3, 3), activation="relu", input_shape=input_shape))
model.add(keras.layers.MaxPool2D((3, 3), strides=(2, 2), padding="same"))
model.add(keras.layers.BatchNormalization())
# 2nd conv layer
model.add(keras.layers.Conv2D(32, (3, 3), activation="relu", input_shape=input_shape))
model.add(keras.layers.MaxPool2D((3, 3), strides=(2, 2), padding="same"))
model.add(keras.layers.BatchNormalization())
# 3rd conv layer
model.add(keras.layers.Conv2D(32, (2, 2), activation="relu", input_shape=input_shape))
model.add(keras.layers.MaxPool2D((2, 2), strides=(2, 2), padding="same"))
model.add(keras.layers.BatchNormalization())
# flatten the output and feed it into dense layer
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(64, activation="relu"))
model.add(keras.layers.Dropout(0.3))
# output layer
model.add(keras.layers.Dense(3, activation="softmax"))
return model
def predict(model, X, y):
X = X[np.newaxis, ...]
# prediction = [ [0.1, 0.2, ...] ]
prediction = model.predict(X) # X -> (1, 130, 50, 1)
# extract index with max_value
predicted_index = np.argmax(prediction, axis=-1) # [4]
print("Expected index: {}, Predicted index: {}".format(y, predicted_index))

if __name__ == "__main__":
# create train, validation and test sets
X_train, X_validation, X_test, y_train, y_validation, y_test = prepare_datasets(0.25, 0.2)
# build the CNN net
input_shape = (X_train.shape[1], X_train.shape[2], X_train.shape[3])
model = build_model(input_shape)
# compile the network
optimizer = keras.optimizers.Adam(learning_rate=0.0001)
model.compile(optimizer=optimizer,
loss="sparse_categorical_crossentropy",
metrics=["accuracy"])
# train the CNN
model.fit(X_train, y_train, validation_data=(X_validation, y_validation), batch_size=32, epochs=30)
# evaluate the CNN on the test set
test_error, test_accuracy = model.evaluate(X_test, y_test, verbose=1)
print("Accuracy on test set is: {}".format(test_accuracy))
# make prediction on a sample
X = X_test[2]
y = y_test[2]
print(X_test.shape)
predict(model, X, y)
model.save(MODEL_PATH)

所以这很简单。

使用模型路径,我所要做的就是将数据输入到我的模型中,就像函数的参数一样:


MODEL_PATH = "/Users/schoolwork/Documents/Goal_Horn_Project_Stuff/Goal Horn Program/Goal_Model.model"
model = keras.load_model(MODEL_PATH, compile=True)
# Generate mfcc 
Prediction = model(mfcc)

最新更新