import streamlit as st import pandas as pd import hashlib import numpy as np from sklearn.preprocessing import LabelEncoder def difference_hash(input_data): """ Computes the difference hash (dHash) for the given input data. Args: input_data (str): The input data as a string. Returns: str: The difference hash value as a hexadecimal string. """ # Split the input data into lines lines = input_data.splitlines() # Split the lines into columns columns = [line.split(',') for line in lines] # Create a DataFrame from the columns df = pd.DataFrame(columns) # Convert string columns to numeric using LabelEncoder for col in df.select_dtypes(include=['object']).columns: encoder = LabelEncoder() df[col] = encoder.fit_transform(df[col]) # Convert the Pandas DataFrame to a NumPy array input_data = df.values # Compute the difference between adjacent rows differences = np.diff(input_data, axis=0) # Compute the hash by comparing each difference to zero and making the 1D array hash_bits = [1 if diff >= 0 else 0 for diff in differences.flatten()] # Convert the hash bits to a hexadecimal string hash_value = hex(int(''.join(str(bit) for bit in hash_bits), 2))[2:][:128] return hash_value def hamming_distance(hash1, hash2): """ Computes the Hamming distance between two hash values. Args: hash1 (str): The first hash value as a hexadecimal string. hash2 (str): The second hash value as a hexadecimal string. Returns: float: The normalized Hamming distance between the two hash values (between 0 and 1). """ # Convert hash strings to integers int_hash1 = int(hash1, 16) int_hash2 = int(hash2, 16) # Compute the XOR of the integers xor_value = int_hash1 ^ int_hash2 # Count the number of set bits in the XOR value (Hamming distance) hamming_dist = bin(xor_value).count('1') # Calculate the maximum possible Hamming distance max_hamming_dist = max(bin(int_hash1).count('1'), bin(int_hash2).count('1')) # Normalize the Hamming distance to a value between 0 and 1 normalized_hamming_dist = hamming_dist / max_hamming_dist return normalized_hamming_dist def jaccard_similarity(text1, text2): lines1 = text1.splitlines() lines2 = text2.splitlines() intersection = len(set(lines1).intersection(set(lines2))) union = len(set(lines1 + lines2)) if union == 0: return 1.0 # Handle the case where both files are empty return intersection / union # Read datasets from files def read_file(file_path): with open(file_path, 'r') as file: text = file.read() return text def compare_datasets(ref_file, compare_files): ref_text = read_file(ref_file) ref_simhash = difference_hash(ref_text) outputs = [] for file_path in compare_files: text = read_file(file_path) simhash_value = difference_hash(text) hamming_dist = hamming_distance(ref_simhash, simhash_value) jaccard_sim = jaccard_similarity(ref_text, text) # Adjust the rate of change based on the current value of combined_similarity if hamming_dist > 0.5: rate_of_change = 0.01 # Slow decrement else: rate_of_change = 0.01 # Fast increment # Combine Hamming distance and Jaccard similarity combined_similarity = max(0, min(1, (1 - rate_of_change * hamming_dist) * jaccard_sim)) print(f"For file {file}:") print("Combined Similarity:", combined_similarity) # Set threshold value threshold = 0.1 outputs.append(f"For file {file_path}:\nCombined Similarity: {combined_similarity}\nThe datasets are {'similar' if combined_similarity >= threshold else 'dissimilar'}") return "\n\n".join(outputs) def main(): st.title("Dataset Similarity Comparison") st.write("Compare datasets for similarity using simhash and Jaccard similarity.") ref_file = st.file_uploader("Upload Reference File", type=["csv"]) compare_files = st.file_uploader("Upload Files to Compare", type=["csv"], accept_multiple_files=True) if ref_file and compare_files: compare_results = compare_datasets(ref_file, [f.name for f in compare_files]) st.write(compare_results) if __name__ == "__main__": main()