import os import dicom_processing import asyncio import aiofiles import subprocess from shutil import copyfile from pathlib import Path def get_drive_type(path): try: result = subprocess.run(["/usr/sbin/diskutil", "info", path], capture_output=True, text=True) if result.returncode != 0: print(f"Fehler beim Zugriff auf Laufwerksinformationen für {path}") return 'Unbekannt' # Überprüfung auf optische Laufwerke basierend auf 'Optical Drive Type' optical_drive_types = [ 'CD-ROM', 'CD-R', 'CD-RW', 'DVD-ROM', 'DVD-R', 'DVD-R DL', 'DVD-RW', 'DVD+R', 'DVD+R DL', 'DVD+RW' ] # Prüfen, ob einer der optischen Laufwerkstypen in der Ausgabe vorhanden ist if any(drive_type in result.stdout for drive_type in optical_drive_types): return 'CD-ROM' # Überprüfung auf SSD if 'Solid State: Yes' in result.stdout: return 'SSD' # Überprüfung auf HDD if 'Solid State: No' in result.stdout: return 'HDD' # Füge hier weitere spezifische Überprüfungen ein, falls nötig except Exception as e: print(f"Fehler beim Ermitteln des Laufwerkstyps: {e}") return 'Unbekannt' def check_for_cd_at_start(): volumes_path = "/Volumes" for volume in os.listdir(volumes_path): full_path = os.path.join(volumes_path, volume) if get_drive_type(full_path) == 'CD-ROM': print(f"CD-ROM gefunden: {full_path}") return full_path return None def eject_cd_if_volume_path(path): if get_drive_type(path) == 'CD-ROM': try: # Überprüfen, ob der Pfad existiert if os.path.exists(path): print(f"Der Pfad {path} beginnt mit /Volumes. Versuche, die CD auszuwerfen...") # Ausführen des Befehls zum Auswerfen der CD subprocess.run(["/usr/bin/drutil", "tray", "eject"], check=True) print("CD erfolgreich ausgeworfen.") else: print(f"Der Pfad {path} existiert nicht.") except subprocess.CalledProcessError as e: print(f"Fehler beim Auswerfen der CD: {e}") else: print(f"Der Pfad {path} beginnt nicht mit /Volumes. Keine Aktion erforderlich.") def find_dicom_files(source_folder): dicom_files = [] for root, dirs, files in os.walk(source_folder): for file in files: if file.upper() == 'DICOMDIR': # Überprüfung, ob der Dateiname 'DICOMDIR' entspricht continue # Überspringt den aktuellen Iterationsschritt im Loop, wenn der Dateiname 'DICOMDIR' ist file_path = os.path.join(root, file) # Überprüft, ob die Datei eine .dcm oder .dicom Erweiterung hat oder wenn kein Punkt im Dateinamen ist (impliziert, dass es keine Dateierweiterung gibt), wird die is_dicom_file Funktion genutzt if file.endswith((".dcm", ".dicom")) or ('.' not in file): dicom_files.append(file_path) if not dicom_files: # Überprüft, ob die Liste leer ist return 'non_found' return dicom_files, len(dicom_files) # Rückgabe der Liste und der Anzahl der Dateien def get_files_in_folder(transfer_folder): file_paths = [] # Überprüfe, ob der Ordner existiert if os.path.exists(transfer_folder): # Durchlaufe alle Dateien im Ordner und füge ihre Pfade zur Liste hinzu for root, _, files in os.walk(transfer_folder): for file in files: file_paths.append(os.path.join(root, file)) return file_paths else: print(f"Der Ordner '{transfer_folder}' existiert nicht.") return 'no_transfer_folder' def next_dat_folder(transfer_folder): existing_folders = [d for d in os.listdir(transfer_folder) if os.path.isdir(os.path.join(transfer_folder, d))] dat_folders = sorted([d for d in existing_folders if d.startswith('dat_') and d[4:].isdigit()], key=lambda x: int(x[4:])) if dat_folders: last_number = int(dat_folders[-1][4:]) new_folder = f"dat_{str(last_number + 1).zfill(2)}" else: new_folder = "dat_01" return os.path.join(transfer_folder, new_folder) def get_latest_dat_folder(transfer_folder): existing_folders = [d for d in os.listdir(transfer_folder) if os.path.isdir(os.path.join(transfer_folder, d))] dat_folders = sorted([d for d in existing_folders if d.startswith('dat_') and d[4:].isdigit()], key=lambda x: int(x[4:])) if dat_folders: latest_folder = dat_folders[-1] else: latest_folder = None # Oder: return None, um anzuzeigen, dass kein Ordner existiert return os.path.join(transfer_folder, latest_folder) if latest_folder else None async def copy_dicom_files(dicom_files, source_folder, transfer_folder, progress_callback=None): if not os.path.exists(transfer_folder): os.makedirs(transfer_folder) destination_folder = next_dat_folder(transfer_folder) if not os.path.exists(destination_folder): os.makedirs(destination_folder) if progress_callback: progress_callback(0, len(dicom_files)) file_counter = {} # Ein Dictionary, um die Zählung für jeden Dateinamen zu speichern async def copy_file(source, dest_base): nonlocal file_counter filename = os.path.basename(source) # Bestimmen Sie die neue Dateinummer, falls der Dateiname bereits existiert count = file_counter.get(filename, 0) # Neuen Dateinamen mit fortlaufender Nummer erstellen new_filename = f"{Path(filename).stem}-{str(count).zfill(3)}{Path(filename).suffix}" dest = os.path.join(dest_base, new_filename) # Aktualisieren Sie den Zähler für diesen Dateinamen file_counter[filename] = count + 1 print(f"Kopieren startet: {source} -> {dest}") async with aiofiles.open(source, 'rb') as src_file, aiofiles.open(dest, 'wb') as dest_file: while True: content = await src_file.read(1024*1024) # Lese in 1MB Chunks if not content: break await dest_file.write(content) print(f"Kopieren abgeschlossen: {source} -> {dest}") if progress_callback: progress_callback(1, len(dicom_files)) drive_type = get_drive_type(source_folder) # Anpassung des Semaphore-Wertes basierend auf dem Laufwerkstyp sem_value = 1 if drive_type == 'CD-ROM' else 10 sem = asyncio.Semaphore(sem_value) async def sem_copy_file(file_path): async with sem: await copy_file(file_path, destination_folder) tasks = [asyncio.create_task(sem_copy_file(file_path)) for file_path in dicom_files] await asyncio.gather(*tasks) # Setze den Fortschritt zurück oder melde den Abschluss if progress_callback: progress_callback(len(dicom_files), len(dicom_files)) def format_patient_name(full_name): name_parts = full_name.split('^') name = name_parts[0] if len(name_parts) > 0 else "" vorname = name_parts[1] if len(name_parts) > 1 else "" return name, vorname def format_birthdate(birthdate): if not birthdate: print("Warnung: Kein Geburtsdatum angegeben") return None birthdate_str = str(birthdate).strip() if len(birthdate_str) == 8: return f"{birthdate_str[6:8]}.{birthdate_str[4:6]}.{birthdate_str[0:4]}" else: print(f"Ungültiges Geburtsdatum-Format: {birthdate_str}. Erwartetes Format: YYYYMMDD") return None def compare_patient_info(name, vorname, geburtsdatum, name_source, vorname_source, geburtsdatum_source): # Vergleiche jede Komponente der Patienteninformationen is_name_match = (name == name_source) is_vorname_match = (vorname == vorname_source) is_geburtsdatum_match = (geburtsdatum == geburtsdatum_source) # Überprüfe, ob alle Komponenten übereinstimmen is_complete_match = is_name_match and is_vorname_match and is_geburtsdatum_match # Rückgabe der Ergebnisse return { 'is_complete_match': is_complete_match, 'is_name_match': is_name_match, 'is_vorname_match': is_vorname_match, 'is_geburtsdatum_match': is_geburtsdatum_match } def write_to_file(acq_date, modality, filename): if not acq_date or not modality: print("Warnung: Unvollständige Daten für Ausgabedatei") return if len(acq_date) < 8: print(f"Warnung: Ungültiges Datumsformat: {acq_date}") return formatted_date = f"{acq_date[6:8]}.{acq_date[4:6]}.{acq_date[0:4]}" print(f"Versuche, Daten in {filename} zu schreiben...") try: # Stelle sicher, dass das Verzeichnis existiert os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True) with open(filename, "a", encoding='utf-8') as file: file.write(f"{modality} vom {formatted_date}\n") print(f"Daten erfolgreich in {filename} geschrieben.") except IOError as e: print(f"Fehler beim Schreiben in die Datei {filename}: {e}") except OSError as e: print(f"OS-Fehler beim Schreiben in die Datei {filename}: {e}") except Exception as e: print(f"Ein unerwarteter Fehler ist aufgetreten: {e}")