| | import os |
| | import sys |
| | import sqlite3 |
| | from datasets import Dataset |
| | from transformers import AutoTokenizer, AutoModelForSequenceClassification, TFAutoModelForSequenceClassification, Trainer, TrainingArguments |
| | from bs4 import BeautifulSoup |
| | import xml.etree.ElementTree as ET |
| | import pyth.plugins.rtf15.reader as rtf15_reader |
| | import pyth.plugins.plaintext.writer as plaintext_writer |
| |
|
| | SUPPORTED_FILE_TYPES = ['.sh', '.bat', '.ps1', '.cs', '.c', '.cpp', '.h', '.cmake', '.py', '.git', '.sql', '.csv', '.sqlite', '.lsl', '.html', '.xml', '.rtf'] |
| |
|
| | def extrahiere_parameter(file_path): |
| | try: |
| | with open(file_path, 'r', encoding='utf-8') as file: |
| | lines = file.readlines() |
| | anzahl_zeilen = len(lines) |
| | anzahl_zeichen = sum(len(line) for line in lines) |
| | long_text_mode = anzahl_zeilen > 1000 |
| | dimensionalität = 1 |
| | return { |
| | "text": file_path, |
| | "anzahl_zeilen": anzahl_zeilen, |
| | "anzahl_zeichen": anzahl_zeichen, |
| | "long_text_mode": long_text_mode, |
| | "dimensionalität": dimensionalität |
| | } |
| | except UnicodeDecodeError as e: |
| | print(f"Fehler beim Lesen der Datei {file_path}: {e}") |
| | return None |
| | except Exception as e: |
| | print(f"Allgemeiner Fehler beim Lesen der Datei {file_path}: {e}") |
| | return None |
| |
|
| | def extrahiere_parameter_html(file_path): |
| | try: |
| | with open(file_path, 'r', encoding='utf-8') as file: |
| | content = file.read() |
| | soup = BeautifulSoup(content, 'html.parser') |
| | text = soup.get_text() |
| | anzahl_zeilen = text.count('\n') |
| | anzahl_zeichen = len(text) |
| | long_text_mode = anzahl_zeilen > 1000 |
| | dimensionalität = 1 |
| | return { |
| | "text": text, |
| | "anzahl_zeilen": anzahl_zeilen, |
| | "anzahl_zeichen": anzahl_zeichen, |
| | "long_text_mode": long_text_mode, |
| | "dimensionalität": dimensionalität |
| | } |
| | except Exception as e: |
| | print(f"Fehler beim Lesen der HTML-Datei {file_path}: {e}") |
| | return None |
| |
|
| | def extrahiere_parameter_xml(file_path): |
| | try: |
| | tree = ET.parse(file_path) |
| | root = tree.getroot() |
| | text = ET.tostring(root, encoding='unicode', method='text') |
| | anzahl_zeilen = text.count('\n') |
| | anzahl_zeichen = len(text) |
| | long_text_mode = anzahl_zeilen > 1000 |
| | dimensionalität = 1 |
| | return { |
| | "text": text, |
| | "anzahl_zeilen": anzahl_zeilen, |
| | "anzahl_zeichen": anzahl_zeichen, |
| | "long_text_mode": long_text_mode, |
| | "dimensionalität": dimensionalität |
| | } |
| | except Exception as e: |
| | print(f"Fehler beim Lesen der XML-Datei {file_path}: {e}") |
| | return None |
| |
|
| | def extrahiere_parameter_rtf(file_path): |
| | try: |
| | with open(file_path, 'rb') as file: |
| | doc = rtf15_reader.read(file) |
| | text = plaintext_writer.write(doc).getvalue() |
| | anzahl_zeilen = text.count('\n') |
| | anzahl_zeichen = len(text) |
| | long_text_mode = anzahl_zeilen > 1000 |
| | dimensionalität = 1 |
| | return { |
| | "text": text, |
| | "anzahl_zeilen": anzahl_zeilen, |
| | "anzahl_zeichen": anzahl_zeichen, |
| | "long_text_mode": long_text_mode, |
| | "dimensionalität": dimensionalität |
| | } |
| | except Exception as e: |
| | print(f"Fehler beim Lesen der RTF-Datei {file_path}: {e}") |
| | return None |
| |
|
| | def durchsuchen_und_extrahieren(root_dir, db_pfad): |
| | try: |
| | with sqlite3.connect(db_pfad) as conn: |
| | cursor = conn.cursor() |
| | cursor.execute('''CREATE TABLE IF NOT EXISTS dateiparameter |
| | (id INTEGER PRIMARY KEY, |
| | dateipfad TEXT, |
| | anzahl_zeilen INTEGER, |
| | anzahl_zeichen INTEGER, |
| | long_text_mode BOOLEAN, |
| | dimensionalität INTEGER)''') |
| |
|
| | for subdir, _, files in os.walk(root_dir): |
| | for file in files: |
| | file_path = os.path.join(subdir, file) |
| | if file.endswith('.html'): |
| | parameter = extrahiere_parameter_html(file_path) |
| | elif file.endswith('.xml'): |
| | parameter = extrahiere_parameter_xml(file_path) |
| | elif file.endswith('.rtf'): |
| | parameter = extrahiere_parameter_rtf(file_path) |
| | elif any(file.endswith(ext) for ext in SUPPORTED_FILE_TYPES): |
| | parameter = extrahiere_parameter(file_path) |
| | else: |
| | continue |
| |
|
| | if parameter: |
| | cursor.execute('''INSERT INTO dateiparameter (dateipfad, anzahl_zeilen, anzahl_zeichen, long_text_mode, dimensionalität) |
| | VALUES (?, ?, ?, ?, ?)''', (file_path, parameter["anzahl_zeilen"], parameter["anzahl_zeichen"], parameter["long_text_mode"], parameter["dimensionalität"])) |
| | conn.commit() |
| | print("Parameter erfolgreich extrahiert und in der Datenbank gespeichert.") |
| | except sqlite3.Error as e: |
| | print(f"SQLite Fehler: {e}") |
| | except Exception as e: |
| | print(f"Allgemeiner Fehler: {e}") |
| |
|
| | def extrahiere_parameter_aus_db(db_pfad): |
| | try: |
| | with sqlite3.connect(db_pfad) as conn: |
| | cursor = conn.cursor() |
| | cursor.execute("SELECT * FROM dateiparameter") |
| | daten = cursor.fetchall() |
| | return daten |
| | except sqlite3.Error as e: |
| | print(f"SQLite Fehler: {e}") |
| | return None |
| | except Exception as e: |
| | print(f"Allgemeiner Fehler: {e}") |
| | return None |
| |
|
| | def konvertiere_zu_hf_dataset(daten): |
| | dataset_dict = { |
| | "text": [], |
| | "anzahl_zeilen": [], |
| | "anzahl_zeichen": [], |
| | "long_text_mode": [], |
| | "dimensionalität": [] |
| | } |
| | |
| | for eintrag in daten: |
| | dataset_dict["text"].append(eintrag[1]) |
| | dataset_dict["anzahl_zeilen"].append(eintrag[2]) |
| | dataset_dict["anzahl_zeichen"].append(eintrag[3]) |
| | dataset_dict["long_text_mode"].append(eintrag[4]) |
| | dataset_dict["dimensionalität"].append(eintrag[5]) |
| | |
| | return Dataset.from_dict(dataset_dict) |
| |
|
| | def trainiere_und_speichere_modell(hf_dataset, output_model_dir): |
| | try: |
| | tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", use_fast=True) |
| |
|
| | def tokenize_function(examples): |
| | return tokenizer(examples["text"], padding="max_length", truncation=True) |
| |
|
| | tokenized_datasets = hf_dataset.map(tokenize_function, batched=True) |
| |
|
| | |
| | tokenized_datasets = tokenized_datasets.map(lambda examples: {"label": [0.0] * len(examples["text"])}, batched=True) |
| |
|
| | |
| | train_test_split = tokenized_datasets.train_test_split(test_size=0.2) |
| | train_dataset = train_test_split["train"] |
| | eval_dataset = train_test_split["test"] |
| |
|
| | num_labels = len(set(train_dataset["label"])) |
| |
|
| | |
| | model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_labels) |
| |
|
| | training_args = TrainingArguments( |
| | output_dir=output_model_dir, |
| | evaluation_strategy="epoch", |
| | per_device_train_batch_size=8, |
| | per_device_eval_batch_size=8, |
| | num_train_epochs=3, |
| | weight_decay=0.01, |
| | ) |
| |
|
| | trainer = Trainer( |
| | model=model, |
| | args=training_args, |
| | train_dataset=train_dataset, |
| | eval_dataset=eval_dataset, |
| | ) |
| |
|
| | trainer.train() |
| | model.save_pretrained(output_model_dir) |
| | tokenizer.save_pretrained(output_model_dir) |
| |
|
| | |
| | tf_model = TFAutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_labels) |
| | tf_model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) |
| |
|
| | |
| | import tensorflow as tf |
| | dummy_input = tf.constant(tokenizer("This is a dummy input", return_tensors="tf")["input_ids"]) |
| |
|
| | |
| | tf_model(dummy_input) |
| | tf_model.save_pretrained(output_model_dir) |
| |
|
| | print(f"Das Modell wurde erfolgreich in {output_model_dir} gespeichert.") |
| | |
| | except Exception as e: |
| | print(f"Fehler beim Trainieren und Speichern des Modells: {e}") |
| |
|
| | if __name__ == "__main__": |
| | |
| | if len(sys.argv) > 1: |
| | directory_path = sys.argv[1] |
| | else: |
| | directory_path = '.' |
| | |
| | db_name = os.path.basename(os.path.normpath(directory_path)) + '.db' |
| |
|
| | durchsuchen_und_extrahieren(directory_path, db_name) |
| |
|
| | daten = extrahiere_parameter_aus_db(db_name) |
| | if daten: |
| | hf_dataset = konvertiere_zu_hf_dataset(daten) |
| |
|
| | output_model = os.path.basename(os.path.normpath(directory_path)) + '_model' |
| | output_model_dir = os.path.join(os.path.dirname(db_name), output_model) |
| |
|
| | trainiere_und_speichere_modell(hf_dataset, output_model_dir) |
| | else: |
| | print("Keine Daten gefunden, um ein HF-Dataset zu erstellen.") |
| |
|