Serendipity_Blog

Sleeping Everyday

2024.08.11

Learning Diffusers

Installation

With pip

pip install --upgrade diffusers[torch]

With conda

conda install -c conda-forge diffusers

Use

Directly call the pretrained model uploaded in diffusers:

import torch
from diffusers import DDPMPipeline

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the butterfly pipeline
butterfly_pipeline = DDPMPipeline.from_pretrained(
    "johnowhitaker/ddpm-butterflies-32px"
).to(device)

# Create 8 images
images = butterfly_pipeline(batch_size=8).images

# View the result
make_grid(images)

Example

Step 0: Login and Initialize some useful functions
# Login
from huggingface_hub import notebook_login
notebook_login()
# copy the token in
import numpy as np
import torch
import torch.nn.functional as F
from matplotlib import pyplot as plt
from PIL import Image

def show_images(x):
    """Given a batch of images x, make a grid and convert to PIL"""
    x = x * 0.5 + 0.5  # Map from (-1, 1) back to (0, 1)
    grid = torchvision.utils.make_grid(x)
    grid_im = grid.detach().cpu().permute(1, 2, 0).clip(0, 1) * 255
    grid_im = Image.fromarray(np.array(grid_im).astype(np.uint8))
    return grid_im
Step 1: Download a training dataset

For this example, we’ll use a dataset of images from the Hugging Face Hub. Specifically, this collection of 1000 butterfly pictures.

import torchvision
from datasets import load_dataset
from torchvision import transforms

# Load dataset from https://huggingface.co/datasets/huggan/smithsonian_butterflies_subset
dataset = load_dataset("huggan/smithsonian_butterflies_subset", split="train")

'''
# Or load images from a local folder
dataset = load_dataset("imagefolder", data_dir="path/to/folder")
''' 

# We'll train on 32-pixel square images, but you can try larger sizes too
image_size = 32
# You can lower your batch size if you're running out of GPU memory
batch_size = 64

# Define data augmentations
preprocess = transforms.Compose(
    [
        transforms.Resize((image_size, image_size)),  # Resize
        transforms.RandomHorizontalFlip(),  # Randomly flip (data augmentation)
        transforms.ToTensor(),  # Convert to tensor (0, 1)
        transforms.Normalize([0.5], [0.5]),  # Map to (-1, 1)
    ]
)

def transform(examples):
    images = [preprocess(image.convert("RGB")) for image in examples["image"]]
    return {"images": images}

dataset.set_transform(transform)

# Create a dataloader from the dataset to serve up the transformed images in batches; Save the images in the dataloader
train_dataloader = torch.utils.data.DataLoader(
    dataset, batch_size=batch_size, shuffle=True
)

View the first 8 image examples in the dataset:

xb = next(iter(train_dataloader))["images"].to(device)[:8]
print("X shape:", xb.shape)
show_images(xb).resize((8 * 64, 64), resample=Image.NEAREST)
Step 2: Define the Scheduler

Our plan for training is to take these input images and add noise to them, then feed the noisy images to the model. And during inference, we will use the model predictions to iteratively remove noise. In diffusers, these processes are both handled by the scheduler.

The noise schedule determines how much noise is added at different timesteps.

from diffusers import DDPMScheduler
# Define a Scheduler
noise_scheduler = DDPMScheduler(num_train_timesteps=1000)
# Add noise and View the process of noise-adding
# The core is add_noise()
timesteps = torch.linspace(0, 999, 8).long().to(device)
noise = torch.randn_like(xb) # Random a noise from standard Guassian N(0,I)
noisy_xb = noise_scheduler.add_noise(xb, noise, timesteps)
print("Noisy X shape", noisy_xb.shape)
show_images(noisy_xb).resize((8 * 64, 64), resample=Image.NEAREST)
Step 3: Define the Model

Most diffusion models use architectures that are some variant of a U-Net and that’s what we’ll use here.

from diffusers import UNet2DModel

# Create a model
model = UNet2DModel(
    sample_size=image_size,  # the target image resolution
    in_channels=3,  # the number of input channels, 3 for RGB images
    out_channels=3,  # the number of output channels
    layers_per_block=2,  # how many ResNet layers to use per UNet block
    block_out_channels=(64, 128, 128, 256),  # More channels -> more parameters
    down_block_types=(
        "DownBlock2D",  # a regular ResNet downsampling block
        "DownBlock2D",
        "AttnDownBlock2D",  # a ResNet downsampling block with spatial self-attention
        "AttnDownBlock2D",
    ),
    up_block_types=(
        "AttnUpBlock2D",
        "AttnUpBlock2D",  # a ResNet upsampling block with spatial self-attention
        "UpBlock2D",
        "UpBlock2D",  # a regular ResNet upsampling block
    ),
)
model.to(device)

When dealing with higher-resolution inputs you may want to use more down and up-blocks, and keep the attention layers only at the lowest resolution (bottom) layers to reduce memory usage.

# Check that passing in a batch of data and some random timesteps produces an output the same shape as the input data:
with torch.no_grad():
    model_prediction = model(noisy_xb, timesteps).sample
model_prediction.shape
Step 4: Training: Create a Training Loop

For each batch of data, we

  • Sample some random timesteps
  • Noise the data accordingly
  • Feed the noisy data through the model
  • Compare the model predictions with the target (i.e. the noise in this case) using mean squared error as our loss function
  • Update the model parameters via loss.backward() and optimizer.step()
# Set the noise scheduler
noise_scheduler = DDPMScheduler(
    num_train_timesteps=1000, beta_schedule="squaredcos_cap_v2"
)

# Training loop
optimizer = torch.optim.AdamW(model.parameters(), lr=4e-4)

losses = []

# Loop through the training epoch
for epoch in range(30):
    # Loop through all data
    for step, batch in enumerate(train_dataloader):
        # Set the Scheduler
        # Load the data
        clean_images = batch["images"].to(device)

        # Sample noise to add to the images
        noise = torch.randn(clean_images.shape).to(clean_images.device)
        bs = clean_images.shape[0]

        # Sample a random timestep for each image
        timesteps = torch.randint(
            0, noise_scheduler.num_train_timesteps, (bs,), device=clean_images.device
        ).long()

        # Add noise to the clean images according to the noise magnitude at each timestep
        noisy_images = noise_scheduler.add_noise(clean_images, noise, timesteps)

        # Input the noisy_images to the model and Get the model prediction
        noise_pred = model(noisy_images, timesteps, return_dict=False)[0]

        # Calculate the loss
        loss = F.mse_loss(noise_pred, noise)
        loss.backward(loss)
        losses.append(loss.item())

        # Update the model parameters with the optimizer
        optimizer.step()
        optimizer.zero_grad()

    if (epoch + 1) % 5 == 0:
        loss_last_epoch = sum(losses[-len(train_dataloader) :]) / len(train_dataloader)
        print(f"Epoch:{epoch+1}, loss: {loss_last_epoch}")
# Plot the loss
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
axs[0].plot(losses)
axs[1].plot(np.log(losses))
plt.show()
Step 5: Generate Images
  • Method 1: Create a pipeline
from diffusers import DDPMPipeline
image_pipe = DDPMPipeline(unet=model, scheduler=noise_scheduler)
pipeline_output = image_pipe()
pipeline_output.images[0]
# save a pipeline to a local folder like so:
image_pipe.save_pretrained("my_pipeline")
# Inspecting the folder contents:
ls my_pipeline/
# Output: model_index.json  scheduler  unet
# The `scheduler` and `unet` subfolders contain everything needed to re-create those components. For example, inside the `unet` folder you'll find the model weights (`diffusion_pytorch_model.bin`) alongside a config file which specifies the UNet architecture. 
  • Method 2: Writing a Sampling Loop
# Random starting point (8 random images):
sample = torch.randn(8, 3, 32, 32).to(device)

# A specific process of denoising and generating the images
for i, t in enumerate(noise_scheduler.timesteps):
    # Get model pred
    with torch.no_grad():
        residual = model(sample, t).sample

    # Update sample with step
    sample = noise_scheduler.step(residual, t, sample).prev_sample

show_images(sample)