本章导学

  • 学习目标:
    • 学会使用多进程的方式进行多模型训练。
    • 学会使用tensorflow-serving将模型封装成微服务,并使用多线程调用。

  • 引导分析:

    • 因为根据我们的业务特点,将使用多个模型进行类别预测以提升准确率, 所以会产生和维护大量的模型,因此如何利用现有资源提升模型的训练效率成为一个问题。而在python中,多进程是解决计算密集型任务并行化的最优方法。

    • 同样根据我们的业务特点,用户的每次请求,都有可能调用多个模型进行预测,而串行预测的方式,远远不能满足预测的性能要求。因此预测过程必须能够并行化,并很容易的整合全部的结果. 使用多线程发送请求,预测过程虽然也是计算密集型,但是这个计算过程是在模型微服务中进行的。

  • 本章小节:
    • 4.1 多模型多进程训练
      • 学习如何充分利用计算资源,提升模型训练效率, 将使用多模型多进程进行训练.
    • 4.2 多模型多线程预测
      • 学习如何以多线程的方式进行调用封装的模型微服务.



4.1 多模型多进程训练

  • 学习目标:
    • 知道使用多进程的原因。
    • 掌握多模型多进程的训练逻辑并实现它。

  • 使用多进程的原因:
    • 在python这门语言中,存在一个全局解释锁,它使系统的多个线程无法一同使用CPU资源,对于计算密集型任务,必须采用多进程方式并行化,而我们的模型训练,正是最典型的计算密集型任务,里面涵盖大量的矩阵计算,因此,我们这里使用多进程训练。

  • CPU/内存正常负载值:
    • 是指我们的CPU/内存正常工作时占用率,比这个值小,说明我们的CPU/内存工作很轻松,比这个值大,说明工作起来已经很劳累了,一般取CPU/内存占用率的55%。

  • CPU/内存危险负载值:
    • 是指我们的CPU/内存危险工作时的占用率,比这值小,系统不会挂掉或者开启自动保护。比这个值大,系统可能随时会挂掉或开启自动保护。一般取CPU/内存占用率的95%。

  • 多模型多进程的训练逻辑:
    • 开启第一个模型训练进程,进入训练状态后开始检测占用资源是否小于CPU/内存正常负载值。小于CPU/内存正常负载值,则开启第二个模型训练任务。否则,开始检测占用资源是否大于CPU/内存危险负载值,如果大于,则kill掉这个进程,否则,说明占用率处在正常负载值与危险负载值之间,此时,等待该模型训练进程结束,再自动开启下一个模型训练进程。

  • 多模型多进程训练过程的代码分析过程:
import time

# 用于开启多个进程
import subprocess

# 使用psutil进行资源监控,主要获取cpu与内存占用情况。
import psutil

# 设定CPU与内存的正常和危险占用阈值
CPU_NOR_LIMIT = MEM_NOR_LIMIT = 55
CPU_DAN_LIMIT = MEM_DAN_LIMIT = 95

# 模型训练脚本列表
model_train_list = ["python movie_model_train.py", "python beauty_model_train.py"]

# 创建subp的列表容器,用于装载子进程
subp = []

def detect_cpu_mem(): 
    """检测CPU和内存占用率"""
    print("进行mem和cpu检测:")
    # 内存检测
    mem = psutil.virtual_memory().percent
    # psutil检测cpu时间隔至少3s以上
    cpu = psutil.cpu_percent(3)
    print("当前内存占用率:" + str(mem) + "%")
    print("当前CPU占用率:" + str(cpu) + "%")
    return  mem, cpu


def single_model_train(model):
    """开启单个模型的训练"""
    p = subprocess.Popen(model, shell=True)
    # 等待3秒预估模型进入训练状态,即资源占用趋于稳定。
    time.sleep(3)
    # 进行资源检测
    mem, cpu = detect_cpu_mem()

    # 内存和CPU同时小于正常负载值,则任其继续运行,并装入列表
    if mem < MEM_NOR_LIMIT and cpu < CPU_NOR_LIMIT:
        subp.append(p)
        print("该模型进入正常训练过程,并可以开启下一模型训练!")
    else:
        # 判断是否大于危险负载值,若大于,将kill该进程,
        # 否则等待该进程结束,再进行其他训练任务。
        if mem > MEM_DAN_LIMIT or cpu > CPU_DAN_LIMIT:
            p.kill()
            print("该模型没有进入正常训练过程!")
        else:
            p.wait()
            print("该模型进入正常训练过程, 但不要开启下一个模型训练!")


def start_multiprocess_train():
    """开启多进程训练"""
    print("启动多模型训练:")

    # 遍历启动模型的命令,准备循环开启训练进程
    for i, model in enumerate(model_train_list):
        print("__________________________")
        print("正在启动第" + str(i+1) + "个模型:")
        # 启动模型训练
        single_model_train(model)
    else:
        # 所有装入列表的进程都会等待其自然结束后才会停止该函数所在的进程
        print("正在等待所有模型训练结束!")
        list(map(lambda x: x.wait(), subp))
        print("完成!")


  • 代码位置: 代码将写在/data/django-uwsgi/text_labeled/model_train/multiprocess_train.py中.

  • 函数detect_cpu_mem():

  • 输出效果:
当前内存占用率:6.7%
当前CPU占用率:0.5%


  • 函数single_model_train(model):

  • 输入实例:
# 模型训练脚本列表的第一项
# 相当于python movie_model_train.py
model = model_train_list[0] 
  • 输出效果:
# 检测内存和cpu占用率, 并打印脚本文件movie_model_train.py的执行内容.
当前内存占用率:7.5%
当前CPU占用率:25.1%
该模型进入正常训练过程, 并可以开启下一模型训练!

Epoch 3/20
5299/5299 [==============================] - 6s 1ms/step - loss: 0.4098 - acc: 0.7996 - val_loss: 1.0321 - val_acc: 0.1647
Epoch 4/20
5299/5299 [==============================] - 7s 1ms/step - loss: 0.3190 - acc: 0.8517 - val_loss: 0.8503 - val_acc: 0.3124
Epoch 5/20
5299/5299 [==============================] - 6s 1ms/step - loss: 0.2384 - acc: 0.9109 - val_loss: 0.6873 - val_acc: 0.5025
Epoch 6/20
5299/5299 [==============================] - 6s 1ms/step - loss: 0.1781 - acc: 0.9504 - val_loss: 0.6238 - val_acc: 0.5756
Epoch 7/20
5299/5299 [==============================] - 6s 1ms/step - loss: 0.1359 - acc: 0.9711 - val_loss: 0.5465 - val_acc: 0.6401


  • 函数start_multiprocess_train():

  • 输入实例:
# 模型训练脚本执行命令列表
model_train_list = ["python movie_model_train.py", "python beauty_model_train.py"]

  • 输出效果:
# 检测两个模型共同训练是的内存和cpu占用情况
当前内存占用率:9.0%
当前CPU占用率:93.1%
...
该模型进入正常训练过程, 但不要开启下一个模型训练!
正在等待所有模型训练结束!
完成!

# 交替打印两个模型训练文件的执行内容

  • 主要注释:
# 用于开启多个进程

# 使用psutil进行资源监控,主要获取cpu与内存占用情况。

# 设定CPU与内存的正常和危险占用阈值

# 模型训练脚本列表

# 创建subp的列表容器,用于装载子进程

    """检测CPU和内存占用率"""

    # 内存检测

    # psutil检测cpu时间隔至少3s以上


    """开启单个模型的训练"""

    # 等待3秒预估模型进入训练状态,即资源占用趋于稳定。

    # 进行资源检测

    # 内存和CPU同时小于正常负载值,则任其继续运行,并装入列表

        # 判断是否大于危险负载值,若大于,将kill该进程,

        # 否则等待该进程结束,再进行其他训练任务。


    """开启多进程训练"""

    # 遍历启动模型的命令,准备循环开启训练进程

        # 启动模型训练

        # 所有装入列表的进程都会等待其自然结束后才会停止该函数所在的进程


  • 练一练:
    • 请同学们将star_model_train.py和fashion_model_train.py的模型训练脚本命令放到model_train_list中, 再进行一次多进程并行训练, 看看是怎样的效果.


  • 小节总结:

    • 学习了使用多进程的原因:

      • 在python这门语言中,存在一个全局解释锁,它使系统的多个线程无法一同使用CPU资源,对于计算密集型任务,必须采用多进程方式并行化,而我们的模型训练,正是最典型的计算密集型任务,里面涵盖大量的矩阵计算,因此,我们这里使用多进程训练。

    • 学习了多模型多进程的训练逻辑:
      • CPU/内存正常负载值: 一般取CPU/内存占用率的55%。
      • CPU/内存危险负载值: 一般取CPU/内存占用率的95%。
      • 开启第一个模型训练进程,进入训练状态后开始检测占用资源是否小于CPU/内存正常负载值。小于CPU/内存正常负载值,则开启第二个模型训练任务。否则,开始检测占用资源是否大于CPU/内存危险负载值,如果大于,则kill掉这个进程,否则,说明占用率处在正常负载值与危险负载值之间,此时,等待该模型训练进程结束,再自动开启下一个模型训练进程。

    • 实现了多模型多进程的训练逻辑:
      • 函数: detect_cpu_mem
      • 函数: single_model_train
      • 函数: start_multiprocess_train



4.2 多模型多线程预测

  • 学习目标:
    • 知道进行多线程预测的原因.
    • 掌握实现多模型多线程预测的三步曲.

  • 进行多线程预测的原因:
    • 根据我们的业务特点,用户的每次请求,都有可能调用多个模型进行预测,而串行预测的方式,远远不能满足预测的性能要求. 这就需要预测过程必须能够并行化,并很容易的整合全部的结果.

  • 模型预测过程也是计算密集型, 为什么没有受到全局解释锁的影响:
    • 虽然预测过程也是计算密集型的,但是我们对这个计算过程进行了封装, 使它是在模型微服务中进行, 而我们线程只是负责调用服务并整合结果而已, 因此不会受到全局解释锁的影响.

  • 实现多模型多线程预测的三步曲:
    • 第一步: 将h5格式的模型转化成pb格式.
    • 第二步: 使用docker启动tensorflow-serving微服务.
    • 第三步: 多线程调用微服务并处理结果.

  • 将h5格式的模型转化成pb格式过程的代码:
import time
from keras import backend as K
from keras.models import load_model
from tensorflow.python.saved_model import builder as saved_model_builder
from tensorflow.python.saved_model import signature_constants
from tensorflow.python.saved_model import tag_constants
from tensorflow.python.saved_model.signature_def_utils_impl import predict_signature_def

def to_savedmodel(h5_model_path, pb_model_path):
  """将h5模型转化成tensorflow的pb格式模型"""

  model = load_model(h5_model_path)
  builder = saved_model_builder.SavedModelBuilder(pb_model_path)

  signature = predict_signature_def(
      inputs={'input': model.inputs[0]}, outputs={'income': model.outputs[0]})

  with K.get_session() as sess:
    builder.add_meta_graph_and_variables(
        sess=sess,
        tags=[tag_constants.SERVING],
        signature_def_map={
            signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: signature
        })
    builder.save()


  • 代码位置: 代码将写在/data/django-uwsgi/text_labeled/model_train/multithread_predict.py中。

  • 输入实例:
# h5格式模型保存路径
h5_model_path = "./movie/model.h5"

# pb模型写入路径(在一个时间戳的文件夹中)
time_ = str(int(time.time()))
pb_model_path = "./movie/" + time_

  • 输出效果:
# 在./movie目录下有一个时间戳文件夹, 里面是pb模型和参数文件
./movie/1571482447
    - saved_model.pb
    - ./variables


  • 练一练:
    • 请同学按照上面的方法, 将beauty, star, fashion中对应的h5模型转化为pb模型, 在对应的文件中生成保存模型的时间戳文件夹.

  • 使用docker启动tensorflow-serving微服务过程的代码分析:
MODEL_PATH="/data/django-uwsgi/text_labeled/model_train"

# 使用docker run命令启动微服务
# 启动第一个模型
docker run -t --rm -p 8501:8501 \
    -v "$MODEL_PATH/movie:/models/movie" \
        -e MODEL_NAME=movie \
            tensorflow/serving &

# 同理启动, 启动第二个模型
docker run -t --rm -p 8502:8501 \
    -v "$MODEL_PATH/beauty:/models/beauty" \
        -e MODEL_NAME=beauty \
            tensorflow/serving &

# 更多模型以此类推

  • 代码位置: 代码在终端中运行, 写在/data/django-uwsgi/目录下。

  • 请求实例:
# 使用curl命令在终端进行测试
curl -d '{"instances": [[1.0, 2.0, 5.0]]}' \
    -X POST http://localhost:8501/v1/models/movie:predict

  • 输出效果:
# 该条样本为正样本的预测概率
{
    "predictions": [[1.0]]
}

  • 多线程调用微服务并处理结果过程的代码分析:
# 导入必备的工具包
import json
import threading
import requests
from sklearn.externals import joblib

# 从任意的模型训练文件中导入add_ngram增加n-gram特征以及padding截断函数
from movie_model_train import add_ngram
from movie_model_train import padding

# 定义模型配置路径,它指向一个json文件
model_config_path = "/data/django-uwsgi/text_labeled/model_train/model_config.json"

# model_config.json形如 :
# {"影视": ["/data/django-uwsgi/text_labeled/model_train/movie/Tokenizer", 60, 2, 
#           "/data/django-uwsgi/text_labeled/model_train/movie/token_indice", 119, 
#           "http://localhost:8501/v1/models/movie:predict"],
# "美妆": ["/data/django-uwsgi/text_labeled/model_train/beauty/Tokenizer", 75, 2, 
#           "/data/django-uwsgi/text_labeled/model_train/beauty/token_indice", 119, 
#           "http://localhost:8502/v1/models/beauty:predict"]}
# json文件中是一个字典,字典中的每个key是我们标签的中文字符,每个value是一个列表,
# 列表的第一项是特征处理词汇映射器的存储地址
# 第二项是特征处理语料的截断长度
# 第三项是n-gram取得n值
# 第四项是n-gram特征中token_indice的保存路径
# 第五项是最后的最大的对齐长度
# 第六项是该模型对应的微服务地址

# 最终的模型预测结果列表
model_prediction = []


def fea_process(word_list, config_list):
    """对输入进行类似与训练前的特征处理过程"""
    # 读取设定好的配置
    tokenizer_path = config_list[0]
    cutlen = config_list[1]
    ngram_range = config_list[2]
    ti_path = config_list[3]
    maxlen = config_list[4]

    # 加载分词映射器
    t = joblib.load(tokenizer_path)
    x_train = t.texts_to_sequences([word_list])
    # 进行截断对齐
    x_train = padding(x_train, cutlen)
    # 获得n-gram映射文件
    with open(ti_path, "r") as f:
        token_indice = eval(f.read())
    # 添加n-gram特征
    x_train = add_ngram(x_train, token_indice, ngram_range)
    # 进行最大长度对齐
    x_train = padding(x_train, maxlen)
    return x_train



def pred(word_list, model):
    """向单个微服务发送预测请求"""
    # 将持久化的模型配置文件加载到内存
    model_config = json.load(open(model_config_path, "r"))
    # 根据名字选择对应的配置列表
    config_list = model_config[model]
    # 对数据进行特征处理
    x_train = fea_process(word_list, config_list)
    # 封装成tf-serving需要的数据体
    data = {"instances": x_train.tolist()}
    # 向刚刚封装的微服务发送请求
    res = requests.post(url=config_list[5], json=data)
    # 将该线程中获取的结果放到模型预测结果列表中
    model_prediction.append([model, eval(res.text)["predictions"][0][0]])
    return res.text 


def request_model_serve(word_list, model_list):
    """该函数开启多线程请求封装好的模型微服务"""
    def _start_thread(pred, x, y):
        """开启预测线程, 以线程需要执行的函数和函数的输入为参数"""
        t = threading.Thread(target=pred, args=(x, y))
        t.start()
        return t

    # 遍历model_list, 调用开启线程函数_start_thread,会获得一个所有开启后的线程列表
    t_list = list(map(lambda model: _start_thread(pred, word_list, model), model_list))
    # 线程将逐一join操作等待所有线程完成
    t_list = list(map(lambda t: t.join(), t_list))
    # 最后过滤掉所有概率预测小于0.5的类别,返回结果
    result = list(filter(lambda x: x[1] >= 0.5, model_prediction))
    return result  


  • 代码位置: 代码将写在/data/django-uwsgi/text_labeled/model_train/multithread_predict.py中.

  • 函数fea_process(word_list, config_list):

  • 输入实例:
# 分词列表
word_list = ["霸王别姬", "是一部", "非常", "值得", "看的", "电影"]
config_list = ["/data/django-uwsgi/text_labeled/model_train/movie/Tokenizer", 60, 2, "/data/django-uwsgi/text_labeled/model_train/movie/token_indice", 119, "http://localhost:8501/v1/models/movie:predict"]


  • 输出效果:
# 通过特征处理流程的矩阵
[[    0     0     0     0     0     0     0     0     0     0     0     0
      0     0     0     0     0     0     0     0     0     0     0     0
      0     0     0     0     0     0     0     0     0     0     0     0
      0     0     0     0     0     0     0     0     0     0     0     0
      0     0     0     0     0     0     0     0     0     0     0     0
      0     0     0     0     0     0     0     0     0     0     0     0
      0     0     0     0     0     0     0     0     0     0     0     0
      0     0     0     0     0     0     0     0     0     0     0     0
      0     0     0     0     0     0     0     0     0     0     0     0
      0     0     0     0     0     0 14196    23   748     1 40108]]


  • 函数pred(word_list, model):

  • 输入实例:
# 分词列表
word_list = ["霸王别姬", "是一部", "非常", "值得", "看的", "电影"]

# 预请求的模型名称
model = "影视"

  • 输出效果
# 只会产生一个影视的概率
{
    "predictions": [[0.920291483]]
}


  • 函数request_model_serve(word_list, model_list):

  • 输入实例:
# 分词列表
word_list = ["霸王别姬", "是一部", "非常", "值得", "看的", "电影"]

# 预请求的模型列表
model_list = ["影视", "美妆"]

  • 输出效果:
# 会产生多个请求模型的概率, 但小于0.5概率的会被去除
[['影视', 0.920291483]]

  • 主要注释:
# 导入必备的工具包
# 从任意的模型训练文件中导入add_ngram增加n-gram特征以及padding截断函数
# 定义模型配置路径,它指向一个json文件

# model_config.json形如 :
# {"影视": ["/data/django-uwsgi/text_labeled/model_train/movie/Tokenizer", 60, 2,
#           "/data/django-uwsgi/text_labeled/model_train/movie/token_indice", 119,
#           "http://localhost:8501/v1/models/movie:predict"],
# "美妆": ["/data/django-uwsgi/text_labeled/model_train/beauty/Tokenizer", 75, 2,
#           "/data/django-uwsgi/text_labeled/model_train/beauty/token_indice", 119,
#           "http://localhost:8502/v1/models/beauty:predict"]}
# json文件中是一个字典,字典中的每个key是我们标签的中文字符,每个value是一个列表
# 列表的第一项是特征处理时词汇映射器的存储地址
# 第二项是特征处理时语料的截断长度
# 第三项是n-gram取得n值
# 第四项是n-gram特征中token_indice的保存路径
# 第五项是最后的最大的对齐长度
# 第六项是该模型对应的微服务地址
# 最终的模型预测结果列表

    """对输入进行类似与训练前的特征处理过程"""
    # 读取设定好的配置
    # 加载分词映射器
    # 进行截断对齐
    # 获得n-gram映射文件
    # 添加n-gram特征
    # 进行最大长度对齐

    """向单个微服务发送预测请求"""
    # 将持久化的模型配置文件加载到内存
    # 根据名字选择对应的配置列表
    # 对数据进行特征处理
    # 封装成tf-serving需要的数据体
    # 向刚刚封装的微服务发送请求
    # 将该线程中获取的结果放到模型预测结果列表中

    """该函数开启多线程请求封装好的模型微服务"""
        """开启预测线程, 以线程需要执行的函数和函数的输入为参数"""

    # 遍历model_list, 调用开启线程函数_start_thread,会获得一个所有开启后的线程列表
    # 线程将逐一join操作等待所有线程完成
    # 最后过滤掉所有概率预测小于0.5的类别,返回结果


  • 小节总结:

    • 学习了进行多线程预测的原因:
      • 根据我们的业务特点,用户的每次请求,都有可能调用多个模型进行预测,而串行预测的方式,远远不能满足预测的性能要求. 这就需要预测过程必须能够并行化,并很容易的整合全部的结果.

    • 模型预测过程也是计算密集型, 为什么没有受到全局解释锁的影响?
      • 虽然预测过程也是计算密集型的,但是我们对这个计算过程进行了封装, 使它是在模型微服务中进行, 而我们线程只是负责调用服务并整合结果而已, 因此不会受到全局解释锁的影响.

    • 学习并实现了多模型多线程预测的三步曲:
      • 第一步: 将h5格式的模型转化成pb格式。
      • 第二步: 使用docker启动tensorflow-serving微服务。
      • 第三步: 多线程调用微服务并处理结果。

    • 将h5格式的模型转化成pb格式函数: to_savedmodel

    • 使用docker启动tensorflow-serving微服务: docker run命令

    • 多线程调用微服务并处理结果:
      • 函数: fea_process
      • 函数: pred
      • 函数: request_model_serve

本章总结

  • 第一小节: 多模型多进程训练

    • 学习了使用多进程的原因:

      • 在python这门语言中,存在一个全局解释锁,它使系统的多个线程无法一同使用CPU资源,对于计算密集型任务,必须采用多进程方式并行化,而我们的模型训练,正是最典型的计算密集型任务,里面涵盖大量的矩阵计算,因此,我们这里使用多进程训练。

    • 学习了多模型多进程的训练逻辑:

      • 开启第一个模型训练进程,进入训练状态后开始检测占用资源是否小于CPU/内存正常负载值。小于CPU/内存正常负载值,则开启第二个模型训练任务。否则,开始检测占用资源是否大于CPU/内存危险负载值,如果大于,则kill掉这个进程,否则,说明占用率处在正常负载值与危险负载值之间,此时,等待该模型训练进程结束,再自动开启下一个模型训练进程。

  • 第二小节: 多模型多线程预测

    • 学习并实现了多模型多线程预测的三步曲:
      • 第一步: 将h5格式的模型转化成pb格式。
      • 第二步: 使用docker启动tensorflow-serving微服务。
      • 第三步: 多线程调用微服务并处理结果。