小学生写作业姿势检测模型训练评估
自从这个deepseek大火之后,一直想学习模型训练,根据我的需求使用grok3、deepseek、gmini三个大模型,写出了基于MobileNetV2 CNN卷积神经网络的视觉识别高级坐姿识别训练系统,直接放代码,需要1000张+以上正确和非正确坐姿进行训练,MobileNetV2这个模型有点老,2024年已经更新MobileNetV4版本,等我抽空把换成v4模型,性能更好,完整代码如下:
import argparse
import tensorflow as tf
from tensorflow.keras import layers, models, applications, callbacks
from tensorflow.keras.utils import to_categorical
import numpy as np
import os
import logging
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.model_selection import train_test_split
# 配置日志记录
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
# 命令行参数解析(改进版)
def parse_args():
parser = argparse.ArgumentParser(description='高级坐姿识别训练系统')
parser.add_argument('--action', choices=['build', 'train', 'evaluate', 'export'], required=True,
help='执行操作: build(新建模型), train(训练), evaluate(评估), export(导出为TFLite)')
parser.add_argument('--model_path', default='posture_model_v2.h5',
help='模型保存/加载路径(默认:posture_model_v2.h5)')
parser.add_argument('--train_data',
help='必须指定训练数据路径')
parser.add_argument('--image_size', type=int, default=224,
help='输入图像尺寸(默认:224)')
parser.add_argument('--image_path', # 添加的参数
help='要检测的单张图片路径(用于单张图片检测)')
parser.add_argument('--train_labels',
help='必须指定训练标签路径')
parser.add_argument('--test_data',
help='可选测试数据路径')
parser.add_argument('--test_labels',
help='测试标签路径(需与--test_data同时使用)')
parser.add_argument('--epochs', type=int, default=30,
help='最大训练轮次(默认:30)')
parser.add_argument('--batch_size', type=int, default=16,
help='批处理大小(默认:16)')
parser.add_argument('--val_split', type=float, default=0.2,
help='验证集划分比例(默认:0.2,当未提供测试数据时生效)')
parser.add_argument('--learning_rate', type=float, default=1e-4,
help='初始学习率(默认:0.0001)')
return parser.parse_args()
# 数据加载与增强(改进版)
class DataProcessor:
def __init__(self, image_size=224):
self.image_size = image_size
self.augmentor = models.Sequential([
layers.RandomRotation(0.3), # 增强旋转幅度
layers.RandomZoom(0.3), # 增大缩放范围
layers.RandomContrast(0.2),
layers.RandomTranslation(0.1, 0.1), # 新增平移增强
layers.RandomFlip("horizontal_and_vertical"), # 增加垂直翻转
layers.GaussianNoise(0.01) # 添加噪声增强
])
def load_data(self, data_path, labels_path):
try:
data = np.load(data_path)
labels = np.load(labels_path)
self.validate_data(data, labels)
return data, labels
except FileNotFoundError as e:
logging.error(f"文件未找到:{str(e)}")
raise
except Exception as e:
logging.error(f"数据加载失败: {str(e)}")
raise
def validate_data(self, data, labels):
if data.ndim != 4 or data.shape[1:3] != (self.image_size, self.image_size):
raise ValueError(f"无效数据维度,期望(样本数, {self.image_size}, {self.image_size}, 3),实际得到 {data.shape}")
if len(labels) != data.shape[0]:
raise ValueError("数据与标签数量不匹配")
if len(np.unique(labels)) > 2:
raise ValueError("超过两个类别,本系统仅支持二分类")
def preprocess(self, data, labels, is_training=True):
# 数据标准化
#data = data.astype('float32') / 255.0
# 数据增强 (仅在训练阶段应用)
if is_training:
data = self.augmentor(data)
# 标签编码
labels = to_categorical(labels, num_classes=2)
return data, labels
# 迁移学习模型构建
class PostureModel:
def __init__(self, image_size=224, learning_rate=1e-4):
self.image_size = image_size
self.learning_rate = learning_rate
self.base_model = applications.MobileNetV2(
input_shape=(image_size, image_size, 3),
include_top=False,
weights='imagenet',
alpha=1.0 # 修改 alpha 参数为 1.0
)
# 解冻最后30%的层
self.base_model.trainable = True
freeze_up_to = int(len(self.base_model.layers) * 0.7)
for layer in self.base_model.layers[:freeze_up_to]:
layer.trainable = False
def build(self):
inputs = layers.Input(shape=(self.image_size, self.image_size, 3))
x = self.base_model(inputs)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dropout(0.7)(x) # 增加Dropout比例
x = layers.Dense(128, activation='relu', kernel_regularizer='l2')(x)
outputs = layers.Dense(2, activation='softmax')(x)
model = models.Model(inputs, outputs)
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate),
loss='categorical_crossentropy',
metrics=['accuracy']
)
return model
# 训练系统
class TrainingSystem:
def __init__(self, args):
self.args = args
self.data_processor = DataProcessor(args.image_size)
self.callbacks = [
callbacks.EarlyStopping(patience=5, restore_best_weights=True),
callbacks.ModelCheckpoint(
filepath=args.model_path,
save_best_only=True,
monitor='val_accuracy'
),
callbacks.ReduceLROnPlateau(factor=0.5, patience=3),
callbacks.TensorBoard(log_dir='./logs')
]
def train(self):
logging.info("开始训练流程...")
# 加载数据
train_data, train_labels = self.data_processor.load_data(
self.args.train_data, self.args.train_labels
)
# 准备验证数据
if self.args.test_data and self.args.test_labels:
val_data, val_labels = self.data_processor.load_data(
self.args.test_data, self.args.test_labels
)
else:
train_data, val_data, train_labels, val_labels = train_test_split(
train_data, train_labels,
test_size=self.args.val_split,
stratify=train_labels
)
# 数据预处理
train_data, train_labels = self.data_processor.preprocess(train_data, train_labels, is_training=True)
val_data, val_labels = self.data_processor.preprocess(val_data, val_labels, is_training=True)
# 构建模型
self.model_builder = PostureModel( # 保存模型构建器引用
image_size=self.args.image_size,
learning_rate=self.args.learning_rate
)
model = self.model_builder.build()
logging.info("模型架构:")
model.summary()
# 添加类权重平衡
class_counts = np.bincount(np.argmax(train_labels, axis=1))
class_weights = {
0: (1 / class_counts[0]) * (len(train_labels) / 2.0),
1: (1 / class_counts[1]) * (len(train_labels) / 2.0)
}
# 初始训练阶段
history = model.fit(
train_data, train_labels,
validation_data=(val_data, val_labels),
epochs=int(self.args.epochs * 0.7), # 70% epochs用于初始训练
batch_size=self.args.batch_size,
callbacks=self.callbacks,
verbose=2
)
# 微调阶段
self.fine_tune(model, train_data, train_labels, val_data, val_labels)
return model
# 修改fine_tune方法中的fit调用,排除ReduceLROnPlateau回调
def fine_tune(self, model, train_data, train_labels, val_data, val_labels):
logging.info("进入微调阶段...")
# 获取基础模型并解冻部分层
base_model = self.model_builder.base_model
base_model.trainable = True
for layer in base_model.layers[:100]:
layer.trainable = False
# 重新编译模型
model.compile(
optimizer=tf.keras.optimizers.Adam(
learning_rate=tf.keras.optimizers.schedules.CosineDecay(
initial_learning_rate=self.args.learning_rate/10,
decay_steps=int(self.args.epochs * 0.3) * len(train_data)//self.args.batch_size
)
),
loss='categorical_crossentropy',
metrics=['accuracy']
)
# 创建不包含ReduceLROnPlateau的回调列表
callbacks_fine_tune = [
cb for cb in self.callbacks
if not isinstance(cb, callbacks.ReduceLROnPlateau)
]
# 微调训练
model.fit(
train_data, train_labels,
validation_data=(val_data, val_labels),
epochs=int(self.args.epochs * 0.3), # 30% epochs用于微调
batch_size=self.args.batch_size,
callbacks=callbacks_fine_tune,
verbose=2
)
model.summary()
def evaluate(self): # 注意这里 evaluate 方法不再需要 data 和 labels 参数
logging.info("开始评估流程...")
model = models.load_model(self.args.model_path)
if self.args.image_path: # 如果提供了 image_path 参数,则进行单张图片检测
logging.info(f"正在检测单张图片: {self.args.image_path}")
try:
# 加载单张图片 (假设您已经安装了 PIL 或 Pillow 库)
img = tf.keras.utils.load_img(self.args.image_path, target_size=(self.args.image_size, self.args.image_size))
img_array = tf.keras.utils.img_to_array(img)
img_array = np.expand_dims(img_array, axis=0) # 扩展维度以匹配模型输入 (batch_size, height, width, channels)
except Exception as e:
logging.error(f"加载图片失败: {str(e)}")
return
# 预处理单张图片
processed_image, _ =
self.data_processor.preprocess(img_array, np.array([0]), is_training=True) # 标签在这里并不重要,只是为了兼容 preprocess 函数的接口
# 进行预测
predictions = model.predict(processed_image)
predicted_class_index = np.argmax(predictions[0]) # 获取概率最高的类别索引
class_names = ['错误坐姿','正确坐姿' ] # 假设 0 代表正确坐姿,1 代表错误坐姿 (需要根据您的训练数据标签来确定)
predicted_class_name = class_names[predicted_class_index]
confidence = predictions[0][predicted_class_index] * 100 # 获取置信度百分比
logging.info(f"预测结果: {predicted_class_name} (置信度: {confidence:.2f}%)")
print(f"预测结果: {predicted_class_name} (置信度: {confidence:.2f}%)") # 打印到控制台
elif self.args.test_data and self.args.test_labels: # 如果提供了测试数据和标签,则进行批量评估 (保持原有的评估逻辑)
data, labels = self.data_processor.load_data(self.args.test_data, self.args.test_labels)
# --- **添加数据检查代码** ---
logging.info("数据检查 START:")
logging.info(f"数据形状 (Data Shape): {data.shape}")
logging.info(f"数据类型 (Data Dtype): {data.dtype}")
logging.info(f"数据最小值 (Data Min Value): {data.min()}")
logging.info(f"数据最大值 (Data Max Value): {data.max()}")
logging.info(f"标签形状 (Labels Shape): {labels.shape}")
logging.info(f"标签类型 (Labels Dtype): {labels.dtype}")
unique_labels, label_counts = np.unique(labels, return_counts=True)
logging.info(f"唯一标签 (Unique Labels): {unique_labels}")
logging.info(f"标签计数 (Label Counts): {label_counts}")
logging.info("数据检查 END:")
data, labels = self.data_processor.preprocess(data, labels)
# --- **添加图像可视化代码** ---
if 1==0:
logging.info("图像可视化 START:")
num_images_to_visualize = min(5, len(data)) # 最多可视化 5 张图像
plt.figure(figsize=(10, 2 * num_images_to_visualize))
for i in range(num_images_to_visualize):
plt.subplot(num_images_to_visualize, 1, i + 1)
plt.imshow(data[i]) # 数据已经预处理 (归一化)
label_index = np.argmax(labels[i]) # one-hot 编码转回标签索引
plt.title(f"预处理图像 {i+1}, 标签: {label_index}")
plt.axis('off')
plt.tight_layout()
plt.show()
logging.info("图像可视化 END: (请查看弹出的图像窗口)")
# --- **结束图像可视化代码** ---
loss, acc = model.evaluate(data, labels, verbose=0)
logging.info(f"测试集评估结果 - 损失: {loss:.4f}, 准确率: {acc:.2%}")
else:
logging.warning("既没有提供单张图片路径,也没有提供测试数据集,评估操作无法执行。请提供 --image_path 参数进行单张图片检测,或提供 --test_data 和 --test_labels 参数进行批量评估。")
# 主程序
def main():
args = parse_args()
system = TrainingSystem(args)
try:
if args.action == 'build':
model = PostureModel(args.image_size).build()
model.save(args.model_path)
logging.info(f"新建模型已保存至 {args.model_path}")
elif args.action == 'train':
# 验证测试数据参数完整性
if (args.test_data and not args.test_labels) or \
(args.test_labels and not args.test_data):
raise ValueError("--test_data 和 --test_labels 必须同时使用")
model = system.train()
logging.info("训练完成,最佳模型已自动保存")
elif args.action == 'evaluate':
system.evaluate() # 直接调用 system.evaluate(),不需要传递参数
elif args.action == 'export':
model = models.load_model(args.model_path)
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
with open('posture_model.tflite', 'wb') as f:
f.write(tflite_model)
logging.info("TFLite模型导出成功")
except Exception as e:
logging.error(f"操作失败:{str(e)}")
logging.info("建议检查:")
logging.info("1. 文件路径是否正确")
logging.info("2. 数据文件是否为numpy数组格式")
logging.info("3. 标签是否只包含0和1")
raise
if __name__ == '__main__':
main()
识别性能可视化