5.11 案例:KITTI人、车检测案例-训练、检测
学习目标
- 目标
- 无
- 应用
- 应用完成KITTI自动驾驶数据集的训练和检测过程
5.11.1 训练过程实现
- 步骤
- 训练参数设置
- 1、判断传入的是需要训练YoloV3Tiny还是YOLOV3正常版本
- 由于有实现多个版本,可以让用户选择具体版本来进行指定模型训练
- 2、获取传入参数的训练数据以及获取传入参数的验证集数据
- 通过dataset.load_tfrecord_dataset进行读取
- 4、判断是否进行迁移学习
- 在训练期间可以封装让训练自定义训练的结构,从而根据自己的数据训练模型
- 5、定义优化器以及损失计算方式
- 6、优化训练
通过argparse设置训练参数
import logging
import tensorflow as tf
import numpy as np
import cv2
import sys
import argparse
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
from tensorflow.keras.callbacks import (
ReduceLROnPlateau,
EarlyStopping,
ModelCheckpoint,
TensorBoard
)
from yolov3_tf2.models import (
YoloV3, YoloV3Tiny, YoloLoss,
yolo_anchors, yolo_anchor_masks,
yolo_tiny_anchors, yolo_tiny_anchor_masks
)
from yolov3_tf2.utils import freeze_all
import yolov3_tf2.dataset as dataset
parser = argparse.ArgumentParser()
parser.add_argument('--dataset', type=str, default='./data/kitti_tfrecords/train.tfrecord',
help='训练数据集路径')
parser.add_argument('--val_dataset', type=str, default='./data/kitti_tfrecords/val.tfrecord',
help='验证集目录')
parser.add_argument('--tiny', type=bool, default=False, help='加载的模型类型yolov3 or yolov3-tiny')
parser.add_argument('--weights', type=str, default='./checkpoints/yolov3_train_1.tf',
help='模型预训练权重路径')
parser.add_argument('--classes', type=str, default='./data/kitti.names',
help='类别文件路径')
parser.add_argument('--mode', type=str, default='fit', choices=['fit', 'eager_tf'],
help='fit: model.fit模式, eager_tf: 自定义GradientTape模式')
parser.add_argument('--transfer', type=str, default='none', choices=['none', 'darknet',
'no_output', 'frozen',
'fine_tune'],
help='none: 全部进行训练'
'迁移学习并冻结所有, fine_tune: 迁移并只冻结darknet')
parser.add_argument('--size', type=int, default=416,
help='图片大小')
parser.add_argument('--epochs', type=int, default=2,
help='迭代次数')
parser.add_argument('--batch_size', type=int, default=8,
help='每批次大小')
parser.add_argument('--learning_rate', type=float, default=1e-3,
help='学习率')
parser.add_argument('--num_classes', type=int, default=6,
help='类别数量')
1、判断传入的是需要训练YoloV3Tiny还是YOLOV3正常版本
并且初始化YOLO各个模型的anchor大小,(在源码中有设置,计算损失时候需要用)
- anchors: 使用到的anchor的尺寸,如[10, 13, 16, 30, 33, 23, 30, 61, 62, 45, 59, 119, 116, 90, 156, 198, 373, 326]
- anchor_mask: 每个层级上使用的anchor的掩码,[[6, 7, 8], [3, 4, 5], [0, 1, 2]]
- anchor box的索引数组,3个1组倒序排序:
- 如6,7,8对应13x13特征图取(116, 90), (156, 198), (373, 326)
- anchor box的索引数组,3个1组倒序排序:
yolo_anchors = np.array([(10, 13), (16, 30), (33, 23), (30, 61), (62, 45), (59, 119), (116, 90), (156, 198), (373, 326)], np.float32) / 416 yolo_anchor_masks = np.array([[6, 7, 8], [3, 4, 5], [0, 1, 2]]) yolo_tiny_anchors = np.array([(10, 14), (23, 27), (37, 58), (81, 82), (135, 169), (344, 319)], np.float32) / 416 yolo_tiny_anchor_masks = np.array([[3, 4, 5], [0, 1, 2]])
# 1、判断传入的是需要训练YoloV3Tiny还是YOLOV3正常版本
if args.tiny:
model = YoloV3Tiny(args.size, training=True,
classes=args.num_classes)
anchors = yolo_tiny_anchors
anchor_masks = yolo_tiny_anchor_masks
else:
model = YoloV3(args.size, training=True, classes=args.num_classes)
anchors = yolo_anchors
anchor_masks = yolo_anchor_masks
2、获取传入参数的训练数据以及获取传入参数的验证集数据
- 通过dataset.load_tfrecord_dataset进行读取
# 2、获取传入参数的训练数据
if args.dataset:
train_dataset = dataset.load_tfrecord_dataset(
args.dataset, args.classes)
train_dataset = train_dataset.shuffle(buffer_size=1024)
train_dataset = train_dataset.batch(args.batch_size)
train_dataset = train_dataset.map(lambda x, y: (
dataset.transform_images(x, args.size),
dataset.transform_targets(y, anchors, anchor_masks, 6)))
train_dataset = train_dataset.prefetch(
buffer_size=tf.data.experimental.AUTOTUNE)
# 3、获取传入参数的验证集数据
if args.val_dataset:
val_dataset = dataset.load_tfrecord_dataset(
args.val_dataset, args.classes)
val_dataset = val_dataset.batch(args.batch_size)
val_dataset = val_dataset.map(lambda x, y: (
dataset.transform_images(x, args.size),
dataset.transform_targets(y, anchors, anchor_masks, 6)))
3、判断训练是否进行迁移学习,指定结构冻结
- 1、如果迁移学习:
- 加载与训练权重,可从官网下载
- 如果判断微调的话:加载yolo_darknet:x_36, x_61, x = Darknet(name='yolo_darknet')(x),冻结这些层
- 如果用户传入frozen:冻结所有层
- 如果是其他:根据YOLO类型,初始化模型
- 若只对darknet迁移,然后对剩余其他层进行冻结
- 如果是no_output,吧out_put部分进行随机初始化,然后冻结其他加载过的模型权重
- 加载与训练权重,可从官网下载
# 4、判断是否进行迁移学习
if args.transfer != 'none':
# 加载与训练模型'./data/yolov3.weights'
model.load_weights(args.weights)
if args.transfer == 'fine_tune':
# 冻结darknet
darknet = model.get_layer('yolo_darknet')
freeze_all(darknet)
elif args.transfer == 'frozen':
# 冻结所有层
freeze_all(model)
else:
# 重置网络后端结构
if args.tiny:
init_model = YoloV3Tiny(
args.size, training=True, classes=args.num_classes)
else:
init_model = YoloV3(
args.size, training=True, classes=args.num_classes)
# 如果迁移指的是darknet
if args.transfer == 'darknet':
# 获取网络的权重
for l in model.layers:
if l.name != 'yolo_darknet' and l.name.startswith('yolo_'):
l.set_weights(init_model.get_layer(
l.name).get_weights())
else:
freeze_all(l)
elif args.transfer == 'no_output':
for l in model.layers:
if l.name.startswith('yolo_output'):
l.set_weights(init_model.get_layer(
l.name).get_weights())
else:
freeze_all(l)
# 需要从模型文件中导入utils中的freeze_all函数
def freeze_all(model, frozen=True):
model.trainable = not frozen
if isinstance(model, tf.keras.Model):
for l in model.layers:
freeze_all(l, frozen)
5、定义优化器以及损失计算方式
optimizer = tf.keras.optimizers.Adam(lr=args.learning_rate)
# 返回yolo_loss(y_true, y_pred)的函数
loss = [YoloLoss(anchors[mask], classes=args.num_classes)
for mask in anchor_masks]
注:其中YOLOLoss的计算过程源码当中
6、训练优化过程,训练指定模式
- 用eager的梯度调试模式进行训练易于调试
- keras model的fit模式简单易用
if args.mode == 'eager_tf':
# 1、定义评估方式
avg_loss = tf.keras.metrics.Mean('loss', dtype=tf.float32)
avg_val_loss = tf.keras.metrics.Mean('val_loss', dtype=tf.float32)
# 2、迭代优化
for epoch in range(1, args.epochs + 1):
for batch, (images, labels) in enumerate(train_dataset):
with tf.GradientTape() as tape:
# 1、计算模型输出和损失
outputs = model(images, training=True)
regularization_loss = tf.reduce_sum(model.losses)
pred_loss = []
for output, label, loss_fn in zip(outputs, labels, loss):
# 根据输出和标签计算出损失
pred_loss.append(loss_fn(label, output))
# 计算总损失 = 平均损失 + regularization_loss
total_loss = tf.reduce_sum(pred_loss) + regularization_loss
# 计算梯度以及更新梯度
grads = tape.gradient(total_loss, model.trainable_variables)
optimizer.apply_gradients(
zip(grads, model.trainable_variables))
# 打印日志
logging.info("{}_train_{}, {}, {}".format(
epoch, batch, total_loss.numpy(),
list(map(lambda x: np.sum(x.numpy()), pred_loss))))
avg_loss.update_state(total_loss)
# 验证数据集验证输出计算
for batch, (images, labels) in enumerate(val_dataset):
outputs = model(images)
# 求损失
regularization_loss = tf.reduce_sum(model.losses)
pred_loss = []
# 输出结果和标签计算损失
for output, label, loss_fn in zip(outputs, labels, loss):
pred_loss.append(loss_fn(label, output))
total_loss = tf.reduce_sum(pred_loss) + regularization_loss
# 打印总损失
logging.info("{}_val_{}, {}, {}".format(
epoch, batch, total_loss.numpy(),
list(map(lambda x: np.sum(x.numpy()), pred_loss))))
avg_val_loss.update_state(total_loss)
logging.info("{}, train: {}, val: {}".format(
epoch,
avg_loss.result().numpy(),
avg_val_loss.result().numpy()))
# 保存模型位置
avg_loss.reset_states()
avg_val_loss.reset_states()
model.save_weights(
'checkpoints/yolov3_train_{}.tf'.format(epoch))
else:
# 指定相关回调函数,自定义需求,对于检测来讲不需要太多的优化算法方式
model.compile(optimizer=optimizer, loss=loss)
callbacks = [
EarlyStopping(patience=3, verbose=1),
ModelCheckpoint('checkpoints/yolov3_train_{epoch}.tf',
verbose=1, save_weights_only=True),
TensorBoard(log_dir='logs')
]
history = model.fit(train_dataset,
epochs=args.epochs,
callbacks=callbacks,
validation_data=val_dataset)
1、EarlyStopping
- keras.callbacks.EarlyStopping(monitor='val_loss', patience=0, verbose=0, mode='auto')
当监测值不再改善时,该回调函数将中止训练
- 参数
- monitor:需要监视的量
- patience:当early stop被激活(如发现loss相比上一个epoch训练没有下降),则经过
patience个epoch后停止训练。 - verbose:信息展示模式
- mode:‘auto’,‘min’,‘max’之一,在
min模式下,如果检测值停止下降则中止训练。在max模式下,当检测值不再上升则停止训练。
5.11.2 测试过程实现
- 步骤
- 1、初始化模型并加载权重
- 2、加载图片处理图片并使用模型进行预测
- 3、将图片框画在图片中并进行保存
导入包并制定相关参数
import time
import logging
import cv2
import numpy as np
import tensorflow as tf
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
from yolov3_tf2.models import (
YoloV3, YoloV3Tiny
)
from yolov3_tf2.dataset import transform_images
from yolov3_tf2.utils import draw_outputs
import argparse
import sys
parser = argparse.ArgumentParser()
parser.add_argument('--classes', type=str, default='./data/kitti.names',
help='类别配置路径')
parser.add_argument('--weights', type=str, default='./checkpoints/yolov3_train_1.tf',
help='训练好的模型位置')
parser.add_argument('--tiny', type=bool, default=False, help='加载的模型类型yolov3 or yolov3-tiny')
parser.add_argument('--size', type=int, default=416,
help='图片大小')
parser.add_argument('--image', type=str, default='./data/kitti/data_object_image_2/testing/image_2/000008.png',
help='输入预测图片的位置')
parser.add_argument('--output', type=str, default='./output.jpg',
help='输出图片结果的位置')
parser.add_argument('--num_classes', type=int, default=6,
help='总共类别数量')
整体过程实现逻辑:
def main(args):
# 1、初始化模型并加载权重
if args.tiny:
yolo = YoloV3Tiny(classes=args.num_classes)
else:
yolo = YoloV3(classes=args.num_classes)
yolo.load_weights(args.weights)
logging.info('加载模型权重weights')
# 加载目标类型
class_names = [c.strip() for c in open(args.classes).readlines()]
# 2、加载图片处理图片并使用模型进行预测
img = tf.image.decode_image(open(args.image, 'rb').read(), channels=3)
img = tf.expand_dims(img, 0)
img = transform_images(img, args.size)
# 记录时间
t1 = time.time()
boxes, scores, classes, nums = yolo(img)
t2 = time.time()
logging.info('耗时: {}'.format(t2 - t1))
logging.info('检测结果:')
print(boxes, scores, classes, nums)
for i in range(nums[0]):
logging.info('\t{}, {}, {}'.format(class_names[int(classes[0][i])],
np.array(scores[0][i]),
np.array(boxes[0][i])))
# 3、显示图片并将图片框画出
img = cv2.imread(args.image)
img = draw_outputs(img, (boxes, scores, classes, nums), class_names)
cv2.imwrite(args.output, img)
logging.info('output saved to: {}'.format(args.output))
if __name__ == '__main__':
args = parser.parse_args(sys.argv[1:])
main(args)
其中用到几个YOLO源码实现的处理函数(无需实现,具体参考源代码)
from yolov3_tf2.dataset import transform_images
from yolov3_tf2.utils import draw_outputs
- 预处理大小和归一化函数
def transform_images(x_train, size):
x_train = tf.image.resize(x_train, (size, size))
x_train = x_train / 255
return x_train
- draw_outputs
def draw_outputs(img, outputs, class_names):
boxes, objectness, classes, nums = outputs
boxes, objectness, classes, nums = boxes[0], objectness[0], classes[0], nums[0]
wh = np.flip(img.shape[0:2])
for i in range(nums):
x1y1 = tuple((np.array(boxes[i][0:2]) * wh).astype(np.int32))
x2y2 = tuple((np.array(boxes[i][2:4]) * wh).astype(np.int32))
img = cv2.rectangle(img, x1y1, x2y2, (255, 0, 0), 2)
img = cv2.putText(img, '{} {:.4f}'.format(
class_names[int(classes[i])], objectness[i]),
x1y1, cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (0, 0, 255), 2)
return img
如果我们看不到结果,说明预测的结果都不好,所以可以这样在模型的代码中修改iou和分数的阈值来进行调整(根据实际需求)
def yolo_nms(outputs, anchors, masks, classes):
# boxes, conf, type
b, c, t = [], [], []
for o in outputs:
b.append(tf.reshape(o[0], (tf.shape(o[0])[0], -1, tf.shape(o[0])[-1])))
c.append(tf.reshape(o[1], (tf.shape(o[1])[0], -1, tf.shape(o[1])[-1])))
t.append(tf.reshape(o[2], (tf.shape(o[2])[0], -1, tf.shape(o[2])[-1])))
bbox = tf.concat(b, axis=1)
confidence = tf.concat(c, axis=1)
class_probs = tf.concat(t, axis=1)
scores = confidence * class_probs
boxes, scores, classes, valid_detections = tf.image.combined_non_max_suppression(
boxes=tf.reshape(bbox, (tf.shape(bbox)[0], -1, 1, 4)),
scores=tf.reshape(
scores, (tf.shape(scores)[0], -1, tf.shape(scores)[-1])),
max_output_size_per_class=100,
max_total_size=100,
iou_threshold=0.5,
score_threshold=0.5
)
return boxes, scores, classes, valid_detections
5.11.3 小结
- 完成KITTI自动驾驶数据集的训练和检测过程