| | import cv2 |
| | import os |
| | import shutil |
| | import time |
| | from SinglePhoto import FaceSwapper |
| |
|
| | def extract_frames(video_path, frames_dir): |
| | if not os.path.exists(frames_dir): |
| | os.makedirs(frames_dir) |
| | last_idx = -1 |
| | else: |
| | |
| | existing = [f for f in os.listdir(frames_dir) if f.startswith("frame_") and f.endswith(".jpg")] |
| | if existing: |
| | last_idx = max([int(f.split("_")[1].split(".")[0]) for f in existing]) |
| | else: |
| | last_idx = -1 |
| |
|
| | cap = cv2.VideoCapture(video_path) |
| | frame_paths = [] |
| | idx = 0 |
| | while True: |
| | ret, frame = cap.read() |
| | if not ret: |
| | break |
| | frame_path = os.path.join(frames_dir, f"frame_{idx:05d}.jpg") |
| | if idx > last_idx: |
| | cv2.imwrite(frame_path, frame) |
| | frame_paths.append(frame_path) |
| | idx += 1 |
| | cap.release() |
| | return frame_paths |
| |
|
| | def frames_to_video(frames_dir, output_video_path, fps): |
| | frames = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')]) |
| | if not frames: |
| | print("No frames found in directory.") |
| | return |
| | first_frame = cv2.imread(frames[0]) |
| | height, width, layers = first_frame.shape |
| | fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| | out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) |
| | for frame_path in frames: |
| | frame = cv2.imread(frame_path) |
| | out.write(frame) |
| | out.release() |
| |
|
| | def main(): |
| | |
| | video_path = os.path.join("VideoSwapping", "data_dst.mp4") |
| | source_image_path = os.path.join("VideoSwapping", "data_src.jpg") |
| | frames_dir = os.path.join("VideoSwapping", "video_frames") |
| | swapped_dir = os.path.join("VideoSwapping", "swapped_frames") |
| | output_video_path = os.path.join("VideoSwapping", "output_swapped_video.mp4") |
| | source_face_idx = 1 |
| |
|
| | |
| | while True: |
| | try: |
| | user_input = input("Enter target_face_idx (default is 1): ").strip() |
| | if user_input == "": |
| | dest_face_idx = 1 |
| | break |
| | dest_face_idx = int(user_input) |
| | break |
| | except ValueError: |
| | print("Invalid input. Please enter an integer value.") |
| |
|
| | print("Choose an option:") |
| | print("1. Extract frames only") |
| | print("2. Face swap only (requires extracted frames)") |
| | print("3. Both extract frames and face swap") |
| | choice = input("Enter 1, 2, or 3: ").strip() |
| |
|
| | frame_paths = [] |
| | if choice == "1" or choice == "3": |
| | print("Extracting frames from video (resuming if needed)...") |
| | frame_paths = extract_frames(video_path, frames_dir) |
| | print(f"Extracted {len(frame_paths)} frames to {frames_dir}.") |
| | if choice == "1": |
| | return |
| |
|
| | if choice == "2": |
| | |
| | if not os.path.exists(frames_dir): |
| | print("Frames directory does not exist. Please extract frames first.") |
| | return |
| | frame_paths = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')]) |
| |
|
| | if choice == "2" or choice == "3": |
| | |
| | if not os.path.exists(swapped_dir): |
| | os.makedirs(swapped_dir) |
| |
|
| | |
| | swapper = FaceSwapper() |
| |
|
| | |
| | print("Swapping faces on frames...") |
| | start_time = time.time() |
| | for idx, frame_path in enumerate(frame_paths): |
| | frame_name = os.path.basename(frame_path) |
| | |
| | if frame_name.startswith("frame_"): |
| | swapped_name = "swapped_" + frame_name[len("frame_"):] |
| | else: |
| | swapped_name = "swapped_" + frame_name |
| | out_path = os.path.join(swapped_dir, swapped_name) |
| | if os.path.exists(out_path): |
| | |
| | print(f"Frame {idx+1}/{len(frame_paths)} already swapped, skipping...", end='\r') |
| | continue |
| | try: |
| | try: |
| | swapped = swapper.swap_faces( |
| | source_path=source_image_path, |
| | source_face_idx=source_face_idx, |
| | target_path=frame_path, |
| | target_face_idx=dest_face_idx |
| | ) |
| | except ValueError as ve: |
| | if "Target image contains" in str(ve): |
| | print(f"\nFrame {idx}: Target face idx {dest_face_idx} not found, trying with idx 1.",end='\r') |
| | swapped = swapper.swap_faces( |
| | source_path=source_image_path, |
| | source_face_idx=source_face_idx, |
| | target_path=frame_path, |
| | target_face_idx=1 |
| | ) |
| | else: |
| | raise ve |
| | cv2.imwrite(out_path, swapped) |
| | except Exception as e: |
| | print(f"\nFrame {idx}: {e}") |
| | |
| | cv2.imwrite(out_path, cv2.imread(frame_path)) |
| | |
| | elapsed = time.time() - start_time |
| | avg_time = elapsed / (idx + 1) |
| | remaining = avg_time * (len(frame_paths) - (idx + 1)) |
| | mins, secs = divmod(int(remaining), 60) |
| | print(f"Swapping frame {idx+1}/{len(frame_paths)} | Est. time left: {mins:02d}:{secs:02d}", end='\r') |
| | print() |
| |
|
| | |
| | cap = cv2.VideoCapture(video_path) |
| | fps = cap.get(cv2.CAP_PROP_FPS) |
| | cap.release() |
| |
|
| | |
| | print("Combining swapped frames into video...") |
| | frames_to_video(swapped_dir, output_video_path, fps) |
| | print(f"Done! Output video saved as {output_video_path}") |
| |
|
| | |
| | answer = input("Do you want to keep the extracted frames and swapped images? (y/n): ").strip().lower() |
| | if answer == 'n': |
| | try: |
| | shutil.rmtree(frames_dir) |
| | shutil.rmtree(swapped_dir) |
| | print("Temporary folders deleted.") |
| | except Exception as e: |
| | print(f"Error deleting folders: {e}") |
| | else: |
| | print("Temporary folders kept.") |
| |
|
| | if __name__ == "__main__": |
| | main() |