Spaces:
Sleeping
Sleeping
| from transformers import AutoTokenizer, AutoModel, AutoImageProcessor | |
| from sentence_transformers import SentenceTransformer | |
| import torch | |
| import torch.nn.functional as F | |
| from PIL import Image | |
| import requests | |
| import os | |
| import json | |
| import math | |
| import re | |
| import pandas as pd | |
| import numpy as np | |
| from omeka_s_api_client import OmekaSClient,OmekaSClientError | |
| from typing import List, Dict, Any, Union | |
| import io | |
| from dotenv import load_dotenv | |
| # env var | |
| load_dotenv(os.path.join(os.getcwd(), ".env")) | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| # Nomic vison model | |
| processor = AutoImageProcessor.from_pretrained("nomic-ai/nomic-embed-vision-v1.5") | |
| vision_model = AutoModel.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", trust_remote_code=True) | |
| # Nomic text model | |
| text_model = SentenceTransformer("nomic-ai/nomic-embed-text-v1.5", trust_remote_code=True, token=HF_TOKEN) | |
| def image_url_to_pil(url: str, max_size=(512, 512)) -> Image: | |
| """ | |
| Ex usage : image_blobs = df["image_url"].apply(image_url_to_pil).tolist() | |
| """ | |
| response = requests.get(url, stream=True, timeout=5) | |
| response.raise_for_status() | |
| image = Image.open(io.BytesIO(response.content)).convert("RGB") | |
| image.thumbnail(max_size, Image.Resampling.LANCZOS) | |
| return image | |
| def generate_img_embed(images_urls, batch_size=20): | |
| """Generate image embeddings in batches to manage memory usage. | |
| Args: | |
| images_urls (list): List of image URLs | |
| batch_size (int): Number of images to process at once | |
| """ | |
| all_embeddings = [] | |
| for i in range(0, len(images_urls), batch_size): | |
| batch_urls = images_urls[i:i + batch_size] | |
| images = [image_url_to_pil(image_url) for image_url in batch_urls] | |
| inputs = processor(images, return_tensors="pt") | |
| img_emb = vision_model(**inputs).last_hidden_state | |
| img_embeddings = F.normalize(img_emb[:, 0], p=2, dim=1) | |
| all_embeddings.append(img_embeddings.detach().numpy()) | |
| return np.vstack(all_embeddings) | |
| def generate_text_embed(sentences: List, batch_size=64): | |
| """Generate text embeddings in batches to manage memory usage. | |
| Args: | |
| sentences (List): List of text strings to encode | |
| batch_size (int): Number of sentences to process at once | |
| """ | |
| all_embeddings = [] | |
| for i in range(0, len(sentences), batch_size): | |
| batch_sentences = sentences[i:i + batch_size] | |
| embeddings = text_model.encode(batch_sentences) | |
| all_embeddings.append(embeddings) | |
| return np.vstack(all_embeddings) | |
| def add_concatenated_text_field_exclude_keys(item_dict, keys_to_exclude=None, text_field_key="text", pair_separator=" - "): | |
| if not isinstance(item_dict, dict): | |
| raise TypeError("Input must be a dictionary.") | |
| if keys_to_exclude is None: | |
| keys_to_exclude = set() # Default to empty set | |
| else: | |
| keys_to_exclude = set(keys_to_exclude) # Ensure it's a set for efficient lookup | |
| # Add the target text key to the exclusion set automatically | |
| keys_to_exclude.add(text_field_key) | |
| formatted_pairs = [] | |
| for key, value in item_dict.items(): | |
| # 1. Skip any key in the exclusion set | |
| if key in keys_to_exclude: | |
| continue | |
| # 2. Check for empty/invalid values (same logic as before) | |
| is_empty_or_invalid = False | |
| if value is None: is_empty_or_invalid = True | |
| elif isinstance(value, float) and math.isnan(value): is_empty_or_invalid = True | |
| elif isinstance(value, (str, list, tuple, dict)) and len(value) == 0: is_empty_or_invalid = True | |
| # 3. Format and add if valid | |
| if not is_empty_or_invalid: | |
| formatted_pairs.append(f"{str(key)}: {str(value)}") | |
| concatenated_text = f"search_document: {pair_separator.join(formatted_pairs)}" | |
| item_dict[text_field_key] = concatenated_text | |
| return item_dict | |
| def prepare_df_atlas(df: pd.DataFrame, id_col='id', images_col='images_urls'): | |
| # Drop completely empty columns | |
| #df = df.dropna(axis=1, how='all') | |
| # Fill remaining nulls with empty strings | |
| #df = df.fillna('') | |
| # Ensure ID column exists | |
| if id_col not in df.columns: | |
| df[id_col] = [f'{i}' for i in range(len(df))] | |
| # Ensure indexed field exists and is not empty | |
| #if indexed_col not in df.columns: | |
| # df[indexed_col] = '' | |
| #df[images_col] = df[images_col].apply(lambda x: [x[0]] if isinstance(x, list) and len(x) > 1 else x if isinstance(x, list) else [x]) | |
| df[images_col] = df[images_col].apply(lambda x: x[0] if isinstance(x, list) else x) | |
| # Optional: force all to string (can help with weird dtypes) | |
| for col in df.columns: | |
| df[col] = df[col].astype(str) | |
| return df | |
| def remove_key_value_from_dict(list_of_dict, key_to_remove): | |
| new_list = [] | |
| for dictionary in list_of_dict: | |
| new_dict = dictionary.copy() # Create a copy to avoid modifying the original list | |
| if key_to_remove in new_dict: | |
| del new_dict[key_to_remove] | |
| new_list.append(new_dict) | |
| return new_list | |
| def remove_key_value_from_dict(input_dict, key_to_remove='text'): | |
| if not isinstance(input_dict, dict): | |
| raise TypeError("Input must be a dictionary.") | |
| if key_to_remove in input_dict: | |
| del input_dict[key_to_remove] | |
| return input_dict |