230 lines
9.2 KiB
Python
Executable File
230 lines
9.2 KiB
Python
Executable File
import os
|
|
import dicom_processing
|
|
import asyncio
|
|
import aiofiles
|
|
import subprocess
|
|
from shutil import copyfile
|
|
from pathlib import Path
|
|
|
|
|
|
def get_drive_type(path):
|
|
try:
|
|
result = subprocess.run(["/usr/sbin/diskutil", "info", path], capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
print(f"Fehler beim Zugriff auf Laufwerksinformationen für {path}")
|
|
return 'Unbekannt'
|
|
|
|
# Überprüfung auf optische Laufwerke basierend auf 'Optical Drive Type'
|
|
optical_drive_types = [
|
|
'CD-ROM', 'CD-R', 'CD-RW', 'DVD-ROM', 'DVD-R', 'DVD-R DL',
|
|
'DVD-RW', 'DVD+R', 'DVD+R DL', 'DVD+RW'
|
|
]
|
|
# Prüfen, ob einer der optischen Laufwerkstypen in der Ausgabe vorhanden ist
|
|
if any(drive_type in result.stdout for drive_type in optical_drive_types):
|
|
return 'CD-ROM'
|
|
|
|
# Überprüfung auf SSD
|
|
if 'Solid State: Yes' in result.stdout:
|
|
return 'SSD'
|
|
|
|
# Überprüfung auf HDD
|
|
if 'Solid State: No' in result.stdout:
|
|
return 'HDD'
|
|
|
|
# Füge hier weitere spezifische Überprüfungen ein, falls nötig
|
|
|
|
except Exception as e:
|
|
print(f"Fehler beim Ermitteln des Laufwerkstyps: {e}")
|
|
|
|
return 'Unbekannt'
|
|
|
|
def check_for_cd_at_start():
|
|
volumes_path = "/Volumes"
|
|
for volume in os.listdir(volumes_path):
|
|
full_path = os.path.join(volumes_path, volume)
|
|
if get_drive_type(full_path) == 'CD-ROM':
|
|
print(f"CD-ROM gefunden: {full_path}")
|
|
return full_path
|
|
return None
|
|
|
|
def eject_cd_if_volume_path(path):
|
|
if get_drive_type(path) == 'CD-ROM':
|
|
try:
|
|
# Überprüfen, ob der Pfad existiert
|
|
if os.path.exists(path):
|
|
print(f"Der Pfad {path} beginnt mit /Volumes. Versuche, die CD auszuwerfen...")
|
|
# Ausführen des Befehls zum Auswerfen der CD
|
|
subprocess.run(["/usr/bin/drutil", "tray", "eject"], check=True)
|
|
print("CD erfolgreich ausgeworfen.")
|
|
else:
|
|
print(f"Der Pfad {path} existiert nicht.")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Fehler beim Auswerfen der CD: {e}")
|
|
else:
|
|
print(f"Der Pfad {path} beginnt nicht mit /Volumes. Keine Aktion erforderlich.")
|
|
|
|
def find_dicom_files(source_folder):
|
|
dicom_files = []
|
|
for root, dirs, files in os.walk(source_folder):
|
|
for file in files:
|
|
if file.upper() == 'DICOMDIR': # Überprüfung, ob der Dateiname 'DICOMDIR' entspricht
|
|
continue # Überspringt den aktuellen Iterationsschritt im Loop, wenn der Dateiname 'DICOMDIR' ist
|
|
file_path = os.path.join(root, file)
|
|
# Überprüft, ob die Datei eine .dcm oder .dicom Erweiterung hat oder wenn kein Punkt im Dateinamen ist (impliziert, dass es keine Dateierweiterung gibt), wird die is_dicom_file Funktion genutzt
|
|
if file.endswith((".dcm", ".dicom")) or ('.' not in file):
|
|
dicom_files.append(file_path)
|
|
if not dicom_files: # Überprüft, ob die Liste leer ist
|
|
return 'non_found'
|
|
return dicom_files, len(dicom_files) # Rückgabe der Liste und der Anzahl der Dateien
|
|
|
|
def get_files_in_folder(transfer_folder):
|
|
file_paths = []
|
|
# Überprüfe, ob der Ordner existiert
|
|
if os.path.exists(transfer_folder):
|
|
# Durchlaufe alle Dateien im Ordner und füge ihre Pfade zur Liste hinzu
|
|
for root, _, files in os.walk(transfer_folder):
|
|
for file in files:
|
|
file_paths.append(os.path.join(root, file))
|
|
return file_paths
|
|
else:
|
|
print(f"Der Ordner '{transfer_folder}' existiert nicht.")
|
|
return 'no_transfer_folder'
|
|
|
|
|
|
|
|
def next_dat_folder(transfer_folder):
|
|
existing_folders = [d for d in os.listdir(transfer_folder) if os.path.isdir(os.path.join(transfer_folder, d))]
|
|
dat_folders = sorted([d for d in existing_folders if d.startswith('dat_') and d[4:].isdigit()], key=lambda x: int(x[4:]))
|
|
if dat_folders:
|
|
last_number = int(dat_folders[-1][4:])
|
|
new_folder = f"dat_{str(last_number + 1).zfill(2)}"
|
|
else:
|
|
new_folder = "dat_01"
|
|
return os.path.join(transfer_folder, new_folder)
|
|
|
|
def get_latest_dat_folder(transfer_folder):
|
|
existing_folders = [d for d in os.listdir(transfer_folder) if os.path.isdir(os.path.join(transfer_folder, d))]
|
|
dat_folders = sorted([d for d in existing_folders if d.startswith('dat_') and d[4:].isdigit()], key=lambda x: int(x[4:]))
|
|
if dat_folders:
|
|
latest_folder = dat_folders[-1]
|
|
else:
|
|
latest_folder = None # Oder: return None, um anzuzeigen, dass kein Ordner existiert
|
|
return os.path.join(transfer_folder, latest_folder) if latest_folder else None
|
|
|
|
|
|
async def copy_dicom_files(dicom_files, source_folder, transfer_folder, progress_callback=None):
|
|
if not os.path.exists(transfer_folder):
|
|
os.makedirs(transfer_folder)
|
|
destination_folder = next_dat_folder(transfer_folder)
|
|
if not os.path.exists(destination_folder):
|
|
os.makedirs(destination_folder)
|
|
|
|
if progress_callback:
|
|
progress_callback(0, len(dicom_files))
|
|
|
|
file_counter = {} # Ein Dictionary, um die Zählung für jeden Dateinamen zu speichern
|
|
|
|
async def copy_file(source, dest_base):
|
|
nonlocal file_counter
|
|
filename = os.path.basename(source)
|
|
# Bestimmen Sie die neue Dateinummer, falls der Dateiname bereits existiert
|
|
count = file_counter.get(filename, 0)
|
|
# Neuen Dateinamen mit fortlaufender Nummer erstellen
|
|
new_filename = f"{Path(filename).stem}-{str(count).zfill(3)}{Path(filename).suffix}"
|
|
dest = os.path.join(dest_base, new_filename)
|
|
# Aktualisieren Sie den Zähler für diesen Dateinamen
|
|
file_counter[filename] = count + 1
|
|
|
|
print(f"Kopieren startet: {source} -> {dest}")
|
|
async with aiofiles.open(source, 'rb') as src_file, aiofiles.open(dest, 'wb') as dest_file:
|
|
while True:
|
|
content = await src_file.read(1024*1024) # Lese in 1MB Chunks
|
|
if not content:
|
|
break
|
|
await dest_file.write(content)
|
|
print(f"Kopieren abgeschlossen: {source} -> {dest}")
|
|
if progress_callback:
|
|
progress_callback(1, len(dicom_files))
|
|
|
|
drive_type = get_drive_type(source_folder)
|
|
|
|
# Anpassung des Semaphore-Wertes basierend auf dem Laufwerkstyp
|
|
sem_value = 1 if drive_type == 'CD-ROM' else 10
|
|
|
|
sem = asyncio.Semaphore(sem_value)
|
|
|
|
async def sem_copy_file(file_path):
|
|
async with sem:
|
|
await copy_file(file_path, destination_folder)
|
|
|
|
tasks = [asyncio.create_task(sem_copy_file(file_path)) for file_path in dicom_files]
|
|
|
|
await asyncio.gather(*tasks)
|
|
|
|
# Setze den Fortschritt zurück oder melde den Abschluss
|
|
if progress_callback:
|
|
progress_callback(len(dicom_files), len(dicom_files))
|
|
|
|
|
|
|
|
def format_patient_name(full_name):
|
|
name_parts = full_name.split('^')
|
|
name = name_parts[0] if len(name_parts) > 0 else ""
|
|
vorname = name_parts[1] if len(name_parts) > 1 else ""
|
|
return name, vorname
|
|
|
|
def format_birthdate(birthdate):
|
|
if not birthdate:
|
|
print("Warnung: Kein Geburtsdatum angegeben")
|
|
return None
|
|
|
|
birthdate_str = str(birthdate).strip()
|
|
if len(birthdate_str) == 8:
|
|
return f"{birthdate_str[6:8]}.{birthdate_str[4:6]}.{birthdate_str[0:4]}"
|
|
else:
|
|
print(f"Ungültiges Geburtsdatum-Format: {birthdate_str}. Erwartetes Format: YYYYMMDD")
|
|
return None
|
|
|
|
def compare_patient_info(name, vorname, geburtsdatum, name_source, vorname_source, geburtsdatum_source):
|
|
# Vergleiche jede Komponente der Patienteninformationen
|
|
is_name_match = (name == name_source)
|
|
is_vorname_match = (vorname == vorname_source)
|
|
is_geburtsdatum_match = (geburtsdatum == geburtsdatum_source)
|
|
|
|
# Überprüfe, ob alle Komponenten übereinstimmen
|
|
is_complete_match = is_name_match and is_vorname_match and is_geburtsdatum_match
|
|
|
|
# Rückgabe der Ergebnisse
|
|
return {
|
|
'is_complete_match': is_complete_match,
|
|
'is_name_match': is_name_match,
|
|
'is_vorname_match': is_vorname_match,
|
|
'is_geburtsdatum_match': is_geburtsdatum_match
|
|
}
|
|
|
|
def write_to_file(acq_date, modality, filename):
|
|
if not acq_date or not modality:
|
|
print("Warnung: Unvollständige Daten für Ausgabedatei")
|
|
return
|
|
|
|
if len(acq_date) < 8:
|
|
print(f"Warnung: Ungültiges Datumsformat: {acq_date}")
|
|
return
|
|
|
|
formatted_date = f"{acq_date[6:8]}.{acq_date[4:6]}.{acq_date[0:4]}"
|
|
print(f"Versuche, Daten in {filename} zu schreiben...")
|
|
|
|
try:
|
|
# Stelle sicher, dass das Verzeichnis existiert
|
|
os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
|
|
with open(filename, "a", encoding='utf-8') as file:
|
|
file.write(f"{modality} vom {formatted_date}\n")
|
|
print(f"Daten erfolgreich in {filename} geschrieben.")
|
|
except IOError as e:
|
|
print(f"Fehler beim Schreiben in die Datei {filename}: {e}")
|
|
except OSError as e:
|
|
print(f"OS-Fehler beim Schreiben in die Datei {filename}: {e}")
|
|
except Exception as e:
|
|
print(f"Ein unerwarteter Fehler ist aufgetreten: {e}")
|
|
|