Files
dicom2pacs/file_management.py
René Mathieu 0fef8d96c5 Initial commit
2026-01-17 13:49:51 +01:00

230 lines
9.2 KiB
Python
Executable File

import os
import dicom_processing
import asyncio
import aiofiles
import subprocess
from shutil import copyfile
from pathlib import Path
def get_drive_type(path):
try:
result = subprocess.run(["/usr/sbin/diskutil", "info", path], capture_output=True, text=True)
if result.returncode != 0:
print(f"Fehler beim Zugriff auf Laufwerksinformationen für {path}")
return 'Unbekannt'
# Überprüfung auf optische Laufwerke basierend auf 'Optical Drive Type'
optical_drive_types = [
'CD-ROM', 'CD-R', 'CD-RW', 'DVD-ROM', 'DVD-R', 'DVD-R DL',
'DVD-RW', 'DVD+R', 'DVD+R DL', 'DVD+RW'
]
# Prüfen, ob einer der optischen Laufwerkstypen in der Ausgabe vorhanden ist
if any(drive_type in result.stdout for drive_type in optical_drive_types):
return 'CD-ROM'
# Überprüfung auf SSD
if 'Solid State: Yes' in result.stdout:
return 'SSD'
# Überprüfung auf HDD
if 'Solid State: No' in result.stdout:
return 'HDD'
# Füge hier weitere spezifische Überprüfungen ein, falls nötig
except Exception as e:
print(f"Fehler beim Ermitteln des Laufwerkstyps: {e}")
return 'Unbekannt'
def check_for_cd_at_start():
volumes_path = "/Volumes"
for volume in os.listdir(volumes_path):
full_path = os.path.join(volumes_path, volume)
if get_drive_type(full_path) == 'CD-ROM':
print(f"CD-ROM gefunden: {full_path}")
return full_path
return None
def eject_cd_if_volume_path(path):
if get_drive_type(path) == 'CD-ROM':
try:
# Überprüfen, ob der Pfad existiert
if os.path.exists(path):
print(f"Der Pfad {path} beginnt mit /Volumes. Versuche, die CD auszuwerfen...")
# Ausführen des Befehls zum Auswerfen der CD
subprocess.run(["/usr/bin/drutil", "tray", "eject"], check=True)
print("CD erfolgreich ausgeworfen.")
else:
print(f"Der Pfad {path} existiert nicht.")
except subprocess.CalledProcessError as e:
print(f"Fehler beim Auswerfen der CD: {e}")
else:
print(f"Der Pfad {path} beginnt nicht mit /Volumes. Keine Aktion erforderlich.")
def find_dicom_files(source_folder):
dicom_files = []
for root, dirs, files in os.walk(source_folder):
for file in files:
if file.upper() == 'DICOMDIR': # Überprüfung, ob der Dateiname 'DICOMDIR' entspricht
continue # Überspringt den aktuellen Iterationsschritt im Loop, wenn der Dateiname 'DICOMDIR' ist
file_path = os.path.join(root, file)
# Überprüft, ob die Datei eine .dcm oder .dicom Erweiterung hat oder wenn kein Punkt im Dateinamen ist (impliziert, dass es keine Dateierweiterung gibt), wird die is_dicom_file Funktion genutzt
if file.endswith((".dcm", ".dicom")) or ('.' not in file):
dicom_files.append(file_path)
if not dicom_files: # Überprüft, ob die Liste leer ist
return 'non_found'
return dicom_files, len(dicom_files) # Rückgabe der Liste und der Anzahl der Dateien
def get_files_in_folder(transfer_folder):
file_paths = []
# Überprüfe, ob der Ordner existiert
if os.path.exists(transfer_folder):
# Durchlaufe alle Dateien im Ordner und füge ihre Pfade zur Liste hinzu
for root, _, files in os.walk(transfer_folder):
for file in files:
file_paths.append(os.path.join(root, file))
return file_paths
else:
print(f"Der Ordner '{transfer_folder}' existiert nicht.")
return 'no_transfer_folder'
def next_dat_folder(transfer_folder):
existing_folders = [d for d in os.listdir(transfer_folder) if os.path.isdir(os.path.join(transfer_folder, d))]
dat_folders = sorted([d for d in existing_folders if d.startswith('dat_') and d[4:].isdigit()], key=lambda x: int(x[4:]))
if dat_folders:
last_number = int(dat_folders[-1][4:])
new_folder = f"dat_{str(last_number + 1).zfill(2)}"
else:
new_folder = "dat_01"
return os.path.join(transfer_folder, new_folder)
def get_latest_dat_folder(transfer_folder):
existing_folders = [d for d in os.listdir(transfer_folder) if os.path.isdir(os.path.join(transfer_folder, d))]
dat_folders = sorted([d for d in existing_folders if d.startswith('dat_') and d[4:].isdigit()], key=lambda x: int(x[4:]))
if dat_folders:
latest_folder = dat_folders[-1]
else:
latest_folder = None # Oder: return None, um anzuzeigen, dass kein Ordner existiert
return os.path.join(transfer_folder, latest_folder) if latest_folder else None
async def copy_dicom_files(dicom_files, source_folder, transfer_folder, progress_callback=None):
if not os.path.exists(transfer_folder):
os.makedirs(transfer_folder)
destination_folder = next_dat_folder(transfer_folder)
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
if progress_callback:
progress_callback(0, len(dicom_files))
file_counter = {} # Ein Dictionary, um die Zählung für jeden Dateinamen zu speichern
async def copy_file(source, dest_base):
nonlocal file_counter
filename = os.path.basename(source)
# Bestimmen Sie die neue Dateinummer, falls der Dateiname bereits existiert
count = file_counter.get(filename, 0)
# Neuen Dateinamen mit fortlaufender Nummer erstellen
new_filename = f"{Path(filename).stem}-{str(count).zfill(3)}{Path(filename).suffix}"
dest = os.path.join(dest_base, new_filename)
# Aktualisieren Sie den Zähler für diesen Dateinamen
file_counter[filename] = count + 1
print(f"Kopieren startet: {source} -> {dest}")
async with aiofiles.open(source, 'rb') as src_file, aiofiles.open(dest, 'wb') as dest_file:
while True:
content = await src_file.read(1024*1024) # Lese in 1MB Chunks
if not content:
break
await dest_file.write(content)
print(f"Kopieren abgeschlossen: {source} -> {dest}")
if progress_callback:
progress_callback(1, len(dicom_files))
drive_type = get_drive_type(source_folder)
# Anpassung des Semaphore-Wertes basierend auf dem Laufwerkstyp
sem_value = 1 if drive_type == 'CD-ROM' else 10
sem = asyncio.Semaphore(sem_value)
async def sem_copy_file(file_path):
async with sem:
await copy_file(file_path, destination_folder)
tasks = [asyncio.create_task(sem_copy_file(file_path)) for file_path in dicom_files]
await asyncio.gather(*tasks)
# Setze den Fortschritt zurück oder melde den Abschluss
if progress_callback:
progress_callback(len(dicom_files), len(dicom_files))
def format_patient_name(full_name):
name_parts = full_name.split('^')
name = name_parts[0] if len(name_parts) > 0 else ""
vorname = name_parts[1] if len(name_parts) > 1 else ""
return name, vorname
def format_birthdate(birthdate):
if not birthdate:
print("Warnung: Kein Geburtsdatum angegeben")
return None
birthdate_str = str(birthdate).strip()
if len(birthdate_str) == 8:
return f"{birthdate_str[6:8]}.{birthdate_str[4:6]}.{birthdate_str[0:4]}"
else:
print(f"Ungültiges Geburtsdatum-Format: {birthdate_str}. Erwartetes Format: YYYYMMDD")
return None
def compare_patient_info(name, vorname, geburtsdatum, name_source, vorname_source, geburtsdatum_source):
# Vergleiche jede Komponente der Patienteninformationen
is_name_match = (name == name_source)
is_vorname_match = (vorname == vorname_source)
is_geburtsdatum_match = (geburtsdatum == geburtsdatum_source)
# Überprüfe, ob alle Komponenten übereinstimmen
is_complete_match = is_name_match and is_vorname_match and is_geburtsdatum_match
# Rückgabe der Ergebnisse
return {
'is_complete_match': is_complete_match,
'is_name_match': is_name_match,
'is_vorname_match': is_vorname_match,
'is_geburtsdatum_match': is_geburtsdatum_match
}
def write_to_file(acq_date, modality, filename):
if not acq_date or not modality:
print("Warnung: Unvollständige Daten für Ausgabedatei")
return
if len(acq_date) < 8:
print(f"Warnung: Ungültiges Datumsformat: {acq_date}")
return
formatted_date = f"{acq_date[6:8]}.{acq_date[4:6]}.{acq_date[0:4]}"
print(f"Versuche, Daten in {filename} zu schreiben...")
try:
# Stelle sicher, dass das Verzeichnis existiert
os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
with open(filename, "a", encoding='utf-8') as file:
file.write(f"{modality} vom {formatted_date}\n")
print(f"Daten erfolgreich in {filename} geschrieben.")
except IOError as e:
print(f"Fehler beim Schreiben in die Datei {filename}: {e}")
except OSError as e:
print(f"OS-Fehler beim Schreiben in die Datei {filename}: {e}")
except Exception as e:
print(f"Ein unerwarteter Fehler ist aufgetreten: {e}")