Spaces:
Sleeping
Sleeping
| import os | |
| import shutil | |
| import subprocess | |
| import gradio as gr | |
| from langchain import HuggingFaceHub | |
| from langchain.chains import create_retrieval_chain | |
| from langchain.chains.combine_documents import create_stuff_documents_chain | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_core.prompts import ChatPromptTemplate | |
| # Get the directory of the current module | |
| module_directory = os.path.dirname(os.path.abspath(__file__)) | |
| class DocumentProcessor: | |
| def __init__(self, document_paths, token): | |
| self.document_paths = document_paths | |
| os.environ["HUGGINGFACEHUB_API_TOKEN"] = os.getenv('hf_token') | |
| # self.load_documents([r"E:\FreeLance\download\tmp_doc.pdf"]) | |
| self.persist_directory = './docs/chroma/' | |
| # self.vectordb = None | |
| self.llm = HuggingFaceHub(repo_id="tiiuae/falcon-7b-instruct", | |
| model_kwargs={"max_length": 300, "max_new_tokens": 300}) | |
| # self.llm = HuggingFaceHub(repo_id="tiiuae/falcon-7b-instruct", | |
| # model_kwargs={"max_length": 1000, "max_new_tokens": 1000}) | |
| # self.llm = HuggingFaceHub(repo_id="tiiuae/falcon-40b", # tiiuae/falcon-40b | |
| # model_kwargs={"max_length": 1000, "max_new_tokens": 1000}) | |
| def load_documents(self, file_paths): | |
| # self.document_paths = [r"E:\FreeLance\download\tmp_doc.pdf"] | |
| self.document_paths = file_paths | |
| # Load PDF | |
| loaders = [ | |
| # Duplicate documents on purpose - messy data | |
| PyPDFLoader(self.document_paths[0]) | |
| # PyPDFLoader("/content/documents/ddpm.pdf"), | |
| ] | |
| self.docs = [] | |
| for loader in loaders: | |
| self.docs.extend(loader.load()) | |
| def split_documents(self): | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=15) | |
| # text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=150) | |
| self.splits = text_splitter.split_documents(self.docs) | |
| def change_permissions(self, directory): | |
| try: | |
| # Define the command | |
| command = ["chmod", "777", "-R", directory] | |
| # Execute the command | |
| subprocess.run(command, check=True) | |
| print(f"Permissions for {directory} changed to 664 successfully.") | |
| except subprocess.CalledProcessError as e: | |
| print(f"An error occurred while changing permissions: {e}") | |
| def delete_embeddings(self): | |
| if os.path.isdir(self.persist_directory): | |
| self.change_permissions(self.persist_directory) | |
| print('directory exist') | |
| shutil.rmtree(self.persist_directory, ignore_errors=True) | |
| def create_embeddings(self): | |
| embeddings = HuggingFaceEmbeddings() | |
| self.vectordb_doc = Chroma.from_documents( | |
| documents=self.splits, | |
| embedding=embeddings, | |
| persist_directory=self.persist_directory | |
| ) | |
| print(self.vectordb_doc._collection.count()) | |
| # self.vectordb = vectordb | |
| def get_embeddings(self): | |
| return self.vectordb_doc | |
| def parse_output(self, response): | |
| # Find the index where "Question:" starts | |
| question_index = response.find("Question:") | |
| # Get all text including and after "Question:" | |
| if question_index != -1: | |
| result_text = response[question_index:].strip() | |
| return result_text | |
| else: | |
| return "I apologies, I don't know the answer" | |
| def document_chain(self): | |
| # prompt = ChatPromptTemplate.from_template("""Answer the following question based only on the provided context: | |
| # | |
| # <context> | |
| # {context} | |
| # </context> | |
| # | |
| # Question: {input}""") | |
| prompt = ChatPromptTemplate.from_template(""" | |
| Answer the following question based only on the provided context: {context} | |
| Question: {input} | |
| """) | |
| document_chain = create_stuff_documents_chain(self.llm, prompt) | |
| return document_chain | |
| def reterival_chain(self, document_chain, document_embeddings): | |
| retriever = document_embeddings.as_retriever() | |
| retrieval_chain = create_retrieval_chain(retriever, document_chain) | |
| return retrieval_chain | |
| def get_response(self, retrieval_chain, message): | |
| # response = retrieval_chain.invoke({"input": "how can langsmith help with testing?"}) | |
| response = retrieval_chain.invoke({"input": message}) | |
| # print(response["answer"]) | |
| return response["answer"] | |
| print('') | |
| def upload_file(files, processor): | |
| try: | |
| file_paths = [file.name for file in files] | |
| processor.load_documents(file_paths) | |
| processor.split_documents() | |
| processor.delete_embeddings() | |
| doc_embeddings = processor.create_embeddings() | |
| gr.Info("Document Uploaded,Enjoy Chat Now!") | |
| except Exception as e: | |
| # Handle any exceptions that occur during execution | |
| print(f"An error occurred: {e}") | |
| gr.Warning("Upload File(s) Again!") | |
| # return doc_embeddings | |
| # print(file_paths) | |
| def echo(message, history, processor): | |
| try: | |
| document_chain = processor.document_chain() | |
| document_embeddings = processor.get_embeddings() | |
| reterival_chain = processor.reterival_chain(document_chain, document_embeddings) | |
| chain_result = processor.get_response(reterival_chain, message) | |
| parsed_result = processor.parse_output(chain_result) | |
| return parsed_result | |
| except Exception as e: | |
| # Handle any exceptions that occur during execution | |
| print(f"An error occurred: {e}") | |
| gr.Warning("An Error Occurred, Refresh Website!") | |
| def upload_warning(): | |
| gr.Warning("Upload PDF File(s) First!") | |
| def main(): | |
| css = """ | |
| .container { | |
| height: 90vh; | |
| } | |
| .container_1 { | |
| height: 80vh; | |
| } | |
| .container_2 { | |
| height: 20vh; | |
| } | |
| """ | |
| processor = DocumentProcessor(document_paths='', token='') | |
| with gr.Blocks(css=css) as demo: | |
| demo.load(upload_warning, inputs=None, outputs=None) | |
| with gr.Column(elem_classes=["container"]): | |
| gr.Markdown("## Chat with your Data") | |
| with gr.Column(elem_classes=["container_2"]): | |
| # gr.Markdown("Make sure uploading PDF file(s) first!") | |
| file_output = gr.File() | |
| upload_button = gr.UploadButton("Click to Upload File(s)", file_types=["pdf", "doc"], | |
| file_count="multiple") | |
| # Function to handle the upload and pass the processor | |
| def process_upload(files): | |
| upload_file(files, processor) | |
| # Get the document embeddings returned by process_upload | |
| upload_button.upload(process_upload, upload_button, file_output) | |
| with gr.Column(elem_classes=["container_1"]): | |
| def process_echo(message, history): | |
| return echo(message, history, processor) | |
| gr.ChatInterface(fn=process_echo, examples=["what is title", "what is summary", "create notes"]) | |
| gr.Markdown("* Note: The answers can be incorrect, However they can be enhanced") | |
| # with gr.Blocks() as demo: | |
| # gr.ChatInterface(fn=echo, examples=["what is title", "what is summary", "merhaba"], title="chat with your data") | |
| ## file_output = gr.File() | |
| # upload_button = gr.UploadButton("Click to Upload a File", file_types=["pdf", "doc"], file_count="multiple") | |
| # upload_button.upload(upload_file, upload_button, file_output) | |
| # demo = gr.ChatInterface(fn=echo, examples=["what is title", "what is summary", "merhaba"], title="Echo Bot") | |
| demo.launch() | |
| if __name__ == "__main__": | |
| main() | |