3_3_快速开始#

作者: ZhouLong
创建日期: 2026 年 02 月 07 日
版本: 1.0

浏览量:

这是一个基于PyTorch框架的机器学习入门教程,主要展示了如何使用PyTorch构建、训练和部署一个神经网络模型来完成FashionMNIST图像分类任务。本篇完整展示了PyTorch的基本流程,适合初学者理解PyTorch核心概念。

1 处理数据#

PyTorch 提供了两个处理数据的基本工具:torch.utils.data.DataLoadertorch.utils.data.DatasetDataset 存储样本及其对应的标签,而 DataLoaderDataset 外部包装了一个可迭代对象。

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

PyTorch 提供了特定领域的库,如 TorchText、TorchVision 和 TorchAudio,它们都包含数据集。在本教程中,我们将使用一个 TorchVision 数据集。

torchvision.datasets 模块包含许多真实世界视觉数据的 Dataset 对象,如 CIFAR、COCO(完整列表在此)。在本教程中,我们使用 FashionMNIST 数据集。每个 TorchVision Dataset 都包含两个参数:transformtarget_transform,分别用于修改样本和标签。

# 从开放数据集下载训练数据。
training_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# 从开放数据集下载测试数据。
test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

(下载进度输出示例,此处略去)

我们将 Dataset 作为参数传递给 DataLoader。这会将我们的数据集包装成一个可迭代对象,并支持自动批处理、采样、打乱顺序和多进程数据加载。这里我们定义批量大小为 64,即 dataloader 可迭代对象中的每个元素将返回一个包含 64 个特征和标签的批次。

batch_size = 64

# 创建数据加载器。
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

输出:

Shape of X [N, C, H, W]: torch.Size([64, 1, 28, 28])
Shape of y: torch.Size([64]) torch.int64

2 创建模型#

要在 PyTorch 中定义神经网络,我们创建一个继承自 nn.Module 的类。我们在 __init__ 函数中定义网络的层,并在 forward 函数中指定数据将如何通过网络。为了加速神经网络中的运算,我们将其移动到加速器(如 CUDA、MPS、MTIA 或 XPU)。如果当前有可用的加速器,我们将使用它;否则,使用 CPU。

device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")

# 定义模型
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

model = NeuralNetwork().to(device)
print(model)

输出示例:

Using cuda device
NeuralNetwork(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=784, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=512, bias=True)
    (3): ReLU()
    (4): Linear(in_features=512, out_features=10, bias=True)
  )
)

3 优化模型参数#

要训练模型,我们需要一个损失函数和一个优化器。

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

在单个训练循环中,模型对训练数据集(以批次形式输入)进行预测,并通过反向传播预测误差来调整模型的参数。

def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # 计算预测误差
        pred = model(X)
        loss = loss_fn(pred, y)

        # 反向传播
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

我们还会根据测试数据集检查模型的性能,以确保它正在学习。

def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

训练过程会经过多次迭代(轮次)。在每一轮中,模型学习参数以做出更好的预测。我们在每个轮次打印模型的准确率和损失;我们希望看到准确率增加,损失随着每个轮次减少。

epochs = 5
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")

输出示例:

Epoch 1
-------------------------------
loss: 2.299333  [   64/60000]
loss: 2.288771  [ 6464/60000]
loss: 2.264925  [12864/60000]
...
Test Error:
 Accuracy: 64.9%, Avg loss: 1.080188

Done!

4 保存模型#

保存模型的常见方法是序列化内部状态字典(包含模型参数)。

torch.save(model.state_dict(), "model.pth")
print("Saved PyTorch Model State to model.pth")

输出:

Saved PyTorch Model State to model.pth

5 加载模型#

加载模型的过程包括重新创建模型结构并将状态字典加载到其中。

model = NeuralNetwork().to(device)
model.load_state_dict(torch.load("model.pth", weights_only=True))

输出:

<All keys matched successfully>

现在可以使用此模型进行预测。

classes = [
    "T恤/上衣",
    "裤子",
    "套头衫",
    "连衣裙",
    "外套",
    "凉鞋",
    "衬衫",
    "运动鞋",
    "包",
    "短靴",
]

model.eval()
x, y = test_data[0][0], test_data[0][1]
with torch.no_grad():
    x = x.to(device)
    pred = model(x)
    predicted, actual = classes[pred[0].argmax(0)], classes[y]
    print(f'Predicted: "{predicted}", Actual: "{actual}"')

输出示例:

Predicted: "短靴", Actual: "短靴"

6 完整代码流程#

可以复制下述完整代码来运行

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# ==================== 1. 数据准备阶段 ====================
print("=== 数据准备阶段 ===")

# 设置设备
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"使用设备: {device}")

# 下载数据集
transform = transforms.ToTensor()
train_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=transform
)
test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=transform
)

print(f"训练集大小: {len(train_data)}")
print(f"测试集大小: {len(test_data)}")

# 创建数据加载器
batch_size = 64
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

# 查看一个批次的数据
for X, y in test_loader:
    print(f"批次数据形状: X: {X.shape}, y: {y.shape}")
    break

# ==================== 2. 模型构建阶段 ====================
print("\n=== 模型构建阶段 ===")

class SimpleNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.network = nn.Sequential(
            nn.Linear(28*28, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 10)
        )
    
    def forward(self, x):
        x = self.flatten(x)
        return self.network(x)

# 创建模型
model = SimpleNet().to(device)
print(f"模型结构:\n{model}")
print(f"总参数数量: {sum(p.numel() for p in model.parameters()):,}")

# ==================== 3. 训练配置阶段 ====================
print("\n=== 训练配置阶段 ===")

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# ==================== 4. 训练循环阶段 ====================
print("\n=== 开始训练 ===")

def train_one_epoch(epoch, dataloader, model, loss_fn, optimizer):
    """训练一个epoch"""
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        
        # 前向传播
        pred = model(X)
        loss = loss_fn(pred, y)
        total_loss += loss.item()
        
        # 计算准确率
        correct += (pred.argmax(1) == y).sum().item()
        total += len(y)
        
        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # 每100个批次打印一次
        if batch % 100 == 0:
            current = batch * len(X)
            size = len(dataloader.dataset)
            print(f"Epoch {epoch}: [{current:>5d}/{size:>5d}]")
    
    avg_loss = total_loss / len(dataloader)
    accuracy = 100 * correct / total
    return avg_loss, accuracy

def test(dataloader, model, loss_fn):
    """测试模型"""
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            total_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).sum().item()
            total += len(y)
    
    avg_loss = total_loss / len(dataloader)
    accuracy = 100 * correct / total
    return avg_loss, accuracy

# 训练多个epoch
epochs = 5
train_losses = []
test_losses = []
train_accs = []
test_accs = []

for epoch in range(epochs):
    print(f"\nEpoch {epoch+1}/{epochs}")
    print("-" * 40)
    
    # 训练
    train_loss, train_acc = train_one_epoch(epoch+1, train_loader, model, loss_fn, optimizer)
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    
    # 测试
    test_loss, test_acc = test(test_loader, model, loss_fn)
    test_losses.append(test_loss)
    test_accs.append(test_acc)
    
    print(f"训练损失: {train_loss:.4f}, 训练准确率: {train_acc:.2f}%")
    print(f"测试损失: {test_loss:.4f}, 测试准确率: {test_acc:.2f}%")

# ==================== 5. 模型保存阶段 ====================
print("\n=== 模型保存阶段 ===")

# 保存模型
torch.save(model.state_dict(), "model.pth")
print(" 模型已保存为 model.pth")

# ==================== 6. 模型加载和预测阶段 ====================
print("\n=== 模型加载和预测阶段 ===")

# 加载模型
loaded_model = SimpleNet().to(device)
loaded_model.load_state_dict(torch.load("model.pth", weights_only=True))
loaded_model.eval()
print(" 模型加载成功")

# 类别标签
classes = [
    "T恤/上衣", "裤子", "套头衫", "连衣裙", "外套",
    "凉鞋", "衬衫", "运动鞋", "包", "短靴"
]

# 进行预测
def predict_single_sample(model, sample, true_label):
    """预测单个样本"""
    with torch.no_grad():
        sample = sample.to(device).unsqueeze(0)  # 增加批次维度
        pred = model(sample)
        predicted_class = pred.argmax(1).item()
        
    predicted_label = classes[predicted_class]
    true_label_name = classes[true_label]
    
    return predicted_label, true_label_name

# 预测测试集中的几个样本
print("\n预测结果:")
print("-" * 40)

for i in range(5):
    sample, label = test_data[i]
    pred_label, true_label = predict_single_sample(loaded_model, sample, label)
    result = "✅" if pred_label == true_label else "❌"
    print(f"样本 {i+1}: 预测={pred_label:12s} | 实际={true_label:12s} {result}")


# ==================== 7. 最终评估 ====================
print("\n=== 最终模型评估 ===")

final_test_loss, final_test_acc = test(test_loader, loaded_model, loss_fn)
print(f"最终测试准确率: {final_test_acc:.2f}%")
print(f"最终测试损失: {final_test_loss:.4f}")
print("\n" + "="*50)
print("🎉 完整流程执行完毕!")
print("="*50)