当前位置:首页 > 技术分析 > 正文内容

小学生写作业姿势检测模型训练评估

ruisui883个月前 (03-16)技术分析34

自从这个deepseek大火之后,一直想学习模型训练,根据我的需求使用grok3、deepseek、gmini三个大模型,写出了基于MobileNetV2 CNN卷积神经网络的视觉识别高级坐姿识别训练系统,直接放代码,需要1000张+以上正确和非正确坐姿进行训练,MobileNetV2这个模型有点老,2024年已经更新MobileNetV4版本,等我抽空把换成v4模型,性能更好,完整代码如下:


import argparse

import tensorflow as tf

from tensorflow.keras import layers, models, applications, callbacks

from tensorflow.keras.utils import to_categorical

import numpy as np

import os

import logging

import matplotlib.pyplot as plt

from PIL import Image

from sklearn.model_selection import train_test_split

# 配置日志记录

logging.basicConfig(level=logging.INFO,

format='%(asctime)s - %(levelname)s - %(message)s')

# 命令行参数解析(改进版)

def parse_args():

parser = argparse.ArgumentParser(description='高级坐姿识别训练系统')

parser.add_argument('--action', choices=['build', 'train', 'evaluate', 'export'], required=True,

help='执行操作: build(新建模型), train(训练), evaluate(评估), export(导出为TFLite)')

parser.add_argument('--model_path', default='posture_model_v2.h5',

help='模型保存/加载路径(默认:posture_model_v2.h5)')

parser.add_argument('--train_data',

help='必须指定训练数据路径')

parser.add_argument('--image_size', type=int, default=224,

help='输入图像尺寸(默认:224)')

parser.add_argument('--image_path', # 添加的参数

help='要检测的单张图片路径(用于单张图片检测)')

parser.add_argument('--train_labels',

help='必须指定训练标签路径')

parser.add_argument('--test_data',

help='可选测试数据路径')

parser.add_argument('--test_labels',

help='测试标签路径(需与--test_data同时使用)')

parser.add_argument('--epochs', type=int, default=30,

help='最大训练轮次(默认:30)')

parser.add_argument('--batch_size', type=int, default=16,

help='批处理大小(默认:16)')

parser.add_argument('--val_split', type=float, default=0.2,

help='验证集划分比例(默认:0.2,当未提供测试数据时生效)')

parser.add_argument('--learning_rate', type=float, default=1e-4,

help='初始学习率(默认:0.0001)')

return parser.parse_args()

# 数据加载与增强(改进版)

class DataProcessor:

def __init__(self, image_size=224):

self.image_size = image_size

self.augmentor = models.Sequential([

layers.RandomRotation(0.3), # 增强旋转幅度

layers.RandomZoom(0.3), # 增大缩放范围

layers.RandomContrast(0.2),

layers.RandomTranslation(0.1, 0.1), # 新增平移增强

layers.RandomFlip("horizontal_and_vertical"), # 增加垂直翻转

layers.GaussianNoise(0.01) # 添加噪声增强

])

def load_data(self, data_path, labels_path):

try:

data = np.load(data_path)

labels = np.load(labels_path)

self.validate_data(data, labels)

return data, labels

except FileNotFoundError as e:

logging.error(f"文件未找到:{str(e)}")

raise

except Exception as e:

logging.error(f"数据加载失败: {str(e)}")

raise

def validate_data(self, data, labels):

if data.ndim != 4 or data.shape[1:3] != (self.image_size, self.image_size):

raise ValueError(f"无效数据维度,期望(样本数, {self.image_size}, {self.image_size}, 3),实际得到 {data.shape}")

if len(labels) != data.shape[0]:

raise ValueError("数据与标签数量不匹配")

if len(np.unique(labels)) > 2:

raise ValueError("超过两个类别,本系统仅支持二分类")

def preprocess(self, data, labels, is_training=True):

# 数据标准化

#data = data.astype('float32') / 255.0


# 数据增强 (仅在训练阶段应用)

if is_training:

data = self.augmentor(data)

# 标签编码

labels = to_categorical(labels, num_classes=2)

return data, labels

# 迁移学习模型构建

class PostureModel:

def __init__(self, image_size=224, learning_rate=1e-4):

self.image_size = image_size

self.learning_rate = learning_rate

self.base_model = applications.MobileNetV2(

input_shape=(image_size, image_size, 3),

include_top=False,

weights='imagenet',

alpha=1.0 # 修改 alpha 参数为 1.0

)

# 解冻最后30%的层

self.base_model.trainable = True

freeze_up_to = int(len(self.base_model.layers) * 0.7)

for layer in self.base_model.layers[:freeze_up_to]:

layer.trainable = False

def build(self):

inputs = layers.Input(shape=(self.image_size, self.image_size, 3))

x = self.base_model(inputs)

x = layers.GlobalAveragePooling2D()(x)

x = layers.Dropout(0.7)(x) # 增加Dropout比例

x = layers.Dense(128, activation='relu', kernel_regularizer='l2')(x)

outputs = layers.Dense(2, activation='softmax')(x)


model = models.Model(inputs, outputs)

model.compile(

optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate),

loss='categorical_crossentropy',

metrics=['accuracy']

)

return model

# 训练系统

class TrainingSystem:

def __init__(self, args):

self.args = args

self.data_processor = DataProcessor(args.image_size)

self.callbacks = [

callbacks.EarlyStopping(patience=5, restore_best_weights=True),

callbacks.ModelCheckpoint(

filepath=args.model_path,

save_best_only=True,

monitor='val_accuracy'

),

callbacks.ReduceLROnPlateau(factor=0.5, patience=3),

callbacks.TensorBoard(log_dir='./logs')

]


def train(self):

logging.info("开始训练流程...")


# 加载数据

train_data, train_labels = self.data_processor.load_data(

self.args.train_data, self.args.train_labels

)


# 准备验证数据

if self.args.test_data and self.args.test_labels:

val_data, val_labels = self.data_processor.load_data(

self.args.test_data, self.args.test_labels

)

else:

train_data, val_data, train_labels, val_labels = train_test_split(

train_data, train_labels,

test_size=self.args.val_split,

stratify=train_labels

)

# 数据预处理

train_data, train_labels = self.data_processor.preprocess(train_data, train_labels, is_training=True)

val_data, val_labels = self.data_processor.preprocess(val_data, val_labels, is_training=True)

# 构建模型

self.model_builder = PostureModel( # 保存模型构建器引用

image_size=self.args.image_size,

learning_rate=self.args.learning_rate

)

model = self.model_builder.build()

logging.info("模型架构:")

model.summary()


# 添加类权重平衡

class_counts = np.bincount(np.argmax(train_labels, axis=1))

class_weights = {

0: (1 / class_counts[0]) * (len(train_labels) / 2.0),

1: (1 / class_counts[1]) * (len(train_labels) / 2.0)

}


# 初始训练阶段

history = model.fit(

train_data, train_labels,

validation_data=(val_data, val_labels),

epochs=int(self.args.epochs * 0.7), # 70% epochs用于初始训练

batch_size=self.args.batch_size,

callbacks=self.callbacks,

verbose=2

)

# 微调阶段

self.fine_tune(model, train_data, train_labels, val_data, val_labels)

return model

# 修改fine_tune方法中的fit调用,排除ReduceLROnPlateau回调

def fine_tune(self, model, train_data, train_labels, val_data, val_labels):

logging.info("进入微调阶段...")


# 获取基础模型并解冻部分层

base_model = self.model_builder.base_model

base_model.trainable = True

for layer in base_model.layers[:100]:

layer.trainable = False


# 重新编译模型

model.compile(

optimizer=tf.keras.optimizers.Adam(

learning_rate=tf.keras.optimizers.schedules.CosineDecay(

initial_learning_rate=self.args.learning_rate/10,

decay_steps=int(self.args.epochs * 0.3) * len(train_data)//self.args.batch_size

)

),

loss='categorical_crossentropy',

metrics=['accuracy']

)

# 创建不包含ReduceLROnPlateau的回调列表

callbacks_fine_tune = [

cb for cb in self.callbacks

if not isinstance(cb, callbacks.ReduceLROnPlateau)

]

# 微调训练

model.fit(

train_data, train_labels,

validation_data=(val_data, val_labels),

epochs=int(self.args.epochs * 0.3), # 30% epochs用于微调

batch_size=self.args.batch_size,

callbacks=callbacks_fine_tune,

verbose=2

)

model.summary()


def evaluate(self): # 注意这里 evaluate 方法不再需要 data 和 labels 参数

logging.info("开始评估流程...")

model = models.load_model(self.args.model_path)

if self.args.image_path: # 如果提供了 image_path 参数,则进行单张图片检测

logging.info(f"正在检测单张图片: {self.args.image_path}")

try:

# 加载单张图片 (假设您已经安装了 PIL 或 Pillow 库)

img = tf.keras.utils.load_img(self.args.image_path, target_size=(self.args.image_size, self.args.image_size))

img_array = tf.keras.utils.img_to_array(img)

img_array = np.expand_dims(img_array, axis=0) # 扩展维度以匹配模型输入 (batch_size, height, width, channels)

except Exception as e:

logging.error(f"加载图片失败: {str(e)}")

return

# 预处理单张图片

processed_image, _ =
self.data_processor.preprocess(img_array, np.array([0]), is_training=True) # 标签在这里并不重要,只是为了兼容 preprocess 函数的接口

# 进行预测

predictions = model.predict(processed_image)

predicted_class_index = np.argmax(predictions[0]) # 获取概率最高的类别索引

class_names = ['错误坐姿','正确坐姿' ] # 假设 0 代表正确坐姿,1 代表错误坐姿 (需要根据您的训练数据标签来确定)

predicted_class_name = class_names[predicted_class_index]

confidence = predictions[0][predicted_class_index] * 100 # 获取置信度百分比

logging.info(f"预测结果: {predicted_class_name} (置信度: {confidence:.2f}%)")

print(f"预测结果: {predicted_class_name} (置信度: {confidence:.2f}%)") # 打印到控制台

elif self.args.test_data and self.args.test_labels: # 如果提供了测试数据和标签,则进行批量评估 (保持原有的评估逻辑)

data, labels = self.data_processor.load_data(self.args.test_data, self.args.test_labels)


# --- **添加数据检查代码** ---

logging.info("数据检查 START:")

logging.info(f"数据形状 (Data Shape): {data.shape}")

logging.info(f"数据类型 (Data Dtype): {data.dtype}")

logging.info(f"数据最小值 (Data Min Value): {data.min()}")

logging.info(f"数据最大值 (Data Max Value): {data.max()}")

logging.info(f"标签形状 (Labels Shape): {labels.shape}")

logging.info(f"标签类型 (Labels Dtype): {labels.dtype}")

unique_labels, label_counts = np.unique(labels, return_counts=True)

logging.info(f"唯一标签 (Unique Labels): {unique_labels}")

logging.info(f"标签计数 (Label Counts): {label_counts}")

logging.info("数据检查 END:")


data, labels = self.data_processor.preprocess(data, labels)


# --- **添加图像可视化代码** ---

if 1==0:

logging.info("图像可视化 START:")

num_images_to_visualize = min(5, len(data)) # 最多可视化 5 张图像

plt.figure(figsize=(10, 2 * num_images_to_visualize))

for i in range(num_images_to_visualize):

plt.subplot(num_images_to_visualize, 1, i + 1)

plt.imshow(data[i]) # 数据已经预处理 (归一化)

label_index = np.argmax(labels[i]) # one-hot 编码转回标签索引

plt.title(f"预处理图像 {i+1}, 标签: {label_index}")

plt.axis('off')

plt.tight_layout()

plt.show()

logging.info("图像可视化 END: (请查看弹出的图像窗口)")

# --- **结束图像可视化代码** ---

loss, acc = model.evaluate(data, labels, verbose=0)

logging.info(f"测试集评估结果 - 损失: {loss:.4f}, 准确率: {acc:.2%}")

else:

logging.warning("既没有提供单张图片路径,也没有提供测试数据集,评估操作无法执行。请提供 --image_path 参数进行单张图片检测,或提供 --test_data 和 --test_labels 参数进行批量评估。")

# 主程序

def main():

args = parse_args()

system = TrainingSystem(args)

try:

if args.action == 'build':

model = PostureModel(args.image_size).build()

model.save(args.model_path)

logging.info(f"新建模型已保存至 {args.model_path}")

elif args.action == 'train':

# 验证测试数据参数完整性

if (args.test_data and not args.test_labels) or \

(args.test_labels and not args.test_data):

raise ValueError("--test_data 和 --test_labels 必须同时使用")


model = system.train()

logging.info("训练完成,最佳模型已自动保存")

elif args.action == 'evaluate':

system.evaluate() # 直接调用 system.evaluate(),不需要传递参数

elif args.action == 'export':

model = models.load_model(args.model_path)

converter = tf.lite.TFLiteConverter.from_keras_model(model)

tflite_model = converter.convert()

with open('posture_model.tflite', 'wb') as f:

f.write(tflite_model)

logging.info("TFLite模型导出成功")

except Exception as e:

logging.error(f"操作失败:{str(e)}")

logging.info("建议检查:")

logging.info("1. 文件路径是否正确")

logging.info("2. 数据文件是否为numpy数组格式")

logging.info("3. 标签是否只包含0和1")

raise

if __name__ == '__main__':

main()

识别性能可视化

扫描二维码推送至手机访问。

版权声明:本文由ruisui88发布,如需转载请注明出处。

本文链接:http://www.ruisui88.com/post/2795.html

标签: .ndim
分享给朋友:

“小学生写作业姿势检测模型训练评估” 的相关文章

10分钟搞定gitlab-ci自动化部署

gitlab-ci 是持续集成工具/自动化部署工具,类似 jenkins。持续集成 是将代码集成到共享存储库并尽可能早地自动构建/测试每个更改的实践 - 通常一天几次。概述在编码完成时都会进行打包发布过程,如果每次都手动操作这一步骤就会浪费时间,效率低下。所以就有了持续集成。准备事项请提前安装以下软...

迁移GIT仓库并带有历史提交记录

迁移git仓库开发在很多时候,会遇到一个问题。GIT仓库的管理,特别是仓库的迁移。我需要保留已有的历史记录,而不是重新开发,重头再来。我们可以这样做:使用--mirror模式会把本地的分支都克隆。// 先用--bare克隆裸仓库 git clone git@gitee.com:xxx/testApp...

Python 幕后:Python导入import的工作原理

更多互联网精彩资讯、工作效率提升关注【飞鱼在浪屿】(日更新)Python 最容易被误解的方面其中之一是import。Python 导入系统不仅看起来很复杂。因此,即使文档非常好,它也不能让您全面了解正在发生的事情。唯一方法是研究 Python 执行 import 语句时幕后发生的事情。注意:在这篇文...

js中数组filter方法的使用和实现

定义filter() 方法创建一个新数组, 其包含通过所提供函数实现的测试的所有元素。语法var newArray = arr.filter(callback(element[, index[, selfArr]])[, thisArg])参数callback循环数组每个元素时调用的回调函数。回调函...

12种JavaScript中最常用的数组操作整理汇总

数组是最常见的数据结构之一,我们需要绝对自信地使用它。在这里,我将列出 JavaScript 中最重要的几个数组常用操作片段,包括数组长度、替换元素、去重以及许多其他内容。1、数组长度大多数人都知道可以像这样得到数组的长度:const arr = [1, 2, 3]; console.log(a...

微信外H5跳转小程序——组件(vue项目)

场景有个H5(vue项目),需要实现点击商品item跳转到小程序,微信内和微信外都要支持,这里我们只介绍一下H5在微信外的跳转。如图所示,红框内是一个商品,就是点击这里,要跳转小程序:配置微信小程序云开发(云函数)1、开通云开发然后选择免费额度2、云开发权限设置找到权限设置,把这里的「未登录用户访问...