from flask import Flask, render_template, request, jsonify, send_file, Response, stream_with_context from werkzeug.utils import secure_filename import os from pathlib import Path import shutil import io import zipfile import base64 import json from pipeline import classify # Ensure this is your classification logic app = Flask(__name__) # Configuration UPLOAD_FOLDER_SINGLE = 'static/uploads/single' UPLOAD_FOLDER_MULTIPLE = 'static/uploads/multiple' ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'} app.config['UPLOAD_FOLDER_SINGLE'] = UPLOAD_FOLDER_SINGLE app.config['UPLOAD_FOLDER_MULTIPLE'] = UPLOAD_FOLDER_MULTIPLE app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # Increase to 100MB app.config['MAX_CONTENT_LENGTH_REPORT'] = 500 * 1024 * 1024 # 500MB for reports # Ensure upload directories exist os.makedirs(UPLOAD_FOLDER_SINGLE, exist_ok=True) os.makedirs(UPLOAD_FOLDER_MULTIPLE, exist_ok=True) def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def clear_uploads(folder): upload_dir = Path(folder) if upload_dir.exists(): shutil.rmtree(upload_dir) upload_dir.mkdir(parents=True) # Routes @app.route('/') def index(): return render_template('index.html') @app.route('/single') def single(): return render_template('single.html') @app.route('/multiple') def multiple(): return render_template('multiple.html') @app.route('/clear_uploads', methods=['POST']) def clear_uploads_route(): try: clear_uploads(app.config['UPLOAD_FOLDER_SINGLE']) clear_uploads(app.config['UPLOAD_FOLDER_MULTIPLE']) return jsonify({'message': 'Upload folders cleared successfully'}) except Exception as e: return jsonify({'error': str(e)}), 500 # Single Image Classification Routes @app.route('/upload_single', methods=['POST']) def upload_single(): if 'file' not in request.files: return jsonify({'error': 'No file part'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No selected file'}), 400 if file and allowed_file(file.filename): # Clear old files clear_uploads(app.config['UPLOAD_FOLDER_SINGLE']) # Save new file filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER_SINGLE'], filename) file.save(filepath) return jsonify({ 'message': 'File uploaded successfully', 'filename': filename }) return jsonify({'error': 'Invalid file type'}), 400 @app.route('/classify_single', methods=['POST']) def classify_single(): data = request.get_json() filename = data.get('filename') if not filename: return jsonify({'error': 'No filename provided'}), 400 filepath = os.path.join(app.config['UPLOAD_FOLDER_SINGLE'], filename) if not os.path.exists(filepath): return jsonify({'error': 'File not found'}), 404 try: classification_result, result_table, failure_labels = classify(filepath) return jsonify({ 'classification': classification_result, 'result_table': result_table }) except Exception as e: return jsonify({'error': str(e)}), 500 # Multiple Image Classification Routes @app.route('/upload_multiple', methods=['POST']) def upload_multiple(): if 'file' not in request.files: return jsonify({'error': 'No file uploaded'}), 400 file = request.files['file'] if not file or not allowed_file(file.filename): return jsonify({'error': 'Invalid file type'}), 400 try: # Save to temp directory temp_dir = os.path.join(app.config['UPLOAD_FOLDER_MULTIPLE'], 'temp') os.makedirs(temp_dir, exist_ok=True) filename = secure_filename(file.filename) filepath = os.path.join(temp_dir, filename) file.save(filepath) # Read the file and convert to base64 with open(filepath, 'rb') as img_file: img_data = base64.b64encode(img_file.read()).decode() return jsonify({ 'filename': filename, 'image': img_data, 'status': 'Queued' }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/classify_multiple', methods=['POST']) def classify_multiple(): temp_dir = os.path.join(app.config['UPLOAD_FOLDER_MULTIPLE'], 'temp') # Get list of files to process files = [f for f in os.listdir(temp_dir) if os.path.isfile(os.path.join(temp_dir, f)) and allowed_file(f)] results = [] for filename in files: filepath = os.path.join(temp_dir, filename) # Process single image classification_result, result_table, failure_labels = classify(filepath) # Move file to appropriate directory category_dir = os.path.join(app.config['UPLOAD_FOLDER_MULTIPLE'], classification_result.lower()) os.makedirs(category_dir, exist_ok=True) dest_path = os.path.join(category_dir, filename) shutil.move(filepath, dest_path) # Read the processed image with open(dest_path, 'rb') as img_file: img_data = base64.b64encode(img_file.read()).decode() # Return single result immediately result = { 'filename': filename, 'status': classification_result, 'labels': failure_labels, 'image': img_data } # Use Flask's Response streaming to send results one by one return jsonify(result) return jsonify({'message': 'All files processed'}) @app.route('/download_multiple/') def download_multiple(category): if category not in ['pass', 'fail']: return jsonify({'error': 'Invalid category'}), 400 category_dir = os.path.join(app.config['UPLOAD_FOLDER_MULTIPLE'], category) if not os.path.exists(category_dir): return jsonify({'error': 'No images found'}), 404 memory_file = io.BytesIO() with zipfile.ZipFile(memory_file, 'w') as zf: for filename in os.listdir(category_dir): filepath = os.path.join(category_dir, filename) if os.path.isfile(filepath) and allowed_file(filename): zf.write(filepath, filename) memory_file.seek(0) return send_file( memory_file, mimetype='application/zip', as_attachment=True, download_name=f'{category}_images.zip' ) @app.route('/save_report', methods=['POST']) def save_report(): try: data = request.get_json() date = data.get('date') time = data.get('time') summary_content = data.get('summary') table_data = data.get('tableData', []) if not all([date, time, summary_content, table_data]): return jsonify({'error': 'Invalid data provided'}), 400 # Create the table HTML table_rows = [] for item in table_data: labels_html = '' if item['status'].lower() == 'fail' and item['labels']: labels = ' '.join(f'{label}' for label in item['labels']) labels_html = f'
{labels}
' row = f""" {item['serialNo']} {item['filename']}
{item['filename']}
{item['status']} {labels_html if labels_html else '-'} """ table_rows.append(row) # Create directories reports_dir = Path('Reports') reports_dir.mkdir(exist_ok=True) date_dir = reports_dir / date date_dir.mkdir(exist_ok=True) # Read the template template_path = Path('templates/report_template.html') with open(template_path, 'r', encoding='utf-8') as f: template = f.read() # Fill in the template html_content = template.format( date=date, time=time, summary_content=summary_content, table_rows='\n'.join(table_rows) ) # Save the file filename = f'Report_{date}_{time}.html' report_path = date_dir / filename with open(report_path, 'w', encoding='utf-8') as f: f.write(html_content) return jsonify({'success': True}) except Exception as e: return jsonify({'error': str(e)}), 500 # Add new route for chunked report saving @app.route('/save_report_chunk', methods=['POST']) def save_report_chunk(): try: chunk_data = request.get_json() if not chunk_data: return jsonify({'error': 'No data received'}), 400 # Log received data for debugging print(f"Received chunk data: {chunk_data.keys()}") # Validate all required fields are present required_fields = ['chunkNumber', 'totalChunks', 'date', 'time', 'chunk'] if not all(field in chunk_data for field in required_fields): missing_fields = [field for field in required_fields if field not in chunk_data] return jsonify({'error': f'Missing required fields: {", ".join(missing_fields)}'}), 400 # Extract and validate data try: chunk_number = int(chunk_data['chunkNumber']) total_chunks = int(chunk_data['totalChunks']) date = str(chunk_data['date']) time = str(chunk_data['time']) chunk = chunk_data['chunk'] summary = chunk_data.get('summary', '') except (ValueError, TypeError) as e: return jsonify({'error': f'Invalid data format: {str(e)}'}), 400 # Validate chunk data if not isinstance(chunk, list): return jsonify({'error': 'Chunk must be an array'}), 400 # Create directories with error handling try: reports_dir = Path('Reports') reports_dir.mkdir(exist_ok=True) date_dir = reports_dir / date date_dir.mkdir(exist_ok=True) temp_dir = date_dir / 'temp' temp_dir.mkdir(exist_ok=True) except Exception as e: return jsonify({'error': f'Failed to create directories: {str(e)}'}), 500 # Save chunk with error handling try: chunk_file = temp_dir / f'chunk_{chunk_number}.json' chunk_data_to_save = { 'data': chunk, 'summary': summary if chunk_number == 0 else '' } with open(chunk_file, 'w', encoding='utf-8') as f: json.dump(chunk_data_to_save, f) except Exception as e: return jsonify({'error': f'Failed to save chunk: {str(e)}'}), 500 # Process final chunk if chunk_number == total_chunks - 1: try: # Collect all chunks all_data = [] summary_content = '' # Verify all chunks exist for i in range(total_chunks): chunk_file = temp_dir / f'chunk_{i}.json' if not chunk_file.exists(): return jsonify({'error': f'Missing chunk file: chunk_{i}.json'}), 500 with open(chunk_file, 'r', encoding='utf-8') as f: chunk_content = json.load(f) all_data.extend(chunk_content['data']) if chunk_content['summary']: summary_content = chunk_content['summary'] # Verify template exists template_path = Path('templates/report_template.html') multiple_path = Path('templates/multiple.html') if not template_path.exists() or not multiple_path.exists(): return jsonify({'error': 'Template files not found'}), 500 # Copy the entire multiple.html content and remove upload section with open(multiple_path, 'r', encoding='utf-8') as f: template = f.read() # Remove the upload section from template upload_start = template.find('
') upload_end = template.find('
', upload_start) + 6 template = template.replace(template[upload_start:upload_end], '') # Replace placeholders template = template.replace('

Multiple Images Classification

', '

Classification Report

') template = template.replace('

Classify and segregate multiple images at once...

', '

{date} {time}

') template = template.replace('
', '
{summary_content}
') template = template.replace('', '{table_rows}') # Extract styles from multiple.html with open(multiple_path, 'r', encoding='utf-8') as f: multiple_content = f.read() style_start = multiple_content.find('') styles = multiple_content[style_start:style_end].strip() # Read and process the report template with open(template_path, 'r', encoding='utf-8') as f: template = f.read() # Replace style placeholder with actual styles template = template.replace('{multiple_styles}', styles) # Replace single braces with double braces to escape them template = template.replace('{', '{{').replace('}', '}}') # Restore our format placeholders template = template.replace('{{date}}', '{date}') \ .replace('{{time}}', '{time}') \ .replace('{{summary_content}}', '{summary_content}') \ .replace('{{table_rows}}', '{table_rows}') # Create table rows with escaped curly braces (only failed items) table_rows = [] serial_counter = 1 # Keep track of numbering for item in all_data: try: if item['status'].lower() == 'fail': # Only process failed items labels_html = '' if item['labels']: labels = ' '.join(f'{label}' for label in item['labels']) labels_html = f'
{labels}
' row = f""" {serial_counter} {item['filename']}
{item['filename']}
{item['status']} {labels_html if labels_html else '-'} """ table_rows.append(row) serial_counter += 1 except KeyError as e: return jsonify({'error': f'Missing required field in data: {str(e)}'}), 500 # Generate final HTML try: html_content = template.format( date=date, time=time, summary_content=summary_content, table_rows='\n'.join(table_rows) ) except Exception as e: return jsonify({'error': f'Failed to generate HTML: {str(e)}'}), 500 # Save final report try: filename = f'Report_{date}_{time}.html' report_path = date_dir / filename with open(report_path, 'w', encoding='utf-8') as f: f.write(html_content) except Exception as e: return jsonify({'error': f'Failed to save report: {str(e)}'}), 500 # Cleanup try: shutil.rmtree(temp_dir) except Exception as e: print(f"Warning: Failed to clean up temp directory: {str(e)}") return jsonify({'success': True}) except Exception as e: return jsonify({'error': f'Error processing final chunk: {str(e)}'}), 500 return jsonify({'success': True, 'message': 'Chunk received'}) except Exception as e: print(f"Error in save_report_chunk: {str(e)}") return jsonify({'error': str(e)}), 500 if __name__ == '__main__': # Clear uploads on startup clear_uploads(app.config['UPLOAD_FOLDER_SINGLE']) clear_uploads(app.config['UPLOAD_FOLDER_MULTIPLE']) app.run(host='0.0.0.0', port=7860, debug=False)