使用带有成对图像的tfrecords(连体模型)



我正在更新我的代码,以便使用tfrecords来加快(希望(暹罗模型的训练。基于一个带有成对图像的.csv,我用以下代码创建了tfrecords:

from data_generation.tf_records_helper import image_feature, int64_feature
import tensorflow as tf
import numpy as np
import pandas as pd
import math

def create_example(image1: np.ndarray, image2: np.ndarray, label: int) -> tf.train.Example:
"""
Creates a single tf train example.
Args:
image1: first image in the pair.
image2: second image in the pair.
label: label of the example.
Returns:
tf train example.
"""
feature = {
"image1": image_feature(image1),
"image2": image_feature(image2),
"label": int64_feature(label),
}
return tf.train.Example(features=tf.train.Features(feature=feature))

def read_data(training_df_path: str) -> pd.DataFrame:
"""
Reads a data frame containing positive/negative pairs of images.
Args:
training_df_path: path to the split file.
Returns:
Data frame with training data information.
"""
df = pd.read_csv(training_df_path)
# region testing
sample_len = int(len(df.sample(n=2006, random_state=42)) / 2)
df_p = df[df["label"] == "positive"].sample(n=sample_len, random_state=42)
df_n = df[df["label"] == "negative"].sample(n=sample_len, random_state=42)
df = pd.concat([df_p, df_n], axis=0)
# endregion
label_map = {"positive": 1, "negative": 0}
df["label"] = df["label"].map(lambda x: label_map.get(x))
df = df.sample(frac=1)  # to prevent df to be ordered by pos/neg samples
return df

def df_to_tf_records(training_df_path: str, num_samples: int = 1000, tfrecords_dir: str = "/dataset") -> None:
df = read_data(training_df_path)
num_tf_records = math.ceil(len(df) / num_samples)
print(f"# tf records = {num_tf_records}.")
for tfrec_num in range(num_tf_records):
samples = df.iloc[(tfrec_num * num_samples) : ((tfrec_num + 1) * num_samples)]
with tf.io.TFRecordWriter(
tfrecords_dir + "/file_%.2i-%i.tfrec" % (tfrec_num, len(samples))
) as writer:
for i, sample in samples.iterrows():
image1_path = sample["img_path_x"]
image2_path = sample["img_path_y"]
label = sample["label"]
image1 = tf.io.decode_jpeg(tf.io.read_file(image1_path))
image2 = tf.io.decode_jpeg(tf.io.read_file(image2_path))
example = create_example(image1, image2, label)
writer.write(example.SerializeToString())

我的生成器看起来像这样:

"""
Custom data generator for Siamese model with contrastive loss.
"""

from typing import Optional
import tensorflow as tf

AUTOTUNE = tf.data.AUTOTUNE

class CustomDataset:
"""
Creates data sets from a data frame.
Attributes:
tf_records_dir: path to the tf records.
batch_size: batch size.
input_size: network input size.
"""
def __init__(
self,
tf_records_dir: str = "/dataset",
batch_size: Optional[int] = 32,
input_size: Optional[tuple[int, int, int]] = (224, 224, 3),
):
"""
Args:
tf_records_dir: path to the tf records.
batch_size: batch size.
input_size: network input size.
"""
self.train_filenames = tf.io.gfile.glob(f"{tf_records_dir}/*.tfrec")
self.input_size: tuple[int, int, int] = input_size
self.batch_size = batch_size
@staticmethod
def _parse_tfrecord_fn(example):
feature_description = {
"image1": tf.io.FixedLenFeature([], tf.string),
"image2": tf.io.FixedLenFeature([], tf.string),
"label": tf.io.FixedLenFeature([], tf.int64),
}
example = tf.io.parse_single_example(example, feature_description)
example["image1"] = tf.io.decode_jpeg(example["image1"], channels=3)
example["image2"] = tf.io.decode_jpeg(example["image2"], channels=3)
return example
def _prepare_sample(self, features):
image1 = tf.image.resize(features["image1"], size=self.input_size[:2])
image2 = tf.image.resize(features["image2"], size=self.input_size[:2])
return image1, image2, features["label"]
def get_dataset(self) -> tf.data.Dataset:
"""
Creates a data set from positive/negative pairs of images.
"""
options = tf.data.Options()
options.experimental_deterministic = False
dataset = (
tf.data.TFRecordDataset(self.train_filenames, num_parallel_reads=AUTOTUNE)
.map(self._parse_tfrecord_fn, num_parallel_calls=AUTOTUNE)
.map(self._prepare_sample, num_parallel_calls=AUTOTUNE)
.shuffle(self.batch_size * 10)
.batch(self.batch_size)
.prefetch(AUTOTUNE)
)
dataset = dataset.with_options(options)
return dataset

当训练模型时,我得到:

/usr/local/lib/python3.9/dist-packages/keras/engine/input_spec.py:199 assert_input_compatibility
raise ValueError('Layer ' + layer_name + ' expects ' +
ValueError: Layer model expects 2 input(s), but it received 1 input tensors. Inputs received: [<tf.Tensor 'IteratorGetNext:0' shape=(None, 224, 224, 3) dtype=float32>]

我在生成器中看到的最大变化是我没有使用

dataset = tf.data.Dataset.zip(
(positive_dataset, negative_dataset, label)
)

再也没有了。我认为这个问题可能与此有关,因为现在我有一个张量和两个输入(?(,但我不确定如何将这些数据正确地传递给模型,看起来像这样:

def create_model(
target_shape: Optional[tuple[int, int, int]] = (224, 224, 3),
path: Optional[str] = None,
) -> Model:
"""
Creates the siamese model.
Args:
target_shape: image dimensions.
path: path to best weights.
Returns:
Siamese model.
"""
input_1 = layers.Input(shape=target_shape, name="inp_1")
input_2 = layers.Input(shape=target_shape, name="inp_2")
img_augmentation = get_augmentation_layer()
input = layers.Input(shape=target_shape, name="input")
lambda_1 = layers.Lambda(
lambda image: tf.keras.applications.efficientnet.preprocess_input(image),
name="pre_process",
)(img_augmentation(input))
base_cnn = EfficientNetB0(
weights="imagenet",
input_tensor=lambda_1,
input_shape=target_shape,
include_top=False,
)
# CONV/FC -> BatchNorm -> ReLu(or other activation) -> Dropout -> CONV/FC ->
pool = layers.MaxPooling2D(pool_size=(2, 2))(base_cnn.output)
flatten = layers.Flatten(name="base_output_flatten")(pool)
dense1 = layers.BatchNormalization(name="dense1_norm")(flatten)
dense1 = layers.Dense(4096, activation="relu", name="dense1")(dense1)
dense1 = layers.Dropout(0.3, name="dense1_dropout")(dense1)
dense2 = layers.BatchNormalization(name="dense2_norm")(dense1)
dense2 = layers.Dense(1024, activation="relu", name="dense2")(dense2)
dense2 = layers.Dropout(0.2, name="dense2_dropout")(dense2)
output = layers.Dense(512, name="dense_output")(dense2)
embedding = Model(input, output, name="Embedding")
tower_1 = embedding(input_1)
tower_2 = embedding(input_2)
merge_layer = layers.Lambda(l1_distance, name="l1")([tower_1, tower_2])
normal_layer = tf.keras.layers.BatchNormalization(name="l1_norm")(merge_layer)
comparison_layer = layers.Dense(
1,
activation="sigmoid",
name="final_layer",
)(normal_layer)
siamese = Model(inputs=[input_1, input_2], outputs=comparison_layer)
if path is not None:
siamese.load_weights(path)
return siamese

如果它能帮助任何人,我可以修复它替换:

def _prepare_sample(self, features):
image1 = tf.image.resize(features["image1"], size=self.input_size[:2])
image2 = tf.image.resize(features["image2"], size=self.input_size[:2])
return image1, image2, features["label"]

通过

def _prepare_sample(self, features):
image1 = tf.image.resize(features["image1"], size=self.input_size[:2])
image2 = tf.image.resize(features["image2"], size=self.input_size[:2])
images = {"inp_1": image1, "inp_2": image2}
return images, features["label"]

使用我的两个输入的名称。

相关内容

  • 没有找到相关文章

最新更新