🔥 深度学习实战教程:YOLO目标检测 + 孪生网络实现验证码匹配

本教程涵盖两大经典任务:基于 YOLO 的目标检测(定位图中的 label、title、char 等元素)和基于孪生网络的相似度学习(判断两张验证码图片是否为同一字符)。代码均已测试,可直接用于工业项目。


📦 环境准备(通用)

在运行任何脚本前,请确保安装以下依赖:

pip install ultralytics torch torchvision opencv-python numpy onnxruntime onnx
# 可选:导出 OpenVINO 时需安装 openvino-dev
pip install openvino-dev

GPU 检查(强烈建议使用 NVIDIA GPU):

import torch
print(torch.cuda.is_available())  # 输出 True 表示可用

模块一:YOLO 目标检测(训练 → 导出 → 预测)

本模块使用 Ultralytics YOLO 框架(支持 YOLOv5/v8/v26 等)训练自定义检测器,并演示导出 ONNX 模型及 CPU/GPU 推理。

1. 准备数据集

目录结构

datasets/
└── bilbil/
    ├── train/
    │   ├── images/      # 训练图片 (.jpg, .png)
    │   └── labels/      # 对应 .txt 标签
    └── detect/
        ├── images/      # 验证/测试图片
        └── labels/      # 对应标签

标签格式(YOLO 格式)

每个图片同名 .txt 文件,每行格式:
<类别ID> <x_center> <y_center> <width> <height>
(所有坐标归一化到 0~1 之间)

数据集配置文件 bilbil.yaml

path: D:\captcha\yolo_img_bilbil   # 数据集根目录
train: train/images                # 训练图片相对路径
val: detect/images                 # 验证图片相对路径

names:
  0: label
  1: title
  2: char

2. 训练脚本(关键参数详解)

参数默认值说明
imgsz640输入尺寸,若目标很小可增至 800/1024(增加显存占用)
optimizer'MuSGD'YOLO 新优化器,收敛平滑;若 loss 出现 NaN 可改 AdamW
close_mosaic10最后 10 个 epoch 关闭 mosaic 增强,提升精度
patience50验证指标连续 50 轮不提升则早停
batch16根据显存调整(16 适合 8GB 显存)

完整训练脚本 train.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from ultralytics import YOLO
import torch

def main():
    # 配置参数
    data_yaml = r"D:\captcha\yolov5\data\bilbil.yaml"
    pretrained_model = "yolo26n.pt"   # 可选 n/s/m/l/x
    epochs = 100
    imgsz = 640
    batch = 16
    workers = 8
    lr0 = 0.01
    lrf = 0.01
    optimizer = 'MuSGD'
    momentum = 0.937
    weight_decay = 0.0005
    warmup_epochs = 3
    close_mosaic = 10
    patience = 50
    device = 0 if torch.cuda.is_available() else 'cpu'

    # 加载模型并训练
    model = YOLO(pretrained_model)
    results = model.train(
        data=data_yaml,
        epochs=epochs,
        imgsz=imgsz,
        batch=batch,
        workers=workers,
        lr0=lr0,
        lrf=lrf,
        optimizer=optimizer,
        momentum=momentum,
        weight_decay=weight_decay,
        warmup_epochs=warmup_epochs,
        close_mosaic=close_mosaic,
        patience=patience,
        device=device,
        plots=True,
        save=True
    )

    # 验证最佳模型
    val_results = model.val(data=data_yaml, imgsz=imgsz, batch=batch)
    print(f"mAP50: {val_results.box.map50:.4f}, mAP50-95: {val_results.box.map:.4f}")

    # 导出 ONNX
    model.export(format="onnx", imgsz=imgsz, dynamic=False)
    print("✅ 训练完成,模型已导出为 ONNX")

if __name__ == "__main__":
    main()

训练输出保存在 runs/detect/train*/,重点文件:

  • weights/best.pt – 最佳权重
  • results.png – 训练曲线
  • confusion_matrix.png – 混淆矩阵

3. 导出模型(ONNX / OpenVINO / TensorRT)

独立的导出脚本 export.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from ultralytics import YOLO

def main():
    model_path = r"weights\best.pt"   # 训练好的权重
    imgsz = 640
    data_yaml = r"bilbil.yaml"        # INT8 量化时需要

    model = YOLO(model_path)

    # 导出 ONNX
    model.export(format="onnx", imgsz=imgsz, batch=1, device="cpu", opset=14)

    # 导出 OpenVINO FP32
    # model.export(format="openvino", imgsz=imgsz, half=False)

    # 导出 OpenVINO INT8(需要 data_yaml 做校准)
    # model.export(format="openvino", imgsz=imgsz, int8=True, data=data_yaml)

    # 导出 TensorRT(需 GPU)
    # model.export(format="engine", imgsz=imgsz)

if __name__ == "__main__":
    main()

4. ONNX 推理(纯 CPU/GPU 示例)

import cv2
import numpy as np
import onnxruntime as ort

class YOLO26ONNX:
    def __init__(self, model_path, conf_threshold=0.5):
        self.session = ort.InferenceSession(model_path, providers=['CPUExecutionProvider'])
        self.conf_threshold = conf_threshold
        self.input_name = self.session.get_inputs()[0].name
        self.input_shape = self.session.get_inputs()[0].shape          # [1,3,640,640]
        self.output_name = self.session.get_outputs()[0].name

    def letterbox(self, image, target_size=(640,640)):
        h, w = image.shape[:2]
        scale = min(target_size[0]/w, target_size[1]/h)
        new_w, new_h = int(w*scale), int(h*scale)
        resized = cv2.resize(image, (new_w, new_h))
        canvas = np.full((target_size[1], target_size[0], 3), 114, dtype=np.uint8)
        dw, dh = (target_size[0]-new_w)//2, (target_size[1]-new_h)//2
        canvas[dh:dh+new_h, dw:dw+new_w] = resized
        return canvas, scale, (dw, dh, new_w, new_h)

    def preprocess(self, image):
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        padded, scale, (dw, dh, _, _) = self.letterbox(image_rgb, (self.input_shape[3], self.input_shape[2]))
        padded = padded.astype(np.float32) / 255.0
        tensor = np.transpose(padded, (2,0,1))[None]   # NCHW
        return tensor, scale, (dw, dh)

    def inference(self, image):
        tensor, scale, (dw, dh) = self.preprocess(image)
        outputs = self.session.run([self.output_name], {self.input_name: tensor})[0]  # (1,300,6)
        detections = []
        for det in outputs[0]:
            x1,y1,x2,y2,conf,cls_id = det.tolist()
            if conf < self.conf_threshold:
                continue
            x1 = max(0, (x1 - dw) / scale)
            y1 = max(0, (y1 - dh) / scale)
            x2 = min(image.shape[1], (x2 - dw) / scale)
            y2 = min(image.shape[0], (y2 - dh) / scale)
            detections.append([int(x1), int(y1), int(x2), int(y2), conf, int(cls_id)])
        return detections

if __name__ == "__main__":
    yolo = YOLO26ONNX("best.onnx", conf_threshold=0.5)
    img = cv2.imread("test.jpg")
    results = yolo.inference(img)
    for r in results:
        print(f"类别{r[5]}, 置信度{r[4]:.2f}, 坐标{r[:4]}")

模块二:孪生网络(验证码文字匹配)

适用于点选验证码文字匹配场景:给定两张图片(提示图 + 候选图),输出它们是否为同一字符。

数据集结构

数据集根目录/                     
├── 验证码A/                  # 文件夹名任意,代表一个字符/类别
│   ├── char001.jpg              # 包含 'char' 字样,数字编号 001
│   ├── plan001.jpg              # 包含 'plan' 字样,数字编号 001
│   ├── char002.png
│   ├── plan002.png
│   └── ...                      # 可以有多个不同编号的配对
├── 验证码B/
│   ├── char001.jpeg
│   ├── plan001.jpeg
│   └── ...
└── ...

1. 模型结构:SiameseMobileNetV4

采用双塔共享权重 + 多特征融合设计:

# !/usr/bin/env python
# -*-coding:utf-8 -*-

"""
# File       : SiameseMobileNetV4.py
# Time       :2026/4/30 17:37
# Author     :yujia
# version    :python 3.6
# Description:
"""
import torch
import torch.nn as nn
import timm

class SiameseMobileNetV4(nn.Module):
    """
    MobileNetV4-Conv-Small 孪生网络,特征维度 960
    mobilenetv4_conv_medium                1280
    mobilenetv4_hybrid_medium              1280  暂时不支持导出onnx
    """
    def __init__(self, pretrained=True):
        super().__init__()
        # MobileNetV4-Conv-Small (卷积架构,适合移动端部署)
        self.backbone = timm.create_model('mobilenetv4_conv_medium', pretrained=pretrained, num_classes=0)
        self.feature_dim = 1280   # 全局池化后输出维度
        self.dropout = nn.Dropout(0.2)

        # 相似度判别头
        self.fusion_head = nn.Sequential(
            nn.Linear(self.feature_dim * 4, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Linear(512, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.Linear(128, 1)        # 输出 logits (不加 Sigmoid,配合 BCEWithLogitsLoss)
        )

        # self.fusion_head = nn.Sequential(
        #     nn.Linear(self.feature_dim * 4, 1)
        # )

    def extract_feature(self, x):
        return self.dropout(self.backbone(x))   # [B, 960]

    def forward(self, x1, x2):
        v1 = self.extract_feature(x1)
        v2 = self.extract_feature(x2)
        # 特征融合:拼接原始向量、差值、乘积
        fused = torch.cat([v1, v2, torch.abs(v1 - v2), v1 * v2], dim=1)   # [B, 960*4]
        x = self.fusion_head(fused)
        return x                                 # logits [B, 1]


if __name__ == '__main__':
    if __name__ == "__main__":
        # 测试代码
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        print("Testing SiameseEfficientNet...")
        model1 = SiameseMobileNetV4(pretrained=True).to(device)
        img1 = torch.rand([4, 3, 112, 112]).to(device)
        img2 = torch.rand([4, 3, 112, 112]).to(device)
        out1 = model1(img1, img2)
        print(f"Output shape: {out1.shape}, values: {out1.squeeze().tolist()}")

设计亮点

  • 使用 torch.abs(v1 - v2) 直接捕捉差异,v1 * v2 捕捉共性。
  • MobileNetV4 纯卷积结构,便于导出 ONNX/OpenVINO。
  • 判别头中加入 BatchNorm 和 Dropout 防止过拟合。

2. 数据加载与正负样本构建

数据集按字符文件夹划分训练/验证,防止数据泄露。每一对样本生成规则:

  • 正样本(标签=1):同一字符文件夹内,提示图与候选图编号相同。
  • 负样本(标签=0):同一文件夹内不同编号 或 不同文件夹的随机组合。

核心处理流程:

import os
import re
import random
import numpy as np
import cv2
import torch
from torch.utils.data.dataset import Dataset

# ---------------------------------------------------#
# 图像预处理工具函数(完全保留)
# ---------------------------------------------------#
def cvtColor(image_np):
    if len(image_np.shape) == 3 and image_np.shape[2] == 3:
        return cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)
    elif len(image_np.shape) == 3 and image_np.shape[2] == 4:
        bgr = cv2.cvtColor(image_np, cv2.COLOR_BGRA2BGR)
        return cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
    else:
        return cv2.cvtColor(image_np, cv2.COLOR_GRAY2RGB)

def letterbox_image(image_np, target_size):
    h, w = target_size
    ih, iw = image_np.shape[:2]
    scale = min(w / iw, h / ih)
    nw = int(iw * scale)
    nh = int(ih * scale)
    resized = cv2.resize(image_np, (nw, nh), interpolation=cv2.INTER_CUBIC)
    new_image = np.full((h, w, 3), 128, dtype=np.uint8)
    dx = (w - nw) // 2
    dy = (h - nh) // 2
    new_image[dy:dy+nh, dx:dx+nw] = resized
    return new_image

def preprocess_input(x):
    return x.astype(np.float32) / 255.0

def rand(a=0, b=1):
    return np.random.rand() * (b - a) + a

# ---------------------------------------------------#
# 新的 load_dataset:构建正负样本对并划分
# ---------------------------------------------------#
def load_dataset(dataset_path, train_ratio=0.8):
    """
    返回 train_samples, val_samples
    每个样本为 (img1_path, img2_path, label)  标签 1=同类同编号,0=不同类或同类别不同编号
    按字符文件夹整体划分训练/验证集,保证验证集文件夹在训练时不可见。
    """
    # 1. 收集所有字符文件夹及其编号对
    folder_pairs = []          # 元素:(folder_path, [ (char_path, plan_path), ... ])
    for root, dirs, files in os.walk(dataset_path):
        char_files = [f for f in files if 'char' in f.lower()]
        plan_files = [f for f in files if 'plan' in f.lower()]
        if not char_files or not plan_files:
            continue

        char_dict = {}
        plan_dict = {}
        for f in char_files:
            nums = re.findall(r'\d+', f)
            if nums:
                char_dict[nums[0]] = os.path.join(root, f)
        for f in plan_files:
            nums = re.findall(r'\d+', f)
            if nums:
                plan_dict[nums[0]] = os.path.join(root, f)

        pairs = []
        for num, char_path in char_dict.items():
            if num in plan_dict:
                pairs.append((char_path, plan_dict[num]))
        if pairs:
            folder_pairs.append((root, pairs))

    print(f"共找到 {sum(len(p) for _, p in folder_pairs)} 个有效图像对,文件夹总数: {len(folder_pairs)}")

    # 2. 按文件夹整体划分训练/验证集
    random.seed(42)
    random.shuffle(folder_pairs)
    num_train_folders = int(len(folder_pairs) * train_ratio)
    train_folders = folder_pairs[:num_train_folders]
    val_folders = folder_pairs[num_train_folders:]

    # 3. 在每个 split 内构建正负样本对
    def build_samples(folders):
        # 文件夹内所有编号对,以及文件夹索引(用于跨文件夹选取)
        all_pairs = []          # (char_path, plan_path, folder_idx)
        folder_idx_map = {}     # folder_path -> idx
        for idx, (folder_path, pairs) in enumerate(folders):
            folder_idx_map[folder_path] = idx
            for char_path, plan_path in pairs:
                all_pairs.append((char_path, plan_path, idx))

        # 同文件夹内部,按照 folder_idx 分组
        folder_to_indices = {}
        for i, (_, _, fidx) in enumerate(all_pairs):
            folder_to_indices.setdefault(fidx, []).append(i)

        samples = []
        for idx, (char_path, plan_path, fidx) in enumerate(all_pairs):
            # 正样本:本身的编号对

            samples.append((char_path, plan_path, 1))

            # 负样本构建:优先从同文件夹其他编号对,否则从其他文件夹任意对
            other_pairs_indices = folder_to_indices[fidx]
            if len(other_pairs_indices) > 1:
                # 同文件夹有多个编号对,随机选一个不同于当前的
                while True:
                    neg_local_idx = random.choice(other_pairs_indices)
                    if neg_local_idx != idx:
                        break
                neg_char, neg_plan, _ = all_pairs[neg_local_idx]

                samples.append((char_path, neg_plan, 0))

        return samples

    train_samples = build_samples(train_folders)
    val_samples = build_samples(val_folders)

    print(f"训练集样本数: {len(train_samples)} (其中正: {sum(l for _,_,l in train_samples)}, 负: {sum(1 for _,_,l in train_samples if l==0)})")
    print(f"验证集样本数: {len(val_samples)} (其中正: {sum(l for _,_,l in val_samples)}, 负: {sum(1 for _,_,l in val_samples if l==0)})")
    return train_samples, val_samples


# ---------------------------------------------------#
# 全新的 SiameseDataset:只负责图像读取和增强
# ---------------------------------------------------#
class SiameseDataset(Dataset):
    def __init__(self, samples, input_shape=(112,112), random=True):
        self.samples = samples    # list of (img1_path, img2_path, label)
        self.input_shape = input_shape
        self.random = random

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, index):
        img1_path, img2_path, label = self.samples[index]

        img1 = self._load_and_preprocess(img1_path)
        img2 = self._load_and_preprocess(img2_path)

        return [img1, img2], np.float32(label)

    def _load_and_preprocess(self, img_path):
        image = cv2.imread(img_path)
        if image is None:
            raise FileNotFoundError(...)
        image = cvtColor(image)
        image = letterbox_image(image, self.input_shape)

        # 增强必须作用在 uint8 上(避免浮点色域溢出)
        if self.random:
            image = self._apply_augment(image)  # 现在 image 仍是 uint8

        # 最后归一化
        image = preprocess_input(image)  # 到这里变成 float32 [0,1]
        image = np.transpose(image, (2, 0, 1))
        return image

    def _apply_augment(self, image):
        """你可以在此处替换为更强大的 albumentations 增强"""
        h, w = image.shape[:2]
        if rand() < 0.5:
            image = cv2.flip(image, 1)
        if rand() < 0.5:
            angle = np.random.randint(-15, 15)
            center = (w//2, h//2)
            M = cv2.getRotationMatrix2D(center, angle, 1.0)
            image = cv2.warpAffine(image, M, (w, h), borderValue=(128,128,128))
        if rand() < 0.5:
            hsv = cv2.cvtColor(image, cv2.COLOR_RGB2HSV).astype(np.float32)
            h_shift = rand(-0.1, 0.1) * 180
            s_scale = rand(1-0.7, 1+0.7)
            v_scale = rand(1-0.3, 1+0.3)
            hsv[:,:,0] = (hsv[:,:,0] + h_shift) % 180
            hsv[:,:,1] = np.clip(hsv[:,:,1] * s_scale, 0, 255)
            hsv[:,:,2] = np.clip(hsv[:,:,2] * v_scale, 0, 255)
            image = cv2.cvtColor(hsv.astype(np.uint8), cv2.COLOR_HSV2RGB)
        return image


# ---------------------------------------------------#
# 新的 collate_fn,与训练代码接口完全兼容
# ---------------------------------------------------#
def dataset_collate(batch):
    left_imgs = [item[0][0] for item in batch]   # 每个是 (C,H,W) numpy
    right_imgs = [item[0][1] for item in batch]
    labels = [item[1] for item in batch]

    left_tensor = torch.from_numpy(np.array(left_imgs)).float()
    right_tensor = torch.from_numpy(np.array(right_imgs)).float()
    labels_tensor = torch.from_numpy(np.array(labels)).float().view(-1, 1)

    images = torch.stack([left_tensor, right_tensor], dim=0)  # (2, B, C, H, W)
    return images, labels_tensor


# ---------------------------------------------------#
# 测试:显示几对样本供人工检查
# ---------------------------------------------------#
if __name__ == '__main__':
    data_path = r"D:\captcha\Siamese\data\jiyan"
    train_samples, val_samples = load_dataset(data_path)
    train_dataset = SiameseDataset(train_samples, input_shape=(112,112), random=True)
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=4,
        shuffle=True,
        collate_fn=dataset_collate
    )

    import matplotlib.pyplot as plt
    for batch_idx, (images, labels) in enumerate(train_loader):
        x1, x2 = images  # (2, B, C, H, W)
        labs = labels.squeeze().numpy()
        def to_display(tensor):
            img = tensor.numpy().transpose(1,2,0)
            img = (img * 255).clip(0,255).astype(np.uint8)
            return img
        num = min(4, len(x1))
        fig, axes = plt.subplots(num, 2, figsize=(8, 4*num))
        for i in range(num):
            axes[i,0].imshow(to_display(x1[i]))
            axes[i,1].imshow(to_display(x2[i]))
            axes[i,0].set_title(f"Left (label={labs[i]})")
            axes[i,1].set_title(f"Right (label={labs[i]})")
            for ax in axes[i]:
                ax.axis('off')
        plt.tight_layout()
        plt.show()
        break

3. 训练脚本(分层学习率 + Focal Loss)

# !/usr/bin/env python
# -*-coding:utf-8 -*-

"""
# File       : train.py
# Time       :2026/4/30 16:48
# Author     :yujia
# version    :python 3.6
# Description:
"""
import torch.nn.functional as F
import os
import time
import numpy as np
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score, roc_auc_score
from torch.optim.lr_scheduler import CosineAnnealingLR

# 导入你的数据集相关函数(确保它们在同一目录或被正确导入)
from dataloader import load_dataset, SiameseDataset, dataset_collate
from SiameseEfficientNet import SiameseEfficientNet
from SiameseEdgeNeXt import SiameseEdgeNeXt
from SiameseMobileNetV4 import SiameseMobileNetV4
# ---------------------------- 配置 ----------------------------
DATA_PATH = r"D:\captcha\Siamese\data\vercode1117"
MODEL_TYPE = "MobileNetV4"        # "edgenext" 或 "efficientnet" MobileNetV4
PRETRAINED = True
INPUT_SIZE  = (112, 112)
BATCH_SIZE  = 32               # 实际每卡样本数,因为每个 sample 贡献 2 张图,实际 batch 为 64 对
EPOCHS      = 80
LR_BACKBONE = 1e-4
LR_HEAD     = 1e-3
WEIGHT_DECAY = 1e-4
DEVICE      = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 保存路径
SAVE_DIR = "./checkpoints"
os.makedirs(SAVE_DIR, exist_ok=True)



# ---------------------------- 工具函数 ----------------------------
def compute_metrics(labels, logits):
    """计算准确率、AUC(二分类)"""
    probs = torch.sigmoid(torch.tensor(logits)).numpy()
    preds = (probs >= 0.5).astype(int).flatten()
    labels = np.array(labels).flatten()
    acc = accuracy_score(labels, preds)
    try:
        auc = roc_auc_score(labels, probs.flatten())
    except:
        auc = 0.5
    return acc, auc, preds, probs

def train_one_epoch(model, loader, criterion, optimizer, device, epoch, total_epochs):
    model.train()
    total_loss, all_labels, all_logits = [], [], []
    pbar = tqdm(loader, desc=f"Train Epoch {epoch}/{total_epochs}", leave=False)
    for images, labels_tensor in pbar:
        x1 = images[0].to(device)
        x2 = images[1].to(device)
        targets = labels_tensor.to(device).float().view(-1, 1)

        logits = model(x1, x2)
        loss = focal_bce_loss(logits, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss.append(loss.item())
        all_labels.extend(targets.cpu().tolist())
        all_logits.extend(logits.detach().cpu().tolist())

        # 实时显示当前 loss
        pbar.set_postfix({'loss': f"{loss.item():.4f}"})

    avg_loss = np.mean(total_loss)
    acc, auc, preds, probs = compute_metrics(all_labels, all_logits)
    return avg_loss, acc, auc

def validate(model, loader, criterion, device):
    model.eval()
    total_loss, all_labels, all_logits = [], [], []
    pbar = tqdm(loader, desc="Validation", leave=False)
    with torch.no_grad():
        for images, labels_tensor in pbar:
            x1 = images[0].to(device)
            x2 = images[1].to(device)
            targets = labels_tensor.to(device).float().view(-1, 1)

            logits = model(x1, x2)
            loss = focal_bce_loss(logits, targets)

            total_loss.append(loss.item())
            all_labels.extend(targets.cpu().tolist())
            all_logits.extend(logits.cpu().tolist())

            pbar.set_postfix({'loss': f"{loss.item():.4f}"})

    avg_loss = np.mean(total_loss)
    acc, auc, preds, probs = compute_metrics(all_labels, all_logits)
    return avg_loss, acc, auc


def focal_bce_loss(logits, targets, gamma=2.0, alpha=0.25, smoothing=0.1):
    # 标签平滑
    targets = targets * (1 - smoothing) + 0.5 * smoothing
    bce = F.binary_cross_entropy_with_logits(logits, targets, reduction='none')
    pt = torch.exp(-bce)
    focal = alpha * (1 - pt) ** gamma * bce
    return focal.mean()

def train():
    # ---------------------------- 准备数据 ----------------------------
    print("Loading dataset...")
    train_samples, val_samples = load_dataset(DATA_PATH, train_ratio=0.8)
    train_dataset = SiameseDataset(train_samples, input_shape=INPUT_SIZE, random=True)
    val_dataset = SiameseDataset(val_samples, input_shape=INPUT_SIZE, random=False)


    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True,
                              collate_fn=dataset_collate, num_workers=0, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False,
                            collate_fn=dataset_collate, num_workers=0, pin_memory=True)


    # ---------------------------- 构建模型 ----------------------------

    if MODEL_TYPE == "edgenext":
        model = SiameseEdgeNeXt(pretrained=PRETRAINED)
    elif MODEL_TYPE == "efficientnet":
        model = SiameseEfficientNet(pretrained=PRETRAINED)
    elif MODEL_TYPE == "MobileNetV4":
        model = SiameseMobileNetV4(pretrained=PRETRAINED)
    else:
        raise ValueError("MODEL_TYPE must be 'edgenext' or 'efficientnet'")

    model = model.to(DEVICE)

    # ---------------------------- 损失函数、优化器、调度器 ----------------------------
    criterion = nn.BCEWithLogitsLoss()  # 输入 logits,目标 0/1

    # 分层学习率:backbone 较小,融合头较大
    optimizer = optim.AdamW([
        {'params': model.backbone.parameters(), 'lr': LR_BACKBONE},
        {'params': model.fusion_head.parameters(), 'lr': LR_HEAD},
    ], weight_decay=WEIGHT_DECAY)

    scheduler = CosineAnnealingLR(optimizer, T_max=EPOCHS)

    # 早停相关
    best_val_acc = 0
    patience = 15
    early_stop_counter = 0

    # ---------------------------- 训练 ----------------------------
    print("\nStart training...")
    for epoch in range(EPOCHS):
        start_time = time.time()
        # 传入 epoch 和 EPOCHS 用于进度条描述
        train_loss, train_acc, train_auc = train_one_epoch(
            model, train_loader, criterion, optimizer, DEVICE, epoch + 1, EPOCHS
        )
        val_loss, val_acc, val_auc = validate(model, val_loader, criterion, DEVICE)

        scheduler.step()

        lr_backbone = optimizer.param_groups[0]['lr']
        lr_head = optimizer.param_groups[1]['lr']

        print(f"\nEpoch {epoch + 1:03d}/{EPOCHS} | Time: {time.time() - start_time:.1f}s | "
              f"Train Loss: {train_loss:.4f} Acc: {train_acc:.4f} AUC: {train_auc:.4f} | "
              f"Val Loss: {val_loss:.4f} Acc: {val_acc:.4f} AUC: {val_auc:.4f} | "
              f"LR: backbone={lr_backbone:.2e}, head={lr_head:.2e}")
        torch.save(model.state_dict(), os.path.join(SAVE_DIR, f"last_{MODEL_TYPE}.pth"))
        # 保存最佳模型
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            early_stop_counter = 0
            torch.save(model.state_dict(), os.path.join(SAVE_DIR, f"best_{MODEL_TYPE}.pth"))
            print(f"\n  => Best model saved (val_acc={val_acc:.4f})")
        else:
            early_stop_counter += 1

        if early_stop_counter >= patience:
            print(f"Early stopping triggered after {epoch + 1} epochs.")
            break

    print("Training finished. Best val loss: {:.4f}".format(best_val_acc))


if __name__ == '__main__':
    train()

训练监控指标:验证集准确率(Val Acc)和 AUC。若训练准确率远高于验证准确率,需增加 Dropout 或减小模型容量。

4. 推理与部署

训练完成后导出 ONNX:

# !/usr/bin/env python
# -*-coding:utf-8 -*-

"""
# File       : export.py.py
# Time       :2026/4/30 17:53
# Author     :yujia
# version    :python 3.6
# Description:
"""
import os
import torch
import torch.nn as nn
import timm

from SiameseMobileNetV4 import SiameseMobileNetV4



def export_onnx(model, onnx_path, input_size=(112, 112)):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    x1 = torch.randn(1, 3, *input_size, device=device)
    x2 = torch.randn(1, 3, *input_size, device=device)

    dynamic_axes = {
        "input1": {0: "batch"},
        "input2": {0: "batch"},
        # "logits": {0: "batch"},       # 输出也声明为动态 batch
    }

    torch.onnx.export(
        model,
        (x1, x2),
        onnx_path,
        export_params=True,
        opset_version=14,            # 稳定且支持动态轴
        do_constant_folding=True,
        input_names=["input1", "input2"],
        output_names=["logits"],
        dynamic_axes=dynamic_axes,
        dynamo=False                  # 关键:禁用 dynamo,使用经典 TorchScript 导出
    )
    print(f"✅ ONNX exported to: {onnx_path}")


def validate_onnx(onnx_path, input_size=(112, 112)):
    import onnxruntime
    import numpy as np

    session = onnxruntime.InferenceSession(onnx_path)
    x1 = np.random.randn(2, 3, *input_size).astype(np.float32)   # batch=2 测试动态
    x2 = np.random.randn(2, 3, *input_size).astype(np.float32)

    ort_inputs = {
        session.get_inputs()[0].name: x1,
        session.get_inputs()[1].name: x2
    }
    outputs = session.run(None, ort_inputs)
    print(f"✅ Validate OK. Output shape: {outputs[0].shape}")


if __name__ == "__main__":
    WEIGHT_PATH = "checkpoints/best_MobileNetV4.pth"
    ONNX_PATH   = "onnx/siamese_mobilenetv4_hybrid_medium.onnx"

    os.makedirs("onnx", exist_ok=True)

    # 创建模型并加载训练权重
    model = SiameseMobileNetV4(pretrained=False)
    state_dict = torch.load(WEIGHT_PATH, map_location="cpu")
    model.load_state_dict(state_dict, strict=True)

    # 导出 ONNX
    export_onnx(model, ONNX_PATH, input_size=(112, 112))

    # 验证动态 batch
    validate_onnx(ONNX_PATH, input_size=(112, 112))

使用 ONNX Runtime 推理:

# !/usr/bin/env python
# -*-coding:utf-8 -*-

"""
# File       : val_onnx.py
# Time       :2026/4/30 17:54
# Author     :yujia
# version    :python 3.6
# Description:
"""
import os
import cv2
import numpy as np
import onnxruntime as ort


def cvtColor(image_np):
    """确保图像为 3 通道 RGB 格式"""
    if len(image_np.shape) == 3 and image_np.shape[2] == 3:
        return cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)
    elif len(image_np.shape) == 3 and image_np.shape[2] == 4:
        bgr = cv2.cvtColor(image_np, cv2.COLOR_BGRA2BGR)
        return cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
    else:
        return cv2.cvtColor(image_np, cv2.COLOR_GRAY2RGB)

def letterbox_image(image_np, target_size):
    h, w = target_size
    ih, iw = image_np.shape[:2]
    scale = min(w / iw, h / ih)
    nw = int(iw * scale)
    nh = int(ih * scale)
    resized = cv2.resize(image_np, (nw, nh), interpolation=cv2.INTER_CUBIC)
    new_image = np.full((h, w, 3), 128, dtype=np.uint8)
    dx = (w - nw) // 2
    dy = (h - nh) // 2
    new_image[dy:dy+nh, dx:dx+nw] = resized
    return new_image

def preprocess_input(x):
    return x.astype(np.float32) / 255.0

def preprocess_image(img: np.ndarray, input_size=(112, 112)) -> np.ndarray:
    """读取图像并预处理,返回形状为 [1, 3, H, W] 的 numpy 数组"""
    img = cvtColor(img)
    img = letterbox_image(img, input_size)
    img = preprocess_input(img)                     # 归一化到 [0, 1]
    img = np.transpose(img, (2, 0, 1)).astype(np.float32)  # HWC -> CHW
    return np.expand_dims(img, axis=0)              # [1, 3, H, W]


# ===================== ONNX 推理接口 =====================
class ONNXInference:
    def __init__(self, onnx_path: str, device: str = 'cpu', input_size=(112, 112)):
        """
        onnx_path : ONNX 模型文件路径
        device    : 'cpu' 或 'cuda' (需要 onnxruntime-gpu)
        input_size: 输入图像尺寸,需与导出时一致
        """
        self.input_size = input_size

        # 配置推理提供者
        providers = ['CPUExecutionProvider']
        if device == 'cuda':
            providers.insert(0, 'CUDAExecutionProvider')

        self.session = ort.InferenceSession(onnx_path, providers=providers)
        self.input_names = [inp.name for inp in self.session.get_inputs()]
        self.output_names = [out.name for out in self.session.get_outputs()]
        print(f"ONNX model loaded. Inputs: {self.input_names}, Outputs: {self.output_names}")

    def predict_pair(self, img1_path: str, img2_path: str) -> float:
        """比较两张图像,返回相似概率 (0~1)"""
        img1 = preprocess_image(img1_path, self.input_size)
        img2 = preprocess_image(img2_path, self.input_size)

        # 注意:输入名称需与导出时一致(默认为 'input1', 'input2')
        ort_inputs = {
            self.input_names[0]: img1,
            self.input_names[1]: img2
        }
        logits = self.session.run(self.output_names, ort_inputs)[0]   # shape: [1, 1]
        prob = 1.0 / (1.0 + np.exp(-logits))  # sigmoid
        return float(prob[0, 0])

    def reason_all_batch(self, image_1_list, image_2_list):
        """
        批量计算两组图片之间的所有组合相似度
        :param image_1_list: 图片路径列表(或已预处理数组),长度 N
        :param image_2_list: 图片路径列表(或已预处理数组),长度 M
        :return: 二维列表 scores[N][M],scores[i][j] 为 image_1[i] 与 image_2[j] 的相似概率
        """
        N = len(image_1_list)
        M = len(image_2_list)
        processed_1 = [preprocess_image(img) for img in image_1_list]
        processed_2 = [preprocess_image(img) for img in image_2_list]

        # 2. 构造笛卡尔积 batch
        x1_list = []
        x2_list = []
        for p1 in processed_1:
            x1_list.extend([p1] * M)  # 每个 char 复制 M 份
            x2_list.extend(processed_2)  # 每份配对所有 target
        # 沿 batch 轴拼接
        x1_batch = np.concatenate(x1_list, axis=0)  # (N*M, C, H, W)
        x2_batch = np.concatenate(x2_list, axis=0)
        print(x1_batch.shape, x2_batch.shape)
        # 3. 一次推理(注意输入名需与 ONNX 模型保持一致)
        # 如果你导出的模型输入名为 "input1", "input2",请替换这里
        ort_inputs = {self.input_names[0]: x1_batch, self.input_names[1]: x2_batch}
        logits = self.session.run(self.output_names, ort_inputs)[0]  # (N*M, 1)

        # 4. Sigmoid 得到概率
        probs = 1.0 / (1.0 + np.exp(-logits))  # 稳定的 sigmoid
        probs = probs.flatten().tolist()

        # 5. 重塑成 N×M 矩阵
        scores = [probs[i * M: (i + 1) * M] for i in range(N)]
        return scores



# ===================== 使用示例 =====================
if __name__ == '__main__':
    # 配置
    ONNX_PATH = "onnx/siamese_mobilenetv4_hybrid_medium.onnx"   # 你的 ONNX 模型路径
    DEVICE = "cpu"                                 # 或 "cuda"

    # 初始化 ONNX 推理器
    infer = ONNXInference(ONNX_PATH, device=DEVICE)
    char_1 = cv2.imread("char_1.jpg")
    plan_1 = cv2.imread("plan_1.jpg")
    plan_2 = cv2.imread("plan_2.jpg")

    # 单对相似度预测
    prob = infer.predict_pair(char_1, plan_1)
    print(f"两图相似概率: {prob:.4f}")

    prob = infer.predict_pair(char_1, plan_2)
    print(f"两图相似概率: {prob:.4f}")


✅ 总结

模块适用场景关键技术
YOLO 检测定位验证码中的多个目标(字符、标题等)Mosaic 增强、MuSGD 优化器、ONNX 部署
孪生网络点选验证码文字匹配、相似度判断特征差异融合、分层学习率、Focal Loss

两份代码均支持 GPU 加速和轻量化部署,可直接集成到生产环境。如有疑问,欢迎根据代码中的注释进行调整。