SuperKontext / app.py
AshBlanc's picture
Update app.py
d8471f7 verified
import os
from dotenv import load_dotenv
import gradio as gr
import google.generativeai as genai
from PIL import Image
import json
import asyncio
import threading
from typing import Optional, List, Dict, Any
import time
from functools import lru_cache
# Load environment variables
load_dotenv()
# ### 1. Enhanced Configuration with Error Handling
class APIManager:
"""Centralized API management with connection pooling and error handling."""
def __init__(self):
self.api_key = None
self.is_configured = False
self.models = {}
self.setup_api()
def setup_api(self):
"""Enhanced API setup with better error handling."""
try:
self.api_key = os.environ.get('GEMINI_API_KEY')
if not self.api_key:
raise ValueError("GEMINI_API_KEY not found in environment variables.")
genai.configure(api_key=self.api_key)
self.is_configured = True
print("🔑 API Key configured successfully.")
# Pre-initialize models for better performance
self.initialize_models()
except Exception as e:
print(f"🔴 Error during API configuration: {e}")
self.is_configured = False
def initialize_models(self):
"""Pre-initialize models for better performance."""
try:
self.models = {
'vision': genai.GenerativeModel('gemini-2.5-pro',
system_instruction=VISION_SYSTEM_INSTRUCTION),
'initial': genai.GenerativeModel('gemini-2.5-flash-lite-preview-06-17',
system_instruction=PROMPT_ENGINEER_SYSTEM_INSTRUCTION),
'refiner': genai.GenerativeModel('gemini-2.5-pro',
system_instruction=PROMPT_REFINER_SYSTEM_INSTRUCTION),
'rewriter': genai.GenerativeModel('gemini-2.5-flash',
system_instruction=META_PROMPT_SYSTEM_INSTRUCTION)
}
except Exception as e:
print(f"⚠️ Warning: Could not pre-initialize models: {e}")
# Global API manager instance
api_manager = APIManager()
# ### 2. OPTIMIZED System Instructions - Short, Clear, High-Quality
VISION_SYSTEM_INSTRUCTION = """Extract key actionable insights from screenshots or descriptions in 3-4 bullet points:
• UI/UX elements and layout structure
• Content type and user intent
• Technical context and requirements
• Specific pain points or opportunities
Be concise, specific, and focus on elements that inform prompt creation."""
PROMPT_ENGINEER_SYSTEM_INSTRUCTION = """Create concise, high-performance prompts that maximize AI effectiveness.
REQUIREMENTS:
- Start with clear role definition
- Use specific, actionable instructions
- Include necessary context only
- Specify exact output format
- Keep under 150 words unless complexity demands more
OUTPUT: Single optimized prompt ready for immediate use."""
PROMPT_REFINER_SYSTEM_INSTRUCTION = """Refine the given prompt based on feedback while preserving core intent.
FOCUS:
- Address specific feedback points
- Maintain original purpose
- Improve clarity and effectiveness
- Optimize structure and language
OUTPUT: Single improved prompt that directly addresses the feedback."""
META_PROMPT_SYSTEM_INSTRUCTION = """Generate 3 distinct, improved variations of the input prompt.
VARIATION STRATEGY:
1. Enhanced clarity and structure
2. Different approach or perspective
3. Optimized for specific use case
OUTPUT FORMAT: Return ONLY valid JSON array of exactly 3 strings:
["Variation 1", "Variation 2", "Variation 3"]"""
# ### 3. Enhanced Processing Functions with Better Error Handling
def analyze_screenshot(pil_image: Image.Image) -> str:
"""Enhanced screenshot analysis with concise output."""
if not isinstance(pil_image, Image.Image):
return "Error: Invalid image provided."
if not api_manager.is_configured:
return "Error: API not configured. Please check your API key."
try:
model = api_manager.models.get('vision') or genai.GenerativeModel(
'gemini-2.0-flash-exp',
system_instruction=VISION_SYSTEM_INSTRUCTION
)
response = model.generate_content([
"Analyze this screenshot and extract key insights for prompt creation:",
pil_image
])
result = response.text.strip()
return result if result else "No meaningful content detected in the screenshot."
except Exception as e:
error_msg = f"Error in vision analysis: {str(e)}"
print(error_msg)
return error_msg
def analyze_text_description(text: str) -> str:
"""Analyze text description for context insights."""
if not text.strip():
return "Error: No text provided for analysis."
if not api_manager.is_configured:
return "Error: API not configured."
try:
model = api_manager.models.get('vision') or genai.GenerativeModel(
'gemini-2.0-flash-exp',
system_instruction=VISION_SYSTEM_INSTRUCTION
)
response = model.generate_content(f"Analyze this description for prompt creation insights:\n\n{text}")
result = response.text.strip()
return result if result else "Unable to extract meaningful insights from the description."
except Exception as e:
error_msg = f"Error in text analysis: {str(e)}"
print(error_msg)
return error_msg
def initial_prompt_stream(analysis_text: str, goal: str):
"""Enhanced streaming prompt generation with concise output."""
if not api_manager.is_configured:
yield "Error: API not configured. Please check your API key."
return
try:
model = api_manager.models.get('initial') or genai.GenerativeModel(
'gemini-2.0-flash-exp',
system_instruction=PROMPT_ENGINEER_SYSTEM_INSTRUCTION
)
# Construct concise prompt
user_goal = goal.strip() if goal else "Create an optimized prompt based on the analysis"
prompt = f"""CONTEXT: {analysis_text}
GOAL: {user_goal}
Create a concise, high-performance prompt that achieves this goal."""
final_prompt_full = ""
for chunk in model.generate_content(prompt, stream=True):
if chunk.text:
final_prompt_full += chunk.text
yield final_prompt_full.strip()
if not final_prompt_full.strip():
fallback = f"You are an expert assistant. {user_goal}. Provide clear, actionable guidance with specific examples."
yield fallback
except Exception as e:
error_msg = f"Error in prompt generation: {str(e)}"
print(error_msg)
yield error_msg
def refinement_prompt_stream(original_prompt: str, feedback: str):
"""Enhanced prompt refinement with concise output."""
if not api_manager.is_configured:
yield "Error: API not configured. Please check your API key."
return
try:
model = api_manager.models.get('refiner') or genai.GenerativeModel(
'gemini-2.0-flash-exp',
system_instruction=PROMPT_REFINER_SYSTEM_INSTRUCTION
)
refinement_prompt = f"""ORIGINAL: {original_prompt}
FEEDBACK: {feedback}
Refine the prompt based on this feedback."""
final_prompt_full = ""
for chunk in model.generate_content(refinement_prompt, stream=True):
if chunk.text:
final_prompt_full += chunk.text
yield final_prompt_full.strip()
if not final_prompt_full.strip():
yield original_prompt # Fallback to original
except Exception as e:
error_msg = f"Error in prompt refinement: {str(e)}"
print(error_msg)
yield error_msg
def rewrite_prompt_with_prewrite(original_prompt: str) -> List[str]:
"""Enhanced prompt rewriting with better JSON parsing."""
if not api_manager.is_configured:
return ["Error: API not configured. Please check your API key.", "", ""]
try:
model = api_manager.models.get('rewriter') or genai.GenerativeModel(
'gemini-2.0-flash-exp',
system_instruction=META_PROMPT_SYSTEM_INSTRUCTION
)
rewrite_prompt = f"""ORIGINAL PROMPT: {original_prompt}
Generate 3 improved variations. Output ONLY JSON array of 3 strings."""
response = model.generate_content(rewrite_prompt)
# Enhanced JSON parsing
response_text = response.text.strip()
# Clean up common formatting issues
response_text = response_text.replace("```json", "").replace("```text", "").replace("```", "").strip()
# Try to extract JSON if it's wrapped in other text
if not response_text.startswith('['):
import re
json_match = re.search(r'\[.*\]', response_text, re.DOTALL)
if json_match:
response_text = json_match.group(0)
variations = json.loads(response_text)
if isinstance(variations, list) and len(variations) >= 1:
# Ensure we have exactly 3 variations
while len(variations) < 3:
variations.append("")
return variations[:3]
return ["Error: AI returned an invalid format.", "", ""]
except json.JSONDecodeError:
return ["Error: Could not parse AI response as JSON.", "", ""]
except Exception as e:
error_msg = f"Error in prompt rewriting: {str(e)}"
print(error_msg)
return [error_msg, "", ""]
# ### 4. Enhanced Gradio Interface Functions
def create_enhanced_interface():
"""Create the enhanced Gradio interface."""
# Custom CSS for better styling
custom_css = """
.gradio-container {
max-width: 1200px !important;
margin: auto;
}
.generate-btn {
background: linear-gradient(45deg, #007bff, #0056b3) !important;
border: none !important;
color: white !important;
font-weight: 600 !important;
transition: all 0.3s ease !important;
}
.generate-btn:hover {
transform: translateY(-2px) !important;
box-shadow: 0 4px 12px rgba(0, 123, 255, 0.3) !important;
}
.status-indicator {
padding: 8px 16px;
border-radius: 20px;
font-size: 14px;
font-weight: 500;
margin: 8px 0;
}
.status-success {
background: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}
.status-error {
background: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
}
.compact-box {
background: #f8f9fa;
border: 1px solid #dee2e6;
border-radius: 8px;
padding: 15px;
margin: 10px 0;
}
.prompt-box {
background: #e3f2fd;
border: 2px solid #2196f3;
border-radius: 12px;
padding: 20px;
margin: 15px 0;
font-family: 'Consolas', 'Monaco', monospace;
font-size: 14px;
line-height: 1.5;
}
"""
# Enhanced theme
theme = gr.themes.Soft(
primary_hue=gr.themes.colors.blue,
secondary_hue=gr.themes.colors.neutral,
neutral_hue=gr.themes.colors.slate,
font=(gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui", "sans-serif")
)
with gr.Blocks(theme=theme, css=custom_css, title="AI Prompt Engineer Pro") as interface:
# State management
analysis_state = gr.State()
first_prompt_state = gr.State()
# Header
gr.Markdown("""
# 🚀 SuperKontext - context is all you need
### Transform your ideas into crystal-clean prompts
""")
# Status indicator
with gr.Row():
with gr.Column():
if api_manager.is_configured:
gr.HTML('<div class="status-indicator status-success">✅ API Connected & Ready</div>')
else:
gr.HTML('<div class="status-indicator status-error">❌ API Configuration Error</div>')
# Main interface
with gr.Row(equal_height=True):
# Input column
with gr.Column(scale=1, min_width=400):
gr.Markdown("### 📝 Input Context")
# Screenshot section
with gr.Group():
gr.Markdown("#### 📸 Screenshot (Optional)")
image_input = gr.Image(
type="pil",
label="Upload Screenshot",
sources=['upload'],
interactive=True,
height=250
)
# Text description section
with gr.Group():
gr.Markdown("#### ✍️ Describe Your Task")
situation_input = gr.Textbox(
label="What do you need help with?",
placeholder="Example: Create a marketing email for a new product launch, Write code documentation, Analyze customer feedback data, etc.",
lines=5,
max_lines=8
)
with gr.Accordion("🎯 Specific Goal (Optional)", open=False):
goal_input = gr.Textbox(
label="Specific outcome you want",
placeholder="e.g., 'Generate 5 subject line options' or 'Create step-by-step instructions'",
lines=2
)
with gr.Row():
submit_btn = gr.Button(
"🚀 Generate Prompt",
variant="primary",
scale=3,
elem_classes=["generate-btn"]
)
clear_btn = gr.Button("🔄 Reset", scale=1)
# Output column
with gr.Column(scale=2, min_width=600):
# Context Analysis - Compact and focused
with gr.Group():
gr.Markdown("### 🔍 Context Analysis")
analysis_output = gr.Textbox(
label="Key Insights",
lines=4,
interactive=False,
show_copy_button=True,
placeholder="Context analysis will appear here...",
elem_classes=["compact-box"]
)
# Final Prompt - Highlighted and prominent
with gr.Group():
gr.Markdown("### ✅ Optimized Prompt")
final_prompt_output = gr.Textbox(
label="Your Crystal-Clear Prompt",
lines=8,
interactive=False,
show_copy_button=True,
placeholder="Your optimized prompt will appear here...",
elem_classes=["prompt-box"]
)
# Refinement interface - Streamlined
with gr.Row(visible=False) as satisfaction_row:
with gr.Column():
gr.Markdown("### 🎨 Refinement Options")
with gr.Row():
like_btn = gr.Button("👍 Perfect!", variant="secondary", scale=1)
auto_refine_btn = gr.Button("🤖 Auto-Refine", variant="primary", scale=1)
dislike_btn = gr.Button("✏️ Custom Feedback", variant="secondary", scale=1)
# Auto-refinement section
with gr.Column(visible=False) as prewrite_col:
gr.Markdown("### 🔄 Choose Your Preferred Version")
prewrite_choices = gr.Radio(
label="Select the best variation:",
type="value",
interactive=True
)
select_version_btn = gr.Button("✅ Use This Version", variant="primary")
# Manual feedback section
with gr.Column(visible=False) as feedback_col:
gr.Markdown("### 💬 Custom Refinement")
feedback_input = gr.Textbox(
label="How should we improve it?",
placeholder="e.g., 'Make it more specific', 'Add examples', 'Change tone to professional'",
lines=2
)
refine_btn = gr.Button("🛠️ Refine Prompt", variant="primary")
# ### Enhanced Interface Functions
def run_analysis_step(pil_image: Optional[Image.Image], situation_text: str):
"""Enhanced analysis step with concise output."""
# Reset UI state
yield {
satisfaction_row: gr.update(visible=False),
feedback_col: gr.update(visible=False),
prewrite_col: gr.update(visible=False),
analysis_output: "🔍 Analyzing context...",
final_prompt_output: ""
}
# Validation
if not api_manager.is_configured:
yield {
analysis_output: "❌ Error: API Key not configured. Please check your GEMINI_API_KEY environment variable.",
final_prompt_output: "",
satisfaction_row: gr.update(visible=False),
feedback_col: gr.update(visible=False),
prewrite_col: gr.update(visible=False),
analysis_state: None
}
return
if pil_image is None and not situation_text.strip():
yield {
analysis_output: "⚠️ Please provide either a screenshot or task description to proceed.",
final_prompt_output: "",
satisfaction_row: gr.update(visible=False),
feedback_col: gr.update(visible=False),
prewrite_col: gr.update(visible=False),
analysis_state: None
}
return
# Perform analysis
try:
if pil_image and situation_text.strip():
# Both provided - analyze screenshot and add text context
screenshot_analysis = analyze_screenshot(pil_image)
analysis_text = f"SCREENSHOT: {screenshot_analysis}\n\nTEXT CONTEXT: {situation_text.strip()}"
elif pil_image:
# Only screenshot
analysis_text = analyze_screenshot(pil_image)
else:
# Only text description
analysis_text = analyze_text_description(situation_text.strip())
if not analysis_text or analysis_text.startswith("Error"):
analysis_text = analysis_text or "Unable to generate analysis. Please try again."
yield {
analysis_output: analysis_text,
final_prompt_output: "",
satisfaction_row: gr.update(visible=False),
feedback_col: gr.update(visible=False),
prewrite_col: gr.update(visible=False),
analysis_state: analysis_text
}
except Exception as e:
error_msg = f"❌ Error during analysis: {str(e)}"
print(error_msg)
yield {
analysis_output: error_msg,
final_prompt_output: "",
satisfaction_row: gr.update(visible=False),
feedback_col: gr.update(visible=False),
prewrite_col: gr.update(visible=False),
analysis_state: None
}
def run_streaming_generation(analysis: str, goal: str):
"""Enhanced streaming generation with concise output."""
if not analysis:
yield {
final_prompt_output: "❌ Error: No analysis available for prompt generation.",
first_prompt_state: None,
satisfaction_row: gr.update(visible=False)
}
return
yield {
final_prompt_output: "🚀 Generating optimized prompt...",
satisfaction_row: gr.update(visible=False)
}
final_prompt_full = ""
for chunk in initial_prompt_stream(analysis, goal):
final_prompt_full = chunk
yield {final_prompt_output: final_prompt_full}
yield {
final_prompt_output: final_prompt_full,
first_prompt_state: final_prompt_full,
satisfaction_row: gr.update(visible=True)
}
def handle_auto_refine(original_prompt: str):
"""Enhanced auto-refinement with better user feedback."""
if not original_prompt:
return {
prewrite_col: gr.update(visible=False),
satisfaction_row: gr.update(visible=True),
feedback_col: gr.update(visible=False)
}
variations = rewrite_prompt_with_prewrite(original_prompt)
# Filter out empty variations
valid_variations = [v for v in variations if v.strip()]
if not valid_variations:
return {
prewrite_col: gr.update(visible=False),
satisfaction_row: gr.update(visible=True),
feedback_col: gr.update(visible=False)
}
return {
prewrite_col: gr.update(visible=True),
prewrite_choices: gr.update(choices=valid_variations, value=valid_variations[0]),
satisfaction_row: gr.update(visible=False),
feedback_col: gr.update(visible=False)
}
def select_rewritten_prompt(selected_prompt: str):
"""Enhanced prompt selection with validation."""
if not selected_prompt or not selected_prompt.strip():
return {
final_prompt_output: "❌ Error: No prompt selected.",
first_prompt_state: None,
satisfaction_row: gr.update(visible=False),
prewrite_col: gr.update(visible=False)
}
return {
final_prompt_output: selected_prompt,
first_prompt_state: selected_prompt,
satisfaction_row: gr.update(visible=True),
prewrite_col: gr.update(visible=False)
}
def handle_manual_feedback():
"""Show feedback input area."""
return {
feedback_col: gr.update(visible=True),
satisfaction_row: gr.update(visible=False),
prewrite_col: gr.update(visible=False)
}
def handle_like():
"""Hide refinement options when user is satisfied."""
return {
satisfaction_row: gr.update(visible=False),
feedback_col: gr.update(visible=False),
prewrite_col: gr.update(visible=False)
}
def refine_with_manual_feedback(original_prompt: str, feedback: str):
"""Enhanced manual refinement with concise output."""
if not feedback.strip():
yield {
final_prompt_output: original_prompt,
first_prompt_state: original_prompt,
satisfaction_row: gr.update(visible=True),
feedback_col: gr.update(visible=False)
}
return
yield {
final_prompt_output: "🛠️ Refining prompt based on your feedback...",
satisfaction_row: gr.update(visible=False)
}
final_prompt_full = ""
for chunk in refinement_prompt_stream(original_prompt, feedback):
final_prompt_full = chunk
yield {
final_prompt_output: final_prompt_full,
first_prompt_state: final_prompt_full
}
yield {
satisfaction_row: gr.update(visible=True),
feedback_col: gr.update(visible=False)
}
def clear_all():
"""Enhanced reset function with complete state clearing."""
return {
image_input: None,
situation_input: "",
goal_input: "",
analysis_output: "",
final_prompt_output: "",
satisfaction_row: gr.update(visible=False),
feedback_col: gr.update(visible=False),
prewrite_col: gr.update(visible=False),
prewrite_choices: gr.update(choices=[], value=None),
feedback_input: "",
analysis_state: None,
first_prompt_state: None
}
# Event handlers
analysis_outputs = [
satisfaction_row, feedback_col, prewrite_col,
analysis_output, final_prompt_output, analysis_state
]
streaming_outputs = [final_prompt_output, first_prompt_state, satisfaction_row]
# Event bindings
submit_btn.click(
fn=run_analysis_step,
inputs=[image_input, situation_input],
outputs=analysis_outputs,
show_progress="minimal"
).then(
fn=run_streaming_generation,
inputs=[analysis_state, goal_input],
outputs=streaming_outputs,
show_progress="minimal"
)
# Auto-submission on goal input
goal_input.submit(
fn=run_analysis_step,
inputs=[image_input, situation_input],
outputs=analysis_outputs,
show_progress="minimal"
).then(
fn=run_streaming_generation,
inputs=[analysis_state, goal_input],
outputs=streaming_outputs,
show_progress="minimal"
)
# Refinement handlers
like_btn.click(
fn=handle_like,
outputs=[satisfaction_row, feedback_col, prewrite_col]
)
auto_refine_btn.click(
fn=handle_auto_refine,
inputs=[first_prompt_state],
outputs=[prewrite_col, prewrite_choices, satisfaction_row, feedback_col]
)
dislike_btn.click(
fn=handle_manual_feedback,
outputs=[feedback_col, satisfaction_row, prewrite_col]
)
select_version_btn.click(
fn=select_rewritten_prompt,
inputs=[prewrite_choices],
outputs=[final_prompt_output, first_prompt_state, satisfaction_row, prewrite_col]
)
refine_btn.click(
fn=refine_with_manual_feedback,
inputs=[first_prompt_state, feedback_input],
outputs=[final_prompt_output, first_prompt_state, satisfaction_row, feedback_col]
)
feedback_input.submit(
fn=refine_with_manual_feedback,
inputs=[first_prompt_state, feedback_input],
outputs=[final_prompt_output, first_prompt_state, satisfaction_row, feedback_col]
)
# Reset functionality
clear_btn.click(
fn=clear_all,
outputs=[
image_input, situation_input, goal_input,
analysis_output, final_prompt_output, satisfaction_row,
feedback_col, prewrite_col, prewrite_choices, feedback_input,
analysis_state, first_prompt_state
]
)
return interface
# ### 5. Launch Configuration
if __name__ == "__main__":
# Create and launch the enhanced interface
demo = create_enhanced_interface()
# Launch with optimal settings
demo.launch(
debug=True,
share=False,
inbrowser=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True,
favicon_path=None,
ssl_verify=False,
quiet=False
)