Spaces:
Running
on
Zero
Running
on
Zero
| """ PyTorch NLLB CLIP model.""" | |
| import math | |
| from dataclasses import dataclass | |
| from typing import Any, Optional, Tuple, Union | |
| import torch | |
| import torch.utils.checkpoint | |
| from configuration_nllb_clip import NLLBCLIPConfig, NLLBCLIPTextConfig | |
| from torch import nn | |
| from transformers import CLIPVisionConfig | |
| from transformers.activations import ACT2FN | |
| from transformers.integrations.deepspeed import is_deepspeed_zero3_enabled | |
| from transformers.modeling_outputs import BaseModelOutput, BaseModelOutputWithPooling | |
| from transformers.modeling_utils import PreTrainedModel | |
| from transformers.utils import ModelOutput, logging | |
| logger = logging.get_logger(__name__) | |
| # contrastive loss function, adapted from | |
| # https://sachinruk.github.io/blog/2021-03-07-clip.html | |
| def contrastive_loss(logits: torch.Tensor) -> torch.Tensor: | |
| return nn.functional.cross_entropy( | |
| logits, torch.arange(len(logits), device=logits.device) | |
| ) | |
| def clip_loss(similarity: torch.Tensor) -> torch.Tensor: | |
| caption_loss = contrastive_loss(similarity) | |
| image_loss = contrastive_loss(similarity.t()) | |
| return (caption_loss + image_loss) / 2.0 | |
| class CLIPVisionEmbeddings(nn.Module): | |
| def __init__(self, config: CLIPVisionConfig): | |
| super().__init__() | |
| self.config = config | |
| self.embed_dim = config.hidden_size | |
| self.image_size = config.image_size | |
| self.patch_size = config.patch_size | |
| self.class_embedding = nn.Parameter(torch.randn(self.embed_dim)) | |
| self.patch_embedding = nn.Conv2d( | |
| in_channels=config.num_channels, | |
| out_channels=self.embed_dim, | |
| kernel_size=self.patch_size, | |
| stride=self.patch_size, | |
| bias=False, | |
| ) | |
| self.num_patches = (self.image_size // self.patch_size) ** 2 | |
| self.num_positions = self.num_patches + 1 | |
| self.position_embedding = nn.Embedding(self.num_positions, self.embed_dim) | |
| self.register_buffer( | |
| "position_ids", | |
| torch.arange(self.num_positions).expand((1, -1)), | |
| persistent=False, | |
| ) | |
| def forward(self, pixel_values: torch.FloatTensor) -> torch.Tensor: | |
| batch_size = pixel_values.shape[0] | |
| target_dtype = self.patch_embedding.weight.dtype | |
| patch_embeds = self.patch_embedding( | |
| pixel_values.to(dtype=target_dtype) | |
| ) # shape = [*, width, grid, grid] | |
| patch_embeds = patch_embeds.flatten(2).transpose(1, 2) | |
| class_embeds = self.class_embedding.expand(batch_size, 1, -1) | |
| embeddings = torch.cat([class_embeds, patch_embeds], dim=1) | |
| embeddings = embeddings + self.position_embedding(self.position_ids) | |
| return embeddings | |
| class CLIPAttention(nn.Module): | |
| """Multi-headed attention from 'Attention Is All You Need' paper""" | |
| def __init__(self, config): | |
| super().__init__() | |
| self.config = config | |
| self.embed_dim = config.hidden_size | |
| self.num_heads = config.num_attention_heads | |
| self.head_dim = self.embed_dim // self.num_heads | |
| if self.head_dim * self.num_heads != self.embed_dim: | |
| raise ValueError( | |
| f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`:" | |
| f" {self.num_heads})." | |
| ) | |
| self.scale = self.head_dim**-0.5 | |
| self.dropout = config.attention_dropout | |
| self.k_proj = nn.Linear(self.embed_dim, self.embed_dim) | |
| self.v_proj = nn.Linear(self.embed_dim, self.embed_dim) | |
| self.q_proj = nn.Linear(self.embed_dim, self.embed_dim) | |
| self.out_proj = nn.Linear(self.embed_dim, self.embed_dim) | |
| def _shape(self, tensor: torch.Tensor, seq_len: int, bsz: int): | |
| return ( | |
| tensor.view(bsz, seq_len, self.num_heads, self.head_dim) | |
| .transpose(1, 2) | |
| .contiguous() | |
| ) | |
| def forward( | |
| self, | |
| hidden_states: torch.Tensor, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| causal_attention_mask: Optional[torch.Tensor] = None, | |
| output_attentions: Optional[bool] = False, | |
| ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]: | |
| """Input shape: Batch x Time x Channel""" | |
| bsz, tgt_len, embed_dim = hidden_states.size() | |
| # get query proj | |
| query_states = self.q_proj(hidden_states) * self.scale | |
| key_states = self._shape(self.k_proj(hidden_states), -1, bsz) | |
| value_states = self._shape(self.v_proj(hidden_states), -1, bsz) | |
| proj_shape = (bsz * self.num_heads, -1, self.head_dim) | |
| query_states = self._shape(query_states, tgt_len, bsz).view(*proj_shape) | |
| key_states = key_states.view(*proj_shape) | |
| value_states = value_states.view(*proj_shape) | |
| src_len = key_states.size(1) | |
| attn_weights = torch.bmm(query_states, key_states.transpose(1, 2)) | |
| if attn_weights.size() != (bsz * self.num_heads, tgt_len, src_len): | |
| raise ValueError( | |
| f"Attention weights should be of size {(bsz * self.num_heads, tgt_len, src_len)}, but is" | |
| f" {attn_weights.size()}" | |
| ) | |
| # apply the causal_attention_mask first | |
| if causal_attention_mask is not None: | |
| if causal_attention_mask.size() != (bsz, 1, tgt_len, src_len): | |
| raise ValueError( | |
| f"Attention mask should be of size {(bsz, 1, tgt_len, src_len)}, but is" | |
| f" {causal_attention_mask.size()}" | |
| ) | |
| attn_weights = ( | |
| attn_weights.view(bsz, self.num_heads, tgt_len, src_len) | |
| + causal_attention_mask | |
| ) | |
| attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len) | |
| if attention_mask is not None: | |
| if attention_mask.size() != (bsz, 1, tgt_len, src_len): | |
| raise ValueError( | |
| f"Attention mask should be of size {(bsz, 1, tgt_len, src_len)}, but is {attention_mask.size()}" | |
| ) | |
| attn_weights = ( | |
| attn_weights.view(bsz, self.num_heads, tgt_len, src_len) | |
| + attention_mask | |
| ) | |
| attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len) | |
| attn_weights = nn.functional.softmax(attn_weights, dim=-1) | |
| if output_attentions: | |
| # this operation is a bit akward, but it's required to | |
| # make sure that attn_weights keeps its gradient. | |
| # In order to do so, attn_weights have to reshaped | |
| # twice and have to be reused in the following | |
| attn_weights_reshaped = attn_weights.view( | |
| bsz, self.num_heads, tgt_len, src_len | |
| ) | |
| attn_weights = attn_weights_reshaped.view( | |
| bsz * self.num_heads, tgt_len, src_len | |
| ) | |
| else: | |
| attn_weights_reshaped = None | |
| attn_probs = nn.functional.dropout( | |
| attn_weights, p=self.dropout, training=self.training | |
| ) | |
| attn_output = torch.bmm(attn_probs, value_states) | |
| if attn_output.size() != (bsz * self.num_heads, tgt_len, self.head_dim): | |
| raise ValueError( | |
| f"`attn_output` should be of size {(bsz, self.num_heads, tgt_len, self.head_dim)}, but is" | |
| f" {attn_output.size()}" | |
| ) | |
| attn_output = attn_output.view(bsz, self.num_heads, tgt_len, self.head_dim) | |
| attn_output = attn_output.transpose(1, 2) | |
| attn_output = attn_output.reshape(bsz, tgt_len, embed_dim) | |
| attn_output = self.out_proj(attn_output) | |
| return attn_output, attn_weights_reshaped | |
| class CLIPMLP(nn.Module): | |
| def __init__(self, config): | |
| super().__init__() | |
| self.config = config | |
| self.activation_fn = ACT2FN[config.hidden_act] | |
| self.fc1 = nn.Linear(config.hidden_size, config.intermediate_size) | |
| self.fc2 = nn.Linear(config.intermediate_size, config.hidden_size) | |
| def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: | |
| hidden_states = self.fc1(hidden_states) | |
| hidden_states = self.activation_fn(hidden_states) | |
| hidden_states = self.fc2(hidden_states) | |
| return hidden_states | |
| class CLIPEncoderLayer(nn.Module): | |
| def __init__(self, config: NLLBCLIPConfig): | |
| super().__init__() | |
| self.embed_dim = config.hidden_size | |
| self.self_attn = CLIPAttention(config) | |
| self.layer_norm1 = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps) | |
| self.mlp = CLIPMLP(config) | |
| self.layer_norm2 = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps) | |
| def forward( | |
| self, | |
| hidden_states: torch.Tensor, | |
| attention_mask: torch.Tensor, | |
| causal_attention_mask: torch.Tensor, | |
| output_attentions: Optional[bool] = False, | |
| ) -> Tuple[torch.FloatTensor]: | |
| """ | |
| Args: | |
| hidden_states (`torch.FloatTensor`): input to the layer of shape `(batch, seq_len, embed_dim)` | |
| attention_mask (`torch.FloatTensor`): attention mask of size | |
| `(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values. | |
| `(config.encoder_attention_heads,)`. | |
| output_attentions (`bool`, *optional*): | |
| Whether or not to return the attentions tensors of all attention layers. See `attentions` under | |
| returned tensors for more detail. | |
| """ | |
| residual = hidden_states | |
| hidden_states = self.layer_norm1(hidden_states) | |
| hidden_states, attn_weights = self.self_attn( | |
| hidden_states=hidden_states, | |
| attention_mask=attention_mask, | |
| causal_attention_mask=causal_attention_mask, | |
| output_attentions=output_attentions, | |
| ) | |
| hidden_states = residual + hidden_states | |
| residual = hidden_states | |
| hidden_states = self.layer_norm2(hidden_states) | |
| hidden_states = self.mlp(hidden_states) | |
| hidden_states = residual + hidden_states | |
| outputs = (hidden_states,) | |
| if output_attentions: | |
| outputs += (attn_weights,) | |
| return outputs | |
| class CLIPEncoder(nn.Module): | |
| """ | |
| Transformer encoder consisting of `config.num_hidden_layers` self attention layers. Each layer is a | |
| [`CLIPEncoderLayer`]. | |
| Args: | |
| config: CLIPConfig | |
| """ | |
| def __init__(self, config: NLLBCLIPConfig): | |
| super().__init__() | |
| self.config = config | |
| self.layers = nn.ModuleList( | |
| [CLIPEncoderLayer(config) for _ in range(config.num_hidden_layers)] | |
| ) | |
| self.gradient_checkpointing = False | |
| def forward( | |
| self, | |
| inputs_embeds, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| causal_attention_mask: Optional[torch.Tensor] = None, | |
| output_attentions: Optional[bool] = None, | |
| output_hidden_states: Optional[bool] = None, | |
| return_dict: Optional[bool] = None, | |
| ) -> Union[Tuple, BaseModelOutput]: | |
| r""" | |
| Args: | |
| inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): | |
| Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. | |
| This is useful if you want more control over how to convert `input_ids` indices into associated vectors | |
| than the model's internal embedding lookup matrix. | |
| attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*): | |
| Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`: | |
| - 1 for tokens that are **not masked**, | |
| - 0 for tokens that are **masked**. | |
| [What are attention masks?](../glossary#attention-mask) | |
| causal_attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*): | |
| Causal mask for the text model. Mask values selected in `[0, 1]`: | |
| - 1 for tokens that are **not masked**, | |
| - 0 for tokens that are **masked**. | |
| [What are attention masks?](../glossary#attention-mask) | |
| output_attentions (`bool`, *optional*): | |
| Whether or not to return the attentions tensors of all attention layers. See `attentions` under | |
| returned tensors for more detail. | |
| output_hidden_states (`bool`, *optional*): | |
| Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors | |
| for more detail. | |
| return_dict (`bool`, *optional*): | |
| Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple. | |
| """ | |
| output_attentions = ( | |
| output_attentions | |
| if output_attentions is not None | |
| else self.config.output_attentions | |
| ) | |
| output_hidden_states = ( | |
| output_hidden_states | |
| if output_hidden_states is not None | |
| else self.config.output_hidden_states | |
| ) | |
| return_dict = ( | |
| return_dict if return_dict is not None else self.config.use_return_dict | |
| ) | |
| encoder_states = () if output_hidden_states else None | |
| all_attentions = () if output_attentions else None | |
| hidden_states = inputs_embeds | |
| for idx, encoder_layer in enumerate(self.layers): | |
| if output_hidden_states: | |
| encoder_states = encoder_states + (hidden_states,) | |
| if self.gradient_checkpointing and self.training: | |
| def create_custom_forward(module): | |
| def custom_forward(*inputs): | |
| return module(*inputs, output_attentions) | |
| return custom_forward | |
| layer_outputs = torch.utils.checkpoint.checkpoint( | |
| create_custom_forward(encoder_layer), | |
| hidden_states, | |
| attention_mask, | |
| causal_attention_mask, | |
| ) | |
| else: | |
| layer_outputs = encoder_layer( | |
| hidden_states, | |
| attention_mask, | |
| causal_attention_mask, | |
| output_attentions=output_attentions, | |
| ) | |
| hidden_states = layer_outputs[0] | |
| if output_attentions: | |
| all_attentions = all_attentions + (layer_outputs[1],) | |
| if output_hidden_states: | |
| encoder_states = encoder_states + (hidden_states,) | |
| if not return_dict: | |
| return tuple( | |
| v | |
| for v in [hidden_states, encoder_states, all_attentions] | |
| if v is not None | |
| ) | |
| return BaseModelOutput( | |
| last_hidden_state=hidden_states, | |
| hidden_states=encoder_states, | |
| attentions=all_attentions, | |
| ) | |
| class CLIPVisionTransformer(nn.Module): | |
| def __init__(self, config: CLIPVisionConfig): | |
| super().__init__() | |
| self.config = config | |
| embed_dim = config.hidden_size | |
| self.embeddings = CLIPVisionEmbeddings(config) | |
| self.pre_layrnorm = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps) | |
| self.encoder = CLIPEncoder(config) | |
| self.post_layernorm = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps) | |
| def forward( | |
| self, | |
| pixel_values: Optional[torch.FloatTensor] = None, | |
| output_attentions: Optional[bool] = None, | |
| output_hidden_states: Optional[bool] = None, | |
| return_dict: Optional[bool] = None, | |
| ) -> Union[Tuple, BaseModelOutputWithPooling]: | |
| r""" | |
| Returns: | |
| """ | |
| output_attentions = ( | |
| output_attentions | |
| if output_attentions is not None | |
| else self.config.output_attentions | |
| ) | |
| output_hidden_states = ( | |
| output_hidden_states | |
| if output_hidden_states is not None | |
| else self.config.output_hidden_states | |
| ) | |
| return_dict = ( | |
| return_dict if return_dict is not None else self.config.use_return_dict | |
| ) | |
| if pixel_values is None: | |
| raise ValueError("You have to specify pixel_values") | |
| hidden_states = self.embeddings(pixel_values) | |
| hidden_states = self.pre_layrnorm(hidden_states) | |
| encoder_outputs = self.encoder( | |
| inputs_embeds=hidden_states, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| ) | |
| last_hidden_state = encoder_outputs[0] | |
| pooled_output = last_hidden_state[:, 0, :] | |
| pooled_output = self.post_layernorm(pooled_output) | |
| if not return_dict: | |
| return (last_hidden_state, pooled_output) + encoder_outputs[1:] | |
| return BaseModelOutputWithPooling( | |
| last_hidden_state=last_hidden_state, | |
| pooler_output=pooled_output, | |
| hidden_states=encoder_outputs.hidden_states, | |
| attentions=encoder_outputs.attentions, | |
| ) | |
| class NLLBCLIPOutput(ModelOutput): | |
| """ | |
| Args: | |
| loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `return_loss` is `True`): | |
| Contrastive loss for image-text similarity. | |
| logits_per_image:(`torch.FloatTensor` of shape `(image_batch_size, text_batch_size)`): | |
| The scaled dot product scores between `image_embeds` and `text_embeds`. This represents the image-text | |
| similarity scores. | |
| logits_per_text:(`torch.FloatTensor` of shape `(text_batch_size, image_batch_size)`): | |
| The scaled dot product scores between `text_embeds` and `image_embeds`. This represents the text-image | |
| similarity scores. | |
| text_embeds(`torch.FloatTensor` of shape `(batch_size, output_dim`): | |
| The text embeddings obtained by applying the projection layer to the pooled output of [`CLIPTextModel`]. | |
| image_embeds(`torch.FloatTensor` of shape `(batch_size, output_dim`): | |
| The image embeddings obtained by applying the projection layer to the pooled output of [`CLIPVisionModel`]. | |
| text_model_output(`BaseModelOutputWithPooling`): | |
| The output of the [`CLIPTextModel`]. | |
| vision_model_output(`BaseModelOutputWithPooling`): | |
| The output of the [`CLIPVisionModel`]. | |
| """ | |
| loss: Optional[torch.FloatTensor] = None | |
| logits_per_image: torch.FloatTensor = None | |
| logits_per_text: torch.FloatTensor = None | |
| text_embeds: torch.FloatTensor = None | |
| image_embeds: torch.FloatTensor = None | |
| text_model_output: BaseModelOutputWithPooling = None | |
| vision_model_output: BaseModelOutputWithPooling = None | |
| def to_tuple(self) -> Tuple[Any]: | |
| return tuple( | |
| self[k] | |
| if k not in ["text_model_output", "vision_model_output"] | |
| else getattr(self, k).to_tuple() | |
| for k in self.keys() | |
| ) | |
| class M2M100Attention(nn.Module): | |
| """Multi-headed attention from 'Attention Is All You Need' paper""" | |
| def __init__( | |
| self, | |
| embed_dim: int, | |
| num_heads: int, | |
| dropout: float = 0.0, | |
| is_decoder: bool = False, | |
| bias: bool = True, | |
| ): | |
| super().__init__() | |
| self.embed_dim = embed_dim | |
| self.num_heads = num_heads | |
| self.dropout = dropout | |
| self.head_dim = embed_dim // num_heads | |
| if (self.head_dim * num_heads) != self.embed_dim: | |
| raise ValueError( | |
| f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim}" | |
| f" and `num_heads`: {num_heads})." | |
| ) | |
| self.scaling = self.head_dim**-0.5 | |
| self.is_decoder = is_decoder | |
| self.k_proj = nn.Linear(embed_dim, embed_dim, bias=bias) | |
| self.v_proj = nn.Linear(embed_dim, embed_dim, bias=bias) | |
| self.q_proj = nn.Linear(embed_dim, embed_dim, bias=bias) | |
| self.out_proj = nn.Linear(embed_dim, embed_dim, bias=bias) | |
| def _shape(self, tensor: torch.Tensor, seq_len: int, bsz: int): | |
| return ( | |
| tensor.view(bsz, seq_len, self.num_heads, self.head_dim) | |
| .transpose(1, 2) | |
| .contiguous() | |
| ) | |
| def forward( | |
| self, | |
| hidden_states: torch.Tensor, | |
| key_value_states: Optional[torch.Tensor] = None, | |
| past_key_value: Optional[Tuple[torch.Tensor]] = None, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| layer_head_mask: Optional[torch.Tensor] = None, | |
| output_attentions: bool = False, | |
| ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]: | |
| """Input shape: Batch x Time x Channel""" | |
| # if key_value_states are provided this layer is used as a cross-attention layer | |
| # for the decoder | |
| is_cross_attention = key_value_states is not None | |
| bsz, tgt_len, _ = hidden_states.size() | |
| # get query proj | |
| query_states = self.q_proj(hidden_states) * self.scaling | |
| # get key, value proj | |
| # `past_key_value[0].shape[2] == key_value_states.shape[1]` | |
| # is checking that the `sequence_length` of the `past_key_value` is the same as | |
| # the provided `key_value_states` to support prefix tuning | |
| if ( | |
| is_cross_attention | |
| and past_key_value is not None | |
| and past_key_value[0].shape[2] == key_value_states.shape[1] | |
| ): | |
| # reuse k,v, cross_attentions | |
| key_states = past_key_value[0] | |
| value_states = past_key_value[1] | |
| elif is_cross_attention: | |
| # cross_attentions | |
| key_states = self._shape(self.k_proj(key_value_states), -1, bsz) | |
| value_states = self._shape(self.v_proj(key_value_states), -1, bsz) | |
| elif past_key_value is not None: | |
| # reuse k, v, self_attention | |
| key_states = self._shape(self.k_proj(hidden_states), -1, bsz) | |
| value_states = self._shape(self.v_proj(hidden_states), -1, bsz) | |
| key_states = torch.cat([past_key_value[0], key_states], dim=2) | |
| value_states = torch.cat([past_key_value[1], value_states], dim=2) | |
| else: | |
| # self_attention | |
| key_states = self._shape(self.k_proj(hidden_states), -1, bsz) | |
| value_states = self._shape(self.v_proj(hidden_states), -1, bsz) | |
| if self.is_decoder: | |
| # if cross_attention save Tuple(torch.Tensor, torch.Tensor) of all cross attention key/value_states. | |
| # Further calls to cross_attention layer can then reuse all cross-attention | |
| # key/value_states (first "if" case) | |
| # if uni-directional self-attention (decoder) save Tuple(torch.Tensor, torch.Tensor) of | |
| # all previous decoder key/value_states. Further calls to uni-directional self-attention | |
| # can concat previous decoder key/value_states to current projected key/value_states (third "elif" case) | |
| # if encoder bi-directional self-attention `past_key_value` is always `None` | |
| past_key_value = (key_states, value_states) | |
| proj_shape = (bsz * self.num_heads, -1, self.head_dim) | |
| query_states = self._shape(query_states, tgt_len, bsz).view(*proj_shape) | |
| key_states = key_states.reshape(*proj_shape) | |
| value_states = value_states.reshape(*proj_shape) | |
| src_len = key_states.size(1) | |
| attn_weights = torch.bmm(query_states, key_states.transpose(1, 2)) | |
| if attn_weights.size() != (bsz * self.num_heads, tgt_len, src_len): | |
| raise ValueError( | |
| f"Attention weights should be of size {(bsz * self.num_heads, tgt_len, src_len)}, but is" | |
| f" {attn_weights.size()}" | |
| ) | |
| if attention_mask is not None: | |
| if attention_mask.size() != (bsz, 1, tgt_len, src_len): | |
| raise ValueError( | |
| f"Attention mask should be of size {(bsz, 1, tgt_len, src_len)}, but is {attention_mask.size()}" | |
| ) | |
| attn_weights = ( | |
| attn_weights.view(bsz, self.num_heads, tgt_len, src_len) | |
| + attention_mask | |
| ) | |
| attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len) | |
| attn_weights = nn.functional.softmax(attn_weights, dim=-1) | |
| if layer_head_mask is not None: | |
| if layer_head_mask.size() != (self.num_heads,): | |
| raise ValueError( | |
| f"Head mask for a single layer should be of size {(self.num_heads,)}, but is" | |
| f" {layer_head_mask.size()}" | |
| ) | |
| attn_weights = layer_head_mask.view(1, -1, 1, 1) * attn_weights.view( | |
| bsz, self.num_heads, tgt_len, src_len | |
| ) | |
| attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len) | |
| if output_attentions: | |
| # this operation is a bit awkward, but it's required to | |
| # make sure that attn_weights keeps its gradient. | |
| # In order to do so, attn_weights have to be reshaped | |
| # twice and have to be reused in the following | |
| attn_weights_reshaped = attn_weights.view( | |
| bsz, self.num_heads, tgt_len, src_len | |
| ) | |
| attn_weights = attn_weights_reshaped.view( | |
| bsz * self.num_heads, tgt_len, src_len | |
| ) | |
| else: | |
| attn_weights_reshaped = None | |
| attn_probs = nn.functional.dropout( | |
| attn_weights, p=self.dropout, training=self.training | |
| ) | |
| attn_output = torch.bmm(attn_probs, value_states) | |
| if attn_output.size() != (bsz * self.num_heads, tgt_len, self.head_dim): | |
| raise ValueError( | |
| f"`attn_output` should be of size {(bsz * self.num_heads, tgt_len, self.head_dim)}, but is" | |
| f" {attn_output.size()}" | |
| ) | |
| attn_output = attn_output.view(bsz, self.num_heads, tgt_len, self.head_dim) | |
| attn_output = attn_output.transpose(1, 2) | |
| # Use the `embed_dim` from the config (stored in the class) rather than `hidden_state` because `attn_output` can be | |
| # partitioned across GPUs when using tensor-parallelism. | |
| attn_output = attn_output.reshape(bsz, tgt_len, self.embed_dim) | |
| attn_output = self.out_proj(attn_output) | |
| return attn_output, attn_weights_reshaped, past_key_value | |
| # Copied from transformers.models.mbart.modeling_mbart.MBartEncoderLayer with MBart->M2M100 | |
| class M2M100EncoderLayer(nn.Module): | |
| def __init__(self, config: NLLBCLIPConfig): | |
| super().__init__() | |
| self.embed_dim = config.d_model | |
| self.self_attn = M2M100Attention( | |
| embed_dim=self.embed_dim, | |
| num_heads=config.encoder_attention_heads, | |
| dropout=config.attention_dropout, | |
| ) | |
| self.self_attn_layer_norm = nn.LayerNorm(self.embed_dim) | |
| self.dropout = config.dropout | |
| self.activation_fn = ACT2FN[config.activation_function] | |
| self.activation_dropout = config.activation_dropout | |
| self.fc1 = nn.Linear(self.embed_dim, config.encoder_ffn_dim) | |
| self.fc2 = nn.Linear(config.encoder_ffn_dim, self.embed_dim) | |
| self.final_layer_norm = nn.LayerNorm(self.embed_dim) | |
| def forward( | |
| self, | |
| hidden_states: torch.Tensor, | |
| attention_mask: torch.Tensor, | |
| layer_head_mask: torch.Tensor, | |
| output_attentions: bool = False, | |
| ) -> torch.Tensor: | |
| """ | |
| Args: | |
| hidden_states (`torch.FloatTensor`): input to the layer of shape `(batch, seq_len, embed_dim)` | |
| attention_mask (`torch.FloatTensor`): attention mask of size | |
| `(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values. | |
| layer_head_mask (`torch.FloatTensor`): mask for attention heads in a given layer of size | |
| `(encoder_attention_heads,)`. | |
| output_attentions (`bool`, *optional*): | |
| Whether or not to return the attentions tensors of all attention layers. See `attentions` under | |
| returned tensors for more detail. | |
| """ | |
| residual = hidden_states | |
| hidden_states = self.self_attn_layer_norm(hidden_states) | |
| hidden_states, attn_weights, _ = self.self_attn( | |
| hidden_states=hidden_states, | |
| attention_mask=attention_mask, | |
| layer_head_mask=layer_head_mask, | |
| output_attentions=output_attentions, | |
| ) | |
| hidden_states = nn.functional.dropout( | |
| hidden_states, p=self.dropout, training=self.training | |
| ) | |
| hidden_states = residual + hidden_states | |
| residual = hidden_states | |
| hidden_states = self.final_layer_norm(hidden_states) | |
| hidden_states = self.activation_fn(self.fc1(hidden_states)) | |
| hidden_states = nn.functional.dropout( | |
| hidden_states, p=self.activation_dropout, training=self.training | |
| ) | |
| hidden_states = self.fc2(hidden_states) | |
| hidden_states = nn.functional.dropout( | |
| hidden_states, p=self.dropout, training=self.training | |
| ) | |
| hidden_states = residual + hidden_states | |
| if hidden_states.dtype == torch.float16 and ( | |
| torch.isinf(hidden_states).any() or torch.isnan(hidden_states).any() | |
| ): | |
| clamp_value = torch.finfo(hidden_states.dtype).max - 1000 | |
| hidden_states = torch.clamp( | |
| hidden_states, min=-clamp_value, max=clamp_value | |
| ) | |
| outputs = (hidden_states,) | |
| if output_attentions: | |
| outputs += (attn_weights,) | |
| return outputs | |
| def _expand_mask(mask: torch.Tensor, dtype: torch.dtype, tgt_len: Optional[int] = None): | |
| """ | |
| Expands attention_mask from `[bsz, seq_len]` to `[bsz, 1, tgt_seq_len, src_seq_len]`. | |
| """ | |
| bsz, src_len = mask.size() | |
| tgt_len = tgt_len if tgt_len is not None else src_len | |
| expanded_mask = mask[:, None, None, :].expand(bsz, 1, tgt_len, src_len).to(dtype) | |
| inverted_mask = 1.0 - expanded_mask | |
| return inverted_mask.masked_fill( | |
| inverted_mask.to(torch.bool), torch.finfo(dtype).min | |
| ) | |
| def create_position_ids_from_input_ids( | |
| input_ids, padding_idx, past_key_values_length=0 | |
| ): | |
| """ | |
| Replace non-padding symbols with their position numbers. Position numbers begin at padding_idx+1. Padding symbols | |
| are ignored. This is modified from fairseq's `utils.make_positions`. | |
| """ | |
| # The series of casts and type-conversions here are carefully balanced to both work with ONNX export and XLA. | |
| mask = input_ids.ne(padding_idx).int() | |
| incremental_indices = ( | |
| torch.cumsum(mask, dim=1).type_as(mask) + past_key_values_length | |
| ) * mask | |
| return incremental_indices.long() + padding_idx | |
| class M2M100SinusoidalPositionalEmbedding(nn.Module): | |
| """This module produces sinusoidal positional embeddings of any length.""" | |
| def __init__( | |
| self, num_positions: int, embedding_dim: int, padding_idx: Optional[int] = None | |
| ): | |
| super().__init__() | |
| self.offset = 2 | |
| self.embedding_dim = embedding_dim | |
| self.padding_idx = padding_idx | |
| self.make_weights(num_positions + self.offset, embedding_dim, padding_idx) | |
| def make_weights( | |
| self, num_embeddings: int, embedding_dim: int, padding_idx: Optional[int] = None | |
| ): | |
| emb_weights = self.get_embedding(num_embeddings, embedding_dim, padding_idx) | |
| if hasattr(self, "weights"): | |
| # in forward put the weights on the correct dtype and device of the param | |
| emb_weights = emb_weights.to( | |
| dtype=self.weights.dtype, device=self.weights.device | |
| ) | |
| self.register_buffer("weights", emb_weights, persistent=False) | |
| def get_embedding( | |
| num_embeddings: int, embedding_dim: int, padding_idx: Optional[int] = None | |
| ): | |
| """ | |
| Build sinusoidal embeddings. | |
| This matches the implementation in tensor2tensor, but differs slightly from the description in Section 3.5 of | |
| "Attention Is All You Need". | |
| """ | |
| half_dim = embedding_dim // 2 | |
| emb = math.log(10000) / (half_dim - 1) | |
| emb = torch.exp(torch.arange(half_dim, dtype=torch.float) * -emb) | |
| emb = torch.arange(num_embeddings, dtype=torch.float).unsqueeze( | |
| 1 | |
| ) * emb.unsqueeze(0) | |
| emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1).view( | |
| num_embeddings, -1 | |
| ) | |
| if embedding_dim % 2 == 1: | |
| # zero pad | |
| emb = torch.cat([emb, torch.zeros(num_embeddings, 1)], dim=1) | |
| if padding_idx is not None: | |
| emb[padding_idx, :] = 0 | |
| return emb.to(torch.get_default_dtype()) | |
| def forward( | |
| self, | |
| input_ids: torch.Tensor = None, | |
| inputs_embeds: torch.Tensor = None, | |
| past_key_values_length: int = 0, | |
| ): | |
| if input_ids is not None: | |
| bsz, seq_len = input_ids.size() | |
| # Create the position ids from the input token ids. Any padded tokens remain padded. | |
| position_ids = create_position_ids_from_input_ids( | |
| input_ids, self.padding_idx, past_key_values_length | |
| ).to(input_ids.device) | |
| else: | |
| bsz, seq_len = inputs_embeds.size()[:-1] | |
| position_ids = self.create_position_ids_from_inputs_embeds( | |
| inputs_embeds, past_key_values_length | |
| ) | |
| # expand embeddings if needed | |
| max_pos = self.padding_idx + 1 + seq_len + past_key_values_length | |
| if max_pos > self.weights.size(0): | |
| self.make_weights( | |
| max_pos + self.offset, self.embedding_dim, self.padding_idx | |
| ) | |
| return ( | |
| self.weights.index_select(0, position_ids.view(-1)) | |
| .view(bsz, seq_len, self.weights.shape[-1]) | |
| .detach() | |
| ) | |
| def create_position_ids_from_inputs_embeds( | |
| self, inputs_embeds, past_key_values_length | |
| ): | |
| """ | |
| We are provided embeddings directly. We cannot infer which are padded so just generate sequential position ids. | |
| Args: | |
| inputs_embeds: torch.Tensor | |
| Returns: torch.Tensor | |
| """ | |
| input_shape = inputs_embeds.size()[:-1] | |
| sequence_length = input_shape[1] | |
| position_ids = torch.arange( | |
| self.padding_idx + 1, | |
| sequence_length + self.padding_idx + 1, | |
| dtype=torch.long, | |
| device=inputs_embeds.device, | |
| ) | |
| return ( | |
| position_ids.unsqueeze(0).expand(input_shape).contiguous() | |
| + past_key_values_length | |
| ) | |
| class M2M100Encoder(PreTrainedModel): | |
| """ | |
| Transformer encoder consisting of *config.encoder_layers* self attention layers. Each layer is a | |
| [`M2M100EncoderLayer`]. | |
| Args: | |
| config: M2M100Config | |
| embed_tokens (nn.Embedding): output embedding | |
| """ | |
| def __init__( | |
| self, config: NLLBCLIPConfig, embed_tokens: Optional[nn.Embedding] = None | |
| ): | |
| super().__init__(config) | |
| self.dropout = config.dropout | |
| self.layerdrop = config.encoder_layerdrop | |
| embed_dim = config.d_model | |
| self.padding_idx = config.pad_token_id | |
| self.max_source_positions = config.max_position_embeddings | |
| self.embed_scale = math.sqrt(embed_dim) if config.scale_embedding else 1.0 | |
| self.embed_tokens = nn.Embedding(config.vocab_size, embed_dim, self.padding_idx) | |
| if embed_tokens is not None: | |
| self.embed_tokens.weight = embed_tokens.weight | |
| self.embed_positions = M2M100SinusoidalPositionalEmbedding( | |
| config.max_position_embeddings, | |
| embed_dim, | |
| self.padding_idx, | |
| ) | |
| self.layers = nn.ModuleList( | |
| [M2M100EncoderLayer(config) for _ in range(config.encoder_layers)] | |
| ) | |
| self.layer_norm = nn.LayerNorm(config.d_model) | |
| self.gradient_checkpointing = False | |
| # Initialize weights and apply final processing | |
| self.post_init() | |
| def forward( | |
| self, | |
| input_ids: Optional[torch.Tensor] = None, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| head_mask: Optional[torch.Tensor] = None, | |
| inputs_embeds: Optional[torch.Tensor] = None, | |
| output_attentions: Optional[bool] = None, | |
| output_hidden_states: Optional[bool] = None, | |
| return_dict: Optional[bool] = None, | |
| ): | |
| r""" | |
| Args: | |
| input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`): | |
| Indices of input sequence tokens in the vocabulary. Padding will be ignored by default should you | |
| provide it. | |
| Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and | |
| [`PreTrainedTokenizer.__call__`] for details. | |
| [What are input IDs?](../glossary#input-ids) | |
| attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*): | |
| Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`: | |
| - 1 for tokens that are **not masked**, | |
| - 0 for tokens that are **masked**. | |
| [What are attention masks?](../glossary#attention-mask) | |
| head_mask (`torch.Tensor` of shape `(encoder_layers, encoder_attention_heads)`, *optional*): | |
| Mask to nullify selected heads of the attention modules. Mask values selected in `[0, 1]`: | |
| - 1 indicates the head is **not masked**, | |
| - 0 indicates the head is **masked**. | |
| inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): | |
| Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. | |
| This is useful if you want more control over how to convert `input_ids` indices into associated vectors | |
| than the model's internal embedding lookup matrix. | |
| output_attentions (`bool`, *optional*): | |
| Whether or not to return the attentions tensors of all attention layers. See `attentions` under | |
| returned tensors for more detail. | |
| output_hidden_states (`bool`, *optional*): | |
| Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors | |
| for more detail. | |
| return_dict (`bool`, *optional*): | |
| Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple. | |
| """ | |
| output_attentions = ( | |
| output_attentions | |
| if output_attentions is not None | |
| else self.config.output_attentions | |
| ) | |
| output_hidden_states = ( | |
| output_hidden_states | |
| if output_hidden_states is not None | |
| else self.config.output_hidden_states | |
| ) | |
| return_dict = ( | |
| return_dict if return_dict is not None else self.config.use_return_dict | |
| ) | |
| # retrieve input_ids and inputs_embeds | |
| if input_ids is not None and inputs_embeds is not None: | |
| raise ValueError( | |
| "You cannot specify both input_ids and inputs_embeds at the same time" | |
| ) | |
| elif input_ids is not None: | |
| self.warn_if_padding_and_no_attention_mask(input_ids, attention_mask) | |
| input_shape = input_ids.size() | |
| input_ids = input_ids.view(-1, input_shape[-1]) | |
| elif inputs_embeds is not None: | |
| input_shape = inputs_embeds.size()[:-1] | |
| else: | |
| raise ValueError("You have to specify either input_ids or inputs_embeds") | |
| if inputs_embeds is None: | |
| inputs_embeds = self.embed_tokens(input_ids) * self.embed_scale | |
| embed_pos = self.embed_positions(input_ids, inputs_embeds) | |
| embed_pos = embed_pos.to(inputs_embeds.device) | |
| hidden_states = inputs_embeds + embed_pos | |
| hidden_states = nn.functional.dropout( | |
| hidden_states, p=self.dropout, training=self.training | |
| ) | |
| # expand attention_mask | |
| if attention_mask is not None: | |
| # [bsz, seq_len] -> [bsz, 1, tgt_seq_len, src_seq_len] | |
| attention_mask = _expand_mask(attention_mask, inputs_embeds.dtype) | |
| encoder_states = () if output_hidden_states else None | |
| all_attentions = () if output_attentions else None | |
| # check if head_mask has a correct number of layers specified if desired | |
| if head_mask is not None: | |
| if head_mask.size()[0] != len(self.layers): | |
| raise ValueError( | |
| f"The head_mask should be specified for {len(self.layers)} layers, but it is for" | |
| f" {head_mask.size()[0]}." | |
| ) | |
| deepspeed_zero3_is_enabled = is_deepspeed_zero3_enabled() | |
| for idx, encoder_layer in enumerate(self.layers): | |
| if output_hidden_states: | |
| encoder_states = encoder_states + (hidden_states,) | |
| # add LayerDrop (see https://arxiv.org/abs/1909.11556 for description) | |
| dropout_probability = torch.rand([]) | |
| skip_the_layer = ( | |
| True | |
| if self.training and (dropout_probability < self.layerdrop) | |
| else False | |
| ) | |
| if not skip_the_layer or deepspeed_zero3_is_enabled: | |
| # under deepspeed zero3 all gpus must run in sync | |
| if self.gradient_checkpointing and self.training: | |
| # create gradient checkpointing function | |
| def create_custom_forward(module): | |
| def custom_forward(*inputs): | |
| return module(*inputs, output_attentions) | |
| return custom_forward | |
| layer_outputs = torch.utils.checkpoint.checkpoint( | |
| create_custom_forward(encoder_layer), | |
| hidden_states, | |
| attention_mask, | |
| (head_mask[idx] if head_mask is not None else None), | |
| ) | |
| else: | |
| layer_outputs = encoder_layer( | |
| hidden_states, | |
| attention_mask, | |
| layer_head_mask=( | |
| head_mask[idx] if head_mask is not None else None | |
| ), | |
| output_attentions=output_attentions, | |
| ) | |
| hidden_states = layer_outputs[0] | |
| if skip_the_layer: | |
| layer_outputs = (None, None) | |
| if output_attentions: | |
| all_attentions = all_attentions + (layer_outputs[1],) | |
| hidden_states = self.layer_norm(hidden_states) | |
| if output_hidden_states: | |
| encoder_states = encoder_states + (hidden_states,) | |
| if not return_dict: | |
| return tuple( | |
| v | |
| for v in [hidden_states, encoder_states, all_attentions] | |
| if v is not None | |
| ) | |
| return BaseModelOutput( | |
| last_hidden_state=hidden_states, | |
| hidden_states=encoder_states, | |
| attentions=all_attentions, | |
| ) | |
| class CLIPTextTransformer(nn.Module): | |
| def __init__(self, config: NLLBCLIPTextConfig): | |
| super().__init__() | |
| self.config = config | |
| embed_dim = config.hidden_size | |
| self.encoder = M2M100Encoder(config) | |
| self.final_layer_norm = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps) | |
| # For `pooled_output` computation | |
| self.eos_token_id = config.eos_token_id | |
| def forward( | |
| self, | |
| input_ids: Optional[torch.Tensor] = None, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| output_attentions: Optional[bool] = None, | |
| output_hidden_states: Optional[bool] = None, | |
| return_dict: Optional[bool] = None, | |
| ) -> Union[Tuple, BaseModelOutputWithPooling]: | |
| r""" | |
| Returns: | |
| """ | |
| output_attentions = ( | |
| output_attentions | |
| if output_attentions is not None | |
| else self.config.output_attentions | |
| ) | |
| output_hidden_states = ( | |
| output_hidden_states | |
| if output_hidden_states is not None | |
| else self.config.output_hidden_states | |
| ) | |
| return_dict = ( | |
| return_dict if return_dict is not None else self.config.use_return_dict | |
| ) | |
| if input_ids is None: | |
| raise ValueError("You have to specify input_ids") | |
| input_shape = input_ids.size() | |
| input_ids = input_ids.view(-1, input_shape[-1]) | |
| encoder_outputs = self.encoder( | |
| input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| ) | |
| last_hidden_state = encoder_outputs[0] | |
| last_hidden_state = self.final_layer_norm(last_hidden_state) | |
| pooled_output = last_hidden_state[ | |
| torch.arange(last_hidden_state.shape[0], device=last_hidden_state.device), | |
| 0, | |
| ] | |
| if not return_dict: | |
| return (last_hidden_state, pooled_output) + encoder_outputs[1:] | |
| return BaseModelOutputWithPooling( | |
| last_hidden_state=last_hidden_state, | |
| pooler_output=pooled_output, | |
| hidden_states=encoder_outputs.hidden_states, | |
| attentions=encoder_outputs.attentions, | |
| ) | |
| class NLLBCLIPModel(PreTrainedModel): | |
| config_class = NLLBCLIPConfig | |
| def __init__(self, config: NLLBCLIPConfig): | |
| super().__init__(config) | |
| if not isinstance(config.text_config, NLLBCLIPTextConfig): | |
| raise ValueError( | |
| "config.text_config is expected to be of type CLIPTextConfig but is of type" | |
| f" {type(config.text_config)}." | |
| ) | |
| if not isinstance(config.vision_config, CLIPVisionConfig): | |
| raise ValueError( | |
| "config.vision_config is expected to be of type CLIPVisionConfig but is of type" | |
| f" {type(config.vision_config)}." | |
| ) | |
| text_config = config.text_config | |
| vision_config = config.vision_config | |
| self.projection_dim = config.projection_dim | |
| self.text_embed_dim = text_config.hidden_size | |
| self.vision_embed_dim = vision_config.hidden_size | |
| self.text_model = CLIPTextTransformer(text_config) | |
| self.vision_model = CLIPVisionTransformer(vision_config) | |
| self.visual_projection = nn.Linear( | |
| self.vision_embed_dim, self.projection_dim, bias=False | |
| ) | |
| self.text_projection = nn.Linear( | |
| self.text_embed_dim, self.projection_dim, bias=False | |
| ) | |
| self.logit_scale = nn.Parameter( | |
| torch.tensor(self.config.logit_scale_init_value) | |
| ) | |
| # Initialize weights and apply final processing | |
| self.post_init() | |
| def get_text_features( | |
| self, | |
| input_ids: Optional[torch.Tensor] = None, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| position_ids: Optional[torch.Tensor] = None, | |
| output_attentions: Optional[bool] = None, | |
| output_hidden_states: Optional[bool] = None, | |
| return_dict: Optional[bool] = None, | |
| ) -> torch.FloatTensor: | |
| r""" | |
| Returns: | |
| text_features (`torch.FloatTensor` of shape `(batch_size, output_dim`): The text embeddings obtained by | |
| applying the projection layer to the pooled output of [`CLIPTextModel`]. | |
| Examples: | |
| ```python | |
| >>> from transformers import AutoTokenizer, CLIPModel | |
| >>> model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") | |
| >>> tokenizer = AutoTokenizer.from_pretrained("openai/clip-vit-base-patch32") | |
| >>> inputs = tokenizer(["a photo of a cat", "a photo of a dog"], padding=True, return_tensors="pt") | |
| >>> text_features = model.get_text_features(**inputs) | |
| ```""" | |
| # Use CLIP model's config for some fields (if specified) instead of those of vision & text components. | |
| output_attentions = ( | |
| output_attentions | |
| if output_attentions is not None | |
| else self.config.output_attentions | |
| ) | |
| output_hidden_states = ( | |
| output_hidden_states | |
| if output_hidden_states is not None | |
| else self.config.output_hidden_states | |
| ) | |
| return_dict = ( | |
| return_dict if return_dict is not None else self.config.use_return_dict | |
| ) | |
| text_outputs = self.text_model( | |
| input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| position_ids=position_ids, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| ) | |
| pooled_output = text_outputs[1] | |
| text_features = self.text_projection(pooled_output) | |
| return text_features | |
| def get_image_features( | |
| self, | |
| pixel_values: Optional[torch.FloatTensor] = None, | |
| output_attentions: Optional[bool] = None, | |
| output_hidden_states: Optional[bool] = None, | |
| return_dict: Optional[bool] = None, | |
| ) -> torch.FloatTensor: | |
| r""" | |
| Returns: | |
| image_features (`torch.FloatTensor` of shape `(batch_size, output_dim`): The image embeddings obtained by | |
| applying the projection layer to the pooled output of [`CLIPVisionModel`]. | |
| Examples: | |
| ```python | |
| >>> from PIL import Image | |
| >>> import requests | |
| >>> from transformers import AutoProcessor, CLIPModel | |
| >>> model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") | |
| >>> processor = AutoProcessor.from_pretrained("openai/clip-vit-base-patch32") | |
| >>> url = "http://images.cocodataset.org/val2017/000000039769.jpg" | |
| >>> image = Image.open(requests.get(url, stream=True).raw) | |
| >>> inputs = processor(images=image, return_tensors="pt") | |
| >>> image_features = model.get_image_features(**inputs) | |
| ```""" | |
| # Use CLIP model's config for some fields (if specified) instead of those of vision & text components. | |
| output_attentions = ( | |
| output_attentions | |
| if output_attentions is not None | |
| else self.config.output_attentions | |
| ) | |
| output_hidden_states = ( | |
| output_hidden_states | |
| if output_hidden_states is not None | |
| else self.config.output_hidden_states | |
| ) | |
| return_dict = ( | |
| return_dict if return_dict is not None else self.config.use_return_dict | |
| ) | |
| vision_outputs = self.vision_model( | |
| pixel_values=pixel_values, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| ) | |
| pooled_output = vision_outputs[1] # pooled_output | |
| image_features = self.visual_projection(pooled_output) | |
| return image_features | |
| def forward( | |
| self, | |
| input_ids: Optional[torch.LongTensor] = None, | |
| pixel_values: Optional[torch.FloatTensor] = None, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| return_loss: Optional[bool] = None, | |
| output_attentions: Optional[bool] = None, | |
| output_hidden_states: Optional[bool] = None, | |
| return_dict: Optional[bool] = None, | |
| ) -> Union[Tuple, NLLBCLIPOutput]: | |
| r""" | |
| Returns: | |
| Examples: | |
| ```python | |
| >>> from PIL import Image | |
| >>> import requests | |
| >>> from transformers import AutoProcessor, CLIPModel | |
| >>> model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") | |
| >>> processor = AutoProcessor.from_pretrained("openai/clip-vit-base-patch32") | |
| >>> url = "http://images.cocodataset.org/val2017/000000039769.jpg" | |
| >>> image = Image.open(requests.get(url, stream=True).raw) | |
| >>> inputs = processor( | |
| ... text=["a photo of a cat", "a photo of a dog"], images=image, return_tensors="pt", padding=True | |
| ... ) | |
| >>> outputs = model(**inputs) | |
| >>> logits_per_image = outputs.logits_per_image # this is the image-text similarity score | |
| >>> probs = logits_per_image.softmax(dim=1) # we can take the softmax to get the label probabilities | |
| ```""" | |
| # Use CLIP model's config for some fields (if specified) instead of those of vision & text components. | |
| output_attentions = ( | |
| output_attentions | |
| if output_attentions is not None | |
| else self.config.output_attentions | |
| ) | |
| output_hidden_states = ( | |
| output_hidden_states | |
| if output_hidden_states is not None | |
| else self.config.output_hidden_states | |
| ) | |
| return_dict = ( | |
| return_dict if return_dict is not None else self.config.use_return_dict | |
| ) | |
| vision_outputs = self.vision_model( | |
| pixel_values=pixel_values, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| ) | |
| text_outputs = self.text_model( | |
| input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| ) | |
| image_embeds = vision_outputs[1] | |
| image_embeds = self.visual_projection(image_embeds) | |
| text_embeds = text_outputs[1] | |
| text_embeds = self.text_projection(text_embeds) | |
| # normalized features | |
| image_embeds = image_embeds / image_embeds.norm(p=2, dim=-1, keepdim=True) | |
| text_embeds = text_embeds / text_embeds.norm(p=2, dim=-1, keepdim=True) | |
| # cosine similarity as logits | |
| logit_scale = self.logit_scale.exp() | |
| logits_per_text = torch.matmul(text_embeds, image_embeds.t()) * logit_scale | |
| logits_per_image = logits_per_text.t() | |
| loss = None | |
| if return_loss: | |
| loss = clip_loss(logits_per_text) | |
| if not return_dict: | |
| output = ( | |
| logits_per_image, | |
| logits_per_text, | |
| text_embeds, | |
| image_embeds, | |
| text_outputs, | |
| vision_outputs, | |
| ) | |
| return ((loss,) + output) if loss is not None else output | |
| return NLLBCLIPOutput( | |
| loss=loss, | |
| logits_per_image=logits_per_image, | |
| logits_per_text=logits_per_text, | |
| text_embeds=text_embeds, | |
| image_embeds=image_embeds, | |
| text_model_output=text_outputs, | |
| vision_model_output=vision_outputs, | |
| ) | |