Spaces:
Runtime error
Runtime error
| """ | |
| https://github.com/Trinkle23897/Fast-Poisson-Image-Editing | |
| MIT License | |
| Copyright (c) 2022 Jiayi Weng | |
| Permission is hereby granted, free of charge, to any person obtaining a copy | |
| of this software and associated documentation files (the "Software"), to deal | |
| in the Software without restriction, including without limitation the rights | |
| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
| copies of the Software, and to permit persons to whom the Software is | |
| furnished to do so, subject to the following conditions: | |
| The above copyright notice and this permission notice shall be included in all | |
| copies or substantial portions of the Software. | |
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
| SOFTWARE. | |
| """ | |
| import time | |
| import argparse | |
| import os | |
| import fpie | |
| from process import ALL_BACKEND, CPU_COUNT, DEFAULT_BACKEND | |
| from fpie.io import read_images, write_image | |
| from process import BaseProcessor, EquProcessor, GridProcessor | |
| from PIL import Image | |
| import numpy as np | |
| import skimage | |
| import skimage.measure | |
| import scipy | |
| import scipy.signal | |
| class PhotometricCorrection: | |
| def __init__(self,quite=False): | |
| self.get_parser("cli") | |
| args=self.parser.parse_args(["--method","grid","-g","src","-s","a","-t","a","-o","a"]) | |
| args.mpi_sync_interval = getattr(args, "mpi_sync_interval", 0) | |
| self.backend=args.backend | |
| self.args=args | |
| self.quite=quite | |
| proc: BaseProcessor | |
| proc = GridProcessor( | |
| args.gradient, | |
| args.backend, | |
| args.cpu, | |
| args.mpi_sync_interval, | |
| args.block_size, | |
| args.grid_x, | |
| args.grid_y, | |
| ) | |
| print( | |
| f"[PIE]Successfully initialize PIE {args.method} solver " | |
| f"with {args.backend} backend" | |
| ) | |
| self.proc=proc | |
| def run(self, original_image, inpainted_image, mode="mask_mode"): | |
| print(f"[PIE] start") | |
| if mode=="disabled": | |
| return inpainted_image | |
| input_arr=np.array(original_image) | |
| if input_arr[:,:,-1].sum()<1: | |
| return inpainted_image | |
| output_arr=np.array(inpainted_image) | |
| mask=input_arr[:,:,-1] | |
| mask=255-mask | |
| if mask.sum()<1 and mode=="mask_mode": | |
| mode="" | |
| if mode=="mask_mode": | |
| mask = skimage.measure.block_reduce(mask, (8, 8), np.max) | |
| mask = mask.repeat(8, axis=0).repeat(8, axis=1) | |
| else: | |
| mask[8:-9,8:-9]=255 | |
| mask = mask[:,:,np.newaxis].repeat(3,axis=2) | |
| nmask=mask.copy() | |
| output_arr2=output_arr[:,:,0:3].copy() | |
| input_arr2=input_arr[:,:,0:3].copy() | |
| output_arr2[nmask<128]=0 | |
| input_arr2[nmask>=128]=0 | |
| output_arr2+=input_arr2 | |
| src = output_arr2[:,:,0:3] | |
| tgt = src.copy() | |
| proc=self.proc | |
| args=self.args | |
| if proc.root: | |
| n = proc.reset(src, mask, tgt, (args.h0, args.w0), (args.h1, args.w1)) | |
| proc.sync() | |
| if proc.root: | |
| result = tgt | |
| t = time.time() | |
| if args.p == 0: | |
| args.p = args.n | |
| for i in range(0, args.n, args.p): | |
| if proc.root: | |
| result, err = proc.step(args.p) # type: ignore | |
| print(f"[PIE] Iter {i + args.p}, abs_err {err}") | |
| else: | |
| proc.step(args.p) | |
| if proc.root: | |
| dt = time.time() - t | |
| print(f"[PIE] Time elapsed: {dt:.4f}s") | |
| # make sure consistent with dummy process | |
| return Image.fromarray(result) | |
| def get_parser(self,gen_type: str) -> argparse.Namespace: | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| "-v", "--version", action="store_true", help="show the version and exit" | |
| ) | |
| parser.add_argument( | |
| "--check-backend", action="store_true", help="print all available backends" | |
| ) | |
| if gen_type == "gui" and "mpi" in ALL_BACKEND: | |
| # gui doesn't support MPI backend | |
| ALL_BACKEND.remove("mpi") | |
| parser.add_argument( | |
| "-b", | |
| "--backend", | |
| type=str, | |
| choices=ALL_BACKEND, | |
| default=DEFAULT_BACKEND, | |
| help="backend choice", | |
| ) | |
| parser.add_argument( | |
| "-c", | |
| "--cpu", | |
| type=int, | |
| default=CPU_COUNT, | |
| help="number of CPU used", | |
| ) | |
| parser.add_argument( | |
| "-z", | |
| "--block-size", | |
| type=int, | |
| default=1024, | |
| help="cuda block size (only for equ solver)", | |
| ) | |
| parser.add_argument( | |
| "--method", | |
| type=str, | |
| choices=["equ", "grid"], | |
| default="equ", | |
| help="how to parallelize computation", | |
| ) | |
| parser.add_argument("-s", "--source", type=str, help="source image filename") | |
| if gen_type == "cli": | |
| parser.add_argument( | |
| "-m", | |
| "--mask", | |
| type=str, | |
| help="mask image filename (default is to use the whole source image)", | |
| default="", | |
| ) | |
| parser.add_argument("-t", "--target", type=str, help="target image filename") | |
| parser.add_argument("-o", "--output", type=str, help="output image filename") | |
| if gen_type == "cli": | |
| parser.add_argument( | |
| "-h0", type=int, help="mask position (height) on source image", default=0 | |
| ) | |
| parser.add_argument( | |
| "-w0", type=int, help="mask position (width) on source image", default=0 | |
| ) | |
| parser.add_argument( | |
| "-h1", type=int, help="mask position (height) on target image", default=0 | |
| ) | |
| parser.add_argument( | |
| "-w1", type=int, help="mask position (width) on target image", default=0 | |
| ) | |
| parser.add_argument( | |
| "-g", | |
| "--gradient", | |
| type=str, | |
| choices=["max", "src", "avg"], | |
| default="max", | |
| help="how to calculate gradient for PIE", | |
| ) | |
| parser.add_argument( | |
| "-n", | |
| type=int, | |
| help="how many iteration would you perfer, the more the better", | |
| default=5000, | |
| ) | |
| if gen_type == "cli": | |
| parser.add_argument( | |
| "-p", type=int, help="output result every P iteration", default=0 | |
| ) | |
| if "mpi" in ALL_BACKEND: | |
| parser.add_argument( | |
| "--mpi-sync-interval", | |
| type=int, | |
| help="MPI sync iteration interval", | |
| default=100, | |
| ) | |
| parser.add_argument( | |
| "--grid-x", type=int, help="x axis stride for grid solver", default=8 | |
| ) | |
| parser.add_argument( | |
| "--grid-y", type=int, help="y axis stride for grid solver", default=8 | |
| ) | |
| self.parser=parser | |
| if __name__ =="__main__": | |
| import sys | |
| import io | |
| import base64 | |
| from PIL import Image | |
| def base64_to_pil(base64_str): | |
| data = base64.b64decode(str(base64_str)) | |
| pil = Image.open(io.BytesIO(data)) | |
| return pil | |
| def pil_to_base64(out_pil): | |
| out_buffer = io.BytesIO() | |
| out_pil.save(out_buffer, format="PNG") | |
| out_buffer.seek(0) | |
| base64_bytes = base64.b64encode(out_buffer.read()) | |
| base64_str = base64_bytes.decode("ascii") | |
| return base64_str | |
| correction_func=PhotometricCorrection(quite=True) | |
| while True: | |
| buffer = sys.stdin.readline() | |
| print(f"[PIE] suprocess {len(buffer)} {type(buffer)} ") | |
| if len(buffer)==0: | |
| break | |
| if isinstance(buffer,str): | |
| lst=buffer.strip().split(",") | |
| else: | |
| lst=buffer.decode("ascii").strip().split(",") | |
| img0=base64_to_pil(lst[0]) | |
| img1=base64_to_pil(lst[1]) | |
| ret=correction_func.run(img0,img1,mode=lst[2]) | |
| ret_base64=pil_to_base64(ret) | |
| if isinstance(buffer,str): | |
| sys.stdout.write(f"{ret_base64}\n") | |
| else: | |
| sys.stdout.write(f"{ret_base64}\n".encode()) | |
| sys.stdout.flush() |