CNN应用在图片分类的场景中较多,可能给大家一个思维定势----CNN貌似只能应用在图片场景,其实CNN也可对文本进行分类。
卷积只是特征提取的一种方式,并不是只能处理图像,使用卷积只要能提取特征即可。
下图为卷积对文本分类的整体思路:
文本分词-->映射成向量:把文本(字符串)转换成数值(对文本进行编码),上图使用7*5的矩阵存储每一句话的编码用三种不同的卷积窗口,每种卷积窗口有2个,得到6个特征图。(例如卷积窗口大小为2*5代表“看前后关注的两个词”)池化:把6个特征的大小变成相同把池化后的特征图组合在一起用得到的特征做二分类训练数据集为.csv文件,存储姓名、性别的映射关系,共351791条数据,我们要训练一个模型,用它来预测一个姓名属于“男”还是“女”。
训练程序
main.py
# coding:utf-8 import tensorflow as tf import numpy as np import csv name_dataset = 'name.csv' train_x = [] train_y = [] with open(name_dataset, 'r', encoding='utf-8') as csvfile: read = csv.reader(csvfile) # 按行读取CSV文件 for sample in read: # 数据有标签 if len(sample) == 2: train_x.append(sample[0]) if sample[1] == '男': train_y.append([0, 1]) # 男,01,onehot编码 else: train_y.append([1, 0]) # 女,10 # 指定当前一个人的名字最大长度。多截少补 max_name_length = max([len(name) for name in train_x]) print("最长名字的字符数:", max_name_length) max_name_length = 8 counter = 0 # 词库表 vocabulary = {} # 每个名字 for name in train_x: counter += 1 tokens = [word for word in name] # 每个字,统计词频 for word in tokens: if word in vocabulary: vocabulary[word] += 1 else: vocabulary[word] = 1 # 排序 vocabulary_list = [' '] + sorted(vocabulary, key=vocabulary.get, reverse=True) print(len(vocabulary_list)) # 对字进行编码。每个字都有唯一的标识符 vocab = dict([(x, y) for (y, x) in enumerate(vocabulary_list)]) train_x_vec = [] for name in train_x: name_vec = [] # 对名字中的每个字 for word in name: name_vec.append(vocab.get(word)) # 当前名字大小未满足最大值,填充 while len(name_vec) < max_name_length: name_vec.append(0) train_x_vec.append(name_vec) ####################################### input_size = max_name_length num_classes = 2 batch_size = 64 num_batch = len(train_x_vec) // batch_size X = tf.placeholder(tf.int32, [None, input_size]) Y = tf.placeholder(tf.float32, [None, num_classes]) dropout_keep_prob = tf.placeholder(tf.float32) # vocabulary_size:词库表总字数;embedding_size:每个名字映射成128维的向量 def neural_network(vocabulary_size, embedding_size=128, num_filters=128): # embedding layer with tf.name_scope("embedding"): W = tf.Variable(tf.random_uniform([vocabulary_size, embedding_size], -1.0, 1.0)) # 把名字映射成向量(?,8,128) embedded_chars = tf.nn.embedding_lookup(W, X) # 填充维度,把3维变成4维,便于进行卷积。用1进行填充。(?,8,128,1) embedded_chars_expanded = tf.expand_dims(embedded_chars, -1) # convolution + maxpool layer # 用不同的filter_sizes得到不同的特征 filter_sizes = [3, 4, 5] pooled_outputs = [] for i, filter_size in enumerate(filter_sizes): with tf.name_scope("conv-maxpool-%s" % filter_size): filter_shape = [filter_size, embedding_size, 1, num_filters] W = tf.Variable(tf.truncated_normal(filter_shape, stddev=0.1)) b = tf.Variable(tf.constant(0.1, shape=[num_filters])) conv = tf.nn.conv2d(embedded_chars_expanded, W, strides=[1, 1, 1, 1], padding="VALID") h = tf.nn.relu(tf.nn.bias_add(conv, b)) pooled = tf.nn.max_pool(h, ksize=[1, input_size - filter_size + 1, 1, 1], strides=[1, 1, 1, 1], padding="VALID") pooled_outputs.append(pooled) # 128*3 num_filters_total = num_filters * len(filter_sizes) # 384特征拼一起 h_pool = tf.concat(pooled_outputs, 3) # 384维特征 h_pool_flat = tf.reshape(h_pool, [-1, num_filters_total]) with tf.name_scope("dropout"): h_drop = tf.nn.dropout(h_pool_flat, dropout_keep_prob) with tf.name_scope("output"): # 384*2 W = tf.get_variable("W", shape=[num_filters_total, num_classes], initializer=tf.contrib.layers.xavier_initializer()) b = tf.Variable(tf.constant(0.1, shape=[num_classes])) output = tf.nn.xw_plus_b(h_drop, W, b) return output def train_neural_network(): output = neural_network(len(vocabulary_list)) optimizer = tf.train.AdamOptimizer(1e-3) loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=output, labels=Y)) # compute_gradients和apply_gradients相当于minimize()。前者用于计算梯度,后者用于使用计算得到的梯度来更新对应的variable grads_and_vars = optimizer.compute_gradients(loss) train_op = optimizer.apply_gradients(grads_and_vars) saver = tf.train.Saver(tf.global_variables()) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) # 迭代200个epoch for e in range(201): # 迭代batch for i in range(num_batch): batch_x = train_x_vec[i * batch_size: (i + 1) * batch_size] batch_y = train_y[i * batch_size: (i + 1) * batch_size] _, loss_ = sess.run([train_op, loss], feed_dict={X: batch_x, Y: batch_y, dropout_keep_prob: 0.5}) if i % 1000 == 0: print('epoch:', e, 'iter:', i, 'loss:', loss_) if e % 100 == 0: # .meta 存网络架构图;.data 存当前的权重 saver.save(sess, "./model/name2sex", global_step=e) train_neural_network()测试程序:
test.py
# coding:utf-8 import tensorflow as tf import csv name_dataset = 'name.csv' train_x = [] train_y = [] with open(name_dataset, 'r', encoding='utf-8') as csvfile: read = csv.reader(csvfile) for sample in read: if len(sample) == 2: train_x.append(sample[0]) if sample[1] == '男': train_y.append([0, 1]) # 男 else: train_y.append([1, 0]) # 女 max_name_length = max([len(name) for name in train_x]) print("最长名字的字符数:", max_name_length) max_name_length = 8 counter = 0 vocabulary = {} for name in train_x: counter += 1 tokens = [word for word in name] for word in tokens: if word in vocabulary: vocabulary[word] += 1 else: vocabulary[word] = 1 vocabulary_list = [' '] + sorted(vocabulary, key=vocabulary.get, reverse=True) print(len(vocabulary_list)) vocab = dict([(x, y) for (y, x) in enumerate(vocabulary_list)]) train_x_vec = [] for name in train_x: name_vec = [] for word in name: name_vec.append(vocab.get(word)) while len(name_vec) < max_name_length: name_vec.append(0) train_x_vec.append(name_vec) input_size = max_name_length num_classes = 2 batch_size = 64 num_batch = len(train_x_vec) // batch_size X = tf.placeholder(tf.int32, [None, input_size]) Y = tf.placeholder(tf.float32, [None, num_classes]) dropout_keep_prob = tf.placeholder(tf.float32) def neural_network(vocabulary_size, embedding_size=128, num_filters=128): # embedding layer with tf.name_scope("embedding"): W = tf.Variable(tf.random_uniform([vocabulary_size, embedding_size], -1.0, 1.0)) embedded_chars = tf.nn.embedding_lookup(W, X) embedded_chars_expanded = tf.expand_dims(embedded_chars, -1) filter_sizes = [3, 4, 5] pooled_outputs = [] for i, filter_size in enumerate(filter_sizes): with tf.name_scope("conv-maxpool-%s" % filter_size): filter_shape = [filter_size, embedding_size, 1, num_filters] W = tf.Variable(tf.truncated_normal(filter_shape, stddev=0.1)) b = tf.Variable(tf.constant(0.1, shape=[num_filters])) conv = tf.nn.conv2d(embedded_chars_expanded, W, strides=[1, 1, 1, 1], padding="VALID") h = tf.nn.relu(tf.nn.bias_add(conv, b)) pooled = tf.nn.max_pool(h, ksize=[1, input_size - filter_size + 1, 1, 1], strides=[1, 1, 1, 1], padding="VALID") pooled_outputs.append(pooled) num_filters_total = num_filters * len(filter_sizes) h_pool = tf.concat(pooled_outputs, 3) h_pool_flat = tf.reshape(h_pool, [-1, num_filters_total]) with tf.name_scope("dropout"): h_drop = tf.nn.dropout(h_pool_flat, dropout_keep_prob) with tf.name_scope("output"): W = tf.get_variable("W", shape=[num_filters_total, num_classes], initializer=tf.contrib.layers.xavier_initializer()) b = tf.Variable(tf.constant(0.1, shape=[num_classes])) output = tf.nn.xw_plus_b(h_drop, W, b) return output def detect_sex(name_list): x = [] for name in name_list: name_vec = [] for word in name: name_vec.append(vocab.get(word)) while len(name_vec) < max_name_length: name_vec.append(0) x.append(name_vec) output = neural_network(len(vocabulary_list)) saver = tf.train.Saver(tf.global_variables()) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) # 恢复前一次训练 ''' ckpt = tf.train.get_checkpoint_state('.') if ckpt != None: print(ckpt.model_checkpoint_path) ''' # 加载当前模型 saver.restore(sess, './model/name2sex-200') predictions = tf.argmax(output, 1) res = sess.run(predictions, {X: x, dropout_keep_prob: 1.0}) i = 0 for name in name_list: print(name, '女' if res[i] == 0 else '男') i += 1 detect_sex(["张金龙", "段玉刚", "金华花"])测试结果:
张金龙 男 段玉刚 男 金华花 女