如何将自定义数据生成器输入到model.fit中,它将生成X、y和一个额外的数组,输入到tensorflow.keras



我正在使用CNN来解决分类问题。我有患者的3D图像(CT扫描(,我正试图根据这些图像预测二元结果。我还有一个临床数据,并希望将其纳入CNN模型。我有一个自定义的*数据生成器(通过keras.utils.Sequence(,它可以生成X、y以及一系列临床数据。

X、 y将在整个模型中使用,并希望在我的第二个最后密集层(输出层之前的层(中添加临床数据

我的数据生成器的代码

class DataGenerator(Sequence):
'Generates data for Keras'
def __init__(self, list_IDs, labels, clinfeat,batch_size=32, dim=(64, 64, 64), n_channels=1,
n_classes=1, shuffle=True, isTestData=False, images_per_id=1,isClinicalData=False,
base_train_image_path='finaldata/AllNumpyImages/',
base_test_images_path='testdata/'):
'Initialization'
self.dim = dim
self.batch_size = batch_size
self.labels = labels
self.clinfeat = clinfeat
self.list_repeated_ids = self.__get_repeated_list_ids(list_IDs, images_per_id)
self.n_channels = n_channels
self.n_classes = n_classes
self.shuffle = shuffle
self.isTestData = isTestData
self.isClinicalData = isClinicalData
self.on_epoch_end()
self.base_train_images_path = base_train_image_path
self.base_test_images_path = base_test_images_path
def __len__(self):
'Denotes the number of batches per epoch'
return len(self.list_repeated_ids) // self.batch_size
def __getitem__(self, index):
'Generate one batch of data'
# Generate indexes of the batch
indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
# Find list of IDs
list_ids_one_batch = [self.list_repeated_ids[k] for k in indexes]
# Generate data
if self.isClinicalData:
X, y, clinical = self.__data_generation(list_ids_one_batch)
return X, y, clinical
else:
X, y = self.__data_generation(list_ids_one_batch)
return X, y

def on_epoch_end(self):
'Updates indexes after each epoch'
self.indexes = np.arange(len(self.list_repeated_ids))
if self.shuffle:
np.random.shuffle(self.indexes)
def __data_generation(self, list_ids_one_batch):
'Generates data containing batch_size samples'  # X : (n_samples, *dim, n_channels)
# Initialization
X = np.empty((self.batch_size, *self.dim, self.n_channels))
y = np.empty(self.batch_size, dtype=int)
clinical = np.empty(shape=(self.batch_size,19), dtype=float)
# Generate data
for i, ID in enumerate(list_ids_one_batch):
# Store sample
if self.isTestData:
X[i,] = np.load(os.path.join(self.base_test_images_path , ID)).reshape(64, 64, 64, 1)
else:
# generates random augmented image for each
tmp_img = np.load(os.path.join(self.base_train_images_path, ID))
aug_img = image_gen.random_transform(tmp_img)
X[i,] = aug_img.reshape(64, 64, 64, 1)
# Store class
y[i] = self.labels[ID]
if self.isClinicalData:
clinical[i] = self.clinfeat[ID].values

if self.isClinicalData:
return X, y, clinical
else:
return X, y
def __get_repeated_list_ids(self, list_ids, images_per_id):
'Returns a new list of IDs where each ID is repeated @images_per_id times'
list_repeated_images_ids = []
for id in list_ids:
list_repeated_images_ids.extend([id] * images_per_id)
return  list_repeated_images_ids

这是我的模型。我正在使用tensorboard记录我的指标和超参数

def create_model(hparams):
model = Sequential()

model.add(Conv3D(filters=64,kernel_size=(5,5,5),strides=(1,1,1),padding='valid',activation='relu',
kernel_regularizer=tf.keras.regularizers.l2(0.000001),input_shape = image_shape))
#model.add(MaxPool3D(pool_size=(3,3,3),strides=(3,3,3),padding='same'))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())

model.add(Conv3D(filters=128,kernel_size=(3,3,3),strides=(1,1,1),padding='valid',activation='relu',
kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(MaxPool3D(pool_size=(3,3,3),strides=(3,3,3),padding='valid'))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())

model.add(Conv3D(filters=256,kernel_size=(3,3,3),strides=(1,1,1),padding='valid',activation='relu',
kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
#model.add(MaxPool3D(pool_size=(3,3,3),strides=(3,3,3),padding='same'))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())

model.add(Conv3D(filters=512,kernel_size=(3,3,3),strides=(1,1,1),padding='valid',activation='relu',
kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(MaxPool3D(pool_size=(3,3,3),strides=(3,3,3),padding='valid'))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())

model.add(Flatten())

model.add(Dense(hparams[HP_NUM_UNITS],activation='relu',
kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())
DL = hparams[HP_NUM_DLAYER]
DU = hparams[HP_NUM_UNITS]
if DL == 2 and DU == 512:
model.add(Dense(256,activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())
elif DL == 3 and DU == 512:
model.add(Dense(256,activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())

model.add(Dense(128,activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())

elif DL == 2 and DU == 1024:
model.add(Dense(512,activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())
else:
model.add(Dense(512,activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())

model.add(Dense(256,activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())


model.add(Dense(1,activation='sigmoid'))



# Setting the optimizer and learning rate

optimizer = hparams[HP_OPTIMIZER]
learning_rate = hparams[HP_LEARNING_RATE]
if optimizer == 'adam':
optimizer = tf.optimizers.Adam(learning_rate = learning_rate)
elif optimizer == 'sgd':
optimizer = tf.optimizers.SGD(learning_rate = learning_rate)
elif optimizer == 'rmsprop':
optimizer = tf.optimizers.RMSprop(learning_rate = learning_rate)
else:
raise ValueError("unexpected optimizer name: %r" %(optimizer_name,))

# compile the model    
model.compile(optimizer = optimizer, loss='binary_crossentropy',metrics=['accuracy'])

# Fit the model
early_stop = EarlyStopping(monitor='val_accuracy',patience=10)

history = model.fit(x=training_generator,validation_data=validation_generator,epochs=50,
callbacks=[
tf.keras.callbacks.TensorBoard(log_dir),
hp.KerasCallback(log_dir,hparams),
early_stop
])
return history.history['val_accuracy'][-1],history.history['accuracy'][-1]

DataGenerator生成一批X、y、临床

是否可以使用X和y作为初始输入,并将临床连接到输出层之前的倒数第二个密集层。

使用功能API:

def create_model(hparams):
model = Sequential()

model.add(Conv3D(filters=64,kernel_size=(5,5,5),strides=(1,1,1),padding='valid',activation='relu',
kernel_regularizer=tf.keras.regularizers.l2(0.000001),input_shape = image_shape))
#model.add(MaxPool3D(pool_size=(3,3,3),strides=(3,3,3),padding='same'))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())

model.add(Conv3D(filters=128,kernel_size=(3,3,3),strides=(1,1,1),padding='valid',activation='relu',
kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(MaxPool3D(pool_size=(3,3,3),strides=(3,3,3),padding='valid'))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())

model.add(Conv3D(filters=256,kernel_size=(3,3,3),strides=(1,1,1),padding='valid',activation='relu',
kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
#model.add(MaxPool3D(pool_size=(3,3,3),strides=(3,3,3),padding='same'))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())

model.add(Conv3D(filters=512,kernel_size=(3,3,3),strides=(1,1,1),padding='valid',activation='relu',
kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(MaxPool3D(pool_size=(3,3,3),strides=(3,3,3),padding='valid'))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())

model.add(Flatten())

model.add(Dense(hparams[HP_NUM_UNITS],activation='relu',
kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())
DL = hparams[HP_NUM_DLAYER]
DU = hparams[HP_NUM_UNITS]
if DL == 2 and DU == 512:
model.add(Dense(256,activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())
elif DL == 3 and DU == 512:
model.add(Dense(256,activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())

model.add(Dense(128,activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())

elif DL == 2 and DU == 1024:
model.add(Dense(512,activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())
else:
model.add(Dense(512,activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())

model.add(Dense(256,activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.000001)))
model.add(Dropout(hparams[HP_DROPOUT]))
model.add(BatchNormalization())

input = (
tf.keras.layers.Input(shape=(None,), dtype=tf.float32), # change shape here to your input shape
tf.keras.layers.Input(shape=(None,), dtype=tf.float32) # change shape here to your input shape
)
x = model(input[0])
x = tf.concat([x, input[1]], 0)
x = Dense(1,activation='sigmoid'))(x)
model = tf.keras.Model(inputs=input, outputs=x)

不要忘记更改输入形状。

你必须更换你的生成器:

if self.isClinicalData:
return (X, clinical), y

相关内容

最新更新