Spaces:
Build error
Build error
| import os | |
| from typing import Callable | |
| from clu import checkpoint | |
| from flax import linen as nn | |
| import gradio as gr | |
| from huggingface_hub import snapshot_download | |
| import jax | |
| import jax.numpy as jnp | |
| import numpy as np | |
| from PIL import Image | |
| from invariant_slot_attention.configs.clevr_with_masks.equiv_transl_scale_v2 import get_config | |
| from invariant_slot_attention.lib import utils | |
| def load_model(config, checkpoint_dir): | |
| rng = jax.random.PRNGKey(42) | |
| # Initialize model | |
| model = utils.build_model_from_config(config.model) | |
| def init_model(rng): | |
| rng, init_rng, model_rng, dropout_rng = jax.random.split(rng, num=4) | |
| init_conditioning = None | |
| init_inputs = jnp.ones([1, 1, 128, 128, 3], jnp.float32) | |
| initial_vars = model.init( | |
| {"params": model_rng, "state_init": init_rng, "dropout": dropout_rng}, | |
| video=init_inputs, conditioning=init_conditioning, | |
| padding_mask=jnp.ones(init_inputs.shape[:-1], jnp.int32)) | |
| # Split into state variables (e.g. for batchnorm stats) and model params. | |
| # Note that `pop()` on a FrozenDict performs a deep copy. | |
| state_vars, initial_params = initial_vars.pop("params") # pytype: disable=attribute-error | |
| # Filter out intermediates (we don't want to store these in the TrainState). | |
| state_vars = utils.filter_key_from_frozen_dict( | |
| state_vars, key="intermediates") | |
| return state_vars, initial_params | |
| state_vars, initial_params = init_model(rng) | |
| opt_state = None | |
| state = utils.TrainState( | |
| step=1, opt_state=opt_state, params=initial_params, rng=rng, | |
| variables=state_vars) | |
| ckpt = checkpoint.Checkpoint(checkpoint_dir) | |
| state = ckpt.restore(state, checkpoint=checkpoint_dir + "/ckpt-0") | |
| return model, state, rng | |
| def load_image(name): | |
| img = Image.open(f"images/{name}.png") | |
| img = img.crop((64, 29, 64 + 192, 29 + 192)) | |
| img = img.resize((128, 128)) | |
| img = np.array(img)[:, :, :3] / 255. | |
| img = jnp.array(img, dtype=jnp.float32) | |
| return img | |
| download_path = snapshot_download(repo_id="ondrejbiza/isa", allow_patterns="clevr_isa_ts_v2*") | |
| checkpoint_dir = os.path.join(download_path, "clevr_isa_ts_v2") | |
| model, state, rng = load_model(get_config(), checkpoint_dir) | |
| rng, init_rng = jax.random.split(rng, num=2) | |
| class DecoderWrapper(nn.Module): | |
| decoder: Callable[[], nn.Module] | |
| def __call__(self, slots, train=False): | |
| return self.decoder()(slots, train) | |
| decoder_model = DecoderWrapper(decoder=model.decoder) | |
| with gr.Blocks() as demo: | |
| local_slots = gr.State(np.zeros((11, 64), dtype=np.float32)) | |
| local_orig_pos = gr.State(np.zeros((11, 2), dtype=np.float32)) | |
| local_orig_scale = gr.State(np.zeros((11, 2), dtype=np.float32)) | |
| local_pos = gr.State(np.zeros((11, 2), dtype=np.float32)) | |
| local_scale = gr.State(np.ones((11, 2), dtype=np.float32)) | |
| local_probs = gr.State(np.zeros((11, 128, 128), dtype=np.float32)) | |
| with gr.Row(): | |
| gr_choose_image = gr.Dropdown( | |
| [f"img{i}" for i in range(1, 9)], label="CLEVR Image", info="Start by a picking an image from the CLEVR dataset." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr_image_1 = gr.Image(type="numpy", shape=(112, 112), source="canvas", label="Decoding") | |
| with gr.Column(): | |
| gr_image_2 = gr.Image(type="numpy", shape=(112, 112), source="canvas", label="Segmentation") | |
| with gr.Column(): | |
| gr_slot_slider = gr.Slider(1, 11, value=1, step=1, label="Slot Index", | |
| info="Change slot index too see the segmentation mask, position and scale of each slot.") | |
| gr_y_slider = gr.Slider(-1, 1, value=0, step=0.01, label="X") | |
| gr_x_slider = gr.Slider(-1, 1, value=0, step=0.01, label="Y") | |
| gr_sy_slider = gr.Slider(0.5, 1.5, value=1., step=0.1, label="Width Multiplier") | |
| gr_sx_slider = gr.Slider(0.5, 1.5, value=1., step=0.1, label="Height Multiplier") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr_button_render = gr.Button("Render", variant="primary", info="Render a new image with altered positions and scales.") | |
| with gr.Column(): | |
| gr_button_reset = gr.Button("Reset", info="Reset slot statistics.") | |
| def update_image_and_segmentation(name, idx): | |
| idx = idx - 1 | |
| img_input = load_image(name) | |
| out = model.apply( | |
| {"params": state.params, **state.variables}, | |
| video=img_input[None, None], | |
| rngs={"state_init": init_rng}, | |
| train=False) | |
| probs = np.array(nn.softmax(out["outputs"]["segmentation_logits"][0, 0, :, :, :, 0], axis=0)) | |
| img = np.array(out["outputs"]["video"][0, 0]) | |
| img = np.clip(img, 0, 1) | |
| slots_ = np.array(out["states"]) | |
| slots = slots_[0, 0, :, :-4] | |
| pos = slots_[0, 0, :, -4: -2] | |
| scale = slots_[0, 0, :, -2:] | |
| return (img * 255).astype(np.uint8), (probs[idx] * 255).astype(np.uint8), float(pos[idx, 0]), \ | |
| float(pos[idx, 1]), probs, slots, pos, np.ones((11, 2), dtype=np.float32), pos, scale | |
| gr_choose_image.change( | |
| fn=update_image_and_segmentation, | |
| inputs=[gr_choose_image, gr_slot_slider], | |
| outputs=[gr_image_1, gr_image_2, gr_x_slider, gr_y_slider, local_probs, | |
| local_slots, local_pos, local_scale, local_orig_pos, local_orig_scale] | |
| ) | |
| def update_sliders(idx, local_probs, local_pos, local_scale): | |
| idx = idx - 1 # 1-indexing to 0-indexing | |
| return (local_probs[idx] * 255).astype(np.uint8), float(local_pos[idx, 0]), \ | |
| float(local_pos[idx, 1]), float(local_scale[idx, 0]), float(local_scale[idx, 1]) | |
| gr_slot_slider.release( | |
| fn=update_sliders, | |
| inputs=[gr_slot_slider, local_probs, local_pos, local_scale], | |
| outputs=[gr_image_2, gr_x_slider, gr_y_slider, gr_sx_slider, gr_sy_slider] | |
| ) | |
| def update_pos_x(idx, val, local_pos): | |
| local_pos[idx - 1, 0] = val | |
| return local_pos | |
| def update_pos_y(idx, val, local_pos): | |
| local_pos[idx - 1, 1] = val | |
| return local_pos | |
| def update_scale_x(idx, val, local_scale): | |
| local_scale[idx - 1, 0] = val | |
| return local_scale | |
| def update_scale_y(idx, val, local_scale): | |
| local_scale[idx - 1, 1] = val | |
| return local_scale | |
| gr_x_slider.release( | |
| fn=update_pos_x, | |
| inputs=[gr_slot_slider, gr_x_slider, local_pos], | |
| outputs=local_pos | |
| ) | |
| gr_y_slider.release( | |
| fn=update_pos_y, | |
| inputs=[gr_slot_slider, gr_y_slider, local_pos], | |
| outputs=local_pos | |
| ) | |
| gr_sx_slider.release( | |
| fn=update_scale_x, | |
| inputs=[gr_slot_slider, gr_sx_slider, local_scale], | |
| outputs=local_scale | |
| ) | |
| gr_sy_slider.release( | |
| fn=update_scale_y, | |
| inputs=[gr_slot_slider, gr_sy_slider, local_scale], | |
| outputs=local_scale | |
| ) | |
| def render(idx, local_slots, local_pos, local_scale, local_orig_scale): | |
| idx = idx - 1 | |
| slots = np.concatenate([local_slots, local_pos, local_scale * local_orig_scale], axis=-1) | |
| slots = jnp.array(slots) | |
| out = decoder_model.apply( | |
| {"params": state.params, **state.variables}, | |
| slots=slots[None, None], | |
| train=False | |
| ) | |
| probs = np.array(nn.softmax(out["segmentation_logits"][0, 0, :, :, :, 0], axis=0)) | |
| image = np.array(out["video"][0, 0]) | |
| image = np.clip(image, 0, 1) | |
| return (image * 255).astype(np.uint8), (probs[idx] * 255).astype(np.uint8), probs | |
| gr_button_render.click( | |
| fn=render, | |
| inputs=[gr_slot_slider, local_slots, local_pos, local_scale, local_orig_scale], | |
| outputs=[gr_image_1, gr_image_2, local_probs] | |
| ) | |
| def reset(idx, local_orig_pos): | |
| idx = idx - 1 | |
| return np.copy(local_orig_pos), np.ones((11, 2), dtype=np.float32), float(local_orig_pos[idx, 0]), \ | |
| float(local_orig_pos[idx, 1]), 1., 1. | |
| gr_button_reset.click( | |
| fn=reset, | |
| inputs=[gr_slot_slider, local_orig_pos], | |
| outputs=[local_pos, local_scale, gr_x_slider, gr_y_slider, gr_sx_slider, gr_sy_slider] | |
| ) | |
| demo.launch() | |