import random import matplotlib.pyplot as plt import nltk from nltk.tokenize import word_tokenize, sent_tokenize from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from nltk.text import Text from nltk.probability import FreqDist from cleantext import clean import textract import urllib.request from io import BytesIO import sys import pandas as pd import cv2 import re from wordcloud import WordCloud, ImageColorGenerator from textblob import TextBlob from PIL import Image import os import gradio as gr from dotenv import load_dotenv import groq import json import traceback import numpy as np import unidecode import contractions from sklearn.feature_extraction.text import TfidfVectorizer load_dotenv() import nltk import ssl def ensure_nltk_resources(): try: nltk.data.find('tokenizers/punkt') nltk.data.find('corpora/stopwords') except LookupError: print("NLTK resources not found. Downloading...") try: # Handling potential SSL issues (common on some systems) _create_unverified_https_context = ssl._create_unverified_context except AttributeError: pass else: ssl._create_default_https_context = _create_unverified_https_context nltk.download(['stopwords', 'wordnet', 'words']) nltk.download('punkt') nltk.download('punkt_tab') print("NLTK resources downloaded successfully.") ensure_nltk_resources() # Download NLTK resources (Ensure this runs once or handle caching) # nltk.download(['stopwords', 'wordnet', 'words']) # nltk.download('punkt') # nltk.download('punkt_tab') # Initialize Groq client groq_api_key = os.getenv("GROQ_API_KEY") groq_client = groq.Groq(api_key=groq_api_key) if groq_api_key else None # Stopwords customization stop_words = set(stopwords.words('english')) stop_words.update({'ask', 'much', 'thank', 'etc.', 'e', 'We', 'In', 'ed', 'pa', 'This', 'also', 'A', 'fu', 'To', '5', 'ing', 'er', '2'}) # Ensure stop_words is a set # --- Parsing & Preprocessing Functions --- def Parsing(parsed_text): try: if hasattr(parsed_text, 'name'): file_path = parsed_text.name else: file_path = parsed_text # Ensure textract handles encoding correctly or handle errors raw_party = textract.process(file_path) # Removed encoding/method for broader compatibility decoded_text = raw_party.decode('utf-8', errors='ignore') # Decode bytes to string, handling errors return clean(decoded_text) # Pass decoded string to clean except Exception as e: print(f"Error parsing PDF: {e}") return f"Error parsing PDF: {e}" def clean_text(text): text = text.encode("ascii", errors="ignore").decode("ascii") text = unidecode.unidecode(text) text = contractions.fix(text) text = re.sub(r"\n", " ", text) text = re.sub(r"\t", " ", text) text = re.sub(r"/ ", " ", text) text = text.strip() text = re.sub(" +", " ", text).strip() text = [word for word in text.split() if word not in stop_words] return ' '.join(text) def Preprocess(textParty): text1Party = re.sub('[^A-Za-z0-9]+', ' ', textParty) pattern = re.compile(r'\b(' + r'|'.join(stopwords.words('english')) + r')\b\s*') text2Party = pattern.sub('', text1Party) return text2Party # --- Core Analysis Functions --- def generate_summary(text): if not groq_client: return "Summarization is not available. Please set up your GROQ_API_KEY in the .env file." if len(text) > 10000: text = text[:10000] try: completion = groq_client.chat.completions.create( model="llama3-8b-8192", # Or your preferred model messages=[ {"role": "system", "content": "You are a helpful assistant that summarizes political manifestos. Provide a concise, objective summary that captures the key policy proposals, themes, and promises in the manifesto."}, {"role": "user", "content": f"Please summarize the following political manifesto text in about 300-500 words, focusing on the main policy areas, promises, and themes:\n{text}"} ], temperature=0.3, max_tokens=800 ) return completion.choices[0].message.content except Exception as e: return f"Error generating summary: {str(e)}" def fDistance(text2Party): word_tokens_party = word_tokenize(text2Party) fdistance = FreqDist(word_tokens_party).most_common(10) mem = {x[0]: x[1] for x in fdistance} vectorizer = TfidfVectorizer(max_features=15, stop_words='english') try: tfidf_matrix = vectorizer.fit_transform(sent_tokenize(text2Party)) feature_names = vectorizer.get_feature_names_out() tfidf_scores = {} sentences = sent_tokenize(text2Party) for i, word in enumerate(feature_names): scores = [] for j in range(tfidf_matrix.shape[0]): # Iterate through sentences if i < tfidf_matrix.shape[1]: # Check if word index is valid for this sentence vector scores.append(tfidf_matrix[j, i]) if scores: tfidf_scores[word] = sum(scores) / len(scores) # Average TF-IDF score across sentences combined_scores = {} all_words = set(list(mem.keys()) + list(tfidf_scores.keys())) max_freq = max(mem.values()) if mem else 1 max_tfidf = max(tfidf_scores.values()) if tfidf_scores else 1 for word in all_words: freq_score = mem.get(word, 0) / max_freq tfidf_score = tfidf_scores.get(word, 0) / max_tfidf combined_scores[word] = (freq_score * 0.3) + (tfidf_score * 0.7) top_words = dict(sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)[:10]) return normalize(top_words) except ValueError as ve: # Handle case where TF-IDF fails (e.g., empty after processing) print(f"Warning: TF-IDF failed, using only frequency: {ve}") # Fallback to just normalized frequency if TF-IDF fails if mem: max_freq = max(mem.values()) return {k: v / max_freq for k, v in list(mem.items())[:10]} # Return top 10 freq, normalized else: return {} def normalize(d, target=1.0): raw = sum(d.values()) factor = target / raw if raw != 0 else 0 return {key: value * factor for key, value in d.items()} # --- Visualization Functions with Error Handling --- def safe_plot(func, *args, **kwargs): """Executes a plotting function and returns the image, handling errors.""" buf = None # Initialize buffer try: # Ensure a clean figure state fig = plt.figure() # Create a new figure explicitly func(*args, **kwargs) buf = BytesIO() # Try saving with bbox_inches, but catch potential apply_aspect error try: plt.savefig(buf, format='png', bbox_inches='tight') except AttributeError as ae: if "apply_aspect" in str(ae): print(f"Warning: bbox_inches='tight' failed ({ae}), saving without it.") buf.seek(0) # Reset buffer as it might be partially written buf = BytesIO() # Get a fresh buffer plt.savefig(buf, format='png') # Save without bbox_inches else: raise # Re-raise if it's a different AttributeError buf.seek(0) img = Image.open(buf) plt.close(fig) # Explicitly close the specific figure return img except Exception as e: print(f"Plotting error in safe_plot: {e}") if buf: buf.close() # Ensure buffer is closed on error if it was created traceback.print_exc() # Try to return a placeholder or None plt.close('all') # Aggressive close on error return None def fDistancePlot(text2Party): """Generates the frequency distribution plot.""" def plot_func(): tokens = word_tokenize(text2Party) if not tokens: plt.text(0.5, 0.5, "No data to plot", ha='center', va='center', transform=plt.gca().transAxes) # Use Axes coordinates return fdist = FreqDist(tokens) fdist.plot(15, title='Frequency Distribution') plt.xticks(rotation=45, ha='right') plt.tight_layout() return safe_plot(plot_func) def DispersionPlot(textParty): """Generates the word dispersion plot.""" buf = None # Initialize buffer try: word_tokens_party = word_tokenize(textParty) if not word_tokens_party: print("Warning: No tokens found for dispersion plot.") return None moby = Text(word_tokens_party) fdistance = FreqDist(word_tokens_party) # Get top 5 words, handle potential IndexError if less than 5 unique words common_words = fdistance.most_common(6) if len(common_words) < 5: word_Lst = [word for word, _ in common_words] else: word_Lst = [common_words[x][0] for x in range(5)] if not word_Lst: print("Warning: No common words found for dispersion plot.") return None # --- Key Fix: Manage figure explicitly without passing 'ax' --- fig = plt.figure(figsize=(10, 5)) # Create figure explicitly plt.title('Dispersion Plot') # Call dispersion_plot without 'ax' argument moby.dispersion_plot(word_Lst) plt.tight_layout() buf = BytesIO() # Handle potential apply_aspect error for dispersion plot try: fig.savefig(buf, format='png', bbox_inches='tight') except AttributeError as ae: if "apply_aspect" in str(ae): print(f"Warning: bbox_inches='tight' failed for Dispersion Plot ({ae}), saving without it.") buf.seek(0) buf = BytesIO() # Get a fresh buffer fig.savefig(buf, format='png') else: raise # Re-raise if it's a different AttributeError buf.seek(0) img = Image.open(buf) plt.close(fig) # Close the specific figure created return img except Exception as e: print(f"Dispersion plot error: {e}") if buf: buf.close() # Ensure buffer is closed on error traceback.print_exc() plt.close('all') # Aggressive close on error return None # Return None on error def word_cloud_generator(parsed_text_name, text_Party): """Generates the word cloud image.""" buf = None # Initialize buffer try: filename_lower = "" if hasattr(parsed_text_name, 'name') and parsed_text_name.name: filename_lower = parsed_text_name.name.lower() elif isinstance(parsed_text_name, str): filename_lower = parsed_text_name.lower() mask_path = None if 'bjp' in filename_lower: mask_path = 'bjpImg2.jpeg' elif 'congress' in filename_lower: mask_path = 'congress3.jpeg' elif 'aap' in filename_lower: mask_path = 'aapMain2.jpg' if text_Party.strip() == "": raise ValueError("Text for word cloud is empty") # Generate word cloud object if mask_path and os.path.exists(mask_path): orgImg = Image.open(mask_path) if orgImg.mode != 'RGB': orgImg = orgImg.convert('RGB') mask = np.array(orgImg) wordcloud = WordCloud(max_words=3000, mask=mask, background_color='white', mode='RGBA').generate(text_Party) else: wordcloud = WordCloud(max_words=2000, background_color='white', mode='RGBA').generate(text_Party) # --- Key Fix: Explicitly manage figure and axes for word cloud --- fig, ax = plt.subplots(figsize=(8, 6)) # Create new figure and axes ax.imshow(wordcloud, interpolation='bilinear') ax.axis("off") fig.tight_layout(pad=0) # Remove padding buf = BytesIO() # Handle potential apply_aspect error for word cloud too try: fig.savefig(buf, format='png', bbox_inches='tight', dpi=300, facecolor='white') # Added dpi and facecolor except AttributeError as ae: if "apply_aspect" in str(ae): print(f"Warning: bbox_inches='tight' failed for Word Cloud ({ae}), saving without it.") buf.seek(0) buf = BytesIO() fig.savefig(buf, format='png', dpi=300, facecolor='white') else: raise buf.seek(0) img = Image.open(buf) plt.close(fig) # Close the specific figure return img except Exception as e: print(f"Word cloud error: {e}") if buf: buf.close() # Ensure buffer is closed on error traceback.print_exc() plt.close('all') # Aggressive close on error return None # Return None on error # Initial design for concordance based search def get_all_phases_containing_tar_wrd(target_word, tar_passage, left_margin=10, right_margin=10, numLins=4): """ Function to get all the phrases that contain the target word in a text/passage. """ if not target_word or target_word.strip() == "": return "Please enter a search term" tokens = nltk.word_tokenize(tar_passage) text = nltk.Text(tokens) c = nltk.ConcordanceIndex(text.tokens, key=lambda s: s.lower()) offsets = c.offsets(target_word) if not offsets: return f"Word '{target_word}' not found." concordance_txt = [ text.tokens[max(0, offset - left_margin):offset + right_margin] for offset in offsets[:numLins] ] result = [' '.join(con_sub) for con_sub in concordance_txt] return '\n'.join(result) # Use newline for better readability in textbox def get_contextual_search_result(target_word, tar_passage, groq_client_instance, max_context_length=8000): """ Uses the LLM to provide contextual information about the target word within the passage. """ if not target_word or target_word.strip() == "": return "Please enter a search term." if not groq_client_instance: return "Contextual search requires the LLM API. Please set up your GROQ_API_KEY." # Basic check if word exists (optional, LLM can handle it too) if target_word.lower() not in tar_passage.lower(): return f"The term '{target_word}' was not found in the manifesto text." # Truncate passage if too long for the model/context window # You might need to adjust this based on your model's limits and desired performance if len(tar_passage) > max_context_length: # Simple truncation; could be improved to ensure sentences are complete tar_passage = tar_passage[:max_context_length] print(f"Warning: Passage truncated for LLM search context to {max_context_length} characters.") prompt = f""" You are given a political manifesto text and a specific search term. Your task is to find all relevant mentions of the search term in the text and provide a concise, informative summary of the context surrounding each mention. Focus on the key ideas, policies, or points related to the search term. If the term is not found or not relevant, state that clearly. Search Term: {target_word} Manifesto Text: {tar_passage} """ try: completion = groq_client_instance.chat.completions.create( model="llama3-8b-8192", # Use the same or a suitable model messages=[ {"role": "system", "content": "You are a helpful assistant skilled at analyzing political texts and extracting relevant information based on a search query."}, {"role": "user", "content": prompt} ], temperature=0.2, # Low temperature for more factual extraction max_tokens=1000 # Adjust based on expected output length ) result = completion.choices[0].message.content.strip() return result if result else f"No specific context for '{target_word}' could be generated." except Exception as e: error_msg = f"Error during contextual search for '{target_word}': {str(e)}" print(error_msg) traceback.print_exc() # Fallback to concordance if LLM fails? # return get_all_phases_containing_tar_wrd_fallback(target_word, tar_passage) return error_msg # Or return the error message directly def analysis(Manifesto, Search): try: if Manifesto is None: # Ensure return order matches the outputs list return "No file uploaded", {}, None, None, None, None, None, "No file uploaded" if Search.strip() == "": Search = "government" raw_party = Parsing(Manifesto) if isinstance(raw_party, str) and raw_party.startswith("Error"): return raw_party, {}, None, None, None, None, None, "Parsing failed" text_Party = clean_text(raw_party) text_Party_processed = Preprocess(text_Party) # --- Perform Search FIRST using the ORIGINAL text for better context --- # Pass the original raw text for richer context to the LLM searChRes = get_contextual_search_result(Search, raw_party, groq_client) # --- Then proceed with other analyses --- summary = generate_summary(raw_party) # Use raw_party for summary for more context? # --- Sentiment Analysis --- if not text_Party_processed.strip(): # Handle empty text after processing df_dummy = pd.DataFrame({'Polarity_Label': ['Neutral'], 'Subjectivity_Label': ['Low']}) polarity_val = 0.0 subjectivity_val = 0.0 else: polarity_val = TextBlob(text_Party_processed).sentiment.polarity subjectivity_val = TextBlob(text_Party_processed).sentiment.subjectivity polarity_label = 'Positive' if polarity_val > 0 else 'Negative' if polarity_val < 0 else 'Neutral' subjectivity_label = 'High' if subjectivity_val > 0.5 else 'Low' df_dummy = pd.DataFrame({'Polarity_Label': [polarity_label], 'Subjectivity_Label': [subjectivity_label]}) # --- Generate Plots with Safe Plotting --- # Pass the potentially empty text and handle inside plotting functions sentiment_plot = safe_plot(lambda: df_dummy['Polarity_Label'].value_counts().plot(kind='bar', color="#FF9F45", title='Sentiment Analysis')) subjectivity_plot = safe_plot(lambda: df_dummy['Subjectivity_Label'].value_counts().plot(kind='bar', color="#B667F1", title='Subjectivity Analysis')) freq_plot = fDistancePlot(text_Party_processed) dispersion_plot = DispersionPlot(text_Party_processed) wordcloud = word_cloud_generator(Manifesto, text_Party_processed) # Pass Manifesto object itself fdist_Party = fDistance(text_Party_processed) # searChRes is now generated earlier return searChRes, fdist_Party, sentiment_plot, subjectivity_plot, wordcloud, freq_plot, dispersion_plot, summary except Exception as e: error_msg = f"Critical error in analysis function: {str(e)}" print(error_msg) traceback.print_exc() # Return error messages/images in the correct order return error_msg, {}, None, None, None, None, None, "Analysis failed" # --- Gradio Interface (remains largely the same, just ensuring output variable names match) --- # Use Blocks for custom layout with gr.Blocks(title='Manifesto Analysis') as demo: gr.Markdown("# Manifesto Analysis") # Input Section with gr.Row(): with gr.Column(scale=1): # Adjust scale if needed file_input = gr.File(label="Upload Manifesto PDF", file_types=[".pdf"]) with gr.Column(scale=1): search_input = gr.Textbox(label="Search Term", placeholder="Enter a term to search in the manifesto") submit_btn = gr.Button("Analyze Manifesto", variant='primary') # Make button prominent # Output Section using Tabs with gr.Tabs(): # --- Summary Tab --- with gr.TabItem("Summary"): summary_output = gr.Textbox(label='AI-Generated Summary', lines=10, interactive=False) # --- Search Results Tab --- with gr.TabItem("Search Results"): # Use the specific output variable defined in the layout search_output = gr.Textbox(label='Context Based Search Results', lines=15, interactive=False, max_lines=20) # Increased lines/max_lines # --- Key Topics Tab --- with gr.TabItem("Key Topics"): topics_output = gr.Label(label="Most Relevant Topics (LLM Enhanced)", num_top_classes=10) # Show top 10 # --- Visualizations Tab --- with gr.TabItem("Visualizations"): # Use Rows and Columns for better arrangement with gr.Row(): # Row 1: Sentiment & Subjectivity with gr.Column(): sentiment_output = gr.Image(label='Sentiment Analysis', interactive=False, height=400) # Set height with gr.Column(): subjectivity_output = gr.Image(label='Subjectivity Analysis', interactive=False, height=400) with gr.Row(): # Row 2: Word Cloud & Frequency with gr.Column(): wordcloud_output = gr.Image(label='Word Cloud', interactive=False, height=400) with gr.Column(): freq_output = gr.Image(label='Frequency Distribution', interactive=False, height=400) with gr.Row(): # Row 3: Dispersion Plot (Full width) with gr.Column(): dispersion_output = gr.Image(label='Dispersion Plot', interactive=False, height=400) # Adjust height as needed # --- Link Button Click to Function and Outputs --- # Ensure the order of outputs matches the function return order submit_btn.click( fn=analysis, inputs=[file_input, search_input], outputs=[ search_output, # 1 (Now contextual) topics_output, # 2 sentiment_output, # 3 subjectivity_output, # 4 wordcloud_output, # 5 freq_output, # 6 dispersion_output, # 7 summary_output # 8 ], concurrency_limit=1 # Limit concurrent analyses if needed ) # --- Examples --- gr.Examples( examples=[ ["Example/AAP_Manifesto_2019.pdf", "government"], ["Example/Bjp_Manifesto_2019.pdf", "environment"], ["Example/Congress_Manifesto_2019.pdf", "safety"] ], inputs=[file_input, search_input], outputs=[search_output, topics_output, sentiment_output, subjectivity_output, wordcloud_output, freq_output, dispersion_output, summary_output], # Link examples to outputs fn=analysis # Run analysis on example click ) # Launch the app if __name__ == "__main__": demo.launch(debug=True, share=False, show_error=True)