扩散模型问世至今,因其训练过程的稳定性和生成样本的多样性,受到了广泛的关注和应用,相应的开源社区贡献的工具链也趋向于更易用,HuggingFace的diffusers库便是其中之一。

diffusers提供了非常简洁和直观的API接口,能够让研究人员和开发者快速实现扩散模型的训练和推理。即便是对扩散模型不太熟悉的用户,也能通过少量的代码实现高效的图像生成任务。

本文基于diffusers,包含如下内容:

1.通过一个简易demo来直观感受使用diffusers中的快速生图的方法

2.使用smithsonian_butterflies_subset数据集,通过diffusers来搭建一个完整的图像生成小项目,包括数据集准备、模型训练、模型推理等步骤

3.介绍如何基于已有的预训练权重,通过微调和引导技术,来控制生成图片整体的细节走向,如颜色偏好,内容偏好等

4.介绍火出圈的StableDiffusion

5.介绍DDIM反转,用于控制图像的局部区域生成细节走向,该技术极大地提高了扩散模型的可玩性

零. 准备工具函数

首先,导入常用的库,并编写后续将被重复使用的工具函数,用于图像可视化。

1
2
3
4
5
6
7
import numpy as np
import torch
import torch.nn.functional as F
from matplotlib import pyplot as plt
from PIL import Image
import torchvision
from tqdm import tqdm
1
2
3
4
5
6
7
8
9
10
11
def show_images(x):
x = x * 0.5 + 0.5# 将(-1,1)区间映射回(0,1)区间
grid = torchvision.utils.make_grid(x)
grid_im = grid.detach().cpu().permute(1, 2, 0).clip(0, 1) *255
grid_im =Image.fromarray(np.array(grid_im).astype(np.uint8))
return grid_im
def make_grid(images, size=64):
output_im = Image.new("RGB", (size * len(images), size))
for i,im in enumerate(images):
output_im.paste(im.resize((size, size)), (i * size, 0))
return output_im

一. 文生图demo的快速实现

本节以DreamBooth为例,使用diffusers快速实现文生图。

其中涉及到的具体模型会在后续展开介绍,这里暂时无需深究,只需动手体验一下文生图即可。

1
2
3
4
5
from diffusers import StableDiffusionPipeline
import torch

model_id = "sd-dreambooth-library/mr-potato-head"
pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.float16).to('cuda')

运行上述代码,就搭建好了一个文生图的管线(注:在diffusers中,每个模型的推理函数都被封装成一个pipline,本文称之为管线),接下来调用该管线,就可以生成图像了:

1
2
3
prompt = "an abstract sky of universe bypicasso"
image = pipe(prompt, num_inference_steps=50,guidance_scale=7.5).images[0]
image

二.使用diffusers动手做一个完整的图像生成项目

本节要做的项目在diffusers中已有可直接使用的管线,这里先使用已有管线进行运行测试,然后来学习如何一步一步从头实现这个管线。

1
2
3
4
from diffusers import DDPMPipeline
butterfly_pipeline =DDPMPipeline.from_pretrained("johnowhitaker/ddpm-butterflies-32px").to('cuda')
images = butterfly_pipeline(batch_size=8).images
make_grid(images)

2.1 数据集准备

首先下载训练数据集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import torchvision
!pip install -q datasets
from datasets import load_dataset
from torchvision import transforms

# 加载数据集
dataset = load_dataset("huggan/smithsonian_butterflies_subset",split="train")

# 定义数据集超参数
image_size = 32
batch_size = 64

# 定义数据增强过程
preprocess = transforms.Compose(
[
transforms.Resize((image_size, image_size)), # 调整大小
transforms.RandomHorizontalFlip(), # 随机翻转
transforms.ToTensor(), # 将张量映射到(0,1)区间
transforms.Normalize([0.5], [0.5]), # 映射到(-1, 1)区间
]
)

def transform(examples):
images = [preprocess(image.convert("RGB")) for image in examples["image"]]
return {"images": images}

dataset.set_transform(transform)

train_dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)

查看一下数据集中的部分图片:

1
2
3
xb = next(iter(train_dataloader))["images"].to('cuda')[:8]
print("X shape:", xb.shape)
show_images(xb).resize((8 * 64, 64), resample=Image.NEAREST)

2.2 调度器

这里使用经典的DDPM调度器:

1
2
3
from diffusers import DDPMScheduler

noise_scheduler = DDPMScheduler(num_train_timesteps=1000, beta_start=0.001, beta_end=0.004)

定义好调度器之后,来尝试使用该调度器对图片进行加噪:

1
2
3
4
5
noise = torch.randn_like(xb)
timesteps = torch.linspace(0, 999, 8).long().to('cuda')# xb的batchsize是8,因此也需要8个对应的timesteps(shape:torch.Size([8]))
noisy_xb = noise_scheduler.add_noise(xb, noise, timesteps)# 加噪
print("Noisy X shape", noisy_xb.shape)
show_images(noisy_xb).resize((8 * 64, 64),resample=Image.NEAREST)


随着迭代次数的增加,清晰的图像被逐步加噪。

2.3 定义扩散模型

用于预测噪声的模型是UNet,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from diffusers import UNet2DModel

# 创建模型
model = UNet2DModel(
sample_size=image_size, # 目标图像分辨率
in_channels=3, # 输入通道数,对于RGB图像来说,通道数为3
out_channels=3, # 输出通道数
layers_per_block=2, # 每个UNet块使用的ResNet层数
block_out_channels=(64, 128, 128, 256), # 更多的通道→更多的参数
down_block_types=(
"DownBlock2D", # 一个常规的ResNet下采样模块
"DownBlock2D",
"AttnDownBlock2D",# 一个带有空间自注意力的ResNet下采样模块
"AttnDownBlock2D",
),
up_block_types=(
"AttnUpBlock2D",
"AttnUpBlock2D", # 一个带有空间自注意力的ResNet上采样模块
"UpBlock2D",
"UpBlock2D", # 一个常规的ResNet上采样模块
),
)
model.to('cuda')

测试一下模型:

1
2
3
with torch.no_grad():
model_prediction = model(noisy_xb, timesteps).sample# noisy_xb:[8,3,32,32], timesteps: [8,1]
model_prediction.shape# torch.Size([8, 3, 32, 32])

输出:

1
torch.Size([8, 3, 32, 32])

2.4 创建扩散模型训练循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 设定噪声调度器
noise_scheduler = DDPMScheduler(
num_train_timesteps=1000,
beta_schedule="squaredcos_cap_v2")

optimizer = torch.optim.AdamW(model.parameters(), lr=4e-4)

losses = []
for epoch in range(30):
for step, batch in enumerate(train_dataloader):
clean_images = batch["images"].to('cuda')
# 为图片添加采样噪声
noise =torch.randn(clean_images.shape).to('cuda')
bs = clean_images.shape[0]
# 为每张图片随机采样一个时间步
timesteps = torch.randint(0, noise_scheduler.num_train_timesteps, (bs,),device='cuda').long()
# 根据每个时间步的噪声幅度,向清晰的图片中添加噪声
noisy_images = noise_scheduler.add_noise(clean_images, noise, timesteps)
# 获得模型的预测结果
noise_pred = model(noisy_images, timesteps, return_dict=False)[0]
# 计算损失
loss = F.mse_loss(noise_pred, noise)
loss.backward(loss)
losses.append(loss.item())
# 迭代模型参数
optimizer.step()
optimizer.zero_grad()
if (epoch + 1) % 5 == 0:
loss_last_epoch = sum(losses[-len(train_dataloader) :])/len(train_dataloader)
print(f"Epoch:{epoch+1}, loss: {loss_last_epoch}")

训练日志:

1
2
3
4
5
6
Epoch:5, loss: 0.15131539199501276
Epoch:10, loss: 0.11000193934887648
Epoch:15, loss: 0.09896771190688014
Epoch:20, loss: 0.0816401862539351
Epoch:25, loss: 0.07495242427103221
Epoch:30, loss: 0.07060358510352671

检查一下训练损失:

1
2
3
4
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
axs[0].plot(losses)
axs[1].plot(np.log(losses))
plt.show()

这样,一个图像生成模型就训练完成了。

2.5 图像的生成

在完成模型训练后,需要本小节来实现模型的推理,即图像生成。

方法1:建立一个diffusers管线

1
2
3
4
5
from diffusers import DDPMPipeline

image_pipe = DDPMPipeline(unet=model,scheduler=noise_scheduler)# 将训练好的model传入管线
pipeline_output = image_pipe()# 开始生成
pipeline_output.images[0]

方法2:自定义一个采样循环

首先来随机初始化8张图片:

1
2
sample = torch.randn(8, 3, 32, 32).to('cuda')
show_images(sample)

可以看到,这些图片上全是噪声。

接下来,定义采样循环来迭代的消除噪声,就完成了图像的生成:

1
2
3
4
5
6
7
8
9
10
# 假设 noise_scheduler.timesteps 是一个包含时间步的序列
for i, t in tqdm(enumerate(noise_scheduler.timesteps), total=len(noise_scheduler.timesteps), desc="Diffusion Steps"):# DDPM 迭代1000步
with torch.no_grad():
residual = model(sample, t).sample # 获取模型预测的噪声

# 根据预测结果更新图像
sample = noise_scheduler.step(residual, t, sample).prev_sample

# 查看生成的图片
show_images(sample)

至此,图像生成项目已全部完成。

三. 微调和引导

现在,你已经学会了如何借助diffusers库,在自定义数据集上训练图像生成模型并完成推理。

但是,生成的图像是和训练集近似分布的,如何引导模型的生成结果向定制化的需求靠近呢?本节将介绍相关的实现技术。

不过在开始之前,还需要优化一个问题。如果仔细观察就会发现,上面所使用的DDPM调度器,在推理时需要迭代1000步,这是一个相当慢的循环操作。

解决方法就是将调度器由DDPM换成DDIM。相较于DDPM,DDIM可以用单步迭代来模拟DDPM中的多步迭代,因此极大地降低了总的迭代次数。

3.1 环境准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
!pip install -qq diffusers datasets accelerate wandb open-clip-torch

import numpy as np
import torch
import torch.nn.functional as F
import torchvision
from datasets import load_dataset
from diffusers import DDIMScheduler, DDPMPipeline
from matplotlib import pyplot as plt
from PIL import Image
from torchvision import transforms
from tqdm.auto import tqdm

device='cuda' if torch.cuda.is_available() else 'cpu'

3.2 使用DDIM-更快的调度器

首先定义一个DDPM的管线:

1
2
3
4
5
image_pipe = DDPMPipeline.from_pretrained("google/ddpm-celebahq-256")
image_pipe.to(device)

images = image_pipe().images
images[0]

然后,创建一个更快的DDIM调度器并设置推理迭代次数为40:

1
2
scheduler = DDIMScheduler.from_pretrained("google/ddpm-celebahq-256")
scheduler.set_timesteps(num_inference_steps=40)

现在,将上面的DDPMPipeline的调度器由DDPM换成DDIM,使用和2.5小节中类似的自定义采样循环来实现图像生成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 从随机噪声开始
x = torch.randn(4, 3, 256, 256).to(device)
for i, t in tqdm(enumerate(scheduler.timesteps)):
# 准备模型输入:给“带噪”图像加上时间步信息
model_input = scheduler.scale_model_input(x, t)# DDIM 调度器会对噪声图像 x 进行适当缩放,使其符合扩散模型的输入规范。这种缩放通常基于时间步 t 的权重参数
# 预测噪声
with torch.no_grad():
noise_pred = image_pipe.unet(model_input, t)["sample"]
# 使用调度器计算更新后的样本应该是什么样子
scheduler_output = scheduler.step(noise_pred, t, x)
# 更新输入图像
x = scheduler_output.prev_sample

# 时不时看一下输入图像和预测的“去噪”图像
if i % 10 == 0 or i == len(scheduler.timesteps) - 1:
fig, axs = plt.subplots(1, 2, figsize=(12, 5))
grid = torchvision.utils.make_grid(x,nrow=4).permute(1, 2, 0)
axs[0].imshow(grid.cpu().clip(-1, 1) * 0.5 + 0.5)
axs[0].set_title(f"Current x (step {i})")
pred_x0 = (
scheduler_output.pred_original_sample
)
grid = torchvision.utils.make_grid(pred_x0, nrow=4).permute(1, 2, 0)
axs[1].imshow(grid.cpu().clip(-1, 1) * 0.5 + 0.5)
axs[1].set_title(f"Predicted denoised images (step{i})")
plt.show()





现在,使用基于DDIM执行图像生成操作,可以看到,只需40次迭代就可以很快得到生成结果:

1
2
3
image_pipe.scheduler = scheduler
images = image_pipe(num_inference_steps=40).images
images[0]

3.3 微调扩散模型

首先准备数据集,仍使用之前的蝴蝶数据集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
dataset_name ="huggan/smithsonian_butterflies_subset"
dataset = load_dataset(dataset_name, split="train")
image_size =256
batch_size =4

preprocess = transforms.Compose(
[
transforms.Resize((image_size, image_size)),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.5], [0.5]),
]
)

def transform(examples):
images = [preprocess(image.convert("RGB")) for image in examples["image"]]
return {"images": images}

dataset.set_transform(transform)
train_dataloader = torch.utils.data.DataLoader(
dataset, batch_size=batch_size, shuffle=True)

查看数据集示例:

1
2
3
4
print("Previewing batch:")
batch =next(iter(train_dataloader))
grid = torchvision.utils.make_grid(batch["images"], nrow=4)
plt.imshow(grid.permute(1, 2, 0).cpu().clip(-1, 1) *0.5+0.5)

现在,基于上面的celebahq人脸数据集上训练的权重,在蝴蝶数据集上进行微调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
num_epochs = 2
lr = 1e-5
grad_accumulation_steps = 2
optimizer = torch.optim.AdamW(image_pipe.unet.parameters(),lr=lr)
losses = []

for epoch in range(num_epochs):
for step, batch in tqdm(enumerate(train_dataloader),total=len(train_dataloader)):
clean_images = batch["images"].to(device)
# 随机生成一个噪声,稍后加到图像上
noise =torch.randn(clean_images.shape).to(clean_images.device)
bs = clean_images.shape[0]
# 随机选取一个时间步
timesteps = torch.randint(0,image_pipe.scheduler.num_train_timesteps,(bs,),device=clean_images.device,).long()

# 根据选中的时间步和确定的幅值,在干净图像上添加噪声
# 此处为前向扩散过程
noisy_images =image_pipe.scheduler.add_noise(clean_images,noise,timesteps)
# 使用“带噪”图像进行网络预测
noise_pred = image_pipe.unet(noisy_images, timesteps, return_dict=False)[0]
# 对真正的噪声和预测的结果进行比较,注意这里是预测噪声
loss = F.mse_loss(noise_pred, noise)
# 保存损失值
losses.append(loss.item())
# 根据损失值更新梯度
loss.backward()
# 进行梯度累积,在累积到一定步数后更新模型参数
if (step + 1) % grad_accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
print(f"Epoch {epoch} average loss: {sum(losses[-len(train_dataloader):])/len(train_dataloader)}")

模型训练结束,执行推理:

1
2
3
4
5
6
7
8
9
# 使用在蝴蝶数据集上基于人脸生成模型的权重微调得到的模型进行推理
x = torch.randn(8, 3, 256, 256).to(device)
for i, t in tqdm(enumerate(image_pipe.scheduler.timesteps)):
model_input = scheduler.scale_model_input(x, t)
with torch.no_grad():
noise_pred = image_pipe.unet(model_input, t)["sample"]
x = image_pipe.scheduler.step(noise_pred, t, x).prev_sample
grid = torchvision.utils.make_grid(x, nrow=4)
plt.imshow(grid.permute(1, 2, 0).cpu().clip(-1, 1) * 0.5 + 0.5)


可以看到,生成结果已经由人脸趋向于蝴蝶的外观了。

3.4 扩散模型之引导

引导,指的是在模型训练完成后,在执行模型推理时,使用一些技术手段来使得生成结果的内容更偏向于自定义的风格。

本届介绍两种引导手段:

  • 颜色引导
  • CLIP引导

在开始之前,首先来准备一个预训练好的管线,并将调度器更换成速度更快的DDIM:

1
2
pipeline_name ="johnowhitaker/sd-class-wikiart-from-bedrooms"
image_pipe =DDPMPipeline.from_pretrained(pipeline_name).to(device)
1
2
3
4
5
6
7
8
9
10
11
12
13
# 使用DDIM调度器,仅用40步生成一些图片
scheduler = DDIMScheduler.from_pretrained(pipeline_name)
scheduler.set_timesteps(num_inference_steps=40)

# 将随机噪声作为出发点
x = torch.randn(8, 3, 256, 256).to(device)

# 使用一个最简单的采样循环
for i, t in tqdm(enumerate(scheduler.timesteps)):
model_input = scheduler.scale_model_input(x, t)
with torch.no_grad():
noise_pred = image_pipe.unet(model_input, t)["sample"]
x = scheduler.step(noise_pred, t, x).prev_sample
1
2
3
# 查看生成结果
grid = torchvision.utils.make_grid(x, nrow=4)
plt.imshow(grid.permute(1, 2, 0).cpu().clip(-1, 1) *0.5+0.5)

3.4.1 颜色引导

颜色引导(Color Guidance) 是一种在扩散模型中生成特定颜色风格或目标图像时,增加对颜色约束的方法。这种方法可以确保生成的图像在颜色空间上满足特定的要求,比如突出某种颜色、控制亮度对比,或者匹配参考图像的配色。

首先,定义颜色损失函数,并设置引导的强度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def color_loss(images, target_color=(0.9, 0.9, 0.9)):
"""给定一个RGB值,返回一个损失值,用于衡量图片的像素值与目标颜色相差多少"""
target = (
torch.tensor(target_color).to(images.device) * 2 - 1
)
# 首先对target_color进行归一化,使它的取值区间为(-1, 1)
target = target[None, :, None, None]
# 将所生成目标张量的形状改为(b, c, h, w),以适配输入图像images的张量形状
error = torch.abs(
images - target
).mean() # 计算图片的像素值以及目标颜色的均方误差
return error

# guidance_loss_scale用于决定引导的强度有多大
guidance_loss_scale = 40# 可设定为5~100的任意数字
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
x = torch.randn(8, 3, 256, 256).to(device)

# 方法1:
for i, t in tqdm(enumerate(scheduler.timesteps)):
# 准备模型输入with DDIM scheduler
model_input = scheduler.scale_model_input(x, t)
# 预测噪声
with torch.no_grad():
noise_pred = image_pipe.unet(model_input, t)["sample"]

# 设置x.requires_grad为True
x = x.detach().requires_grad_()

# 从当前时间步得到“去噪”后的图像
x0 = scheduler.step(noise_pred, t, x).pred_original_sample

# pred_original_sample:当你希望直接获取模型在当前步骤中预测的最终图像时使用
# prev_sample:当你需要逐步生成图像,通过递归步骤从噪声到最终图像的过程时使用

# 计算损失值
loss = color_loss(x0) * guidance_loss_scale
if i % 10 == 0:
print(i, "loss:", loss.item())

# 获取梯度
cond_grad = -torch.autograd.grad(loss, x)[0]

# 使用梯度更新x
x = x.detach() + cond_grad# 为了降低颜色损失,这里执行梯度下降,使得预测的"去噪"后图片x的颜色更接近于目标颜色

# 使用调度器更新x
x = scheduler.step(noise_pred, t, x).prev_sample

输出:

1
2
3
4
0 loss: 29.88568115234375
10 loss: 3.800239324569702
20 loss: 3.4818835258483887
30 loss: 3.5615406036376953
1
2
3
4
# 查看结果
grid = torchvision.utils.make_grid(x, nrow=4)
im = grid.permute(1, 2, 0).cpu().clip(-1, 1) * 0.5 + 0.5
Image.fromarray(np.array(im * 255).astype(np.uint8))

我们预先设置的颜色是(0.9, 0.9, 0.9),这是一种接近白色的颜色,可以看到,在推理过程中加入颜色损失来引导生成过程后,生成的图像颜色整体上已经偏向我们设置的目标颜色了。

3.4.2 CLIP引导

CLIP引导的核心思想是通过CLIP模型的文本和图像相似性来优化生成的图像,使其与文本描述的语义更加一致。通常情况下,CLIP模型会计算文本和图像之间的相似度,生成的图像通过梯度下降优化,使得图像与文本描述更加匹配。

这里使用open_clip封装好的CLIP模型:

1
2
3
import open_clip
clip_model, _, preprocess =open_clip.create_model_and_transforms("ViT-B-32", pretrained="openai")
clip_model.to(device)

和颜色引导一样,这里需要定义clip引导所需的损失函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 图像变换:用于修改图像尺寸和增广数据,同时归一化数据,以使数据能够适配CLIP模型
tfms = torchvision.transforms.Compose(
[
torchvision.transforms.RandomResizedCrop(224),# 随机裁剪

torchvision.transforms.RandomAffine(5), # 随机扭曲图片
torchvision.transforms.RandomHorizontalFlip(),# 随机左右镜像,

torchvision.transforms.Normalize(
mean=(0.48145466, 0.4578275, 0.40821073),
std=(0.26862954, 0.26130258, 0.27577711),
),
]
)

# 定义一个损失函数,用于获取图片的特征,然后与提示文字的特征进行对比
def clip_loss(image, text_features):
image_features = clip_model.encode_image(
tfms(image)
) # 注意施加上面定义好的变换
input_normed = torch.nn.functional.normalize(image_features.unsqueeze(1), dim=2)
embed_normed = torch.nn.functional.normalize(text_features.unsqueeze(0), dim=2)

dists = (
input_normed.sub(embed_normed).norm(dim=2).div(2).
arcsin().pow(2).mul(2)
) # 使用Squared Great Circle Distance计算距离

return dists.mean()

定义超参数,并提取提示词的特征向量:

1
2
3
4
5
6
7
8
9
10
11
12
prompt ="A vast, tranquil ocean under a golden sunset sky. The water is crystal clear, reflecting the warm hues of the sun with gentle ripples. The horizon stretches far, meeting the sky in a soft gradient of oranges, purples, and pinks. A few distant sailboats glide across the water, while seagulls soar above. The scene should evoke a sense of peacefulness and infinity, with delicate waves and a calm, serene atmosphere."

guidance_scale = 8
n_cuts = 10

# 这里使用稍微多一些的步数
scheduler.set_timesteps(50)

# 使用CLIP从提示文字中提取特征
text = open_clip.tokenize([prompt]).to(device)
with torch.no_grad(), torch.cuda.amp.autocast():
text_features = clip_model.encode_text(text)

开始生成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
x = torch.randn(4, 3, 256, 256).to(device)# 从随机噪声开始

for i, t in tqdm(enumerate(scheduler.timesteps)):
model_input = scheduler.scale_model_input(x, t)
# 预测噪声
with torch.no_grad():
noise_pred = image_pipe.unet(model_input, t)["sample"]

cond_grad = 0

for cut in range(n_cuts):
# 设置输入图像的requires_grad属性为True
x = x.detach().requires_grad_()

# 获得“去噪”后的图像
x0 = scheduler.step(noise_pred, t,x).pred_original_sample

# 计算损失值
loss = clip_loss(x0, text_features) * guidance_scale

# 获取梯度并使用n_cuts进行平均
cond_grad -= torch.autograd.grad(loss, x)[0] / n_cuts

if i % 25 == 0:
print("Step:", i, ", Guidance loss:", loss.item())

# 根据这个梯度更新x
alpha_bar = scheduler.alphas_cumprod[i]
x = (
x.detach() + cond_grad * alpha_bar.sqrt()
) # 注意这里的缩放因子

# 使用调度器更新x
x = scheduler.step(noise_pred, t, x).prev_sample

其中的n_cuts 是一个超参数,在每次计算损失后,梯度会被累加,并在每次计算中平均。这样可以减少梯度的波动,获得更稳定的优化过程。

1
2
3
grid = torchvision.utils.make_grid(x.detach(), nrow=4)
im = grid.permute(1, 2, 0).cpu().clip(-1, 1) * 0.5 + 0.5
Image.fromarray(np.array(im * 255).astype(np.uint8))


可以用看到,生成图像的内容已经朝着提示词的方向靠近了。

3.4.3 创建一个类别条件扩散模型

和先前的引导类似,类别条件扩散模型可以引导模型的生成结果更偏向于指定的“类别”。

3.4.3.1 数据集准备

1
2
3
4
5
6
7
8
import torch
import torchvision
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader
from diffusers import DDPMScheduler, UNet2DModel
from matplotlib import pyplot as plt
from tqdm.auto import tqdm

这里使用经典的手写数字数据集,这些图片都是单个通道的:

1
2
3
4
5
6
7
8
9
10
11
12
13
device = 'mps' if torch.backends.mps.is_available() else'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Using device: {device}')

# 载入MNIST数据集
dataset = torchvision.datasets.MNIST(root="mnist/", train=True, download=True, transform=torchvision.transforms.ToTensor())

# 创建数据加载器
train_dataloader = DataLoader(dataset, batch_size=8,shuffle=True)

x, y = next(iter(train_dataloader))
print('Input shape:', x.shape)
print('Labels:', y)
plt.imshow(torchvision.utils.make_grid(x)[0], cmap='Greys')

3.4.3.2 创建一个以类别为条件的UNet模型

为了将类别信息注入模型,这里需要将类别信息映射到一个长度为class_emb_size的特征向量上,然后扩展成和输入图像高宽一样的张量,最后将其与输入图片在通道维度上进行拼接,这就是注入类别信息后模型的输入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class ClassConditionedUnet(nn.Module):
def __init__(self, num_classes=10, class_emb_size=4):
super().__init__()
# 这个网络层会把数字所属的类别映射到一个长度为class_emb_size的特征向量上
self.class_emb = nn.Embedding(num_classes, class_emb_size)
# self.model是一个不带生成条件的UNet模型,在这里,我们给它添加了额外的输入通道,用于接收条件信息
self.model = UNet2DModel(
sample_size=28, # 所生成图片的尺寸
in_channels=1+ class_emb_size, # 加入额外的输入通道,#用于施加生成条件
out_channels=1, # 输出结果的通道数
layers_per_block=2, # 设置一个UNet模块有多少个残差连接层
block_out_channels=(32, 64, 64),
down_block_types=(
"DownBlock2D", # 常规的ResNet下采样模块
"AttnDownBlock2D", # 含有spatial self-attention的
"AttnDownBlock2D", # ResNet下采样模块
),
up_block_types=(
"AttnUpBlock2D",
"AttnUpBlock2D", # 含有spatial self-attention的ResNet上采样模块
"UpBlock2D", # 常规的ResNet下采样模块
),
)

# 此时扩散模型的前向计算就会含有额外的类别标签作为输入了
def forward(self, x, t, class_labels):
bs, ch, w, h = x.shape
# 类别条件将会以额外通道的形式输入
class_cond = self.class_emb(class_labels)# 将类别映射为向量形式,并扩展成类似于(bs, 4, 28, 28)的张量形状
class_cond = class_cond.view(bs, class_cond.shape[1], 1, 1).expand(bs, class_cond.shape[1], w, h)
# 将原始输入和类别条件信息拼接到一起
net_input = torch.cat((x, class_cond), 1) # (bs, 5, 28, 28)
# 使用模型进行预测
return self.model(net_input, t).sample # (bs, 1, 28, 28)

3.4.3.3 训练和采样

准备好模型之后,现在来设置一些训练所需的基础方法,然后开启训练:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
model=ClassConditionedUnet()

# 创建一个调度器
noise_scheduler = DDPMScheduler(num_train_timesteps=1000, beta_schedule='squaredcos_cap_v2')
# 定义数据加载器
train_dataloader = DataLoader(dataset, batch_size=128,shuffle=True)
n_epochs = 10
net = ClassConditionedUnet().to(device)
loss_fn = nn.MSELoss()
opt = torch.optim.Adam(net.parameters(), lr=1e-3)
losses = [] # 记录损失值

# 开始训练
for epoch in range(n_epochs):
for x, y in tqdm(train_dataloader):
# 获取数据并添加噪声
x = x.to(device) * 2 - 1# 数据被归一化到区间(-1, 1)
y = y.to(device)
# print(x.shape,y.shape)# torch.Size([128, 1, 28, 28]) torch.Size([128])
noise = torch.randn_like(x)
timesteps = torch.randint(0, 999,(x.shape[0],)).long().to(device)# 随机一个时间步
# print(timesteps.shape)# torch.Size([128])
noisy_x = noise_scheduler.add_noise(x, noise,timesteps)

# 预测
pred = net(noisy_x, timesteps, y) # 注意这里也输入了类别标签y

loss = loss_fn(pred, noise) # 判断预测结果和实际的噪声有多接近
opt.zero_grad()
loss.backward()
opt.step()

losses.append(loss.item())

avg_loss = sum(losses[-100:])/100
print(f'Finished epoch {epoch}. Average of the last 100loss values: {avg_loss:05f}')

输出日志:

1
2
3
4
Finished epoch 0. Average of the last 100loss values: 0.053223
Finished epoch 1. Average of the last 100loss values: 0.046675
...
Finished epoch 9. Average of the last 100loss values: 0.038562

训练完成,现在来尝试让模型生成特定类别的手写数字图片,这里通过一个列表推导式来生成0到9的图片各8张:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 开始推理
x = torch.randn(80, 1, 28, 28).to(device)
y = torch.tensor([[i]*8for i in range(10)]).flatten().to(device)

# 采样循环
for i, t in tqdm(enumerate(noise_scheduler.timesteps)):
with torch.no_grad():
residual = net(x, t, y)
x = noise_scheduler.step(residual, t, x).prev_sample

# 查看生成的图像
fig, ax = plt.subplots(1, 1, figsize=(12, 12))
ax.imshow(torchvision.utils.make_grid(x.detach().cpu().clip(-1,1), nrow=8)[0], cmap='Greys')

四.Stable Diffision

Stable Diffusion 与传统扩散模型的最大区别在于其采用了潜在扩散(Latent Diffusion)方法,通过在低维潜在空间中进行扩散生成,而不是直接在高维图像空间进行去噪。这样做显著减少了计算复杂度和内存需求,提高了生成效率。此外,Stable Diffusion具备强大的文本到图像生成能力,能够根据自然语言描述生成图像,依托于与CLIP模型的结合,使得用户可以通过精确的文本提示控制生成的图像内容和风格。

4.1 环境&素材准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
!pip install -Uq diffusers ftfy accelerate
!pip install -Uq git+https://github.com/huggingface/transformers

import torch
import requests
from PIL import Image
from io import BytesIO
from matplotlib import pyplot as plt

from diffusers import (
StableDiffusionPipeline,
StableDiffusionImg2ImgPipeline,
StableDiffusionInpaintPipeline,
StableDiffusionDepth2ImgPipeline
)
1
2
3
4
5
6
7
8
9
10
11
12
13
# 因为要用到的展示图片较多,所以我们写了一个旨在下载图片的函数
def download_image(url):
response = requests.get(url)
return Image.open(BytesIO(response.content)).convert("RGB")

# Inpainting需要用到的图片
img_url = "https://raw.githubusercontent.com/CompVis/latent-diffusion/main/data/inpainting_examples/overture-creations-5sI6fQgYIuo.png"
mask_url = "https://raw.githubusercontent.com/CompVis/latent-diffusion/main/data/inpainting_examples/overture-creations-5sI6fQgYIuo_mask.png"

init_image = download_image(img_url).resize((512, 512))
mask_image = download_image(mask_url).resize((512, 512))

device = ("mps"if torch.backends.mps.is_available()else"cuda"if torch.cuda.is_available()else"cpu")

4.2 体验Stable Diffusion :从文本生成图像

通过一个封装好的管线,使用Stable Diffusion完成文生图:

1
2
3
4
5
# 载入管线
model_id = "stabilityai/stable-diffusion-2-1-base"
pipe =StableDiffusionPipeline.from_pretrained(model_id).to(device)
# 也可以载入FP16精度版本
# pipe = StableDiffusionPipeline.from_pretrained(model_id, revision="fp16", torch_dtype=torch.float16).to(device)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
pipe.enable_attention_slicing()# 开启注意力切分功能,旨在通过降低速度来减少GPU显存的使用

generator = torch.Generator(device=device).manual_seed(840)

pipe_output = pipe(
prompt="Palette knife painting of an autumn cityscape",# 提示文字:哪些要生成
negative_prompt="Oversaturated, blurry, low quality",# 提示文字:哪些不要生成
height=480, width=640, # 定义所生成图片的尺寸
guidance_scale=8, # 提示文字的影响程度
num_inference_steps=35, # 定义一次生成需要多少个推理步骤
generator=generator # 设定随机种子的生成器
)

# 查看生成图片
pipe_output.images[0]

4.3 解析StableDiffusionPipeline的各个子模块

StableDiffusionPipeline 通过多个步骤将文本描述转化为图像,如下图所示。

首先,文本分词器 (tokenizer) 将用户提供的文本描述拆分为可以处理的“token”,以便后续处理。接着,文本编码器 (text_encoder) 将这些 token 转换为高维的语义向量,捕捉文本的含义。随后,U-Net 网络 (unet) 接收文本编码后的向量和初始的随机噪声图像,通过去噪的迭代过程生成潜在空间的表示。这个潜在表示会传递给变分自编码器 (VAE),通过解码器将其转换回潜在空间的图像。最后,调度器 (scheduler) 控制扩散过程中的噪声去除,确保图像在多个迭代步骤中逐步清晰化,最终生成符合描述的高质量图像。

对了,在 StableDiffusionPipeline 中,隐变量并不需要作为显式输入提供给模型,它们是在生成过程中由模型内部自动生成和使用的,比如 (1, 4, 64, 64)就是一个低分辨率的隐变量。

4.3.1 可变分自编码器

可变分自编码器(VAE)是一种生成模型,它通过学习数据的潜在分布来生成新样本。VAE 由编码器和解码器组成,编码器将输入数据映射到潜在空间的概率分布,解码器则从潜在空间中重构出与输入相似的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
# 创建取值区间为(-1, 1)的伪数据
images = torch.rand(1, 3, 512, 512).to(device) * 2 - 1
print("Input images shape:", images.shape)

# 编码到隐空间
with torch.no_grad():
latents = 0.18215 *pipe.vae.encode(images).latent_dist.mean
print("Encoded latents shape:", latents.shape)

# 再解码回来
with torch.no_grad():
decoded_images = pipe.vae.decode(latents / 0.18215).sample
print("Decoded images shape:", decoded_images.shape)

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 创建取值区间为(-1, 1)的伪数据
images = torch.rand(1, 3, 512, 512).to(device) * 2 - 1
print("Input images shape:", images.shape)

# 编码到隐空间
with torch.no_grad():
latents = 0.18215 *pipe.vae.encode(images).latent_dist.mean
print("Encoded latents shape:", latents.shape)

# 再解码回来
with torch.no_grad():
decoded_images = pipe.vae.decode(latents / 0.18215).sample
print("Decoded images shape:", decoded_images.shape)

4.3.2 分词器和文本编码器

将用户输入的提示词进行分词

1
2
3
4
5
6
7
# 手动对提示文字进行分词和编码

# 分词
input_ids = pipe.tokenizer(["A painting of a flooble"])['input_ids']
print("Input ID -> decoded token")
for input_id in input_ids[0]:
print(f"{input_id} -> {pipe.tokenizer.decode(input_id)}")

输出:

1
2
3
4
5
6
7
# 手动对提示文字进行分词和编码

# 分词
input_ids = pipe.tokenizer(["A painting of a flooble"])['input_ids']
print("Input ID -> decoded token")
for input_id in input_ids[0]:
print(f"{input_id} -> {pipe.tokenizer.decode(input_id)}")
1
2
3
4
5
# 将分词结果输入
input_ids = torch.tensor(input_ids).to(device)
with torch.no_grad():
text_embeddings = pipe.text_encoder(input_ids)['last_hidden_state']
print("Text embeddings shape:", text_embeddings.shape)

输出:

1
2
3
4
5
# 将分词结果输入
input_ids = torch.tensor(input_ids).to(device)
with torch.no_grad():
text_embeddings = pipe.text_encoder(input_ids)['last_hidden_state']
print("Text embeddings shape:", text_embeddings.shape)

4.3.3 UNet

1
2
3
4
5
6
7
8
9
# 创建伪输入
timestep = pipe.scheduler.timesteps[0]
latents = torch.randn(1, 4, 64, 64).to(device)
text_embeddings = torch.randn(1, 77, 1024).to(device)

# 让模型进行预测
with torch.no_grad():
unet_output = pipe.unet(latents, timestep,text_embeddings).sample
print('UNet output shape:', unet_output.shape)# [1, 4, 64, 64]

4.3.4 调度器

1
2
3
4
plt.plot(pipe.scheduler.alphas_cumprod,label=r'$\bar{\alpha}$')
plt.xlabel('Timestep (high noise to low noise ->)')
plt.title('Noise schedule')
plt.legend()

1
2
3
4
5
6
7
from diffusers import LMSDiscreteScheduler

# 替换原来的调度器
pipe.scheduler =LMSDiscreteScheduler.from_config(pipe.scheduler.config)# 输出配置参数
print('Scheduler config:', pipe.scheduler)
# 使用新的调度器生成图片
pipe(prompt="Palette knife painting of an winter cityscape", height=480, width=480,generator=torch.Generator(device=device). manual_seed(42)).images[0]

4.3.5 DIY采样循环

1
2
3
4
guidance_scale = 8
num_inference_steps=30
prompt = "Beautiful picture of a wave breaking"
negative_prompt = "zoomed in, blurry, oversaturated, warped"
1
2
3
4
5
6
7
8
9
# 对提示文字进行编码
text_embeddings = pipe._encode_prompt(prompt, device, 1, True, negative_prompt)

# 创建随机噪声作为起点
latents = torch.randn((1, 4, 64, 64), device=device,generator=generator)
latents *= pipe.scheduler.init_noise_sigma

# 准备调度器
pipe.scheduler.set_timesteps(num_inference_steps,device=device)
1
2
3
4
5
6
7
8
9
10
# 生成过程开始
for i, t in enumerate(pipe.scheduler.timesteps):
latent_model_input = torch.cat([latents] * 2)
latent_model_input = pipe.scheduler.scale_model_input(latent_model_input, t)

with torch.no_grad():
noise_pred = pipe.unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)# classifer- redd guidence
latents = pipe.scheduler.step(noise_pred, t,latents).prev_sample
1
2
3
4
# 将隐变量映射到图片
with torch.no_grad():
image = pipe.decode_latents(latents.detach())
pipe.numpy_to_pil(image)[0]

4.4 其它管线介绍

4.4.1 Img2Img

1
2
3
# 载入Img2Img管线
model_id ="stabilityai/stable-diffusion-2-1-base"
img2img_pipe = StableDiffusionImg2ImgPipeline.from_pretrained(model_id).to(device)
1
2
3
4
5
result_image = img2img_pipe(
prompt="An oil painting of a man on a bench",
image = init_image, # 输入待编辑图片 s
trength = 0.5, # 文本提示在设为0时完全不起作用,设为1时作用强度最大
).images[0]
1
2
3
4
# 显示结果
fig, axs = plt.subplots(1, 2, figsize=(12, 5))
axs[0].imshow(init_image);axs[0].set_title('Input Image')
axs[1].imshow(result_image);axs[1].set_title('Result')

4.4.2 Inpainting

1
2
3
4
pipe =StableDiffusionInpaintPipeline.from_pretrained("runwayml/stable-diffusion-inpainting")
pipe = pipe.to(device)# 添加提示文字,用于让模型知道补全图像时使用什么内容
prompt ="A small robot, high resolution, sitting on a parkbench"
image = pipe(prompt=prompt, image=init_image, mask_image=mask_image).images[0]
1
2
3
4
5
# 查看结果
fig, axs = plt.subplots(1, 3, figsize=(16, 5))
axs[0].imshow(init_image);axs[0].set_title('Input Image')
axs[1].imshow(mask_image);axs[1].set_title('Mask')
axs[2].imshow(image);axs[2].set_title('Result')

4.4.3 Depth2Image

1
2
3
# 载入Depth2Img管线
pipe = StableDiffusionDepth2ImgPipeline.from_pretrained("stabilityai/stable-diffusion-2-depth")
pipe = pipe.to(device)
1
2
3
# 使用提示文字进行图像补全
prompt ="An oil painting of a man on a bench"
image = pipe(prompt=prompt, image=init_image).images[0]
1
2
3
4
# 查看结果
fig, axs = plt.subplots(1, 2, figsize=(16, 5))
axs[0].imshow(init_image);axs[0].set_title('Input Image')
axs[1].imshow(image);axs[1].set_title('Result')

五.DDIM 反转

5.1 数学原理

DDIM反转的概念有些难以理解,因此,在介绍它之前,首先介绍DDPM的正向过程和反向过程,然后再介绍DDIM。

5.1.1 DDPM

在上面的章节中,你已经使用过DDPM:

1
2
from diffusers import DDPMScheduler
noise_scheduler = DDPMScheduler(num_train_timesteps=1000, beta_start=0.001, beta_end=0.004)

这里来剖析一下其内部的数学原理。

在 DDPM (Denoising Diffusion Probabilistic Models) 中,正向过程(扩散过程)和反向过程(去噪过程)是核心组成部分。正向过程通过逐步添加噪声将数据转化为噪声,反向过程则是通过逐步去噪来恢复原始数据。

1) 正向扩散过程 (Forward Diffusion Process)

假设我们有一个真实的样本$x_0$(通常是图像)。正向扩散过程是通过多步逐渐将噪声加入到原始数据中,直到最终得到一个纯噪声$x_T$。在每个时间步$t$,数据的噪声是通过以下的分布加入的:

其中:

  • $\alpha_t$是一个随时间变化的超参数,控制每一步添加噪声的量。
  • $x_t$是在时间步$t$生成的噪声图像,$x_0$是原始图像。

将上述分布转换成数学等式,即“均值+标准差*高斯噪声”,如下:

根据此公式,可以进一步推导出$x_t$和$x_0$之间的关系:

改成数学等式的形式,为:

由此可见,$x_t$可以由$x_0$一步得到!

这对于前向加噪操作是非常方便的。回想一下,在之前的章节训练时,会将随机生成的噪声施加到输入图片$x_0$上,然后随机采样一个时间步$t$,那里之所以可以随机采样时间步,就是因为$x_t$可以由$x_0$一步得到。

  1. 反向去噪过程(Reverse Diffusion Process)

这里引用知乎上一篇文章的内容(https://zhuanlan.zhihu.com/p/666552214),里面对于DDPM的整个公式推到讲解的很清楚。



可以看到,方差是一个常数。

综上所述,反向去噪过程的公式可以表示为:

改写成数学等式的形式,就得到了DDPM反向去噪过程的数学公式了:

在许多实现中,去噪模型仅关注去除噪声部分,即主要输出的是去噪后的均值,而方差作为常数项并没有显著影响反向过程的表示,因此上述DDPM的反向去噪过程可以省略方差。

5.1.2 DDIM

DDIM(Denoising Diffusion Implicit Models)是一种对传统 Denoising Diffusion Probabilistic Models(DDPM)进行改进的方法。它通过修改扩散模型的反向过程,使得可以在更少的时间步内生成样本,同时保持较高的样本质量。DDIM 是一个隐式模型,它不依赖于明确的概率分布来生成样本,而是通过优化生成的过程来加速生成。


现将DDPM和DDIM的对比总结如下:

5.2 实战:反转

5.2.1 配置

1
2
3
4
5
6
7
8
9
10
11
12
!pip install -q transformers diffusers accelerate

import torch
import requests
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image
from io import BytesIO
from tqdm.auto import tqdm
from matplotlib import pyplot as plt
from torchvision import transforms as tfms
from diffusers import StableDiffusionPipeline, DDIMScheduler
1
2
3
4
5
6
7
8
9
# 定义接下来将要用到的函数
def load_image(url, size=None):
response = requests.get(url,timeout=0.2)
img = Image.open(BytesIO(response.content)).convert('RGB')
if size is not None:
img = img.resize(size)
return img

device = torch.device("cuda"if torch.cuda.is_available() else"cpu")

5.2.2 载入一个预训练过的StableDiffusion管线

1
2
# 载入一个管线
pipe =StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5").to(device)
1
2
# 配置DDIM调度器
pipe.scheduler =DDIMScheduler.from_config(pipe.scheduler.config)
1
2
3
4
5
# 从中采样一次,以保证代码运行正常
prompt = 'Beautiful DSLR Photograph of a cute cat on the beach, golden hour'
negative_prompt = 'blurry, ugly, stock photo'
im = pipe(prompt, negative_prompt=negative_prompt).images[0]
im.resize((256, 256)) # 调整至有利于查看的尺寸

5.2.3 DDIM采样

1
2
3
4
5
6
# 绘制'alpha','alpha'(即α)在DDPM论文中被称为'alpha bar'(即α)。
# 为了能够清晰地表现出来,我们选择使用Diffusers中的alphas_cumprod函数来得到alphas
timesteps = pipe.scheduler.timesteps.cpu()
alphas = pipe.scheduler.alphas_cumprod[timesteps]
plt.plot(timesteps, alphas, label='alpha_t')
plt.legend()

在采样过程中,我们选择从时间步1000的纯噪声开始,慢慢向着时间步0前进,遵循公式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 采样函数(标准的DDIM采样)
@torch.no_grad()
def sample(prompt, start_step=0, start_latents=None, guidance_scale=3.5, num_inference_steps=30, num_images_per_prompt=1, do_classifier_free_guidance=True, negative_prompt='', device=device):
# 对文本提示语进行编码
text_embeddings = pipe._encode_prompt(
prompt, device, num_images_per_prompt,
do_classifier_free_guidance, negative_prompt
)
# 配置推理的步数
pipe.scheduler.set_timesteps(num_inference_steps,device=device)

# 如果没有起点,就创建一个随机的起点
if start_latents is None:
start_latents = torch.randn(1, 4, 64, 64, device=device)
start_latents *= pipe.scheduler.init_noise_sigma

latents = start_latents.clone()

for i in tqdm(range(start_step, num_inference_steps)):
t = pipe.scheduler.timesteps[i]

# 如果正在进行CFG,则对隐层进行扩展
latent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latents
latent_model_input =pipe.scheduler.scale_model_input(latent_model_input, t)

# 预测残留的噪声
noise_pred = pipe.unet(latent_model_input, t,encoder_hidden_states=text_embeddings).sample

# 进行引导
if do_classifier_free_guidance:
noise_pred_uncond, noise_pred_text =noise_pred.chunk(2)
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

# 使用调度器更新步骤
# latents = pipe.scheduler.step(noise_pred, t,latents).prev_sample
# 现在不用调度器,而是自行实现,这里假设方差为0
prev_t = max(1, t.item() - (1000//num_inference_steps))# 从当前时间步t推算要预测的时间步,比如t-1
alpha_t = pipe.scheduler.alphas_cumprod[t.item()]
alpha_t_prev = pipe.scheduler.alphas_cumprod[prev_t]
# DDIM论文中公式的实现:sample x_{t-1}/x_{prev} from x_t
predicted_x0 = (latents - (1-alpha_t).sqrt()*noise_pred ) / alpha_t.sqrt()
direction_pointing_to_xt = (1-alpha_t_prev).sqrt()*noise_pred
latents = alpha_t_prev.sqrt()*predicted_x0 + direction_pointing_to_xt

# 后处理
images = pipe.decode_latents(latents)
images = pipe.numpy_to_pil(images)

return images
1
sample('Watercolor painting of a beach sunset', num_inference_steps=50)[0].resize((256,256))

5.2.4 反转

首先准备一张图片,将其用VAE编码成隐向量,稍后会对其进行反转操作

1
2
3
4
5
6
7
8
input_image = load_image('https://images.pexels.com/photos/8306128/pexels-photo-8306128.jpeg', size=(512, 512))
input_image_prompt ="Photograph of a puppy on the grass"

# 使用VAE进行编码
with torch.no_grad():
latent = pipe.vae.encode(tfms.functional.to_tensor(input_image).unsqueeze(0).to(device)*2-1)

cur_latent = 0.18215 * latent.latent_dist.sample()


$\sigma_t$设置为0,公式改写成由t-1到t的形式,使得生成图像朝着越来越高的噪声方向移动:

上述公式就是DDIM反转所用到的公式,现在来代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
## 反转
@torch.no_grad()
def invert(start_latents, prompt, guidance_scale=3.5, num_inference_steps=80,
num_images_per_prompt=1,do_classifier_free_guidance=True,negative_prompt='',device=device):

# 对提示文本进行编码
text_embeddings = pipe._encode_prompt(prompt, device, num_images_per_prompt, do_classifier_free_guidance, negative_prompt)

# 已经指定好起点
latents = start_latents.clone()

# 用一个列表保存反转的隐层
intermediate_latents = []

# 配置推理的步数
pipe.scheduler.set_timesteps(num_inference_steps,device=device)

# 反转的时间步
timesteps = reversed(pipe.scheduler.timesteps)

for i in tqdm(range(1, num_inference_steps), total=num_inference_steps-1):
# 跳过最后一次迭代
if i >= num_inference_steps - 1:
continue
t = timesteps[i]

# 如果正在进行CFG,则对隐层进行扩展(
latent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latents
latent_model_input = pipe.scheduler.scale_model_input(latent_model_input, t)

# 预测残留的噪声
noise_pred = pipe.unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample

# 进行引导
if do_classifier_free_guidance:
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

current_t = max(0, t.item() - (1000//num_inference_steps))#t
next_t = t # min(999, t.item() + (1000//num_inference_steps)) #t+1
alpha_t = pipe.scheduler.alphas_cumprod[current_t]
alpha_t_next = pipe.scheduler.alphas_cumprod[next_t]

# 反转的更新步(重新排列更新步,利用xt-1(当前隐层)得到xt(新的隐层))
latents = (latents - (1-alpha_t).sqrt()*noise_pred)*(alpha_t_next.sqrt()/alpha_t.sqrt()) + (1-alpha_t_next).sqrt()*noise_pred

# 保存
intermediate_latents.append(latents)

return torch.cat(intermediate_latents)

现在,对正常图片的隐变量进行反转,得到噪声图的隐变量:```python
inverted_latents = invert(cur_latent,input_image_prompt,num_inference_steps=50)
inverted_latents.shape# [48, 4, 64, 64]

1
2
3
4
5
6
对隐变量进行解码,恢复到相应的图片:
```python
# 解码反转的最后一个隐层
with torch.no_grad():
im = pipe.decode_latents(inverted_latents[-1].unsqueeze(0))
pipe.numpy_to_pil(im)[0]

对每个隐变量都进行解码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 假设 inverted_latents 是一个包含多个 latent 的列表
images = []
for latent in inverted_latents:
with torch.no_grad():
im = pipe.decode_latents(latent.unsqueeze(0))[0]
images.append(im)

# 创建一个绘图
fig, axs = plt.subplots(1, len(images), figsize=(15, 5))

for ax, img in zip(axs, images):
ax.imshow(img)
ax.axis('off') # 不显示坐标轴

plt.show()

使用最后一层的隐变量,执行反向去噪:

1
2
# 将上面的得到的最终的噪声进行去噪
pipe(input_image_prompt, latents=inverted_latents[-1][None],num_inference_steps=50, guidance_scale=3.5).images[0]

我们遇到一个问题:这不是最初使用的那张图片。这是因为DDIM反转需要一个重要的假设——在时刻t预测的噪声与在时刻t +1预测的噪声相同,但这个假设在反转50步或100步时是不成立的。

1
2
3
# 设置起点的原因
start_step=10
sample(input_image_prompt, start_latents=inverted_latents[-(start_step+1)][None], start_step=start_step, num_inference_steps=50)[0]

1
2
3
4
# 使用新的文本提示语进行采样
start_step=10
new_prompt = input_image_prompt.replace('puppy', 'cat')
sample(new_prompt, start_latents=inverted_latents[-(start_step+1)][None], start_step=start_step, num_inference_steps=50)[0]

5.3 组合封装

将采样和反转函数组合起来,就得到了图像编辑函数 edit

1
2
3
4
5
6
7
def edit(input_image, input_image_prompt, edit_prompt,num_steps=100, start_step=30,guidance_scale=3.5):
with torch.no_grad():
latent =pipe.vae.encode(tfms.functional.to_tensor(input_image).unsqueeze(0).to(device)*2-1)
l = 0.18215 * latent.latent_dist.sample()
inverted_latents = invert(l,input_image_prompt,num_inference_steps=num_steps)
final_im = sample(edit_prompt,start_latents=inverted_latents[-(start_step+1)][None],start_step=start_step,num_inference_steps=num_steps,guidance_scale=guidance_scale)[0]
return final_im
1
2
# 实际操作
edit(input_image, 'A puppy on the grass', 'an old grey dog on the grass', num_steps=50,start_step=10)