PRISM / app.py
devranx's picture
Upload 20 files
03e275e verified
from flask import Flask, render_template, request, jsonify, send_file, Response, stream_with_context
from werkzeug.utils import secure_filename
import os
from pathlib import Path
import shutil
import io
import zipfile
import base64
import json
from pipeline import classify # Ensure this is your classification logic
app = Flask(__name__)
# Configuration
UPLOAD_FOLDER_SINGLE = 'static/uploads/single'
UPLOAD_FOLDER_MULTIPLE = 'static/uploads/multiple'
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
app.config['UPLOAD_FOLDER_SINGLE'] = UPLOAD_FOLDER_SINGLE
app.config['UPLOAD_FOLDER_MULTIPLE'] = UPLOAD_FOLDER_MULTIPLE
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # Increase to 100MB
app.config['MAX_CONTENT_LENGTH_REPORT'] = 500 * 1024 * 1024 # 500MB for reports
# Ensure upload directories exist
os.makedirs(UPLOAD_FOLDER_SINGLE, exist_ok=True)
os.makedirs(UPLOAD_FOLDER_MULTIPLE, exist_ok=True)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def clear_uploads(folder):
upload_dir = Path(folder)
if upload_dir.exists():
shutil.rmtree(upload_dir)
upload_dir.mkdir(parents=True)
# Routes
@app.route('/')
def index():
return render_template('index.html')
@app.route('/single')
def single():
return render_template('single.html')
@app.route('/multiple')
def multiple():
return render_template('multiple.html')
@app.route('/clear_uploads', methods=['POST'])
def clear_uploads_route():
try:
clear_uploads(app.config['UPLOAD_FOLDER_SINGLE'])
clear_uploads(app.config['UPLOAD_FOLDER_MULTIPLE'])
return jsonify({'message': 'Upload folders cleared successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
# Single Image Classification Routes
@app.route('/upload_single', methods=['POST'])
def upload_single():
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if file and allowed_file(file.filename):
# Clear old files
clear_uploads(app.config['UPLOAD_FOLDER_SINGLE'])
# Save new file
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER_SINGLE'], filename)
file.save(filepath)
return jsonify({
'message': 'File uploaded successfully',
'filename': filename
})
return jsonify({'error': 'Invalid file type'}), 400
@app.route('/classify_single', methods=['POST'])
def classify_single():
data = request.get_json()
filename = data.get('filename')
if not filename:
return jsonify({'error': 'No filename provided'}), 400
filepath = os.path.join(app.config['UPLOAD_FOLDER_SINGLE'], filename)
if not os.path.exists(filepath):
return jsonify({'error': 'File not found'}), 404
try:
classification_result, result_table, failure_labels = classify(filepath)
return jsonify({
'classification': classification_result,
'result_table': result_table
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# Multiple Image Classification Routes
@app.route('/upload_multiple', methods=['POST'])
def upload_multiple():
if 'file' not in request.files:
return jsonify({'error': 'No file uploaded'}), 400
file = request.files['file']
if not file or not allowed_file(file.filename):
return jsonify({'error': 'Invalid file type'}), 400
try:
# Save to temp directory
temp_dir = os.path.join(app.config['UPLOAD_FOLDER_MULTIPLE'], 'temp')
os.makedirs(temp_dir, exist_ok=True)
filename = secure_filename(file.filename)
filepath = os.path.join(temp_dir, filename)
file.save(filepath)
# Read the file and convert to base64
with open(filepath, 'rb') as img_file:
img_data = base64.b64encode(img_file.read()).decode()
return jsonify({
'filename': filename,
'image': img_data,
'status': 'Queued'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/classify_multiple', methods=['POST'])
def classify_multiple():
temp_dir = os.path.join(app.config['UPLOAD_FOLDER_MULTIPLE'], 'temp')
# Get list of files to process
files = [f for f in os.listdir(temp_dir)
if os.path.isfile(os.path.join(temp_dir, f)) and allowed_file(f)]
results = []
for filename in files:
filepath = os.path.join(temp_dir, filename)
# Process single image
classification_result, result_table, failure_labels = classify(filepath)
# Move file to appropriate directory
category_dir = os.path.join(app.config['UPLOAD_FOLDER_MULTIPLE'], classification_result.lower())
os.makedirs(category_dir, exist_ok=True)
dest_path = os.path.join(category_dir, filename)
shutil.move(filepath, dest_path)
# Read the processed image
with open(dest_path, 'rb') as img_file:
img_data = base64.b64encode(img_file.read()).decode()
# Return single result immediately
result = {
'filename': filename,
'status': classification_result,
'labels': failure_labels,
'image': img_data
}
# Use Flask's Response streaming to send results one by one
return jsonify(result)
return jsonify({'message': 'All files processed'})
@app.route('/download_multiple/<category>')
def download_multiple(category):
if category not in ['pass', 'fail']:
return jsonify({'error': 'Invalid category'}), 400
category_dir = os.path.join(app.config['UPLOAD_FOLDER_MULTIPLE'], category)
if not os.path.exists(category_dir):
return jsonify({'error': 'No images found'}), 404
memory_file = io.BytesIO()
with zipfile.ZipFile(memory_file, 'w') as zf:
for filename in os.listdir(category_dir):
filepath = os.path.join(category_dir, filename)
if os.path.isfile(filepath) and allowed_file(filename):
zf.write(filepath, filename)
memory_file.seek(0)
return send_file(
memory_file,
mimetype='application/zip',
as_attachment=True,
download_name=f'{category}_images.zip'
)
@app.route('/save_report', methods=['POST'])
def save_report():
try:
data = request.get_json()
date = data.get('date')
time = data.get('time')
summary_content = data.get('summary')
table_data = data.get('tableData', [])
if not all([date, time, summary_content, table_data]):
return jsonify({'error': 'Invalid data provided'}), 400
# Create the table HTML
table_rows = []
for item in table_data:
labels_html = ''
if item['status'].lower() == 'fail' and item['labels']:
labels = ' '.join(f'<span class="label">{label}</span>' for label in item['labels'])
labels_html = f'<div class="labels">{labels}</div>'
row = f"""
<tr>
<td class="serial">{item['serialNo']}</td>
<td class="image-col">
<img src="{item['image']}" alt="{item['filename']}"
class="thumbnail" onclick="openModal(this)"
style="cursor: pointer;">
<div class="filename">{item['filename']}</div>
</td>
<td class="result-col">
<span class="result-{item['status'].lower()}">{item['status']}</span>
</td>
<td class="labels-col">
{labels_html if labels_html else '-'}
</td>
</tr>
"""
table_rows.append(row)
# Create directories
reports_dir = Path('Reports')
reports_dir.mkdir(exist_ok=True)
date_dir = reports_dir / date
date_dir.mkdir(exist_ok=True)
# Read the template
template_path = Path('templates/report_template.html')
with open(template_path, 'r', encoding='utf-8') as f:
template = f.read()
# Fill in the template
html_content = template.format(
date=date,
time=time,
summary_content=summary_content,
table_rows='\n'.join(table_rows)
)
# Save the file
filename = f'Report_{date}_{time}.html'
report_path = date_dir / filename
with open(report_path, 'w', encoding='utf-8') as f:
f.write(html_content)
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500
# Add new route for chunked report saving
@app.route('/save_report_chunk', methods=['POST'])
def save_report_chunk():
try:
chunk_data = request.get_json()
if not chunk_data:
return jsonify({'error': 'No data received'}), 400
# Log received data for debugging
print(f"Received chunk data: {chunk_data.keys()}")
# Validate all required fields are present
required_fields = ['chunkNumber', 'totalChunks', 'date', 'time', 'chunk']
if not all(field in chunk_data for field in required_fields):
missing_fields = [field for field in required_fields if field not in chunk_data]
return jsonify({'error': f'Missing required fields: {", ".join(missing_fields)}'}), 400
# Extract and validate data
try:
chunk_number = int(chunk_data['chunkNumber'])
total_chunks = int(chunk_data['totalChunks'])
date = str(chunk_data['date'])
time = str(chunk_data['time'])
chunk = chunk_data['chunk']
summary = chunk_data.get('summary', '')
except (ValueError, TypeError) as e:
return jsonify({'error': f'Invalid data format: {str(e)}'}), 400
# Validate chunk data
if not isinstance(chunk, list):
return jsonify({'error': 'Chunk must be an array'}), 400
# Create directories with error handling
try:
reports_dir = Path('Reports')
reports_dir.mkdir(exist_ok=True)
date_dir = reports_dir / date
date_dir.mkdir(exist_ok=True)
temp_dir = date_dir / 'temp'
temp_dir.mkdir(exist_ok=True)
except Exception as e:
return jsonify({'error': f'Failed to create directories: {str(e)}'}), 500
# Save chunk with error handling
try:
chunk_file = temp_dir / f'chunk_{chunk_number}.json'
chunk_data_to_save = {
'data': chunk,
'summary': summary if chunk_number == 0 else ''
}
with open(chunk_file, 'w', encoding='utf-8') as f:
json.dump(chunk_data_to_save, f)
except Exception as e:
return jsonify({'error': f'Failed to save chunk: {str(e)}'}), 500
# Process final chunk
if chunk_number == total_chunks - 1:
try:
# Collect all chunks
all_data = []
summary_content = ''
# Verify all chunks exist
for i in range(total_chunks):
chunk_file = temp_dir / f'chunk_{i}.json'
if not chunk_file.exists():
return jsonify({'error': f'Missing chunk file: chunk_{i}.json'}), 500
with open(chunk_file, 'r', encoding='utf-8') as f:
chunk_content = json.load(f)
all_data.extend(chunk_content['data'])
if chunk_content['summary']:
summary_content = chunk_content['summary']
# Verify template exists
template_path = Path('templates/report_template.html')
multiple_path = Path('templates/multiple.html')
if not template_path.exists() or not multiple_path.exists():
return jsonify({'error': 'Template files not found'}), 500
# Copy the entire multiple.html content and remove upload section
with open(multiple_path, 'r', encoding='utf-8') as f:
template = f.read()
# Remove the upload section from template
upload_start = template.find('<div class="upload-section">')
upload_end = template.find('</div>', upload_start) + 6
template = template.replace(template[upload_start:upload_end], '')
# Replace placeholders
template = template.replace('<h1>Multiple Images Classification</h1>', '<h1>Classification Report</h1>')
template = template.replace('<p>Classify and segregate multiple images at once...</p>', '<p>{date} {time}</p>')
template = template.replace('<div id="summary"></div>', '<div id="summary">{summary_content}</div>')
template = template.replace('<tbody id="results-tbody">', '<tbody>{table_rows}</tbody>')
# Extract styles from multiple.html
with open(multiple_path, 'r', encoding='utf-8') as f:
multiple_content = f.read()
style_start = multiple_content.find('<style>') + 7
style_end = multiple_content.find('</style>')
styles = multiple_content[style_start:style_end].strip()
# Read and process the report template
with open(template_path, 'r', encoding='utf-8') as f:
template = f.read()
# Replace style placeholder with actual styles
template = template.replace('{multiple_styles}', styles)
# Replace single braces with double braces to escape them
template = template.replace('{', '{{').replace('}', '}}')
# Restore our format placeholders
template = template.replace('{{date}}', '{date}') \
.replace('{{time}}', '{time}') \
.replace('{{summary_content}}', '{summary_content}') \
.replace('{{table_rows}}', '{table_rows}')
# Create table rows with escaped curly braces (only failed items)
table_rows = []
serial_counter = 1 # Keep track of numbering
for item in all_data:
try:
if item['status'].lower() == 'fail': # Only process failed items
labels_html = ''
if item['labels']:
labels = ' '.join(f'<span class="label">{label}</span>'
for label in item['labels'])
labels_html = f'<div class="labels">{labels}</div>'
row = f"""
<tr>
<td class="serial">{serial_counter}</td>
<td class="image-col">
<img src="{item['image']}" alt="{item['filename']}"
class="thumbnail" onclick="openModal(this)"
style="cursor: pointer;">
<div class="filename">{item['filename']}</div>
</td>
<td class="result-col">
<span class="result-fail">{item['status']}</span>
</td>
<td class="labels-col">
{labels_html if labels_html else '-'}
</td>
</tr>
"""
table_rows.append(row)
serial_counter += 1
except KeyError as e:
return jsonify({'error': f'Missing required field in data: {str(e)}'}), 500
# Generate final HTML
try:
html_content = template.format(
date=date,
time=time,
summary_content=summary_content,
table_rows='\n'.join(table_rows)
)
except Exception as e:
return jsonify({'error': f'Failed to generate HTML: {str(e)}'}), 500
# Save final report
try:
filename = f'Report_{date}_{time}.html'
report_path = date_dir / filename
with open(report_path, 'w', encoding='utf-8') as f:
f.write(html_content)
except Exception as e:
return jsonify({'error': f'Failed to save report: {str(e)}'}), 500
# Cleanup
try:
shutil.rmtree(temp_dir)
except Exception as e:
print(f"Warning: Failed to clean up temp directory: {str(e)}")
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': f'Error processing final chunk: {str(e)}'}), 500
return jsonify({'success': True, 'message': 'Chunk received'})
except Exception as e:
print(f"Error in save_report_chunk: {str(e)}")
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# Clear uploads on startup
clear_uploads(app.config['UPLOAD_FOLDER_SINGLE'])
clear_uploads(app.config['UPLOAD_FOLDER_MULTIPLE'])
app.run(host='0.0.0.0', port=7860, debug=False)