3_3_快速开始#
作者: ZhouLong
创建日期: 2026 年 02 月 07 日
版本: 1.0
浏览量:
这是一个基于PyTorch框架的机器学习入门教程,主要展示了如何使用PyTorch构建、训练和部署一个神经网络模型来完成FashionMNIST图像分类任务。本篇完整展示了PyTorch的基本流程,适合初学者理解PyTorch核心概念。
1 处理数据#
PyTorch 提供了两个处理数据的基本工具:torch.utils.data.DataLoader 和 torch.utils.data.Dataset。Dataset 存储样本及其对应的标签,而 DataLoader 在 Dataset 外部包装了一个可迭代对象。
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
PyTorch 提供了特定领域的库,如 TorchText、TorchVision 和 TorchAudio,它们都包含数据集。在本教程中,我们将使用一个 TorchVision 数据集。
torchvision.datasets 模块包含许多真实世界视觉数据的 Dataset 对象,如 CIFAR、COCO(完整列表在此)。在本教程中,我们使用 FashionMNIST 数据集。每个 TorchVision Dataset 都包含两个参数:transform 和 target_transform,分别用于修改样本和标签。
# 从开放数据集下载训练数据。
training_data = datasets.FashionMNIST(
root="data",
train=True,
download=True,
transform=ToTensor(),
)
# 从开放数据集下载测试数据。
test_data = datasets.FashionMNIST(
root="data",
train=False,
download=True,
transform=ToTensor(),
)
(下载进度输出示例,此处略去)
我们将 Dataset 作为参数传递给 DataLoader。这会将我们的数据集包装成一个可迭代对象,并支持自动批处理、采样、打乱顺序和多进程数据加载。这里我们定义批量大小为 64,即 dataloader 可迭代对象中的每个元素将返回一个包含 64 个特征和标签的批次。
batch_size = 64
# 创建数据加载器。
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)
for X, y in test_dataloader:
print(f"Shape of X [N, C, H, W]: {X.shape}")
print(f"Shape of y: {y.shape} {y.dtype}")
break
输出:
Shape of X [N, C, H, W]: torch.Size([64, 1, 28, 28])
Shape of y: torch.Size([64]) torch.int64
2 创建模型#
要在 PyTorch 中定义神经网络,我们创建一个继承自 nn.Module 的类。我们在 __init__ 函数中定义网络的层,并在 forward 函数中指定数据将如何通过网络。为了加速神经网络中的运算,我们将其移动到加速器(如 CUDA、MPS、MTIA 或 XPU)。如果当前有可用的加速器,我们将使用它;否则,使用 CPU。
device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")
# 定义模型
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28*28, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
model = NeuralNetwork().to(device)
print(model)
输出示例:
Using cuda device
NeuralNetwork(
(flatten): Flatten(start_dim=1, end_dim=-1)
(linear_relu_stack): Sequential(
(0): Linear(in_features=784, out_features=512, bias=True)
(1): ReLU()
(2): Linear(in_features=512, out_features=512, bias=True)
(3): ReLU()
(4): Linear(in_features=512, out_features=10, bias=True)
)
)
3 优化模型参数#
要训练模型,我们需要一个损失函数和一个优化器。
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
在单个训练循环中,模型对训练数据集(以批次形式输入)进行预测,并通过反向传播预测误差来调整模型的参数。
def train(dataloader, model, loss_fn, optimizer):
size = len(dataloader.dataset)
model.train()
for batch, (X, y) in enumerate(dataloader):
X, y = X.to(device), y.to(device)
# 计算预测误差
pred = model(X)
loss = loss_fn(pred, y)
# 反向传播
loss.backward()
optimizer.step()
optimizer.zero_grad()
if batch % 100 == 0:
loss, current = loss.item(), (batch + 1) * len(X)
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
我们还会根据测试数据集检查模型的性能,以确保它正在学习。
def test(dataloader, model, loss_fn):
size = len(dataloader.dataset)
num_batches = len(dataloader)
model.eval()
test_loss, correct = 0, 0
with torch.no_grad():
for X, y in dataloader:
X, y = X.to(device), y.to(device)
pred = model(X)
test_loss += loss_fn(pred, y).item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
训练过程会经过多次迭代(轮次)。在每一轮中,模型学习参数以做出更好的预测。我们在每个轮次打印模型的准确率和损失;我们希望看到准确率增加,损失随着每个轮次减少。
epochs = 5
for t in range(epochs):
print(f"Epoch {t+1}\n-------------------------------")
train(train_dataloader, model, loss_fn, optimizer)
test(test_dataloader, model, loss_fn)
print("Done!")
输出示例:
Epoch 1
-------------------------------
loss: 2.299333 [ 64/60000]
loss: 2.288771 [ 6464/60000]
loss: 2.264925 [12864/60000]
...
Test Error:
Accuracy: 64.9%, Avg loss: 1.080188
Done!
4 保存模型#
保存模型的常见方法是序列化内部状态字典(包含模型参数)。
torch.save(model.state_dict(), "model.pth")
print("Saved PyTorch Model State to model.pth")
输出:
Saved PyTorch Model State to model.pth
5 加载模型#
加载模型的过程包括重新创建模型结构并将状态字典加载到其中。
model = NeuralNetwork().to(device)
model.load_state_dict(torch.load("model.pth", weights_only=True))
输出:
<All keys matched successfully>
现在可以使用此模型进行预测。
classes = [
"T恤/上衣",
"裤子",
"套头衫",
"连衣裙",
"外套",
"凉鞋",
"衬衫",
"运动鞋",
"包",
"短靴",
]
model.eval()
x, y = test_data[0][0], test_data[0][1]
with torch.no_grad():
x = x.to(device)
pred = model(x)
predicted, actual = classes[pred[0].argmax(0)], classes[y]
print(f'Predicted: "{predicted}", Actual: "{actual}"')
输出示例:
Predicted: "短靴", Actual: "短靴"
6 完整代码流程#
可以复制下述完整代码来运行
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
# ==================== 1. 数据准备阶段 ====================
print("=== 数据准备阶段 ===")
# 设置设备
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"使用设备: {device}")
# 下载数据集
transform = transforms.ToTensor()
train_data = datasets.FashionMNIST(
root="data",
train=True,
download=True,
transform=transform
)
test_data = datasets.FashionMNIST(
root="data",
train=False,
download=True,
transform=transform
)
print(f"训练集大小: {len(train_data)}")
print(f"测试集大小: {len(test_data)}")
# 创建数据加载器
batch_size = 64
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)
# 查看一个批次的数据
for X, y in test_loader:
print(f"批次数据形状: X: {X.shape}, y: {y.shape}")
break
# ==================== 2. 模型构建阶段 ====================
print("\n=== 模型构建阶段 ===")
class SimpleNet(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.network = nn.Sequential(
nn.Linear(28*28, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 10)
)
def forward(self, x):
x = self.flatten(x)
return self.network(x)
# 创建模型
model = SimpleNet().to(device)
print(f"模型结构:\n{model}")
print(f"总参数数量: {sum(p.numel() for p in model.parameters()):,}")
# ==================== 3. 训练配置阶段 ====================
print("\n=== 训练配置阶段 ===")
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# ==================== 4. 训练循环阶段 ====================
print("\n=== 开始训练 ===")
def train_one_epoch(epoch, dataloader, model, loss_fn, optimizer):
"""训练一个epoch"""
model.train()
total_loss = 0
correct = 0
total = 0
for batch, (X, y) in enumerate(dataloader):
X, y = X.to(device), y.to(device)
# 前向传播
pred = model(X)
loss = loss_fn(pred, y)
total_loss += loss.item()
# 计算准确率
correct += (pred.argmax(1) == y).sum().item()
total += len(y)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 每100个批次打印一次
if batch % 100 == 0:
current = batch * len(X)
size = len(dataloader.dataset)
print(f"Epoch {epoch}: [{current:>5d}/{size:>5d}]")
avg_loss = total_loss / len(dataloader)
accuracy = 100 * correct / total
return avg_loss, accuracy
def test(dataloader, model, loss_fn):
"""测试模型"""
model.eval()
total_loss = 0
correct = 0
total = 0
with torch.no_grad():
for X, y in dataloader:
X, y = X.to(device), y.to(device)
pred = model(X)
total_loss += loss_fn(pred, y).item()
correct += (pred.argmax(1) == y).sum().item()
total += len(y)
avg_loss = total_loss / len(dataloader)
accuracy = 100 * correct / total
return avg_loss, accuracy
# 训练多个epoch
epochs = 5
train_losses = []
test_losses = []
train_accs = []
test_accs = []
for epoch in range(epochs):
print(f"\nEpoch {epoch+1}/{epochs}")
print("-" * 40)
# 训练
train_loss, train_acc = train_one_epoch(epoch+1, train_loader, model, loss_fn, optimizer)
train_losses.append(train_loss)
train_accs.append(train_acc)
# 测试
test_loss, test_acc = test(test_loader, model, loss_fn)
test_losses.append(test_loss)
test_accs.append(test_acc)
print(f"训练损失: {train_loss:.4f}, 训练准确率: {train_acc:.2f}%")
print(f"测试损失: {test_loss:.4f}, 测试准确率: {test_acc:.2f}%")
# ==================== 5. 模型保存阶段 ====================
print("\n=== 模型保存阶段 ===")
# 保存模型
torch.save(model.state_dict(), "model.pth")
print(" 模型已保存为 model.pth")
# ==================== 6. 模型加载和预测阶段 ====================
print("\n=== 模型加载和预测阶段 ===")
# 加载模型
loaded_model = SimpleNet().to(device)
loaded_model.load_state_dict(torch.load("model.pth", weights_only=True))
loaded_model.eval()
print(" 模型加载成功")
# 类别标签
classes = [
"T恤/上衣", "裤子", "套头衫", "连衣裙", "外套",
"凉鞋", "衬衫", "运动鞋", "包", "短靴"
]
# 进行预测
def predict_single_sample(model, sample, true_label):
"""预测单个样本"""
with torch.no_grad():
sample = sample.to(device).unsqueeze(0) # 增加批次维度
pred = model(sample)
predicted_class = pred.argmax(1).item()
predicted_label = classes[predicted_class]
true_label_name = classes[true_label]
return predicted_label, true_label_name
# 预测测试集中的几个样本
print("\n预测结果:")
print("-" * 40)
for i in range(5):
sample, label = test_data[i]
pred_label, true_label = predict_single_sample(loaded_model, sample, label)
result = "✅" if pred_label == true_label else "❌"
print(f"样本 {i+1}: 预测={pred_label:12s} | 实际={true_label:12s} {result}")
# ==================== 7. 最终评估 ====================
print("\n=== 最终模型评估 ===")
final_test_loss, final_test_acc = test(test_loader, loaded_model, loss_fn)
print(f"最终测试准确率: {final_test_acc:.2f}%")
print(f"最终测试损失: {final_test_loss:.4f}")
print("\n" + "="*50)
print("🎉 完整流程执行完毕!")
print("="*50)