Skip to main content

Overview

Proper data loading is crucial for distributed training. Each GPU process must:
  1. Load different data (to avoid redundant computation)
  2. Synchronize data downloads (to avoid race conditions)
  3. Shuffle data correctly (for reproducible training)
This guide covers all aspects of data loading for both DDP and FSDP.

The Three Essential Components

1. @dataDownload Decorator

Ensures datasets are downloaded only once per node:
from sf_tensor.persist import dataDownload

@dataDownload
def download_dataset():
    """Download dataset - runs only on LOCAL_RANK=0"""
    datasets.CIFAR10(root='./data', train=True, download=True)
    datasets.CIFAR10(root='./data', train=False, download=True)

2. Barrier Synchronization

Wait for downloads to complete before all processes access data:
import torch.distributed as dist

# Download data
download_dataset()

# Wait for download to complete
if dist.is_initialized():
    dist.barrier()

# Now all processes can safely load data

3. DistributedSampler

Partition data across GPUs so each sees different samples:
from torch.utils.data.distributed import DistributedSampler

sampler = DistributedSampler(
    dataset,
    num_replicas=dist.get_world_size() if dist.is_initialized() else 1,
    rank=dist.get_rank() if dist.is_initialized() else 0,
    shuffle=True
)

loader = DataLoader(dataset, sampler=sampler)

DistributedSampler in Detail

Why Use DistributedSampler?

Without DistributedSampler, all GPUs would see the same data, wasting compute:
Without DistributedSampler (Wrong!):
GPU 0: [Sample 0, 1, 2, 3, ...]  ← Same data!
GPU 1: [Sample 0, 1, 2, 3, ...]  ← Same data!
GPU 2: [Sample 0, 1, 2, 3, ...]  ← Same data!
GPU 3: [Sample 0, 1, 2, 3, ...]  ← Same data!
With DistributedSampler, each GPU sees different data:
With DistributedSampler (Correct!):
GPU 0: [Sample 0, 4, 8, 12, ...]   ← Different!
GPU 1: [Sample 1, 5, 9, 13, ...]   ← Different!
GPU 2: [Sample 2, 6, 10, 14, ...]  ← Different!
GPU 3: [Sample 3, 7, 11, 15, ...]  ← Different!

Key Parameters

from torch.utils.data.distributed import DistributedSampler

sampler = DistributedSampler(
    dataset,
    num_replicas=8,      # Total number of processes
    rank=0,              # Rank of current process
    shuffle=True,        # Shuffle data
    seed=42,             # Random seed for shuffling
    drop_last=False      # Drop incomplete batch
)
Total number of processes (GPUs) in distributed training.
num_replicas=dist.get_world_size() if dist.is_initialized() else 1
  • 8 GPUs → num_replicas=8
  • Single GPU → num_replicas=1
Rank of the current process (which GPU this is).
rank=dist.get_rank() if dist.is_initialized() else 0
  • GPU 0 → rank=0
  • GPU 1 → rank=1
  • etc.
Whether to shuffle data at each epoch.
shuffle=True  # For training
shuffle=False # For validation
Important: Must call sampler.set_epoch(epoch) for proper shuffling!
Random seed for reproducible shuffling.
seed=42  # All GPUs use same seed for deterministic behavior
Drop the last incomplete batch if dataset size not divisible by world_size.
drop_last=False  # Include incomplete batch (default)
drop_last=True   # Drop incomplete batch (recommended for training)

Setting Epoch for Shuffling

Critical: Call set_epoch() at the start of each epoch for proper shuffling:
for epoch in range(num_epochs):
    # Set epoch for shuffling
    train_sampler.set_epoch(epoch)

    for batch in train_loader:
        # Training code...
        pass
Why? The sampler uses the epoch number to seed the random shuffle, ensuring:
  1. Different shuffle order each epoch
  2. Same shuffle order across all GPUs (for reproducibility)
Forgetting set_epoch() will cause the same shuffle order every epoch, hurting model generalization.

DataLoader Configuration

Good Settings

train_loader = DataLoader(
    dataset,
    batch_size=128,           # Per-GPU batch size. Precise number isn't vital.
    sampler=train_sampler,    # Use sampler, NOT shuffle
    num_workers=4,            # Number of worker processes
    pin_memory=True,          # Faster CPU→GPU transfer
    drop_last=True,           # Drop incomplete batch
    persistent_workers=True   # Keep workers alive between epochs
)
Batch size per GPU, not total batch size.
# 8 GPUs, batch_size=128 per GPU
# Effective global batch size = 8 × 128 = 1024

batch_size=128  # Per GPU
Use sampler for distributed training, not shuffle.
# Correct
DataLoader(dataset, sampler=sampler)

# Wrong (will error!)
DataLoader(dataset, sampler=sampler, shuffle=True)
Cannot use shuffle=True with a sampler. The sampler handles shuffling.
Number of worker processes for data loading.
num_workers=4  # Good starting point, tune to your application
  • Too low: CPU bottleneck, GPU underutilized
  • Too high: Memory overhead, slower startup
Pin memory for faster CPU→GPU transfer.
pin_memory=True  # Always use for GPU training
Enables faster data transfer by using pinned (page-locked) memory.
Drop the last incomplete batch.
drop_last=True  # Recommended for training
drop_last=False # For validation (don't drop data)
Ensures all GPUs process the same number of batches.
Keep worker processes alive between epochs (PyTorch 1.7+).
persistent_workers=True  # Faster epoch transitions
Avoids overhead of restarting workers each epoch.

Best Practices

Always use the @dataDownload decorator for dataset downloads:
@dataDownload
def download_data():
    # Download logic
Always synchronize after downloading:
download_data()
if dist.is_initialized():
    dist.barrier()
Always set epoch for proper shuffling:
for epoch in range(num_epochs):
    train_sampler.set_epoch(epoch)
Always enable pinned memory for GPU training:
DataLoader(..., pin_memory=True)
Drop incomplete batches in training:
DataLoader(..., drop_last=True)

Summary

Proper data loading for distributed training requires: @dataDownload decorator - Download once per node ✅ Barrier synchronization - Wait for downloads ✅ DistributedSampler - Partition data across GPUs ✅ set_epoch() - Proper shuffling each epoch ✅ Optimal DataLoader settings - pin_memory, num_workers, etc. Follow these patterns for both DDP and FSDP!