如何使用Keras创建BERT层



我正在尝试使用一个BERT层来将文本评论分为正面或负面:

# similar to tutorial:
# https://towardsdatascience.com/bert-in-keras-with-tensorflow-hub-76bcbc9417b
# ensure you are running TensorFlow 2.0 Google Colab
try:
%tensorflow_version 2.x
except Exception:
pass
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.keras import layers
import bert
# import dependencies
import pandas as pd
import numpy as np
import csv
from nltk.tokenize import word_tokenize
from nltk import pos_tag
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from sklearn.preprocessing import LabelEncoder
from collections import defaultdict
from nltk.corpus import wordnet as wn
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn import model_selection, naive_bayes, svm
from sklearn.metrics import accuracy_score
# set random seed
np.random.seed(42)
# add the corpus
with open('labelled_data.csv', newline='') as f:
reader = csv.reader(f)
next(reader) # skip header
labelled_data = [tuple(row) for row in reader]
# generate a balanced data set
# separate into positive and negative comments
positives = []
for tup in labelled_data:
if tup[1] == "positive":
positives.append(tup)
negatives = []
for tup in labelled_data:
if tup[1] == "negative":
negatives.append(tup)
# base the number of samples on the lesser of two category counts    
import random
sample_count = min(len(positives), len(negatives))
balanced_positives = random.sample(positives, sample_count)
balanced_negatives = random.sample(negatives, sample_count)
balanced_data = balanced_positives + balanced_negatives
# shuffle the data
random.shuffle(balanced_data)
# convert to a dataframe and label the columns
Corpus = pd.DataFrame(balanced_data)    
Corpus.columns = ['text', 'label']
# Step - a : Remove blank rows if any.
Corpus['text'].dropna(inplace=True)
# Step - b : Change all the text to lower case. This is required as python interprets 'dog' and 'DOG' differently
Corpus['text'] = [entry.lower() for entry in Corpus['text']]
# Step - c : Tokenization : In this each entry in the corpus will be broken into set of words
Corpus['text']= [word_tokenize(entry) for entry in Corpus['text']]
# Step - d : Remove Stop words, Non-Numeric and perfom Word Stemming/Lemmenting.
# WordNetLemmatizer requires Pos tags to understand if the word is noun or verb or adjective etc. By default it is set to Noun
tag_map = defaultdict(lambda : wn.NOUN)
tag_map['J'] = wn.ADJ
tag_map['V'] = wn.VERB
tag_map['R'] = wn.ADV
for index,entry in enumerate(Corpus['text']):
# Declaring Empty List to store the words that follow the rules for this step
Final_words = []
# Initializing WordNetLemmatizer()
word_Lemmatized = WordNetLemmatizer()
# pos_tag function below will provide the 'tag' i.e if the word is Noun(N) or Verb(V) or something else.
for word, tag in pos_tag(entry):
# Below condition is to check for Stop words and consider only alphabets
if word not in stopwords.words('english') and word.isalpha():
word_Final = word_Lemmatized.lemmatize(word,tag_map[tag[0]])
Final_words.append(word_Final)
# The final processed set of words for each iteration will be stored in 'text_final'
Corpus.loc[index,'text_final'] = str(Final_words)
# create 80/20 training/test split of data
Train_X, Test_X, Train_Y, Test_Y = model_selection.train_test_split(Corpus['text_final'],Corpus['label'],test_size=0.2)
# encode text to numerical matrix
Encoder = LabelEncoder()
Train_Y = Encoder.fit_transform(Train_Y)
Test_Y = Encoder.fit_transform(Test_Y)
# create term-frequency/inverse document frequency matrices to find important features/words
Tfidf_vect = TfidfVectorizer(max_features=5000)
Tfidf_vect.fit(Corpus['text_final'])
Train_X_Tfidf = Tfidf_vect.transform(Train_X)
Test_X_Tfidf = Tfidf_vect.transform(Test_X)
# to view vectorized data in format of (row, uniqueId, importance_score)
# create a bert layer class to use in the model
class BertLayer(tf.keras.layers.Layer):
def __init__(self, n_fine_tune_layers=10, **kwargs):
self.n_fine_tune_layers = n_fine_tune_layers
self.trainable = True
self.output_size = 768,
self.bert_path="https://tfhub.dev/google/bert_uncased_L-12_H-768_A-12/1",
super(BertLayer, self).__init__(**kwargs)
def build(self, input_shape):
self.bert = hub.Module(
self.bert_path,
trainable=self.trainable,
name="{}_module".format(self.name)
)
trainable_vars = self.bert.variables
# Remove unused layers
trainable_vars = [var for var in trainable_vars if not "/cls/" in var.name]
# Select how many layers to fine tune
trainable_vars = trainable_vars[-self.n_fine_tune_layers :]
# Add to trainable weights
for var in trainable_vars:
self._trainable_weights.append(var)
# Add non-trainable weights
for var in self.bert.variables:
if var not in self._trainable_weights:
self._non_trainable_weights.append(var)
super(BertLayer, self).build(input_shape)
def call(self, inputs):
inputs = [K.cast(x, dtype="int32") for x in inputs]
input_ids, input_mask, segment_ids = inputs
bert_inputs = dict(
input_ids=input_ids, input_mask=input_mask, segment_ids=segment_ids
)
result = self.bert(inputs=bert_inputs, signature="tokens", as_dict=True)[
"pooled_output"
]
return result
def compute_output_shape(self, input_shape):
return (input_shape[0], self.output_size)
# Build model
in_id = tf.keras.layers.Input(shape=(Train_X.shape[0],), name="input_ids")
in_mask = tf.keras.layers.Input(shape=(Train_X.shape[0],), name="input_masks")
in_segment = tf.keras.layers.Input(shape=(Train_X.shape[0],), name="segment_ids")
bert_inputs = [in_id, in_mask, in_segment]
# Here the error occurs
bert_output = BertLayer(n_fine_tune_layers=10)(bert_inputs)

错误为:Unknown module spec type: <class 'tuple'>

bert_inputsshape=(None, 563)的张量

以下是我最终如何集成BERT层:

import tensorflow as tf
import pandas as pd
import tensorflow_hub as hub
import os
import re
import numpy as np
from bert.tokenization import FullTokenizer
from tqdm import tqdm
from tensorflow.keras import backend as K
# Initialize session
sess = tf.Session()

# Load all files from a directory in a DataFrame.
def load_directory_data(directory):
data = {}
data["sentence"] = []
data["sentiment"] = []
for file_path in os.listdir(directory):
with tf.gfile.GFile(os.path.join(directory, file_path), "r") as f:
data["sentence"].append(f.read())
data["sentiment"].append(re.match("d+_(d+).txt", file_path).group(1))
return pd.DataFrame.from_dict(data)

# Merge positive and negative examples, add a polarity column and shuffle.
def load_dataset(directory):
pos_df = load_directory_data(os.path.join(directory, "pos"))
neg_df = load_directory_data(os.path.join(directory, "neg"))
pos_df["polarity"] = 1
neg_df["polarity"] = 0
return pd.concat([pos_df, neg_df]).sample(frac=1).reset_index(drop=True)

# Download and process the dataset files.
def download_and_load_datasets(force_download=False):
dataset = tf.keras.utils.get_file(
fname="aclImdb.tar.gz",
origin="http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz",
extract=True,
)
train_df = load_dataset(os.path.join(os.path.dirname(dataset), "aclImdb", "train"))
test_df = load_dataset(os.path.join(os.path.dirname(dataset), "aclImdb", "test"))
return train_df, test_df

class PaddingInputExample(object):
"""Fake example so the num input examples is a multiple of the batch size.
When running eval/predict on the TPU, we need to pad the number of examples
to be a multiple of the batch size, because the TPU requires a fixed batch
size. The alternative is to drop the last batch, which is bad because it means
the entire output data won't be generated.
We use this class instead of `None` because treating `None` as padding
battches could cause silent errors.
"""

class InputExample(object):
"""A single training/test example for simple sequence classification."""
def __init__(self, guid, text_a, text_b=None, label=None):
"""Constructs a InputExample.
Args:
guid: Unique id for the example.
text_a: string. The untokenized text of the first sequence. For single
sequence tasks, only this sequence must be specified.
text_b: (Optional) string. The untokenized text of the second sequence.
Only must be specified for sequence pair tasks.
label: (Optional) string. The label of the example. This should be
specified for train and dev examples, but not for test examples.
"""
self.guid = guid
self.text_a = text_a
self.text_b = text_b
self.label = label

def create_tokenizer_from_hub_module(bert_path):
"""Get the vocab file and casing info from the Hub module."""
bert_module = hub.Module(bert_path)
tokenization_info = bert_module(signature="tokenization_info", as_dict=True)
vocab_file, do_lower_case = sess.run(
[tokenization_info["vocab_file"], tokenization_info["do_lower_case"]]
)
return FullTokenizer(vocab_file=vocab_file, do_lower_case=do_lower_case)

def convert_single_example(tokenizer, example, max_seq_length=256):
"""Converts a single `InputExample` into a single `InputFeatures`."""
if isinstance(example, PaddingInputExample):
input_ids = [0] * max_seq_length
input_mask = [0] * max_seq_length
segment_ids = [0] * max_seq_length
label = 0
return input_ids, input_mask, segment_ids, label
tokens_a = tokenizer.tokenize(example.text_a)
if len(tokens_a) > max_seq_length - 2:
tokens_a = tokens_a[0 : (max_seq_length - 2)]
tokens = []
segment_ids = []
tokens.append("[CLS]")
segment_ids.append(0)
for token in tokens_a:
tokens.append(token)
segment_ids.append(0)
tokens.append("[SEP]")
segment_ids.append(0)
input_ids = tokenizer.convert_tokens_to_ids(tokens)
# The mask has 1 for real tokens and 0 for padding tokens. Only real
# tokens are attended to.
input_mask = [1] * len(input_ids)
# Zero-pad up to the sequence length.
while len(input_ids) < max_seq_length:
input_ids.append(0)
input_mask.append(0)
segment_ids.append(0)
assert len(input_ids) == max_seq_length
assert len(input_mask) == max_seq_length
assert len(segment_ids) == max_seq_length
return input_ids, input_mask, segment_ids, example.label

def convert_examples_to_features(tokenizer, examples, max_seq_length=256):
"""Convert a set of `InputExample`s to a list of `InputFeatures`."""
input_ids, input_masks, segment_ids, labels = [], [], [], []
for example in tqdm(examples, desc="Converting examples to features"):
input_id, input_mask, segment_id, label = convert_single_example(
tokenizer, example, max_seq_length
)
input_ids.append(input_id)
input_masks.append(input_mask)
segment_ids.append(segment_id)
labels.append(label)
return (
np.array(input_ids),
np.array(input_masks),
np.array(segment_ids),
np.array(labels).reshape(-1, 1),
)

def convert_text_to_examples(texts, labels):
"""Create InputExamples"""
InputExamples = []
for text, label in zip(texts, labels):
InputExamples.append(
InputExample(guid=None, text_a=" ".join(text), text_b=None, label=label)
)
return InputExamples

class BertLayer(tf.keras.layers.Layer):
def __init__(
self,
n_fine_tune_layers=10,
pooling="mean",
bert_path="https://tfhub.dev/google/bert_uncased_L-12_H-768_A-12/1",
**kwargs,
):
self.n_fine_tune_layers = n_fine_tune_layers
self.trainable = True
self.output_size = 768
self.pooling = pooling
self.bert_path = bert_path
if self.pooling not in ["first", "mean"]:
raise NameError(
f"Undefined pooling type (must be either first or mean, but is {self.pooling}"
)
super(BertLayer, self).__init__(**kwargs)
def build(self, input_shape):
self.bert = hub.Module(
self.bert_path, trainable=self.trainable, name=f"{self.name}_module"
)
# Remove unused layers
trainable_vars = self.bert.variables
if self.pooling == "first":
trainable_vars = [var for var in trainable_vars if not "/cls/" in var.name]
trainable_layers = ["pooler/dense"]
elif self.pooling == "mean":
trainable_vars = [
var
for var in trainable_vars
if not "/cls/" in var.name and not "/pooler/" in var.name
]
trainable_layers = []
else:
raise NameError(
f"Undefined pooling type (must be either first or mean, but is {self.pooling}"
)
# Select how many layers to fine tune
for i in range(self.n_fine_tune_layers):
trainable_layers.append(f"encoder/layer_{str(11 - i)}")
# Update trainable vars to contain only the specified layers
trainable_vars = [
var
for var in trainable_vars
if any([l in var.name for l in trainable_layers])
]
# Add to trainable weights
for var in trainable_vars:
self._trainable_weights.append(var)
for var in self.bert.variables:
if var not in self._trainable_weights:
self._non_trainable_weights.append(var)
super(BertLayer, self).build(input_shape)
def call(self, inputs):
inputs = [K.cast(x, dtype="int32") for x in inputs]
input_ids, input_mask, segment_ids = inputs
bert_inputs = dict(
input_ids=input_ids, input_mask=input_mask, segment_ids=segment_ids
)
if self.pooling == "first":
pooled = self.bert(inputs=bert_inputs, signature="tokens", as_dict=True)[
"pooled_output"
]
elif self.pooling == "mean":
result = self.bert(inputs=bert_inputs, signature="tokens", as_dict=True)[
"sequence_output"
]
mul_mask = lambda x, m: x * tf.expand_dims(m, axis=-1)
masked_reduce_mean = lambda x, m: tf.reduce_sum(mul_mask(x, m), axis=1) / (
tf.reduce_sum(m, axis=1, keepdims=True) + 1e-10)
input_mask = tf.cast(input_mask, tf.float32)
pooled = masked_reduce_mean(result, input_mask)
else:
raise NameError(f"Undefined pooling type (must be either first or mean, but is {self.pooling}")
return pooled
def compute_output_shape(self, input_shape):
return (input_shape[0], self.output_size)

# Build model
def build_model(max_seq_length):
in_id = tf.keras.layers.Input(shape=(max_seq_length,), name="input_ids")
in_mask = tf.keras.layers.Input(shape=(max_seq_length,), name="input_masks")
in_segment = tf.keras.layers.Input(shape=(max_seq_length,), name="segment_ids")
bert_inputs = [in_id, in_mask, in_segment]
bert_output = BertLayer(n_fine_tune_layers=3)(bert_inputs)
dense = tf.keras.layers.Dense(256, activation="relu")(bert_output)
pred = tf.keras.layers.Dense(1, activation="sigmoid")(dense)
model = tf.keras.models.Model(inputs=bert_inputs, outputs=pred)
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
model.summary()
return model

def initialize_vars(sess):
sess.run(tf.local_variables_initializer())
sess.run(tf.global_variables_initializer())
sess.run(tf.tables_initializer())
K.set_session(sess)

def main(train_text, train_label, test_text, test_label):
# Params for bert model and tokenization
bert_path = "https://tfhub.dev/google/bert_uncased_L-12_H-768_A-12/1"
max_seq_length = 256

# Instantiate tokenizer
tokenizer = create_tokenizer_from_hub_module(bert_path)
# Convert data to InputExample format
train_examples = convert_text_to_examples(train_text, train_label)
test_examples = convert_text_to_examples(test_text, test_label)
# Convert to features
(
train_input_ids,
train_input_masks,
train_segment_ids,
train_labels,
) = convert_examples_to_features(
tokenizer, train_examples, max_seq_length=max_seq_length
)
(
test_input_ids,
test_input_masks,
test_segment_ids,
test_labels,
) = convert_examples_to_features(
tokenizer, test_examples, max_seq_length=max_seq_length
)
model = build_model(max_seq_length)
# Instantiate variables
initialize_vars(sess)
model.fit(
[train_input_ids, train_input_masks, train_segment_ids],
train_labels,
validation_data=(
[test_input_ids, test_input_masks, test_segment_ids],
test_labels,
),
epochs=1,
batch_size=32,
)
# get the data
# import dependencies
import pandas as pd
import numpy as np
import csv
from nltk.tokenize import word_tokenize
from nltk import pos_tag
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from sklearn.preprocessing import LabelEncoder
from collections import defaultdict
from nltk.corpus import wordnet as wn
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn import model_selection, naive_bayes, svm
from sklearn.metrics import accuracy_score
# set random seed
np.random.seed(42)
# add the corpus
with open('labelled_data.csv', newline='') as f:
reader = csv.reader(f)
next(reader) # skip header
labelled_data = [tuple(row) for row in reader]
# generate a balanced data set
# separate into positive and negative comments
positives = []
for tup in labelled_data:
if tup[1] == "positive":
positives.append(tup)
negatives = []
for tup in labelled_data:
if tup[1] == "negative":
negatives.append(tup)
# base the number of samples on the lesser of two category counts    
import random
sample_count = min(len(positives), len(negatives))
balanced_positives = random.sample(positives, sample_count)
balanced_negatives = random.sample(negatives, sample_count)
balanced_data = balanced_positives + balanced_negatives
# shuffle the data
random.shuffle(balanced_data)
# convert to a dataframe and label the columns
Corpus = pd.DataFrame(balanced_data)    
Corpus.columns = ['text', 'label']
# Step - a : Remove blank rows if any.
Corpus['text'].dropna(inplace=True)
# Step - b : Change all the text to lower case. This is required as python interprets 'dog' and 'DOG' differently
Corpus['text'] = [entry.lower() for entry in Corpus['text']]
# Step - c : Tokenization : In this each entry in the corpus will be broken into set of words
Corpus['text']= [word_tokenize(entry) for entry in Corpus['text']]
# Step - d : Remove Stop words, Non-Numeric and perfom Word Stemming/Lemmenting.
# WordNetLemmatizer requires Pos tags to understand if the word is noun or verb or adjective etc. By default it is set to Noun
tag_map = defaultdict(lambda : wn.NOUN)
tag_map['J'] = wn.ADJ
tag_map['V'] = wn.VERB
tag_map['R'] = wn.ADV
for index,entry in enumerate(Corpus['text']):
# Declaring Empty List to store the words that follow the rules for this step
Final_words = []
# Initializing WordNetLemmatizer()
word_Lemmatized = WordNetLemmatizer()
# pos_tag function below will provide the 'tag' i.e if the word is Noun(N) or Verb(V) or something else.
for word, tag in pos_tag(entry):
# Below condition is to check for Stop words and consider only alphabets
if word not in stopwords.words('english') and word.isalpha():
word_Final = word_Lemmatized.lemmatize(word,tag_map[tag[0]])
Final_words.append(word_Final)
# The final processed set of words for each iteration will be stored in 'text_final'
Corpus.loc[index,'text_final'] = str(Final_words)
# create 80/20 training/test split of data
train_text, test_text, train_label, test_label = model_selection.train_test_split(Corpus['text_final'],Corpus['label'],test_size=0.2)
Encoder = LabelEncoder()
Train_Y = Encoder.fit_transform(train_label)
Test_Y = Encoder.fit_transform(test_label)
Train_X = train_text
Test_X = test_text
main(Train_X, Train_Y, Test_X, Test_Y)

最新更新