BEiT: BERT Pre-Training of Image Transformers Code
Contents
BEiT: BERT Pre-Training of Image Transformers Code#
1.modeling_discrete_vae.py#
https://github.com/microsoft/unilm/blob/master/beit/modeling_discrete_vae.py
# DallE discre VAE
from dall_e import load_model
class Dalle_VAE(BasicVAE):
def __init__(self, image_size):
super().__init__()
self.encoder = None
self.decoder = None
self.image_size = image_size
def load_model(self, model_dir, device):
self.encoder = load_model(os.path.join(model_dir, "encoder.pkl"), device)
self.decoder = load_model(os.path.join(model_dir, "decoder.pkl"), device)
def decode(self, img_seq):
bsz = img_seq.size()[0]
img_seq = img_seq.view(bsz, self.image_size // 8, self.image_size // 8)
z = F.one_hot(img_seq, num_classes=self.encoder.vocab_size).permute(0, 3, 1, 2).float()
return self.decoder(z).float()
def get_codebook_indices(self, images):
z_logits = self.encoder(images)
return torch.argmax(z_logits, axis=1)
def get_codebook_probs(self, images):
z_logits = self.encoder(images)
return nn.Softmax(dim=1)(z_logits)
def forward(self, img_seq_prob, no_process=False):
if no_process:
return self.decoder(img_seq_prob.float()).float()
else:
bsz, seq_len, num_class = img_seq_prob.size()
z = img_seq_prob.view(bsz, self.image_size // 8, self.image_size // 8, self.encoder.vocab_size)
return self.decoder(z.permute(0, 3, 1, 2).float()).float()
# Custom DiscreteVAE
class DiscreteVAE(BasicVAE):
def __init__(
self,
image_size = 256,
num_tokens = 512,
codebook_dim = 512,
num_layers = 3,
hidden_dim = 64,
channels = 3,
smooth_l1_loss = False,
temperature = 0.9,
straight_through = False,
kl_div_loss_weight = 0.
):
super().__init__()
# assert log2(image_size).is_integer(), 'image size must be a power of 2'
assert num_layers >= 1, 'number of layers must be greater than or equal to 1'
self.image_size = image_size
self.num_tokens = num_tokens
self.num_layers = num_layers
self.temperature = temperature
self.straight_through = straight_through
self.codebook = nn.Embedding(num_tokens, codebook_dim)
enc_layers = []
dec_layers = []
enc_in = channels
dec_in = codebook_dim
for layer_id in range(num_layers):
enc_layers.append(nn.Sequential(nn.Conv2d(enc_in, hidden_dim, 4, stride=2, padding=1), nn.ReLU()))
enc_layers.append(ResBlock(chan_in=hidden_dim, hidden_size=hidden_dim, chan_out=hidden_dim))
enc_in = hidden_dim
dec_layers.append(nn.Sequential(nn.ConvTranspose2d(dec_in, hidden_dim, 4, stride=2, padding=1), nn.ReLU()))
dec_layers.append(ResBlock(chan_in=hidden_dim, hidden_size=hidden_dim, chan_out=hidden_dim))
dec_in = hidden_dim
enc_layers.append(nn.Conv2d(hidden_dim, num_tokens, 1))
dec_layers.append(nn.Conv2d(hidden_dim, channels, 1))
self.encoder = nn.Sequential(*enc_layers)
self.decoder = nn.Sequential(*dec_layers)
self.loss_fn = F.smooth_l1_loss if smooth_l1_loss else F.mse_loss
self.kl_div_loss_weight = kl_div_loss_weight
def get_image_size(self):
return self.image_size
def get_image_tokens_size(self):
return self.image_size // 8
@torch.no_grad()
@eval_decorator
def get_codebook_indices(self, images):
logits = self.forward(images, return_logits = True)
codebook_indices = logits.argmax(dim = 1)
return codebook_indices
@torch.no_grad()
@eval_decorator
def get_codebook_probs(self, images):
logits = self.forward(images, return_logits = True)
return nn.Softmax(dim=1)(logits)
def decode(
self,
img_seq
):
image_embeds = self.codebook(img_seq)
b, n, d = image_embeds.shape
h = w = int(sqrt(n))
image_embeds = rearrange(image_embeds, 'b (h w) d -> b d h w', h = h, w = w)
images = self.decoder(image_embeds)
return images
def forward(
self,
img,
return_loss = False,
return_recons = False,
return_logits = False,
temp = None
):
device, num_tokens, image_size, kl_div_loss_weight = img.device, self.num_tokens, self.image_size, self.kl_div_loss_weight
assert img.shape[-1] == image_size and img.shape[-2] == image_size, f'input must have the correct image size {image_size}'
logits = self.encoder(img)
if return_logits:
return logits # return logits for getting hard image indices for DALL-E training
temp = default(temp, self.temperature)
soft_one_hot = F.gumbel_softmax(logits, tau = temp, dim = 1, hard = self.straight_through)
sampled = einsum('b n h w, n d -> b d h w', soft_one_hot, self.codebook.weight)
out = self.decoder(sampled)
if not return_loss:
return out
# reconstruction loss
recon_loss = self.loss_fn(img, out)
# kl divergence
logits = rearrange(logits, 'b n h w -> b (h w) n')
qy = F.softmax(logits, dim = -1)
log_qy = torch.log(qy + 1e-10)
log_uniform = torch.log(torch.tensor([1. / num_tokens], device = device))
kl_div = F.kl_div(log_uniform, log_qy, None, None, 'batchmean', log_target = True)
loss = recon_loss + (kl_div * kl_div_loss_weight)
if not return_recons:
return loss
return loss, out
2.masking_generator.py#
https://github.com/microsoft/unilm/blob/master/beit/masking_generator.py
class MaskingGenerator:
def __init__(
self, input_size, num_masking_patches, min_num_patches=4, max_num_patches=None,
min_aspect=0.3, max_aspect=None):
if not isinstance(input_size, tuple):
input_size = (input_size, ) * 2
self.height, self.width = input_size
self.num_patches = self.height * self.width
self.num_masking_patches = num_masking_patches
self.min_num_patches = min_num_patches
self.max_num_patches = num_masking_patches if max_num_patches is None else max_num_patches
max_aspect = max_aspect or 1 / min_aspect
self.log_aspect_ratio = (math.log(min_aspect), math.log(max_aspect))
def __repr__(self):
repr_str = "Generator(%d, %d -> [%d ~ %d], max = %d, %.3f ~ %.3f)" % (
self.height, self.width, self.min_num_patches, self.max_num_patches,
self.num_masking_patches, self.log_aspect_ratio[0], self.log_aspect_ratio[1])
return repr_str
def get_shape(self):
return self.height, self.width
def _mask(self, mask, max_mask_patches):
delta = 0
for attempt in range(10):
target_area = random.uniform(self.min_num_patches, max_mask_patches)
aspect_ratio = math.exp(random.uniform(*self.log_aspect_ratio))
h = int(round(math.sqrt(target_area * aspect_ratio)))
w = int(round(math.sqrt(target_area / aspect_ratio)))
if w < self.width and h < self.height:
top = random.randint(0, self.height - h)
left = random.randint(0, self.width - w)
num_masked = mask[top: top + h, left: left + w].sum()
# Overlap
if 0 < h * w - num_masked <= max_mask_patches:
for i in range(top, top + h):
for j in range(left, left + w):
if mask[i, j] == 0:
mask[i, j] = 1
delta += 1
if delta > 0:
break
return delta
def __call__(self):
mask = np.zeros(shape=self.get_shape(), dtype=np.int)
mask_count = 0
while mask_count < self.num_masking_patches:
max_mask_patches = self.num_masking_patches - mask_count
max_mask_patches = min(max_mask_patches, self.max_num_patches)
delta = self._mask(mask, max_mask_patches)
if delta == 0:
break
else:
mask_count += delta
return mask
3.modeling_pretrain.py#
https://github.com/microsoft/unilm/blob/master/beit/modeling_pretrain.py
class VisionTransformerForMaskedImageModeling(nn.Module):
def __init__(self, img_size=224, patch_size=16, in_chans=3, vocab_size=8192, embed_dim=768, depth=12,
num_heads=12, mlp_ratio=4., qkv_bias=True, qk_scale=None, drop_rate=0., attn_drop_rate=0.,
drop_path_rate=0., norm_layer=None, init_values=None, attn_head_dim=None,
use_abs_pos_emb=True, use_rel_pos_bias=False, use_shared_rel_pos_bias=False, init_std=0.02, **kwargs):
super().__init__()
self.num_features = self.embed_dim = embed_dim # num_features for consistency with other models
self.patch_embed = PatchEmbed(
img_size=img_size, patch_size=patch_size, in_chans=in_chans, embed_dim=embed_dim)
num_patches = self.patch_embed.num_patches
self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
self.mask_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
if use_abs_pos_emb:
self.pos_embed = nn.Parameter(torch.zeros(1, num_patches + 1, embed_dim))
else:
self.pos_embed = None
self.pos_drop = nn.Dropout(p=drop_rate)
# https://jeonsworld.github.io/NLP/rel_pe/
if use_shared_rel_pos_bias:
self.rel_pos_bias = RelativePositionBias(window_size=self.patch_embed.patch_shape, num_heads=num_heads)
else:
self.rel_pos_bias = None
dpr = [x.item() for x in torch.linspace(0, drop_path_rate, depth)] # stochastic depth decay rule
self.blocks = nn.ModuleList([
Block(
dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, qkv_bias=qkv_bias, qk_scale=qk_scale,
drop=drop_rate, attn_drop=attn_drop_rate, drop_path=dpr[i], norm_layer=norm_layer,
init_values=init_values, window_size=self.patch_embed.patch_shape if use_rel_pos_bias else None,
attn_head_dim=attn_head_dim,
)
for i in range(depth)])
self.norm = norm_layer(embed_dim)
self.init_std = init_std
self.lm_head = nn.Linear(embed_dim, vocab_size)
if self.pos_embed is not None:
trunc_normal_(self.pos_embed, std=self.init_std)
trunc_normal_(self.cls_token, std=self.init_std)
trunc_normal_(self.mask_token, std=self.init_std)
trunc_normal_(self.lm_head.weight, std=self.init_std)
self.apply(self._init_weights)
self.fix_init_weight()
def fix_init_weight(self):
def rescale(param, layer_id):
param.div_(math.sqrt(2.0 * layer_id))
for layer_id, layer in enumerate(self.blocks):
rescale(layer.attn.proj.weight.data, layer_id + 1)
rescale(layer.mlp.fc2.weight.data, layer_id + 1)
def _init_weights(self, m):
if isinstance(m, nn.Linear):
trunc_normal_(m.weight, std=self.init_std)
if isinstance(m, nn.Linear) and m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.LayerNorm):
nn.init.constant_(m.bias, 0)
nn.init.constant_(m.weight, 1.0)
elif isinstance(m, nn.Conv2d):
trunc_normal_(m.weight, std=self.init_std)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
@torch.jit.ignore
def no_weight_decay(self):
return {'pos_embed', 'cls_token'}
def get_num_layers(self):
return len(self.blocks)
def forward_features(self, x, bool_masked_pos):
x = self.patch_embed(x, bool_masked_pos=bool_masked_pos)
batch_size, seq_len, _ = x.size()
cls_tokens = self.cls_token.expand(batch_size, -1, -1) # stole cls_tokens impl from Phil Wang, thanks
mask_token = self.mask_token.expand(batch_size, seq_len, -1)
# replace the masked visual tokens by mask_token
w = bool_masked_pos.unsqueeze(-1).type_as(mask_token)
x = x * (1 - w) + mask_token * w
x = torch.cat((cls_tokens, x), dim=1)
if self.pos_embed is not None:
x = x + self.pos_embed
x = self.pos_drop(x)
rel_pos_bias = self.rel_pos_bias() if self.rel_pos_bias is not None else None
for blk in self.blocks:
x = blk(x, rel_pos_bias=rel_pos_bias)
return self.norm(x)
def forward(self, x, bool_masked_pos, return_all_tokens=False):
x = self.forward_features(x, bool_masked_pos=bool_masked_pos)
x = x[:, 1:]
if return_all_tokens:
return self.lm_head(x)
else:
# return the masked tokens
return self.lm_head(x[bool_masked_pos])
Author by 박민식
Edit by 김주영