2853 lines
99 KiB
Python
Executable File
2853 lines
99 KiB
Python
Executable File
# Copyright 2008-2020 pydicom authors. See LICENSE file for details.
|
|
"""DICOM File-set handling."""
|
|
|
|
from collections.abc import Iterator, Iterable, Callable
|
|
import copy
|
|
import os
|
|
from pathlib import Path
|
|
import re
|
|
import shutil
|
|
from tempfile import TemporaryDirectory
|
|
from typing import Optional, Union, Any, cast
|
|
import uuid
|
|
|
|
from pydicom.charset import default_encoding
|
|
from pydicom.datadict import tag_for_keyword, dictionary_description
|
|
from pydicom.dataelem import DataElement
|
|
from pydicom.dataset import Dataset, FileMetaDataset, FileDataset
|
|
from pydicom.filebase import DicomBytesIO, DicomFileLike
|
|
from pydicom.filereader import dcmread
|
|
from pydicom.filewriter import write_dataset, write_data_element, write_file_meta_info
|
|
from pydicom.misc import warn_and_log
|
|
from pydicom.tag import Tag, BaseTag
|
|
import pydicom.uid as sop
|
|
from pydicom.uid import (
|
|
generate_uid,
|
|
UID,
|
|
ExplicitVRLittleEndian,
|
|
ImplicitVRLittleEndian,
|
|
MediaStorageDirectoryStorage,
|
|
)
|
|
|
|
|
|
# Regex for conformant File ID paths - PS3.10 Section 8.5
|
|
_RE_FILE_ID = re.compile("^[A-Z0-9_]*$")
|
|
# Prefixes to use when generating File ID components
|
|
_PREFIXES = {
|
|
"PATIENT": "PT",
|
|
"STUDY": "ST",
|
|
"SERIES": "SE",
|
|
"IMAGE": "IM",
|
|
"RT DOSE": "RD",
|
|
"RT STRUCTURE SET": "RS",
|
|
"RT PLAN": "RP",
|
|
"RT TREAT RECORD": "RX",
|
|
"PRESENTATION": "PR",
|
|
"WAVEFORM": "WV",
|
|
"SR DOCUMENT": "SR",
|
|
"KEY OBJECT DOC": "KY",
|
|
"SPECTROSCOPY": "SP",
|
|
"RAW DATA": "RW",
|
|
"REGISTRATION": "RG",
|
|
"FIDUCIAL": "FD",
|
|
"HANGING PROTOCOL": "HG",
|
|
"ENCAP DOC": "ED",
|
|
"VALUE MAP": "VM",
|
|
"STEREOMETRIC": "SX",
|
|
"PALETTE": "PA",
|
|
"IMPLANT": "IP",
|
|
"IMPLANT ASSY": "IA",
|
|
"IMPLANT GROUP": "IG",
|
|
"PLAN": "PL",
|
|
"MEASUREMENT": "MX",
|
|
"SURFACE": "SF",
|
|
"SURFACE SCAN": "SS",
|
|
"TRACT": "TR",
|
|
"ASSESSMENT": "AS",
|
|
"RADIOTHERAPY": "RT",
|
|
"PRIVATE": "P",
|
|
}
|
|
_FIRST_OFFSET = "OffsetOfTheFirstDirectoryRecordOfTheRootDirectoryEntity"
|
|
_NEXT_OFFSET = "OffsetOfTheNextDirectoryRecord"
|
|
_LOWER_OFFSET = "OffsetOfReferencedLowerLevelDirectoryEntity"
|
|
_LAST_OFFSET = "OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity"
|
|
|
|
|
|
def generate_filename(
|
|
prefix: str = "", start: int = 0, alphanumeric: bool = False
|
|
) -> Iterator[str]:
|
|
"""Yield File IDs for a File-set.
|
|
|
|
Maximum number of File IDs is:
|
|
|
|
* Numeric: (10 ** (8 - `prefix`)) - `start`
|
|
* Alphanumeric: (35 ** (8 - `prefix`)) - `start`
|
|
|
|
.. versionchanged:: 3.0
|
|
|
|
The characters used when `alphanumeric` is ``True`` have been reduced to
|
|
[0-9][A-I,K-Z]
|
|
|
|
Parameters
|
|
----------
|
|
prefix : str, optional
|
|
The prefix to use for all filenames, default (``""``).
|
|
start : int, optional
|
|
The starting index to use for the suffixes, (default ``0``).
|
|
i.e. if you want to start at ``'00010'`` then `start` should be ``10``.
|
|
alphanumeric : bool, optional
|
|
If ``False`` (default) then only generate suffixes using the characters
|
|
[0-9], otherwise use [0-9][A-I,K-Z].
|
|
|
|
Yields
|
|
------
|
|
str
|
|
A unique filename with 8 characters, with each incremented by 1 from
|
|
the previous one (i.e. ``'00000000'``, ``'00000001'``, ``'00000002'``,
|
|
and so on).
|
|
"""
|
|
if len(prefix) > 7:
|
|
raise ValueError("The 'prefix' must be less than 8 characters long")
|
|
|
|
chars = "0123456789ABCDEFGHIKLMNOPQRSTUVWXYZ"
|
|
if not alphanumeric:
|
|
chars = chars[:10]
|
|
|
|
idx = start
|
|
b = len(chars)
|
|
length = 8 - len(prefix)
|
|
while idx < b**length:
|
|
n = idx
|
|
suffix = ""
|
|
while n:
|
|
suffix += chars[n % b]
|
|
n //= b
|
|
|
|
yield f"{prefix}{suffix[::-1]:>0{length}}"
|
|
idx += 1
|
|
|
|
|
|
def is_conformant_file_id(path: Path) -> bool:
|
|
"""Return ``True`` if `path` is a conformant File ID.
|
|
|
|
**Conformance**
|
|
|
|
* :dcm:`No more than 8 components<part03/sect_F.3.2.2.html>` (parts) in
|
|
the path
|
|
* :dcm:`No more than 8 characters per component<part03/sect_F.3.2.2.html>`
|
|
* :dcm:`Characters in a component must be ASCII<part10/sect_8.2.html>`
|
|
* :dcm:`Valid characters in a component are 0-9, A-Z and _
|
|
<part10/sect_8.5.html>`
|
|
|
|
Parameters
|
|
----------
|
|
path : pathlib.Path
|
|
The path to check, relative to the File-set root directory.
|
|
|
|
Returns
|
|
-------
|
|
bool
|
|
``True`` if `path` is conformant, ``False`` otherwise.
|
|
"""
|
|
# No more than 8 characters per component
|
|
parts = path.parts
|
|
if any([len(pp) > 8 for pp in parts]):
|
|
return False
|
|
|
|
# No more than 8 components
|
|
if len(parts) > 8:
|
|
return False
|
|
|
|
# Characters in the path are ASCII
|
|
chars = "".join(parts)
|
|
try:
|
|
chars.encode(encoding="ascii", errors="strict")
|
|
except UnicodeEncodeError:
|
|
return False
|
|
|
|
# Characters are in [0-9][A-Z] and _
|
|
if re.match(_RE_FILE_ID, chars):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
class RecordNode(Iterable["RecordNode"]):
|
|
"""Representation of a DICOMDIR's directory record.
|
|
|
|
Attributes
|
|
----------
|
|
children : list of RecordNode
|
|
The current node's child nodes (if any)
|
|
instance : FileInstance or None
|
|
If the current node is a leaf node, a
|
|
:class:`~pydicom.fileset.FileInstance` for the corresponding SOP
|
|
Instance.
|
|
"""
|
|
|
|
def __init__(self, record: Dataset | None = None) -> None:
|
|
"""Create a new ``RecordNode``.
|
|
|
|
Parameters
|
|
----------
|
|
record : pydicom.dataset.Dataset, optional
|
|
A *Directory Record Sequence's* directory record.
|
|
"""
|
|
self.children: list[RecordNode] = []
|
|
self.instance: FileInstance | None = None
|
|
self._parent: RecordNode | None = None
|
|
self._record: Dataset
|
|
|
|
if record:
|
|
self._set_record(record)
|
|
|
|
# When the record is encoded as part of the *Directory Record Sequence*
|
|
# this is the offset to the start of the sequence item containing
|
|
# the record - not guaranteed to be up-to-date
|
|
self._offset = 0
|
|
# The offset to the start of the encoded record's *Offset of the
|
|
# Next Directory Record* and *Offset of Referenced Lower Level
|
|
# Directory Entity* values - use _encode_record() to set them
|
|
self._offset_next = 0
|
|
self._offset_lower = 0
|
|
|
|
def add(self, leaf: "RecordNode") -> None:
|
|
"""Add a leaf to the tree.
|
|
|
|
Parameters
|
|
----------
|
|
leaf : pydicom.fileset.RecordNode
|
|
A leaf node (i.e. one with a
|
|
:class:`~pydicom.fileset.FileInstance`) to be added to the tree
|
|
(if not already present).
|
|
"""
|
|
# Move up to the branch's furthest ancestor with a directory record
|
|
node = leaf.root
|
|
if node is self:
|
|
node = node.children[0]
|
|
|
|
# Move back down, inserting at the point where the node is unique
|
|
current = self.root
|
|
while node in current and node.children:
|
|
current = current[node]
|
|
node = node.children[0]
|
|
|
|
node.parent = current
|
|
|
|
@property
|
|
def ancestors(self) -> list["RecordNode"]:
|
|
"""Return a list of the current node's ancestors, ordered from nearest
|
|
to furthest.
|
|
"""
|
|
return [nn for nn in self.reverse() if nn is not self]
|
|
|
|
@property
|
|
def component(self) -> str:
|
|
"""Return a File ID component as :class:`str` for the current node."""
|
|
if self.is_root:
|
|
raise ValueError("The root node doesn't contribute a File ID component")
|
|
|
|
prefix = _PREFIXES[self.record_type]
|
|
if self.record_type == "PRIVATE":
|
|
prefix = f"{prefix}{self.depth}"
|
|
|
|
chars = "0123456789ABCDEFGHIKLMNOPQRSTUVWXYZ"
|
|
if not self.file_set._use_alphanumeric:
|
|
chars = chars[:10]
|
|
|
|
suffix = ""
|
|
n = self.index
|
|
b = len(chars)
|
|
while n:
|
|
suffix += chars[n % b]
|
|
n //= b
|
|
|
|
idx = f"{suffix[::-1]:>0{8 - len(prefix)}}"
|
|
|
|
return f"{prefix}{idx}"
|
|
|
|
def __contains__(self, key: Union[str, "RecordNode"]) -> bool:
|
|
"""Return ``True`` if the current node has a child matching `key`."""
|
|
if isinstance(key, RecordNode):
|
|
key = key.key
|
|
|
|
return key in [child.key for child in self.children]
|
|
|
|
def __delitem__(self, key: Union[str, "RecordNode"]) -> None:
|
|
"""Remove one of the current node's children and if the current node
|
|
becomes childless recurse upwards and delete it from its parent.
|
|
"""
|
|
if isinstance(key, RecordNode):
|
|
key = key.key
|
|
|
|
if key not in self:
|
|
raise KeyError(key)
|
|
|
|
self.children = [ii for ii in self.children if ii.key != key]
|
|
|
|
# Recurse upwards to the root, removing any empty nodes
|
|
if not self.children and not self.is_root:
|
|
del self.parent[self]
|
|
|
|
@property
|
|
def depth(self) -> int:
|
|
"Return the number of nodes to the level below the tree root"
|
|
return len(list(self.reverse())) - 1
|
|
|
|
def _encode_record(self, force_implicit: bool = False) -> int:
|
|
"""Encode the node's directory record.
|
|
|
|
* Encodes the record as explicit VR little endian
|
|
* Sets the ``RecordNode._offset_next`` and ``RecordNode._offset_lower``
|
|
attributes to the position of the start of the values of the *Offset
|
|
of the Next Directory Record* and *Offset of Referenced Lower Level
|
|
Directory Entity* elements. Note that the offsets are relative to
|
|
the start of the current directory record.
|
|
|
|
The values for the *Offset Of The Next Directory Record* and *Offset
|
|
of Referenced Lower Level Directory Entity* elements are not guaranteed
|
|
to be correct.
|
|
|
|
Parameters
|
|
----------
|
|
force_implicit : bool, optional
|
|
``True`` to force using implicit VR encoding, which is
|
|
non-conformant. Default ``False``.
|
|
|
|
Returns
|
|
-------
|
|
int
|
|
The length of the encoded directory record.
|
|
|
|
See Also
|
|
--------
|
|
:meth:`~pydicom.fileset.RecordNode._update_record_offsets`
|
|
"""
|
|
fp = DicomBytesIO()
|
|
fp.is_little_endian = True
|
|
fp.is_implicit_VR = force_implicit
|
|
|
|
encoding = self._record.get("SpecificCharacterSet", default_encoding)
|
|
|
|
for tag in sorted(self._record.keys()):
|
|
if tag.element == 0 and tag.group > 6:
|
|
continue
|
|
|
|
# (0004,1400) Offset Of The Next Directory Record
|
|
# (0004,1420) Offset Of Referenced Lower Level Directory Entity
|
|
# Offset from start of tag to start of value for VR UL is always 8
|
|
# however the absolute position may change with transfer syntax
|
|
if tag == 0x00041400:
|
|
self._offset_next = fp.tell() + 8
|
|
elif tag == 0x00041420:
|
|
self._offset_lower = fp.tell() + 8
|
|
|
|
write_data_element(fp, self._record[tag], encoding)
|
|
|
|
return len(fp.getvalue())
|
|
|
|
@property
|
|
def _file_id(self) -> Path | None:
|
|
"""Return the *Referenced File ID* as a :class:`~pathlib.Path`.
|
|
|
|
Returns
|
|
-------
|
|
pathlib.Path or None
|
|
The *Referenced File ID* from the directory record as a
|
|
:class:`pathlib.Path` or ``None`` if the element value is null.
|
|
"""
|
|
if "ReferencedFileID" in self._record:
|
|
elem = self._record["ReferencedFileID"]
|
|
if elem.VM == 1:
|
|
return Path(cast(str, self._record.ReferencedFileID))
|
|
if elem.VM > 1:
|
|
return Path(*cast(list[str], self._record.ReferencedFileID))
|
|
|
|
return None
|
|
|
|
raise AttributeError("No 'Referenced File ID' in the directory record")
|
|
|
|
@property
|
|
def file_set(self) -> "FileSet":
|
|
"""Return the tree's :class:`~pydicom.fileset.FileSet`."""
|
|
return self.root.file_set
|
|
|
|
def __getitem__(self, key: Union[str, "RecordNode"]) -> "RecordNode":
|
|
"""Return the current node's child using it's
|
|
:attr:`~pydicom.fileset.RecordNode.key`
|
|
"""
|
|
if isinstance(key, RecordNode):
|
|
key = key.key
|
|
|
|
for child in self.children:
|
|
if key == child.key:
|
|
return child
|
|
|
|
raise KeyError(key)
|
|
|
|
@property
|
|
def has_instance(self) -> bool:
|
|
"""Return ``True`` if the current node corresponds to an instance."""
|
|
return self.instance is not None
|
|
|
|
@property
|
|
def index(self) -> int:
|
|
"""Return the index of the current node amongst its siblings."""
|
|
if not self.parent:
|
|
return 0
|
|
|
|
return self.parent.children.index(self)
|
|
|
|
@property
|
|
def is_root(self) -> bool:
|
|
"""Return ``True`` if the current node is the tree's root node."""
|
|
return False
|
|
|
|
def __iter__(self) -> Iterator["RecordNode"]:
|
|
"""Yield this node (unless it's the root node) and all nodes below it."""
|
|
if not self.is_root:
|
|
yield self
|
|
|
|
for child in self.children:
|
|
yield from child
|
|
|
|
@property
|
|
def key(self) -> str:
|
|
"""Return a unique key for the node's record as :class:`str`."""
|
|
rtype = self.record_type
|
|
if rtype == "PATIENT":
|
|
# PS3.3, Annex F.5.1: Each Patient ID is unique within a File-set
|
|
return cast(str, self._record.PatientID)
|
|
if rtype == "STUDY":
|
|
# PS3.3, Annex F.5.2: Type 1C
|
|
if "StudyInstanceUID" in self._record:
|
|
return cast(UID, self._record.StudyInstanceUID)
|
|
else:
|
|
return cast(UID, self._record.ReferencedSOPInstanceUIDInFile)
|
|
if rtype == "SERIES":
|
|
return cast(UID, self._record.SeriesInstanceUID)
|
|
if rtype == "PRIVATE":
|
|
return cast(UID, self._record.PrivateRecordUID)
|
|
|
|
# PS3.3, Table F.3-3: Required if record references an instance
|
|
try:
|
|
return cast(UID, self._record.ReferencedSOPInstanceUIDInFile)
|
|
except AttributeError as exc:
|
|
raise AttributeError(
|
|
f"Invalid '{rtype}' record - missing required element "
|
|
"'Referenced SOP Instance UID in File'"
|
|
) from exc
|
|
|
|
@property
|
|
def next(self) -> Optional["RecordNode"]:
|
|
"""Return the node after the current one (if any), or ``None``."""
|
|
if not self.parent:
|
|
return None
|
|
|
|
try:
|
|
return self.parent.children[self.index + 1]
|
|
except IndexError:
|
|
return None
|
|
|
|
@property
|
|
def parent(self) -> "RecordNode":
|
|
"""Return the current node's parent (if it has one)."""
|
|
return cast("RecordNode", self._parent)
|
|
|
|
@parent.setter
|
|
def parent(self, node: "RecordNode") -> None:
|
|
"""Set the parent of the current node."""
|
|
self._parent = node
|
|
if node is not None and self not in node.children:
|
|
node.children.append(self)
|
|
|
|
def prettify(self, indent_char: str = " ") -> list[str]:
|
|
"""Return the tree structure as a list of pretty strings, starting at
|
|
the current node (unless the current node is the root node).
|
|
|
|
Parameters
|
|
----------
|
|
indent_char : str, optional
|
|
The characters to use to indent each level of the tree.
|
|
"""
|
|
|
|
def leaf_summary(node: "RecordNode", indent_char: str) -> list[str]:
|
|
"""Summarize the leaves at the current level."""
|
|
# Examples:
|
|
# IMAGE: 15 SOP Instances (10 initial, 9 additions, 4 removals)
|
|
# RTDOSE: 1 SOP Instance
|
|
out = []
|
|
if not node.children:
|
|
indent = indent_char * node.depth
|
|
sibs = [ii for ii in node.parent if ii.has_instance]
|
|
# Split into record types
|
|
rtypes = {ii.record_type for ii in sibs}
|
|
for record_type in sorted(rtypes):
|
|
# nr = initial + additions
|
|
nr = [ii for ii in sibs if ii.record_type == record_type]
|
|
# All leaves should have a corresponding FileInstance
|
|
add = len(
|
|
[
|
|
ii
|
|
for ii in nr
|
|
if cast(FileInstance, ii.instance).for_addition
|
|
]
|
|
)
|
|
rm = len(
|
|
[ii for ii in nr if cast(FileInstance, ii.instance).for_removal]
|
|
)
|
|
initial = len(nr) - add
|
|
result = len(nr) - rm
|
|
|
|
changes = []
|
|
if (add or rm) and initial > 0:
|
|
changes.append(f"{initial} initial")
|
|
if add:
|
|
plural = "s" if add > 1 else ""
|
|
changes.append(f"{add} addition{plural}")
|
|
if rm:
|
|
plural = "s" if rm > 1 else ""
|
|
changes.append(f"{rm} removal{plural}")
|
|
|
|
summary = (
|
|
f"{indent}{record_type}: {result} "
|
|
f"SOP Instance{'' if result == 1 else 's'}"
|
|
)
|
|
if changes:
|
|
summary += f" ({', '.join(changes)})"
|
|
|
|
out.append(summary)
|
|
|
|
return out
|
|
|
|
s = []
|
|
for node in self:
|
|
indent = indent_char * node.depth
|
|
if node.children:
|
|
s.append(f"{indent}{node}")
|
|
# Summarise any leaves at the next level
|
|
for child in node.children:
|
|
if child.has_instance:
|
|
s.extend(leaf_summary(child, indent_char))
|
|
break
|
|
elif node.depth == 0 and node.has_instance:
|
|
node.instance = cast(FileInstance, node.instance)
|
|
# Single-level records
|
|
line = f"{indent}{node.record_type}: 1 SOP Instance"
|
|
if node.instance.for_addition:
|
|
line += " (to be added)"
|
|
elif node.instance.for_removal:
|
|
line += " (to be removed)"
|
|
|
|
s.append(line)
|
|
|
|
return s
|
|
|
|
@property
|
|
def previous(self) -> Optional["RecordNode"]:
|
|
"""Return the node before the current one (if any), or ``None``."""
|
|
if not self.parent:
|
|
return None
|
|
|
|
if self.index == 0:
|
|
return None
|
|
|
|
return self.parent.children[self.index - 1]
|
|
|
|
def _set_record(self, ds: Dataset) -> None:
|
|
"""Set the node's initial directory record dataset.
|
|
|
|
The record is used as a starting point when filling the DICOMDIR's
|
|
*Directory Record Sequence* and is modified as required during
|
|
encoding.
|
|
|
|
Parameters
|
|
----------
|
|
ds : pydicom.dataset.Dataset
|
|
Set the node's initial directory record dataset, must be conformant
|
|
to :dcm:`Part 3, Annex F of the DICOM Standard
|
|
<part03/chapter_F.html>`.
|
|
"""
|
|
offset = getattr(ds, "seq_item_tell", None)
|
|
rtype = ds.get("DirectoryRecordType", None)
|
|
rtype = f"{rtype} " if rtype else ""
|
|
msg = f"The {rtype}directory record is missing"
|
|
if offset:
|
|
msg = f"The {rtype}directory record at offset {offset} is missing"
|
|
|
|
keywords = ["DirectoryRecordType"]
|
|
missing = [kw for kw in keywords if kw not in ds]
|
|
if missing:
|
|
msg = f"{msg} one or more required elements: {', '.join(missing)}"
|
|
raise ValueError(msg)
|
|
|
|
if _NEXT_OFFSET not in ds:
|
|
setattr(ds, _NEXT_OFFSET, 0)
|
|
if _LOWER_OFFSET not in ds:
|
|
setattr(ds, _LOWER_OFFSET, 0)
|
|
ds.RecordInUseFlag = 0xFFFF
|
|
self._record = ds
|
|
|
|
try:
|
|
self.key
|
|
except (AttributeError, ValueError) as exc:
|
|
raise ValueError(f"{msg} a required element") from exc
|
|
|
|
@property
|
|
def record_type(self) -> str:
|
|
"""Return the record's *Directory Record Type* as :class:`str`."""
|
|
return cast(str, self._record.DirectoryRecordType)
|
|
|
|
def remove(self, node: "RecordNode") -> None:
|
|
"""Remove a leaf from the tree
|
|
|
|
Parameters
|
|
----------
|
|
node : pydicom.fileset.RecordNode
|
|
The leaf node (i.e. one with a
|
|
:class:`~pydicom.fileset.FileInstance`) to remove.
|
|
"""
|
|
if not node.has_instance:
|
|
raise ValueError("Only leaf nodes can be removed")
|
|
|
|
del node.parent[node]
|
|
|
|
def reverse(self) -> Iterable["RecordNode"]:
|
|
"""Yield nodes up to the level below the tree's root node."""
|
|
node = self
|
|
while node.parent:
|
|
yield node
|
|
node = node.parent
|
|
|
|
if not node.is_root:
|
|
yield node
|
|
|
|
@property
|
|
def root(self) -> "RecordNode":
|
|
"""Return the tree's root node."""
|
|
if self.parent:
|
|
return self.parent.root
|
|
|
|
return self
|
|
|
|
def __str__(self) -> str:
|
|
"""Return a string representation of the node."""
|
|
if self.is_root:
|
|
return "ROOT"
|
|
|
|
ds = self._record
|
|
record_type = f"{self.record_type}"
|
|
|
|
s = []
|
|
if self.record_type == "PATIENT":
|
|
s += [f"PatientID='{ds.PatientID}'", f"PatientName='{ds.PatientName}'"]
|
|
elif self.record_type == "STUDY":
|
|
s += [f"StudyDate={ds.StudyDate}", f"StudyTime={ds.StudyTime}"]
|
|
if getattr(ds, "StudyDescription", None):
|
|
s.append(f"StudyDescription='{ds.StudyDescription}'")
|
|
elif self.record_type == "SERIES":
|
|
s += [f"Modality={ds.Modality}", f"SeriesNumber={ds.SeriesNumber}"]
|
|
if getattr(ds, "SeriesDescription", None):
|
|
s.append(f"SeriesDescription='{ds.SeriesDescription}'")
|
|
elif self.record_type == "IMAGE":
|
|
s.append(f"InstanceNumber={ds.InstanceNumber}")
|
|
else:
|
|
s.append(f"{self.key}")
|
|
|
|
return f"{record_type}: {', '.join(s)}"
|
|
|
|
def _update_record_offsets(self) -> None:
|
|
"""Update the record's offset elements.
|
|
|
|
Updates the values for *Offset of the Next Directory Record* and
|
|
*Offset of Referenced Lower Level Directory Entity*, provided all of
|
|
the nodes have had their *_offset* attribute set correctly.
|
|
"""
|
|
next_elem = self._record[_NEXT_OFFSET]
|
|
next_elem.value = 0
|
|
if self.next:
|
|
next_elem.value = self.next._offset
|
|
|
|
lower_elem = self._record[_LOWER_OFFSET]
|
|
lower_elem.value = 0
|
|
if self.children:
|
|
self._record[_LOWER_OFFSET].value = self.children[0]._offset
|
|
|
|
|
|
class RootNode(RecordNode):
|
|
"""The root node for the File-set's record tree."""
|
|
|
|
def __init__(self, fs: "FileSet") -> None:
|
|
"""Create a new root node.
|
|
|
|
Parameters
|
|
----------
|
|
fs : pydicom.fileset.FileSet
|
|
The File-set the record tree belongs to.
|
|
"""
|
|
super().__init__()
|
|
|
|
self._fs = fs
|
|
|
|
@property
|
|
def file_set(self) -> "FileSet":
|
|
"""Return the tree's :class:`~pydicom.fileset.FileSet`."""
|
|
return self._fs
|
|
|
|
@property
|
|
def is_root(self) -> bool:
|
|
"""Return ``True`` if the current node is the tree's root node."""
|
|
return True
|
|
|
|
|
|
class FileInstance:
|
|
"""Representation of a File in the File-set.
|
|
|
|
Attributes
|
|
----------
|
|
node : pydicom.fileset.RecordNode
|
|
The leaf record that references this instance.
|
|
"""
|
|
|
|
def __init__(self, node: RecordNode) -> None:
|
|
"""Create a new FileInstance.
|
|
|
|
Parameters
|
|
----------
|
|
node : pydicom.fileset.RecordNode
|
|
The record that references this instance.
|
|
"""
|
|
|
|
class Flags:
|
|
add: bool
|
|
remove: bool
|
|
|
|
self._uuid = uuid.uuid4()
|
|
self._flags = Flags()
|
|
self._apply_stage("x")
|
|
self._stage_path: Path | None = None
|
|
self.node = node
|
|
|
|
def _apply_stage(self, flag: str) -> None:
|
|
"""Apply staging to the instance.
|
|
|
|
Parameters
|
|
----------
|
|
flag : str
|
|
The staging to apply, one of ``'+'``, ``'-'`` or ``'x'``.
|
|
This will flag the instance for addition to or removal from the
|
|
File-set, or to reset the staging, respectively.
|
|
"""
|
|
# Clear flags
|
|
if flag == "x":
|
|
self._flags.add = False
|
|
self._flags.remove = False
|
|
self._stage_path = None
|
|
elif flag == "+":
|
|
# remove + add = no change
|
|
if self._flags.remove:
|
|
self._flags.remove = False
|
|
self._stage_path = None
|
|
else:
|
|
self._flags.add = True
|
|
self._stage_path = self.file_set._stage["path"] / f"{self._uuid}"
|
|
|
|
elif flag == "-":
|
|
# add + remove = no change
|
|
if self._flags.add:
|
|
self._flags.add = False
|
|
self._stage_path = None
|
|
else:
|
|
self._flags.remove = True
|
|
self._stage_path = None
|
|
|
|
def __contains__(self, name: str | int) -> bool:
|
|
"""Return ``True`` if the element with keyword or tag `name` is
|
|
in one of the corresponding directory records.
|
|
|
|
Parameters
|
|
----------
|
|
name : str or int
|
|
The element keyword or tag to search for.
|
|
|
|
Returns
|
|
-------
|
|
bool
|
|
``True`` if the corresponding element is present, ``False``
|
|
otherwise.
|
|
"""
|
|
try:
|
|
self[name]
|
|
except KeyError:
|
|
return False
|
|
|
|
return True
|
|
|
|
@property
|
|
def FileID(self) -> str:
|
|
"""Return the File ID of the referenced instance."""
|
|
root = self.node.root
|
|
components = [ii.component for ii in self.node.reverse() if ii is not root]
|
|
return os.fspath(Path(*components[::-1]))
|
|
|
|
@property
|
|
def file_set(self) -> "FileSet":
|
|
"""Return the :class:`~pydicom.fileset.FileSet` this instance belongs
|
|
to.
|
|
"""
|
|
return self.node.file_set
|
|
|
|
@property
|
|
def for_addition(self) -> bool:
|
|
"""Return ``True`` if the instance has been staged for addition to
|
|
the File-set.
|
|
"""
|
|
return self._flags.add
|
|
|
|
@property
|
|
def for_moving(self) -> bool:
|
|
"""Return ``True`` if the instance will be moved to a new location
|
|
within the File-set.
|
|
"""
|
|
if self.for_addition:
|
|
return False
|
|
|
|
if self["ReferencedFileID"].VM == 1:
|
|
file_id = self.FileID.split(os.path.sep)
|
|
return [self.ReferencedFileID] != file_id
|
|
|
|
return cast(bool, self.ReferencedFileID != self.FileID.split(os.path.sep))
|
|
|
|
@property
|
|
def for_removal(self) -> bool:
|
|
"""Return ``True`` if the instance has been staged for removal from
|
|
the File-set.
|
|
"""
|
|
return self._flags.remove
|
|
|
|
def __getattribute__(self, name: str) -> Any:
|
|
"""Return the class attribute value for `name`.
|
|
|
|
Parameters
|
|
----------
|
|
name : str
|
|
An element keyword or a class attribute name.
|
|
|
|
Returns
|
|
-------
|
|
object
|
|
If `name` matches a DICOM keyword and the element is
|
|
present in one of the directory records then returns the
|
|
corresponding element's value. Otherwise returns the class
|
|
attribute's value (if present). Directory records are searched
|
|
from the lowest (i.e. an IMAGE or similar record type) to the
|
|
highest (PATIENT or similar).
|
|
"""
|
|
tag = tag_for_keyword(name)
|
|
if tag is not None:
|
|
tag = Tag(tag)
|
|
for node in self.node.reverse():
|
|
if tag in node._record:
|
|
return node._record[tag].value
|
|
|
|
return super().__getattribute__(name)
|
|
|
|
def __getitem__(self, key: str | int) -> DataElement:
|
|
"""Return the DataElement with keyword or tag `key`.
|
|
|
|
Parameters
|
|
----------
|
|
key : str or int
|
|
An element keyword or tag.
|
|
|
|
Returns
|
|
-------
|
|
pydicom.dataelem.DataElement
|
|
The DataElement corresponding to `key`, if present in one of the
|
|
directory records. Directory records are searched
|
|
from the lowest (i.e. an IMAGE or similar record type) to the
|
|
highest (PATIENT or similar).
|
|
"""
|
|
|
|
if isinstance(key, BaseTag):
|
|
tag = key
|
|
else:
|
|
tag = Tag(key)
|
|
|
|
if tag == 0x00080018:
|
|
# SOP Instance UID
|
|
tag = Tag(0x00041511)
|
|
elif tag == 0x00080016:
|
|
# SOP Class UID
|
|
tag = Tag(0x00041510)
|
|
elif tag == 0x00020010:
|
|
# Transfer Syntax UID
|
|
tag = Tag(0x00041512)
|
|
|
|
for node in self.node.reverse():
|
|
if tag in node._record:
|
|
return node._record[tag]
|
|
|
|
raise KeyError(tag)
|
|
|
|
@property
|
|
def is_private(self) -> bool:
|
|
"""Return ``True`` if the instance is privately defined."""
|
|
return self.node.record_type == "PRIVATE"
|
|
|
|
@property
|
|
def is_staged(self) -> bool:
|
|
"""Return ``True`` if the instance is staged for moving, addition or
|
|
removal
|
|
"""
|
|
return self.for_addition or self.for_moving or self.for_removal
|
|
|
|
def load(self) -> Dataset:
|
|
"""Return the referenced instance as a
|
|
:class:`~pydicom.dataset.Dataset`.
|
|
"""
|
|
if self.for_addition:
|
|
return dcmread(cast(Path, self._stage_path))
|
|
|
|
return dcmread(self.path)
|
|
|
|
@property
|
|
def path(self) -> str:
|
|
"""Return the path to the corresponding instance as :class:`str`.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
The absolute path to the corresponding instance. If the instance is
|
|
staged for addition to the File-set this will be a path to the
|
|
staged file in the temporary staging directory.
|
|
"""
|
|
if self.for_addition:
|
|
return os.fspath(cast(Path, self._stage_path))
|
|
|
|
# If not staged for addition then File Set must exist on file system
|
|
return os.fspath(
|
|
cast(Path, self.file_set.path) / cast(Path, self.node._file_id)
|
|
)
|
|
|
|
@property
|
|
def SOPClassUID(self) -> UID:
|
|
"""Return the *SOP Class UID* of the referenced instance."""
|
|
return cast(UID, self.ReferencedSOPClassUIDInFile)
|
|
|
|
@property
|
|
def SOPInstanceUID(self) -> UID:
|
|
"""Return the *SOP Instance UID* of the referenced instance."""
|
|
return cast(UID, self.ReferencedSOPInstanceUIDInFile)
|
|
|
|
@property
|
|
def TransferSyntaxUID(self) -> UID:
|
|
"""Return the *Transfer Syntax UID* of the referenced instance."""
|
|
return cast(UID, self.ReferencedTransferSyntaxUIDInFile)
|
|
|
|
|
|
DSPathType = Dataset | str | os.PathLike
|
|
|
|
|
|
class FileSet:
|
|
"""Representation of a DICOM File-set."""
|
|
|
|
def __init__(self, ds: DSPathType | None = None) -> None:
|
|
"""Create or load a File-set.
|
|
|
|
Parameters
|
|
----------
|
|
ds : pydicom.dataset.Dataset, str or PathLike, optional
|
|
If loading a File-set, the DICOMDIR dataset or the path
|
|
to the DICOMDIR file.
|
|
"""
|
|
# The nominal path to the root of the File-set
|
|
self._path: Path | None = None
|
|
# The root node of the record tree used to fill out the DICOMDIR's
|
|
# *Directory Record Sequence*.
|
|
# The tree for instances currently in the File-set
|
|
self._tree = RootNode(self)
|
|
|
|
# For tracking changes to the File-set
|
|
self._stage: dict[str, Any] = {
|
|
"t": TemporaryDirectory(),
|
|
"+": {}, # instances staged for addition
|
|
"-": {}, # instances staged for removal
|
|
"~": False, # instances staged for moving
|
|
"^": False, # a File-set Identification module element has changed
|
|
}
|
|
self._stage["path"] = Path(self._stage["t"].name)
|
|
|
|
# The DICOMDIR instance, not guaranteed to be up-to-date
|
|
self._ds = Dataset()
|
|
# The File-set's managed SOP Instances as list of FileInstance
|
|
self._instances: list[FileInstance] = []
|
|
# Use alphanumeric or numeric File IDs
|
|
self._use_alphanumeric = False
|
|
|
|
# The File-set ID
|
|
self._id: str | None = None
|
|
# The File-set UID
|
|
self._uid: UID | None = None
|
|
# The File-set Descriptor File ID
|
|
self._descriptor: str | None = None
|
|
# The Specific Character Set of File-set Descriptor File
|
|
self._charset: str | None = None
|
|
|
|
# Check the DICOMDIR dataset and create the record tree
|
|
if ds:
|
|
self.load(ds)
|
|
else:
|
|
# New File-set
|
|
self.UID = generate_uid()
|
|
|
|
def add(self, ds_or_path: DSPathType) -> FileInstance:
|
|
"""Stage an instance for addition to the File-set.
|
|
|
|
If the instance has been staged for removal then calling
|
|
:meth:`~pydicom.fileset.FileSet.add` will cancel the staging
|
|
and the instance will not be removed.
|
|
|
|
Parameters
|
|
----------
|
|
ds_or_path : pydicom.dataset.Dataset, str or PathLike
|
|
The instance to add to the File-set, either as a
|
|
:class:`~pydicom.dataset.Dataset` or the path to the instance.
|
|
|
|
Returns
|
|
-------
|
|
FileInstance
|
|
The :class:`~pydicom.fileset.FileInstance` that was added.
|
|
|
|
See Also
|
|
--------
|
|
:meth:`~pydicom.fileset.FileSet.add_custom`
|
|
"""
|
|
ds: Dataset | FileDataset
|
|
if isinstance(ds_or_path, str | os.PathLike):
|
|
ds = dcmread(ds_or_path)
|
|
else:
|
|
ds = ds_or_path
|
|
|
|
key = ds.SOPInstanceUID
|
|
have_instance = [ii for ii in self if ii.SOPInstanceUID == key]
|
|
|
|
# If staged for removal, keep instead - check this now because
|
|
# `have_instance` is False when instance staged for removal
|
|
if key in self._stage["-"]:
|
|
instance = self._stage["-"][key]
|
|
del self._stage["-"][key]
|
|
self._instances.append(instance)
|
|
instance._apply_stage("+")
|
|
|
|
return cast(FileInstance, instance)
|
|
|
|
# The instance is already in the File-set (and not staged for removal)
|
|
# May or may not be staged for addition/movement
|
|
if have_instance:
|
|
return have_instance[0]
|
|
|
|
# If not already in the File-set, stage for addition
|
|
# Create the directory records and tree nodes for the dataset
|
|
# For instances that won't contain PRIVATE records we shouldn't have
|
|
# to worry about exceeding the maximum component depth of 8
|
|
record_gen = self._recordify(ds)
|
|
record = next(record_gen)
|
|
parent = RecordNode(record)
|
|
node = parent # Maybe only be a single record
|
|
for record in record_gen:
|
|
node = RecordNode(record)
|
|
node.parent = parent
|
|
parent = node
|
|
|
|
instance = FileInstance(node)
|
|
node.instance = instance
|
|
self._tree.add(node)
|
|
|
|
# Save the dataset to the stage
|
|
self._stage["+"][instance.SOPInstanceUID] = instance
|
|
self._instances.append(instance)
|
|
instance._apply_stage("+")
|
|
ds.save_as(instance.path, enforce_file_format=True)
|
|
|
|
return cast(FileInstance, instance)
|
|
|
|
def add_custom(self, ds_or_path: DSPathType, leaf: RecordNode) -> FileInstance:
|
|
"""Stage an instance for addition to the File-set using custom records.
|
|
|
|
This method allows you to add a SOP instance and customize the
|
|
directory records that will be used when writing the DICOMDIR file. It
|
|
must be used when you require PRIVATE records and may be used instead
|
|
of modifying :attr:`~pydicom.fileset.DIRECTORY_RECORDERS` with your
|
|
own record definition functions when the default functions aren't
|
|
suitable.
|
|
|
|
The following elements will be added automatically to the supplied
|
|
directory records if required and not present:
|
|
|
|
* (0004,1400) *Offset of the Next Directory Record*
|
|
* (0004,1410) *Record In-use Flag*
|
|
* (0004,1420) *Offset of Referenced Lower-Level Directory Entity*
|
|
* (0004,1500) *Referenced File ID*
|
|
* (0004,1510) *Referenced SOP Class UID in File*
|
|
* (0004,1511) *Referenced SOP Instance UID in File*
|
|
* (0004,1512) *Referenced Transfer Syntax UID in File*
|
|
|
|
If the instance has been staged for removal then calling
|
|
:meth:`~pydicom.fileset.FileSet.add_custom` will cancel the staging
|
|
and the instance will not be removed.
|
|
|
|
Examples
|
|
--------
|
|
|
|
Add a SOP Instance using a two record hierarchy of PATIENT -> PRIVATE
|
|
|
|
.. code-block:: python
|
|
|
|
from pydicom import Dataset, examples
|
|
from pydicom.fileset import FileSet, RecordNode
|
|
from pydicom.uid import generate_uid
|
|
|
|
# The instance to be added
|
|
ds = examples.ct
|
|
|
|
# Define the leaf node (the PRIVATE record)
|
|
record = Dataset()
|
|
record.DirectoryRecordType = "PRIVATE"
|
|
record.PrivateRecordUID = generate_uid()
|
|
leaf_node = RecordNode(record)
|
|
|
|
# Define the top node (the PATIENT record)
|
|
record = Dataset()
|
|
record.DirectoryRecordType = "PATIENT"
|
|
record.PatientID = ds.PatientID
|
|
record.PatientName = ds.PatientName
|
|
top_node = RecordNode(record)
|
|
|
|
# Set the node relationship
|
|
leaf_node.parent = top_node
|
|
|
|
# Add the instance to the File-set
|
|
fs = FileSet()
|
|
instance = fs.add_custom(ds, leaf_node)
|
|
|
|
Parameters
|
|
----------
|
|
ds_or_path : pydicom.dataset.Dataset, str or PathLike
|
|
The instance to add to the File-set, either as a
|
|
:class:`~pydicom.dataset.Dataset` or the path to the instance.
|
|
leaf : pydicom.fileset.RecordNode
|
|
The leaf node for the instance, should have its ancestors nodes set
|
|
correctly as well as their corresponding directory records. Should
|
|
have no more than 7 ancestors due to the semantics used by
|
|
:class:`~pydicom.fileset.FileSet` when creating the directory
|
|
structure.
|
|
|
|
Returns
|
|
-------
|
|
FileInstance
|
|
The :class:`~pydicom.fileset.FileInstance` that was added.
|
|
|
|
See Also
|
|
--------
|
|
:meth:`~pydicom.fileset.FileSet.add`
|
|
"""
|
|
ds: Dataset | FileDataset
|
|
if isinstance(ds_or_path, str | os.PathLike):
|
|
ds = dcmread(ds_or_path)
|
|
else:
|
|
ds = ds_or_path
|
|
|
|
# Check the supplied nodes
|
|
if leaf.depth > 7:
|
|
raise ValueError(
|
|
"The 'leaf' node must not have more than 7 ancestors as "
|
|
"'FileSet' supports a maximum directory structure depth of 8"
|
|
)
|
|
|
|
key = ds.SOPInstanceUID
|
|
have_instance = [ii for ii in self if ii.SOPInstanceUID == key]
|
|
|
|
# If staged for removal, keep instead - check this now because
|
|
# `have_instance` is False when instance staged for removal
|
|
if key in self._stage["-"]:
|
|
instance = self._stage["-"][key]
|
|
del self._stage["-"][key]
|
|
self._instances.append(instance)
|
|
instance._apply_stage("+")
|
|
|
|
return cast(FileInstance, instance)
|
|
|
|
if have_instance:
|
|
return have_instance[0]
|
|
|
|
# Ensure the leaf node's record contains the required elements
|
|
leaf._record.ReferencedFileID = None
|
|
leaf._record.ReferencedSOPClassUIDInFile = ds.SOPClassUID
|
|
leaf._record.ReferencedSOPInstanceUIDInFile = key
|
|
leaf._record.ReferencedTransferSyntaxUIDInFile = ds.file_meta.TransferSyntaxUID
|
|
|
|
instance = FileInstance(leaf)
|
|
leaf.instance = instance
|
|
self._tree.add(leaf)
|
|
|
|
# Save the dataset to the stage
|
|
self._stage["+"][instance.SOPInstanceUID] = instance
|
|
self._instances.append(instance)
|
|
instance._apply_stage("+")
|
|
ds.save_as(instance.path, enforce_file_format=True)
|
|
|
|
return cast(FileInstance, instance)
|
|
|
|
def clear(self) -> None:
|
|
"""Clear the File-set."""
|
|
self._tree.children = []
|
|
self._instances = []
|
|
self._path = None
|
|
self._ds = Dataset()
|
|
self._id = None
|
|
self._uid = generate_uid()
|
|
self._descriptor = None
|
|
self._charset = None
|
|
|
|
# Clean and reset the stage
|
|
self._stage["+"] = {}
|
|
self._stage["-"] = {}
|
|
self._stage["~"] = False
|
|
self._stage["^"] = False
|
|
self._stage["t"].cleanup()
|
|
self._stage["t"] = TemporaryDirectory()
|
|
self._stage["path"] = Path(self._stage["t"].name)
|
|
|
|
def copy(self, path: str | os.PathLike, force_implicit: bool = False) -> "FileSet":
|
|
"""Copy the File-set to a new root directory and return the copied
|
|
File-set.
|
|
|
|
Changes staged to the original :class:`~pydicom.fileset.FileSet` will
|
|
be applied to the new File-set. The original
|
|
:class:`~pydicom.fileset.FileSet` will remain staged.
|
|
|
|
Parameters
|
|
----------
|
|
path : str or PathLike
|
|
The root directory where the File-set is to be copied to.
|
|
force_implicit : bool, optional
|
|
If ``True`` force the DICOMDIR file to be encoded using *Implicit
|
|
VR Little Endian* which is non-conformant to the DICOM Standard
|
|
(default ``False``).
|
|
|
|
Returns
|
|
-------
|
|
pydicom.fileset.FileSet
|
|
The copied File-set as a :class:`~pydicom.fileset.FileSet`.
|
|
"""
|
|
# !! We can't change anything public in the original FileSet !!
|
|
|
|
path = Path(path)
|
|
if self.path and Path(self.path) == path:
|
|
raise ValueError("Cannot copy the File-set as the 'path' is unchanged")
|
|
|
|
if len(self) > 10**6:
|
|
self._use_alphanumeric = True
|
|
|
|
if len(self) > 35**6:
|
|
raise NotImplementedError(
|
|
"pydicom doesn't support writing File-sets with more than "
|
|
"1838265625 managed instances"
|
|
)
|
|
|
|
# Removals are detached from the tree
|
|
detached_nodes = []
|
|
for instance in self._stage["-"].values():
|
|
detached_nodes.append(instance.node)
|
|
self._tree.remove(instance.node)
|
|
continue
|
|
|
|
file_ids = []
|
|
for instance in self:
|
|
file_ids.append(instance.ReferencedFileID)
|
|
dst = path / Path(instance.FileID)
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copyfile(instance.path, dst)
|
|
instance.node._record.ReferencedFileID = instance.FileID.split(os.path.sep)
|
|
|
|
# Create the DICOMDIR file
|
|
p = path / "DICOMDIR"
|
|
with open(p, "wb") as fp:
|
|
f = DicomFileLike(fp)
|
|
self._write_dicomdir(f, copy_safe=True, force_implicit=force_implicit)
|
|
|
|
# Reset the *Referenced File ID* values
|
|
# The order here doesn't matter because removed instances aren't
|
|
# yielded by iter(self)
|
|
for instance, file_id in zip(self, file_ids):
|
|
instance.node._record.ReferencedFileID = file_id
|
|
|
|
# Reattach the removed nodes
|
|
for node in detached_nodes:
|
|
self._tree.add(node)
|
|
|
|
fs = FileSet()
|
|
fs.load(p, raise_orphans=True)
|
|
|
|
return fs
|
|
|
|
def _create_dicomdir(self) -> Dataset:
|
|
"""Return a new minimal DICOMDIR dataset."""
|
|
ds = Dataset()
|
|
ds.filename = None
|
|
|
|
ds.file_meta = FileMetaDataset()
|
|
ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian
|
|
ds.file_meta.MediaStorageSOPInstanceUID = self.UID
|
|
ds.file_meta.MediaStorageSOPClassUID = MediaStorageDirectoryStorage
|
|
|
|
ds.FileSetID = self.ID
|
|
ds.OffsetOfTheFirstDirectoryRecordOfTheRootDirectoryEntity = 0
|
|
ds.OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity = 0
|
|
ds.FileSetConsistencyFlag = 0
|
|
ds.DirectoryRecordSequence = []
|
|
|
|
if self.descriptor_file_id:
|
|
ds.FileSetDescriptorFileID = self.descriptor_file_id
|
|
if self.descriptor_character_set:
|
|
ds.SpecificCharacterSetOfFileSetDescriptorFile = (
|
|
self.descriptor_character_set
|
|
)
|
|
|
|
return ds
|
|
|
|
@property
|
|
def descriptor_character_set(self) -> str | None:
|
|
"""Return the *Specific Character Set of File-set Descriptor File*
|
|
(if available) or ``None``.
|
|
"""
|
|
return self._charset
|
|
|
|
@descriptor_character_set.setter
|
|
def descriptor_character_set(self, val: str | None) -> None:
|
|
"""Set the *Specific Character Set of File-set Descriptor File*.
|
|
|
|
The descriptor file itself is used for user comments related to the
|
|
File-set (e.g. a README file) and is up the user to create.
|
|
|
|
Parameters
|
|
----------
|
|
val : str or None
|
|
The value to use for the DICOMDIR's (0004,1142) *Specific
|
|
Character Set of File-set Descriptor File*. See :dcm:`C.12.1.1.2
|
|
in Part 3 of the DICOM Standard
|
|
<part03/sect_C.12.html#sect_C.12.1.1.2>` for defined terms.
|
|
|
|
See Also
|
|
--------
|
|
:attr:`~pydicom.fileset.FileSet.descriptor_file_id` set the descriptor
|
|
file ID for the file that uses the character set.
|
|
"""
|
|
if val == self._charset:
|
|
return
|
|
|
|
self._charset = val
|
|
if self._ds:
|
|
self._ds.SpecificCharacterSetOfFileSetDescriptorFile = val
|
|
self._stage["^"] = True
|
|
|
|
@property
|
|
def descriptor_file_id(self) -> str | None:
|
|
"""Return the *File-set Descriptor File ID* (if available) or ``None``."""
|
|
return self._descriptor
|
|
|
|
@descriptor_file_id.setter
|
|
def descriptor_file_id(self, val: str | None) -> None:
|
|
"""Set the *File-set Descriptor File ID*.
|
|
|
|
The descriptor file itself is used for user comments related to the
|
|
File-set (e.g. a README file) and is up the user to create.
|
|
|
|
Parameters
|
|
----------
|
|
val : str, list of str or None
|
|
The value to use for the DICOMDIR's (0004,1141) *File-set
|
|
Descriptor File ID*. Should be the relative path to the descriptor
|
|
file and has a maximum length of 8 components, with each component
|
|
up to 16 characters long.
|
|
|
|
Raises
|
|
------
|
|
ValueError
|
|
If `val` has more than 8 items or if each item is longer than 16
|
|
characters.
|
|
|
|
See Also
|
|
--------
|
|
:attr:`~pydicom.fileset.FileSet.descriptor_character_set` the
|
|
character set used in the descriptor file, required if an expanded or
|
|
replaced character set is used.
|
|
"""
|
|
if val == self._descriptor:
|
|
return
|
|
|
|
if val is None:
|
|
pass
|
|
elif isinstance(val, list):
|
|
try:
|
|
assert len(val) <= 8
|
|
for component in val:
|
|
assert isinstance(component, str)
|
|
assert 0 <= len(component) <= 16
|
|
except AssertionError:
|
|
raise ValueError(
|
|
"The 'File-set Descriptor File ID' has a maximum of 8 "
|
|
"components, each between 0 and 16 characters long"
|
|
)
|
|
|
|
# Push the value through Path to clean it up and check validity
|
|
val = list(Path(*val).parts)
|
|
elif isinstance(val, str):
|
|
if not 0 <= len(val) <= 16:
|
|
raise ValueError(
|
|
"Each 'File-set Descriptor File ID' component has a "
|
|
"maximum length of 16 characters"
|
|
)
|
|
else:
|
|
raise TypeError(
|
|
"The 'DescriptorFileID' must be a str, list of str, or None"
|
|
)
|
|
|
|
self._descriptor = val
|
|
if self._ds:
|
|
self._ds.FileSetDescriptorFileID = self._descriptor
|
|
self._stage["^"] = True
|
|
|
|
def find(self, load: bool = False, **kwargs: Any) -> list[FileInstance]:
|
|
"""Return matching instances in the File-set
|
|
|
|
**Limitations**
|
|
|
|
* Only single value matching is supported so neither
|
|
``PatientID=['1234567', '7654321']`` or ``PatientID='1234567',
|
|
PatientID='7654321'`` will work (although the first example will
|
|
work if the *Patient ID* is actually multi-valued).
|
|
* Repeating group and private elements cannot be used when searching.
|
|
|
|
Parameters
|
|
----------
|
|
load : bool, optional
|
|
If ``True``, then load the SOP Instances belonging to the
|
|
File-set and perform the search against their available elements.
|
|
Otherwise (default) search only the elements available in the
|
|
corresponding directory records (more efficient, but only a limited
|
|
number of elements are available).
|
|
**kwargs
|
|
Search parameters, as element keyword=value (i.e.
|
|
``PatientID='1234567', StudyDescription="My study"``.
|
|
|
|
Returns
|
|
-------
|
|
list of pydicom.fileset.FileInstance
|
|
A list of matching instances.
|
|
"""
|
|
if not kwargs:
|
|
return self._instances[:]
|
|
|
|
# Flag whether or not the query elements are in the DICOMDIR records
|
|
has_elements = False
|
|
|
|
def match(ds: Dataset | FileInstance, **kwargs: Any) -> bool:
|
|
nonlocal has_elements
|
|
if load:
|
|
ds = ds.load()
|
|
|
|
# Check that all query elements are present
|
|
if all([kw in ds for kw in kwargs]):
|
|
has_elements = True
|
|
|
|
for kw, val in kwargs.items():
|
|
try:
|
|
assert ds[kw].value == val
|
|
except (AssertionError, KeyError):
|
|
return False
|
|
|
|
return True
|
|
|
|
matches = [instance for instance in self if match(instance, **kwargs)]
|
|
|
|
if not load and not has_elements:
|
|
warn_and_log(
|
|
"None of the records in the DICOMDIR dataset contain all "
|
|
"the query elements, consider using the 'load' parameter "
|
|
"to expand the search to the corresponding SOP instances"
|
|
)
|
|
|
|
return matches
|
|
|
|
def find_values(
|
|
self,
|
|
elements: str | int | list[str | int],
|
|
instances: list[FileInstance] | None = None,
|
|
load: bool = False,
|
|
) -> list[Any] | dict[str | int, list[Any]]:
|
|
"""Return a list of unique values for given element(s).
|
|
|
|
Parameters
|
|
----------
|
|
elements : str, int or pydicom.tag.BaseTag, or list of these
|
|
The keyword or tag of the element(s) to search for.
|
|
instances : list of pydicom.fileset.FileInstance, optional
|
|
Search within the given instances. If not used then all available
|
|
instances will be searched.
|
|
load : bool, optional
|
|
If ``True``, then load the SOP Instances belonging to the
|
|
File-set and perform the search against their available elements.
|
|
Otherwise (default) search only the elements available in the
|
|
corresponding directory records (more efficient, but only a limited
|
|
number of elements are available).
|
|
|
|
Returns
|
|
-------
|
|
list of object(s), or dict of lists of object(s)
|
|
|
|
* If single element was queried: A list of value(s) for the element
|
|
available in the instances.
|
|
* If list of elements was queried: A dict of element value pairs
|
|
with lists of value(s) for the elements available in the instances.
|
|
"""
|
|
element_list = elements if isinstance(elements, list) else [elements]
|
|
has_element = {element: False for element in element_list}
|
|
results: dict[str | int, list[Any]] = {element: [] for element in element_list}
|
|
iter_instances = instances or iter(self)
|
|
instance: Dataset | FileInstance
|
|
for instance in iter_instances:
|
|
if load:
|
|
instance = instance.load()
|
|
|
|
for element in element_list:
|
|
if element not in instance:
|
|
continue
|
|
|
|
has_element[element] = True
|
|
val = instance[element].value
|
|
# Not very efficient, but we can't use set
|
|
if val not in results[element]:
|
|
results[element].append(val)
|
|
|
|
missing_elements = [element for element, v in has_element.items() if not v]
|
|
if not load and missing_elements:
|
|
warn_and_log(
|
|
"None of the records in the DICOMDIR dataset contain "
|
|
f"{missing_elements}, consider using the 'load' parameter "
|
|
"to expand the search to the corresponding SOP instances"
|
|
)
|
|
|
|
if not isinstance(elements, list):
|
|
return results[element_list[0]]
|
|
|
|
return results
|
|
|
|
@property
|
|
def ID(self) -> str | None:
|
|
"""Return the *File-set ID* (if available) or ``None``."""
|
|
return self._id
|
|
|
|
@ID.setter
|
|
def ID(self, val: str | None) -> None:
|
|
"""Set the File-set ID.
|
|
|
|
Parameters
|
|
----------
|
|
val : str or None
|
|
The value to use for the DICOMDIR's (0004,1130) *File-set ID*.
|
|
|
|
Raises
|
|
------
|
|
ValueError
|
|
If `val` is greater than 16 characters long.
|
|
"""
|
|
if val == self._id:
|
|
return
|
|
|
|
if val is None or 0 <= len(val) <= 16:
|
|
self._id = val
|
|
if self._ds:
|
|
self._ds.FileSetID = val
|
|
self._stage["^"] = True
|
|
else:
|
|
raise ValueError("The maximum length of the 'File-set ID' is 16 characters")
|
|
|
|
@property
|
|
def is_staged(self) -> bool:
|
|
"""Return ``True`` if the File-set is new or has changes staged."""
|
|
return any(self._stage[c] for c in "+-^~")
|
|
|
|
def __iter__(self) -> Iterator[FileInstance]:
|
|
"""Yield :class:`~pydicom.fileset.FileInstance` from the File-set."""
|
|
yield from self._instances[:]
|
|
|
|
def __len__(self) -> int:
|
|
"""Return the number of instances in the File-set."""
|
|
return len(self._instances)
|
|
|
|
def load(
|
|
self,
|
|
ds_or_path: DSPathType,
|
|
include_orphans: bool = True,
|
|
raise_orphans: bool = False,
|
|
) -> None:
|
|
"""Load an existing File-set.
|
|
|
|
Existing File-sets that do not use the same directory structure as
|
|
*pydicom* will be staged to be moved to a new structure. This is
|
|
because the DICOM Standard attaches no semantics to *how* the files
|
|
in a File-set are to be structured so it's impossible to determine what
|
|
the layout will be when changes are to be made.
|
|
|
|
Parameters
|
|
----------
|
|
ds_or_path : pydicom.dataset.Dataset, str or PathLike
|
|
An existing File-set's DICOMDIR, either as a
|
|
:class:`~pydicom.dataset.Dataset` or the path to the DICOMDIR file
|
|
as :class:`str` or pathlike.
|
|
include_orphans : bool, optional
|
|
If ``True`` (default) include instances referenced by orphaned
|
|
directory records in the File-set.
|
|
raise_orphans : bool, optional
|
|
If ``True`` then raise an exception if orphaned directory records
|
|
are found in the File-set (default ``False``).
|
|
"""
|
|
if isinstance(ds_or_path, Dataset):
|
|
ds = ds_or_path
|
|
else:
|
|
ds = dcmread(ds_or_path)
|
|
|
|
sop_class = ds.file_meta.get("MediaStorageSOPClassUID", None)
|
|
if sop_class != MediaStorageDirectoryStorage:
|
|
raise ValueError(
|
|
"Unable to load the File-set as the supplied dataset is "
|
|
"not a 'Media Storage Directory' instance"
|
|
)
|
|
|
|
tsyntax = ds.file_meta.TransferSyntaxUID
|
|
if tsyntax != ExplicitVRLittleEndian:
|
|
warn_and_log(
|
|
"The DICOMDIR dataset uses an invalid transfer syntax "
|
|
f"'{tsyntax.name}' and will be updated to use 'Explicit VR "
|
|
"Little Endian'"
|
|
)
|
|
|
|
try:
|
|
path = Path(cast(str, ds.filename)).resolve(strict=True)
|
|
except FileNotFoundError:
|
|
raise FileNotFoundError(
|
|
"Unable to load the File-set as the 'filename' attribute "
|
|
"for the DICOMDIR dataset is not a valid path: "
|
|
f"{ds.filename}"
|
|
)
|
|
except TypeError:
|
|
# Custom message if DICOMDIR from bytes, etc
|
|
raise TypeError(
|
|
"Unable to load the File-set as the DICOMDIR dataset must "
|
|
"have a 'filename' attribute set to the path of the "
|
|
"DICOMDIR file"
|
|
)
|
|
|
|
self.clear()
|
|
self._id = cast(str | None, ds.get("FileSetID", None))
|
|
uid = cast(UID | None, ds.file_meta.get("MediaStorageSOPInstanceUID"))
|
|
if not uid:
|
|
uid = generate_uid()
|
|
ds.file_meta.MediaStorageSOPInstanceUID = uid
|
|
self._uid = uid
|
|
self._descriptor = cast(str | None, ds.get("FileSetDescriptorFileID", None))
|
|
self._charset = cast(
|
|
str | None, ds.get("SpecificCharacterSetOfFileSetDescriptorFile", None)
|
|
)
|
|
self._path = path.parent
|
|
self._ds = ds
|
|
|
|
# Create the record tree
|
|
self._parse_records(ds, include_orphans, raise_orphans)
|
|
|
|
bad_instances = []
|
|
for instance in self:
|
|
# Check that the referenced file exists
|
|
file_id = instance.node._file_id
|
|
if file_id is None:
|
|
bad_instances.append(instance)
|
|
continue
|
|
|
|
try:
|
|
# self.path is already set at this point
|
|
(cast(Path, self.path) / file_id).resolve(strict=True)
|
|
except FileNotFoundError:
|
|
bad_instances.append(instance)
|
|
warn_and_log(
|
|
"The referenced SOP Instance for the directory record at "
|
|
f"offset {instance.node._offset} does not exist: "
|
|
f"{cast(Path, self.path) / file_id}"
|
|
)
|
|
continue
|
|
|
|
# If the instance's existing directory structure doesn't match
|
|
# the pydicom semantics then stage for movement
|
|
if instance.for_moving:
|
|
self._stage["~"] = True
|
|
|
|
for instance in bad_instances:
|
|
self._instances.remove(instance)
|
|
|
|
def _parse_records(
|
|
self, ds: Dataset, include_orphans: bool, raise_orphans: bool = False
|
|
) -> None:
|
|
"""Parse the records in an existing DICOMDIR.
|
|
|
|
Parameters
|
|
----------
|
|
ds : pydicom.dataset.Dataset
|
|
The File-set's DICOMDIR dataset.
|
|
include_orphans : bool
|
|
If ``True`` then include within the File-set orphaned records that
|
|
contain a valid (and unique) *Referenced File ID* element. Orphaned
|
|
records are those that aren't placed within the *Directory Record
|
|
Sequence* hierarchy.
|
|
raise_orphans : bool, optional
|
|
If ``True`` then raise an exception if orphaned directory records
|
|
are found in the File-set (default ``False``).
|
|
"""
|
|
# First pass: get the offsets for each record
|
|
records = {}
|
|
for record in cast(Iterable[Dataset], ds.DirectoryRecordSequence):
|
|
offset = cast(int, record.seq_item_tell)
|
|
node = RecordNode(record)
|
|
node._offset = offset
|
|
records[offset] = node
|
|
|
|
# Define the top-level nodes
|
|
if records:
|
|
node = records[ds[_FIRST_OFFSET].value]
|
|
node.parent = self._tree
|
|
while getattr(node._record, _NEXT_OFFSET, None):
|
|
node = records[node._record[_NEXT_OFFSET].value]
|
|
node.parent = self._tree
|
|
|
|
# Second pass: build the record hierarchy
|
|
# Records not in the hierarchy will be ignored
|
|
# Branches without a valid leaf node File ID will be removed
|
|
def recurse_node(node: RecordNode) -> None:
|
|
child_offset = getattr(node._record, _LOWER_OFFSET, None)
|
|
if child_offset:
|
|
child = records[child_offset]
|
|
child.parent = node
|
|
|
|
next_offset = getattr(child._record, _NEXT_OFFSET, None)
|
|
while next_offset:
|
|
child = records[next_offset]
|
|
child.parent = node
|
|
next_offset = getattr(child._record, _NEXT_OFFSET, None)
|
|
elif "ReferencedFileID" not in node._record:
|
|
# No children = leaf node, leaf nodes must reference a File ID
|
|
del node.parent[node]
|
|
|
|
# The leaf node references the FileInstance
|
|
if "ReferencedFileID" in node._record:
|
|
node.instance = FileInstance(node)
|
|
self._instances.append(node.instance)
|
|
|
|
for child in node.children:
|
|
recurse_node(child)
|
|
|
|
for node in self._tree.children:
|
|
recurse_node(node)
|
|
|
|
if len(records) == len(list(iter(self._tree))):
|
|
return
|
|
|
|
if raise_orphans:
|
|
raise ValueError("The DICOMDIR contains orphaned directory records")
|
|
|
|
# DICOMDIR contains orphaned records
|
|
# Determine which nodes are both orphaned and reference an instance
|
|
missing_set = set(records.keys()) - {ii._offset for ii in self._tree}
|
|
missing = [records[o] for o in missing_set]
|
|
missing = [r for r in missing if "ReferencedFileID" in r._record]
|
|
|
|
if missing and not include_orphans:
|
|
warn_and_log(
|
|
f"The DICOMDIR has {len(missing)} orphaned directory records "
|
|
"that reference an instance that will not be included in the "
|
|
"File-set"
|
|
)
|
|
return
|
|
|
|
for node in missing:
|
|
# Get the path to the orphaned instance
|
|
original_value = node._record.ReferencedFileID
|
|
file_id = node._file_id
|
|
if file_id is None:
|
|
continue
|
|
|
|
# self.path is set for an existing File Set
|
|
path = cast(Path, self.path) / file_id
|
|
if node.record_type == "PRIVATE":
|
|
instance = self.add_custom(path, node)
|
|
else:
|
|
instance = self.add(path)
|
|
|
|
# Because the record is new the Referenced File ID isn't set
|
|
instance.node._record.ReferencedFileID = original_value
|
|
|
|
@property
|
|
def path(self) -> str | None:
|
|
"""Return the absolute path to the File-set root directory as
|
|
:class:`str` (if set) or ``None`` otherwise.
|
|
"""
|
|
if self._path is not None:
|
|
return os.fspath(self._path)
|
|
|
|
return self._path
|
|
|
|
def _recordify(self, ds: Dataset) -> Iterator[Dataset]:
|
|
"""Yield directory records for a SOP Instance.
|
|
|
|
Parameters
|
|
----------
|
|
ds : pydicom.dataset.Dataset
|
|
The SOP Instance to create DICOMDIR directory records for.
|
|
|
|
Yields
|
|
------
|
|
ds : pydicom.dataset.Dataset
|
|
A directory record for the instance, ordered from highest to
|
|
lowest level.
|
|
|
|
Raises
|
|
------
|
|
ValueError
|
|
If unable to create the required directory records because of
|
|
a missing required element or element value.
|
|
"""
|
|
# Single-level records: leaf
|
|
record_type = _single_level_record_type(ds)
|
|
if record_type != "PATIENT":
|
|
try:
|
|
record = DIRECTORY_RECORDERS[record_type](ds)
|
|
except ValueError as exc:
|
|
raise ValueError(
|
|
f"Unable to use the default '{record_type}' "
|
|
f"record creator: {exc}. See DICOM PS3.3 Section F.5. "
|
|
"Either update the instance, "
|
|
"define your own record creation function or use "
|
|
"'FileSet.add_custom()' instead"
|
|
) from exc
|
|
|
|
record.OffsetOfTheNextDirectoryRecord = 0
|
|
record.RecordInUseFlag = 0xFFFF
|
|
record.OffsetOfReferencedLowerLevelDirectoryEntity = 0
|
|
record.DirectoryRecordType = record_type
|
|
record.ReferencedFileID = None
|
|
record.ReferencedSOPClassUIDInFile = ds.SOPClassUID
|
|
record.ReferencedSOPInstanceUIDInFile = ds.SOPInstanceUID
|
|
record.ReferencedTransferSyntaxUIDInFile = ds.file_meta.TransferSyntaxUID
|
|
|
|
yield record
|
|
return
|
|
|
|
# Four-level records: PATIENT -> STUDY -> SERIES -> leaf
|
|
records = []
|
|
leaf_type = _four_level_record_type(ds)
|
|
for record_type in ["PATIENT", "STUDY", "SERIES", leaf_type]:
|
|
try:
|
|
record = DIRECTORY_RECORDERS[record_type](ds)
|
|
except ValueError as exc:
|
|
raise ValueError(
|
|
f"Unable to use the default '{record_type}' "
|
|
f"record creator: {exc}. See DICOM PS3.3 Section F.5. "
|
|
"Either update the instance, "
|
|
"define your own record creation function or use "
|
|
"'FileSet.add_custom()' instead"
|
|
) from exc
|
|
|
|
record.OffsetOfTheNextDirectoryRecord = 0
|
|
record.RecordInUseFlag = 0xFFFF
|
|
record.OffsetOfReferencedLowerLevelDirectoryEntity = 0
|
|
record.DirectoryRecordType = record_type
|
|
if "SpecificCharacterSet" in ds:
|
|
record.SpecificCharacterSet = ds.SpecificCharacterSet
|
|
|
|
records.append(record)
|
|
|
|
# Add the instance referencing elements to the leaf
|
|
leaf = records[3]
|
|
leaf.ReferencedFileID = None
|
|
leaf.ReferencedSOPClassUIDInFile = ds.SOPClassUID
|
|
leaf.ReferencedSOPInstanceUIDInFile = ds.SOPInstanceUID
|
|
leaf.ReferencedTransferSyntaxUIDInFile = ds.file_meta.TransferSyntaxUID
|
|
|
|
yield from records
|
|
|
|
def remove(self, instance: FileInstance | list[FileInstance]) -> None:
|
|
"""Stage instance(s) for removal from the File-set.
|
|
|
|
If the instance has been staged for addition to the File-set, calling
|
|
:meth:`~pydicom.fileset.FileSet.remove` will cancel the staging and
|
|
the instance will not be added.
|
|
|
|
Parameters
|
|
----------
|
|
instance : pydicom.fileset.FileInstance or a list of FileInstance
|
|
The instance(s) to remove from the File-set.
|
|
"""
|
|
if isinstance(instance, list):
|
|
for item in instance:
|
|
self.remove(item)
|
|
return
|
|
|
|
if instance not in self._instances:
|
|
raise ValueError("No such instance in the File-set")
|
|
|
|
# If staged for addition, no longer add
|
|
if instance.SOPInstanceUID in self._stage["+"]:
|
|
leaf = instance.node
|
|
del leaf.parent[leaf]
|
|
del self._stage["+"][instance.SOPInstanceUID]
|
|
# Delete file from stage
|
|
try:
|
|
Path(instance.path).unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
instance._apply_stage("-")
|
|
self._instances.remove(instance)
|
|
|
|
# Stage for removal if not already done
|
|
elif instance.SOPInstanceUID not in self._stage["-"]:
|
|
instance._apply_stage("-")
|
|
self._stage["-"][instance.SOPInstanceUID] = instance
|
|
self._instances.remove(instance)
|
|
|
|
def __str__(self) -> str:
|
|
"""Return a string representation of the FileSet."""
|
|
s = [
|
|
"DICOM File-set",
|
|
f" Root directory: {self.path or '(no value available)'}",
|
|
f" File-set ID: {self.ID or '(no value available)'}",
|
|
f" File-set UID: {self.UID}",
|
|
(
|
|
f" Descriptor file ID: "
|
|
f"{self.descriptor_file_id or '(no value available)'}"
|
|
),
|
|
(
|
|
f" Descriptor file character set: "
|
|
f"{self.descriptor_character_set or '(no value available)'}"
|
|
),
|
|
]
|
|
if self.is_staged:
|
|
changes = []
|
|
if not self._ds:
|
|
changes.append("DICOMDIR creation")
|
|
else:
|
|
changes.append("DICOMDIR update")
|
|
|
|
if self._stage["~"]:
|
|
changes.append("directory structure update")
|
|
|
|
if self._stage["+"]:
|
|
suffix = "s" if len(self._stage["+"]) > 1 else ""
|
|
changes.append(f"{len(self._stage['+'])} addition{suffix}")
|
|
if self._stage["-"]:
|
|
suffix = "s" if len(self._stage["-"]) > 1 else ""
|
|
changes.append(f"{len(self._stage['-'])} removal{suffix}")
|
|
|
|
s.append(f" Changes staged for write(): {', '.join(changes)}")
|
|
|
|
if not self._tree.children:
|
|
return "\n".join(s)
|
|
|
|
s.append("\n Managed instances:")
|
|
s.extend([f" {ii}" for ii in self._tree.prettify()])
|
|
|
|
return "\n".join(s)
|
|
|
|
@property
|
|
def UID(self) -> UID:
|
|
"""Return the File-set's UID."""
|
|
return cast(UID, self._uid)
|
|
|
|
@UID.setter
|
|
def UID(self, uid: UID) -> None:
|
|
"""Set the File-set UID.
|
|
|
|
Parameters
|
|
----------
|
|
uid : pydicom.uid.UID
|
|
The UID to use as the new File-set UID.
|
|
"""
|
|
if uid == self._uid:
|
|
return
|
|
|
|
uid = UID(uid)
|
|
assert uid.is_valid
|
|
self._uid = uid
|
|
if self._ds:
|
|
self._ds.file_meta.MediaStorageSOPInstanceUID = uid
|
|
|
|
self._stage["^"] = True
|
|
|
|
def write(
|
|
self,
|
|
path: str | os.PathLike | None = None,
|
|
use_existing: bool = False,
|
|
force_implicit: bool = False,
|
|
) -> None:
|
|
"""Write the File-set, or changes to the File-set, to the file system.
|
|
|
|
.. warning::
|
|
|
|
If modifying an existing File-set it's **strongly recommended**
|
|
that you follow standard data management practices and ensure that
|
|
you have an up-to-date backup of the original data.
|
|
|
|
By default, for both new or existing File-sets, *pydicom* uses the
|
|
following directory structure semantics when writing out changes:
|
|
|
|
* For instances defined using the standard four-levels of directory
|
|
records (i.e. PATIENT/STUDY/SERIES + one of the record types
|
|
such as IMAGE or RT DOSE): ``PTxxxxxx/STxxxxxx/SExxxxxx/`` with a
|
|
filename such as ``IMxxxxxx`` (for IMAGE), where the first two
|
|
characters are dependent on the record type and ``xxxxxx`` is a
|
|
numeric or alphanumeric index.
|
|
* For instances defined using the standard one-level directory record
|
|
(i.e. PALETTE, IMPLANT): a filename such as ``PAxxxxxx`` (for
|
|
PALETTE).
|
|
* For instances defined using PRIVATE directory records then the
|
|
structure will be along the lines of ``P0xxxxxx/P1xxxxxx/P2xxxxxx``
|
|
for PRIVATE/PRIVATE/PRIVATE, ``PTxxxxxx/STxxxxxx/P2xxxxxx`` for
|
|
PATIENT/STUDY/PRIVATE.
|
|
|
|
When only changes to the DICOMDIR file are required or instances have
|
|
only been removed from an existing File-set you can use the
|
|
`use_existing` keyword parameter to keep the existing directory
|
|
structure and only update the DICOMDIR file.
|
|
|
|
Parameters
|
|
----------
|
|
path : str or PathLike, optional
|
|
For new File-sets, the absolute path to the root directory where
|
|
the File-set will be written. Using `path` with an existing
|
|
File-set will raise :class:`ValueError`.
|
|
use_existing : bool, optional
|
|
If ``True`` and no instances have been added to the File-set
|
|
(removals are OK), then only update the DICOMDIR file, keeping
|
|
the current directory structure rather than converting everything
|
|
to the semantics used by *pydicom* for File-sets (default
|
|
``False``).
|
|
force_implicit : bool, optional
|
|
If ``True`` force the DICOMDIR file to be encoded using *Implicit
|
|
VR Little Endian* which is non-conformant to the DICOM Standard
|
|
(default ``False``).
|
|
|
|
Raises
|
|
------
|
|
ValueError
|
|
If `use_existing` is ``True`` but instances have been staged
|
|
for addition to the File-set.
|
|
"""
|
|
if not path and self.path is None:
|
|
raise ValueError(
|
|
"The path to the root directory is required for a new File-set"
|
|
)
|
|
|
|
if path and self.path:
|
|
raise ValueError(
|
|
"The path for an existing File-set cannot be changed, use "
|
|
"'FileSet.copy()' to write the File-set to a new location"
|
|
)
|
|
|
|
if path:
|
|
self._path = Path(path)
|
|
|
|
# Don't write unless changed or new
|
|
if not self.is_staged:
|
|
return
|
|
|
|
# Path to the DICOMDIR file
|
|
p = cast(Path, self._path) / "DICOMDIR"
|
|
|
|
# Re-use the existing directory structure if only moves or removals
|
|
# are required and `use_existing` is True
|
|
major_change = bool(self._stage["+"])
|
|
if use_existing and major_change:
|
|
raise ValueError(
|
|
"'Fileset.write()' called with 'use_existing' but additions "
|
|
"to the File-set's managed instances are staged"
|
|
)
|
|
|
|
if not use_existing:
|
|
major_change |= self._stage["~"]
|
|
|
|
# Worst case scenario if all instances in one directory
|
|
if len(self) > 10**6:
|
|
self._use_alphanumeric = True
|
|
|
|
if len(self) > 35**6:
|
|
raise NotImplementedError(
|
|
"pydicom doesn't support writing File-sets with more than "
|
|
"1838265625 managed instances"
|
|
)
|
|
|
|
# Remove the removals - must be first because the File IDs will be
|
|
# incorrect with the removals still in the tree
|
|
for instance in self._stage["-"].values():
|
|
try:
|
|
Path(instance.path).unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
self._tree.remove(instance.node)
|
|
|
|
if use_existing and not major_change:
|
|
with open(p, "wb") as fp:
|
|
f = DicomFileLike(fp)
|
|
self._write_dicomdir(f, force_implicit=force_implicit)
|
|
|
|
self.load(p, raise_orphans=True)
|
|
|
|
return
|
|
|
|
# We need to be careful not to overwrite the source file
|
|
# for a different (later) instance
|
|
# Check for collisions between the new and old File IDs
|
|
# and copy any to the stage
|
|
fout = {Path(ii.FileID) for ii in self}
|
|
fin = {
|
|
ii.node._file_id for ii in self if ii.SOPInstanceUID not in self._stage["+"]
|
|
}
|
|
collisions = fout & fin
|
|
for instance in [ii for ii in self if ii.node._file_id in collisions]:
|
|
self._stage["+"][instance.SOPInstanceUID] = instance
|
|
instance._apply_stage("+")
|
|
shutil.copyfile(self._path / instance.node._file_id, instance.path)
|
|
|
|
for instance in self:
|
|
dst = self._path / instance.FileID
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
fn: Callable
|
|
if instance.SOPInstanceUID in self._stage["+"]:
|
|
src = instance.path
|
|
fn = shutil.copyfile
|
|
else:
|
|
src = self._path / instance.node._file_id
|
|
fn = shutil.move
|
|
|
|
fn(os.fspath(src), os.fspath(dst))
|
|
instance.node._record.ReferencedFileID = instance.FileID.split(os.path.sep)
|
|
|
|
# Create the DICOMDIR file
|
|
with open(p, "wb") as fp:
|
|
f = DicomFileLike(fp)
|
|
self._write_dicomdir(f, force_implicit=force_implicit)
|
|
|
|
# Reload the File-set
|
|
# We're doing things wrong if we have orphans so raise
|
|
self.load(p, raise_orphans=True)
|
|
|
|
def _write_dicomdir(
|
|
self, fp: DicomFileLike, copy_safe: bool = False, force_implicit: bool = False
|
|
) -> None:
|
|
"""Encode and write the File-set's DICOMDIR dataset.
|
|
|
|
Parameters
|
|
----------
|
|
fp : file-like
|
|
The file-like to write the encoded DICOMDIR dataset to. Must
|
|
have ``write()``, ``tell()`` and ``seek()`` methods.
|
|
copy_safe : bool, optional
|
|
If ``True`` then the function doesn't make any changes to the
|
|
public parts of the current :class:`~pydicom.fileset.FileSet`
|
|
instance.
|
|
force_implicit : bool, optional
|
|
Force encoding the DICOMDIR with 'Implicit VR Little Endian' which
|
|
is non-conformant to the DICOM Standard (default ``False``).
|
|
"""
|
|
ds = self._ds
|
|
if copy_safe or not ds:
|
|
ds = self._create_dicomdir()
|
|
|
|
# By default, always convert to the correct syntax
|
|
ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian
|
|
seq_offset = 12
|
|
if force_implicit:
|
|
ds.file_meta.TransferSyntaxUID = ImplicitVRLittleEndian
|
|
seq_offset = 8
|
|
|
|
fp.is_implicit_VR = ds.file_meta.TransferSyntaxUID.is_implicit_VR
|
|
fp.is_little_endian = ds.file_meta.TransferSyntaxUID.is_little_endian
|
|
|
|
# Reset the offsets
|
|
first_elem = ds[_FIRST_OFFSET]
|
|
first_elem.value = 0
|
|
last_elem = ds[_LAST_OFFSET]
|
|
last_elem.value = 0
|
|
|
|
# Write the preamble, DICM marker and File Meta
|
|
fp.write(b"\x00" * 128 + b"DICM")
|
|
write_file_meta_info(fp, ds.file_meta, enforce_standard=True)
|
|
|
|
# Write the dataset
|
|
# Write up to the *Offset of the First Directory Record...* element
|
|
write_dataset(fp, ds[:0x00041200])
|
|
tell_offset_first = fp.tell() # Start of *Offset of the First...*
|
|
# Write up to (but not including) the *Directory Record Sequence*
|
|
write_dataset(fp, ds[0x00041200:0x00041220])
|
|
|
|
# Rebuild and encode the *Directory Record Sequence*
|
|
# Step 1: Determine the offsets for all the records
|
|
offset = fp.tell() + seq_offset # Start of the first seq. item tag
|
|
for node in self._tree:
|
|
# RecordNode._offset is the start of each record's seq. item tag
|
|
node._offset = offset
|
|
offset += 8 # a sequence item's (tag + length)
|
|
# Copy safe - only modifies RecordNode._offset
|
|
offset += node._encode_record(force_implicit)
|
|
# If the sequence item has undefined length then it uses a
|
|
# sequence item delimiter item
|
|
if node._record.is_undefined_length_sequence_item:
|
|
offset += 8
|
|
|
|
# Step 2: Update the records and add to *Directory Record Sequence*
|
|
ds.DirectoryRecordSequence = []
|
|
for node in self._tree:
|
|
record = node._record
|
|
if not copy_safe:
|
|
node._update_record_offsets()
|
|
else:
|
|
record = copy.deepcopy(record)
|
|
next_elem = record[_NEXT_OFFSET]
|
|
next_elem.value = 0
|
|
if node.next:
|
|
next_elem.value = node.next._offset
|
|
|
|
lower_elem = record[_LOWER_OFFSET]
|
|
lower_elem.value = 0
|
|
if node.children:
|
|
record[_LOWER_OFFSET].value = node.children[0]._offset
|
|
|
|
cast(list[Dataset], ds.DirectoryRecordSequence).append(record)
|
|
|
|
# Step 3: Encode *Directory Record Sequence* and the rest
|
|
write_dataset(fp, ds[0x00041220:])
|
|
|
|
# Update the first and last record offsets
|
|
if self._tree.children:
|
|
first_elem.value = self._tree.children[0]._offset
|
|
last_elem.value = self._tree.children[-1]._offset
|
|
# Re-write the record offset pointer elements
|
|
fp.seek(tell_offset_first)
|
|
write_data_element(fp, first_elem)
|
|
write_data_element(fp, last_elem)
|
|
# Go to the end
|
|
fp.seek(0, 2)
|
|
|
|
|
|
# Functions for creating Directory Records
|
|
def _check_dataset(ds: Dataset, keywords: list[str]) -> None:
|
|
"""Check the dataset module for the Type 1 `keywords`.
|
|
|
|
Parameters
|
|
----------
|
|
ds : pydicom.dataset.Dataset
|
|
The dataset to check.
|
|
keywords : list of str
|
|
The DICOM keywords for Type 1 elements that are to be checked.
|
|
|
|
Raises
|
|
------
|
|
KeyError
|
|
If an element is not in the dataset.
|
|
ValueError
|
|
If the element is present but has no value.
|
|
"""
|
|
for kw in keywords:
|
|
tag = Tag(cast(int, tag_for_keyword(kw)))
|
|
name = dictionary_description(tag)
|
|
if kw not in ds:
|
|
raise ValueError(f"The instance's {tag} '{name}' element is missing")
|
|
|
|
if ds[kw].VM != 0:
|
|
continue
|
|
|
|
raise ValueError(f"The instance's {tag} '{name}' element cannot be empty")
|
|
|
|
|
|
def _define_patient(ds: Dataset) -> Dataset:
|
|
"""Return a PATIENT directory record from `ds`."""
|
|
_check_dataset(ds, ["PatientID"])
|
|
|
|
record = Dataset()
|
|
record.PatientName = ds.get("PatientName")
|
|
record.PatientID = ds.PatientID
|
|
|
|
return record
|
|
|
|
|
|
def _define_study(ds: Dataset) -> Dataset:
|
|
"""Return a STUDY directory record from `ds`."""
|
|
_check_dataset(ds, ["StudyDate", "StudyTime", "StudyID"])
|
|
|
|
record = Dataset()
|
|
record.StudyDate = ds.StudyDate
|
|
record.StudyTime = ds.StudyTime
|
|
record.StudyDescription = ds.get("StudyDescription")
|
|
if "StudyInstanceUID" in ds:
|
|
_check_dataset(ds, ["StudyInstanceUID"])
|
|
record.StudyInstanceUID = ds.StudyInstanceUID
|
|
record.StudyID = ds.StudyID
|
|
record.AccessionNumber = ds.get("AccessionNumber")
|
|
|
|
return record
|
|
|
|
|
|
def _define_series(ds: Dataset) -> Dataset:
|
|
"""Return a SERIES directory record from `ds`."""
|
|
_check_dataset(ds, ["Modality", "SeriesInstanceUID", "SeriesNumber"])
|
|
|
|
record = Dataset()
|
|
record.Modality = ds.Modality
|
|
record.SeriesInstanceUID = ds.SeriesInstanceUID
|
|
record.SeriesNumber = ds.SeriesNumber
|
|
|
|
return record
|
|
|
|
|
|
def _define_image(ds: Dataset) -> Dataset:
|
|
"""Return an IMAGE directory record from `ds`."""
|
|
_check_dataset(ds, ["InstanceNumber"])
|
|
|
|
record = Dataset()
|
|
record.InstanceNumber = ds.InstanceNumber
|
|
|
|
return record
|
|
|
|
|
|
def _define_rt_dose(ds: Dataset) -> Dataset:
|
|
"""Return an RT DOSE directory record from `ds`."""
|
|
_check_dataset(ds, ["InstanceNumber", "DoseSummationType"])
|
|
|
|
record = Dataset()
|
|
record.InstanceNumber = ds.InstanceNumber
|
|
record.DoseSummationType = ds.DoseSummationType
|
|
|
|
return record
|
|
|
|
|
|
def _define_rt_structure_set(ds: Dataset) -> Dataset:
|
|
"""Return an RT STRUCTURE SET directory record from `ds`."""
|
|
_check_dataset(ds, ["InstanceNumber", "StructureSetLabel"])
|
|
|
|
record = Dataset()
|
|
record.InstanceNumber = ds.InstanceNumber
|
|
record.StructureSetLabel = ds.StructureSetLabel
|
|
record.StructureSetDate = ds.get("StructureSetDate")
|
|
record.StructureSetTime = ds.get("StructureSetTime")
|
|
|
|
return record
|
|
|
|
|
|
def _define_rt_plan(ds: Dataset) -> Dataset:
|
|
"""Return an RT PLAN directory record from `ds`."""
|
|
_check_dataset(ds, ["InstanceNumber", "RTPlanLabel"])
|
|
|
|
record = Dataset()
|
|
record.InstanceNumber = ds.InstanceNumber
|
|
record.RTPlanLabel = ds.RTPlanLabel
|
|
record.RTPlanDate = ds.get("RTPlanDate")
|
|
record.RTPlanTime = ds.get("RTPlanTime")
|
|
|
|
return record
|
|
|
|
|
|
def _define_rt_treatment_record(ds: Dataset) -> Dataset:
|
|
"""Return an RT TREAT RECORD directory record from `ds`."""
|
|
_check_dataset(ds, ["InstanceNumber"])
|
|
|
|
record = Dataset()
|
|
record.InstanceNumber = ds.InstanceNumber
|
|
record.TreatmentDate = ds.get("TreatmentDate")
|
|
record.TreatmentTime = ds.get("TreatmentTime")
|
|
|
|
return record
|
|
|
|
|
|
def _define_presentation(ds: Dataset) -> Dataset:
|
|
"""Return a PRESENTATION directory record from `ds`."""
|
|
_check_dataset(
|
|
ds,
|
|
[
|
|
"PresentationCreationDate",
|
|
"PresentationCreationTime",
|
|
"InstanceNumber",
|
|
"ContentLabel",
|
|
],
|
|
)
|
|
|
|
record = Dataset()
|
|
record.PresentationCreationDate = ds.PresentationCreationDate
|
|
record.PresentationCreationTime = ds.PresentationCreationTime
|
|
# Content Identification Macro
|
|
record.InstanceNumber = ds.InstanceNumber
|
|
record.ContentLabel = ds.ContentLabel
|
|
record.ContentDescription = ds.get("ContentDescription")
|
|
record.ContentCreatorName = ds.get("ContentCreatorName")
|
|
if "ReferencedSeriesSequence" in ds:
|
|
_check_dataset(ds, ["ReferencedSeriesSequence"])
|
|
record.ReferencedSeriesSequence = ds.ReferencedSeriesSequence
|
|
if "BlendingSequence" in ds:
|
|
_check_dataset(ds, ["BlendingSequence"])
|
|
record.BlendingSequence = ds.BlendingSequence
|
|
|
|
return record
|
|
|
|
|
|
def _define_sr_document(ds: Dataset) -> Dataset:
|
|
"""Return a SR DOCUMENT directory record from `ds`."""
|
|
_check_dataset(
|
|
ds,
|
|
[
|
|
"InstanceNumber",
|
|
"CompletionFlag",
|
|
"VerificationFlag",
|
|
"ContentDate",
|
|
"ContentTime",
|
|
"ConceptNameCodeSequence",
|
|
],
|
|
)
|
|
|
|
record = Dataset()
|
|
record.InstanceNumber = ds.InstanceNumber
|
|
record.CompletionFlag = ds.CompletionFlag
|
|
record.VerificationFlag = ds.VerificationFlag
|
|
record.ContentDate = ds.ContentDate
|
|
record.ContentTime = ds.ContentTime
|
|
if "VerificationDateTime" in ds:
|
|
_check_dataset(ds, ["VerificationDateTime"])
|
|
record.VerificationDateTime = ds.VerificationDateTime
|
|
record.ConceptNameCodeSequence = ds.ConceptNameCodeSequence
|
|
if "ContentSequence" in ds:
|
|
_check_dataset(ds, ["ContentSequence"])
|
|
record.ContentSequence = ds.ContentSequence
|
|
|
|
return record
|
|
|
|
|
|
def _define_key_object_doc(ds: Dataset) -> Dataset:
|
|
"""Return a KEY OBJECT DOC directory record from `ds`."""
|
|
_check_dataset(
|
|
ds,
|
|
[
|
|
"InstanceNumber",
|
|
"ContentDate",
|
|
"ContentTime",
|
|
"ConceptNameCodeSequence",
|
|
],
|
|
)
|
|
|
|
record = Dataset()
|
|
record.ContentDate = ds.ContentDate
|
|
record.ContentTime = ds.ContentTime
|
|
record.InstanceNumber = ds.InstanceNumber
|
|
record.ConceptNameCodeSequence = ds.ConceptNameCodeSequence
|
|
if "ContentSequence" in ds:
|
|
_check_dataset(ds, ["ContentSequence"])
|
|
record.ContentSequence = ds.ContentSequence
|
|
|
|
return record
|
|
|
|
|
|
def _define_spectroscopy(ds: Dataset) -> Dataset:
|
|
"""Return an SPECTROSCOPY directory record from `ds`."""
|
|
_check_dataset(
|
|
ds,
|
|
[
|
|
"ImageType",
|
|
"ContentDate",
|
|
"ContentTime",
|
|
"InstanceNumber",
|
|
"NumberOfFrames",
|
|
"Rows",
|
|
"Columns",
|
|
"DataPointRows",
|
|
"DataPointColumns",
|
|
],
|
|
)
|
|
|
|
record = Dataset()
|
|
record.ImageType = ds.ImageType
|
|
record.ContentDate = ds.ContentDate
|
|
record.ContentTime = ds.ContentTime
|
|
record.InstanceNumber = ds.InstanceNumber
|
|
if "ReferencedImageEvidenceSequence" in ds:
|
|
_check_dataset(ds, ["ReferencedImageEvidenceSequence"])
|
|
|
|
record.ReferencedImageEvidenceSequence = ds.ReferencedImageEvidenceSequence
|
|
|
|
record.NumberOfFrames = ds.NumberOfFrames
|
|
record.Rows = ds.Rows
|
|
record.Columns = ds.Columns
|
|
record.DataPointRows = ds.DataPointRows
|
|
record.DataPointColumns = ds.DataPointColumns
|
|
|
|
return record
|
|
|
|
|
|
def _define_hanging_protocol(ds: Dataset) -> Dataset:
|
|
"""Return a HANGING PROTOCOL directory record from `ds`."""
|
|
_check_dataset(
|
|
ds,
|
|
[
|
|
"HangingProtocolCreator",
|
|
"HangingProtocolCreationDateTime",
|
|
"HangingProtocolDefinitionSequence",
|
|
"NumberOfPriorsReferenced",
|
|
],
|
|
)
|
|
|
|
record = Dataset()
|
|
record.HangingProtocolCreator = ds.HangingProtocolCreator
|
|
record.HangingProtocolCreationDateTime = ds.HangingProtocolCreationDateTime
|
|
record.HangingProtocolDefinitionSequence = ds.HangingProtocolDefinitionSequence
|
|
record.NumberOfPriorsReferenced = ds.NumberOfPriorsReferenced
|
|
record.HangingProtocolUserIdentificationCodeSequence = ds.get(
|
|
"HangingProtocolUserIdentificationCodeSequence", []
|
|
)
|
|
|
|
return record
|
|
|
|
|
|
def _define_encap_doc(ds: Dataset) -> Dataset:
|
|
"""Return an ENCAP DOC directory record from `ds`."""
|
|
_check_dataset(ds, ["InstanceNumber", "MIMETypeOfEncapsulatedDocument"])
|
|
|
|
record = Dataset()
|
|
record.ContentDate = ds.get("ContentDate")
|
|
record.ContentTime = ds.get("ContentTime")
|
|
record.InstanceNumber = ds.InstanceNumber
|
|
record.DocumentTitle = ds.get("DocumentTitle")
|
|
if "HL7InstanceIdentifier" in ds:
|
|
_check_dataset(ds, ["HL7InstanceIdentifier"])
|
|
record.HL7InstanceIdentifier = ds.HL7InstanceIdentifier
|
|
record.ConceptNameCodeSequence = ds.get("ConceptNameCodeSequence")
|
|
|
|
record.MIMETypeOfEncapsulatedDocument = ds.MIMETypeOfEncapsulatedDocument
|
|
|
|
return record
|
|
|
|
|
|
def _define_palette(ds: Dataset) -> Dataset:
|
|
"""Return a PALETTE directory record from `ds`."""
|
|
_check_dataset(ds, ["ContentLabel"])
|
|
|
|
record = Dataset()
|
|
record.ContentLabel = ds.ContentLabel
|
|
record.ContentDescription = ds.get("ContentDescription")
|
|
|
|
return record
|
|
|
|
|
|
def _define_implant(ds: Dataset) -> Dataset:
|
|
"""Return a IMPLANT directory record from `ds`."""
|
|
_check_dataset(ds, ["Manufacturer", "ImplantName", "ImplantPartNumber"])
|
|
|
|
record = Dataset()
|
|
record.Manufacturer = ds.Manufacturer
|
|
record.ImplantName = ds.ImplantName
|
|
if "ImplantSize" in ds:
|
|
_check_dataset(ds, ["ImplantSize"])
|
|
record.ImplantSize = ds.ImplantSize
|
|
record.ImplantPartNumber = ds.ImplantPartNumber
|
|
|
|
return record
|
|
|
|
|
|
def _define_implant_assy(ds: Dataset) -> Dataset:
|
|
"""Return a IMPLANT ASSY directory record from `ds`."""
|
|
_check_dataset(
|
|
ds, ["ImplantAssemblyTemplateName", "Manufacturer", "ProcedureTypeCodeSequence"]
|
|
)
|
|
|
|
record = Dataset()
|
|
record.ImplantAssemblyTemplateName = ds.ImplantAssemblyTemplateName
|
|
record.Manufacturer = ds.Manufacturer
|
|
record.ProcedureTypeCodeSequence = ds.ProcedureTypeCodeSequence
|
|
|
|
return record
|
|
|
|
|
|
def _define_implant_group(ds: Dataset) -> Dataset:
|
|
"""Return a IMPLANT GROUP directory record from `ds`."""
|
|
_check_dataset(ds, ["ImplantTemplateGroupName", "ImplantTemplateGroupIssuer"])
|
|
|
|
record = Dataset()
|
|
record.ImplantTemplateGroupName = ds.ImplantTemplateGroupName
|
|
record.ImplantTemplateGroupIssuer = ds.ImplantTemplateGroupIssuer
|
|
|
|
return record
|
|
|
|
|
|
def _define_surface_scan(ds: Dataset) -> Dataset:
|
|
"""Return a SURFACE SCAN directory record from `ds`."""
|
|
_check_dataset(ds, ["ContentDate", "ContentTime"])
|
|
|
|
record = Dataset()
|
|
record.ContentDate = ds.ContentDate
|
|
record.ContentTime = ds.ContentTime
|
|
|
|
return record
|
|
|
|
|
|
def _define_assessment(ds: Dataset) -> Dataset:
|
|
"""Return a ASSESSMENT directory record from `ds`."""
|
|
_check_dataset(ds, ["InstanceNumber", "InstanceCreationDate"])
|
|
|
|
record = Dataset()
|
|
record.InstanceNumber = ds.InstanceNumber
|
|
record.InstanceCreationDate = ds.InstanceCreationDate
|
|
record.InstanceCreationTime = ds.get("InstanceCreationTime")
|
|
|
|
return record
|
|
|
|
|
|
def _define_radiotherapy(ds: Dataset) -> Dataset:
|
|
"""Return a RADIOTHERAPY directory record from `ds`."""
|
|
_check_dataset(ds, ["InstanceNumber"])
|
|
|
|
record = Dataset()
|
|
record.InstanceNumber = ds.InstanceNumber
|
|
if "UserContentLabel" in ds:
|
|
_check_dataset(ds, ["UserContentLabel"])
|
|
record.UserContentLabel = ds.UserContentLabel
|
|
if "UserContentLongLabel" in ds:
|
|
_check_dataset(ds, ["UserContentLongLabel"])
|
|
record.UserContentLongLabel = ds.UserContentLongLabel
|
|
|
|
record.ContentDescription = ds.get("ContentDescription")
|
|
record.ContentCreatorName = ds.get("ContentCreatorName")
|
|
|
|
return record
|
|
|
|
|
|
def _define_generic_content(ds: Dataset) -> Dataset:
|
|
"""Return a WAVEFORM/RAW DATA directory record from `ds`."""
|
|
_check_dataset(ds, ["InstanceNumber", "ContentDate", "ContentTime"])
|
|
|
|
record = Dataset()
|
|
record.InstanceNumber = ds.InstanceNumber
|
|
record.ContentDate = ds.ContentDate
|
|
record.ContentTime = ds.ContentTime
|
|
|
|
return record
|
|
|
|
|
|
def _define_generic_content_id(ds: Dataset) -> Dataset:
|
|
"""Return a generic content identification directory record from `ds`."""
|
|
_check_dataset(ds, ["InstanceNumber", "ContentDate", "ContentTime", "ContentLabel"])
|
|
|
|
# Content Identification Macro
|
|
record = Dataset()
|
|
record.InstanceNumber = ds.InstanceNumber
|
|
record.ContentDate = ds.ContentDate
|
|
record.ContentTime = ds.ContentTime
|
|
record.ContentLabel = ds.ContentLabel
|
|
record.ContentDescription = ds.get("ContentDescription")
|
|
record.ContentCreatorName = ds.get("ContentCreatorName")
|
|
|
|
return record
|
|
|
|
|
|
def _define_empty(ds: Dataset) -> Dataset:
|
|
"""Return an empty directory record from `ds`."""
|
|
return Dataset()
|
|
|
|
|
|
DIRECTORY_RECORDERS = {
|
|
"PATIENT": _define_patient, # TOP LEVEL
|
|
"STUDY": _define_study, # INTERMEDIATE or LEAF
|
|
"SERIES": _define_series, # INTERMEDIATE
|
|
"IMAGE": _define_image, # LEAF
|
|
"RT DOSE": _define_rt_dose, # LEAF
|
|
"RT STRUCTURE SET": _define_rt_structure_set, # LEAF
|
|
"RT PLAN": _define_rt_plan, # LEAF
|
|
"RT TREAT RECORD": _define_rt_treatment_record, # LEAF
|
|
"PRESENTATION": _define_presentation, # LEAF
|
|
"WAVEFORM": _define_generic_content, # LEAF
|
|
"SR DOCUMENT": _define_sr_document, # LEAF
|
|
"KEY OBJECT DOC": _define_key_object_doc, # LEAF
|
|
"SPECTROSCOPY": _define_spectroscopy, # LEAF
|
|
"RAW DATA": _define_generic_content, # LEAF
|
|
"REGISTRATION": _define_generic_content_id, # LEAF
|
|
"FIDUCIAL": _define_generic_content_id, # LEAF
|
|
"HANGING PROTOCOL": _define_hanging_protocol, # TOP LEVEL and LEAF
|
|
"ENCAP DOC": _define_encap_doc, # LEAF
|
|
"VALUE MAP": _define_generic_content_id, # LEAF
|
|
"STEREOMETRIC": _define_empty, # LEAF
|
|
"PALETTE": _define_palette, # TOP LEVEL and LEAF
|
|
"IMPLANT": _define_implant, # TOP LEVEL and LEAF
|
|
"IMPLANT ASSY": _define_implant_assy, # TOP LEVEL and LEAF
|
|
"IMPLANT GROUP": _define_implant_group, # TOP LEVEL and LEAF
|
|
"PLAN": _define_empty, # LEAF
|
|
"MEASUREMENT": _define_generic_content_id, # LEAF
|
|
"SURFACE": _define_generic_content_id, # LEAF
|
|
"SURFACE SCAN": _define_surface_scan, # LEAF
|
|
"TRACT": _define_generic_content_id, # LEAF
|
|
"ASSESSMENT": _define_assessment, # LEAF
|
|
"RADIOTHERAPY": _define_radiotherapy, # LEAF
|
|
}
|
|
"""A :class:`dict` containing the directory record creation functions.
|
|
|
|
The functions are used to create non-PRIVATE records for a given SOP Instance
|
|
as ``{"RECORD TYPE": callable}``, where ``"RECORD TYPE"`` should match one of
|
|
the allowable values - except PRIVATE - for (0004,1430) *Directory Record
|
|
Type*. By overriding the function for a given record type you can customize
|
|
the directory records that will be included in the DICOMDIR file.
|
|
|
|
Example
|
|
-------
|
|
|
|
.. code-block:: python
|
|
|
|
from pydicom.fileset import DIRECTORY_RECORDERS, FileSet
|
|
|
|
def my_recorder(ds: Dataset) -> Dataset:
|
|
record = Dataset()
|
|
record.OffsetOfTheNextDirectoryRecord = 0
|
|
record.RecordInUseFlag = 0xFFFF
|
|
record.OffsetOfReferencedLowerLevelDirectoryEntity = 0
|
|
record.DirectoryRecordType = "PATIENT"
|
|
if "SpecificCharacterSet" in ds:
|
|
record.SpecificCharacterSet = ds.SpecificCharacterSet
|
|
|
|
record.PatientName = ds.get("PatientName")
|
|
record.PatientID = ds.PatientID
|
|
|
|
return record
|
|
|
|
DIRECTORY_RECORDERS["PATIENT"] = my_recorder
|
|
|
|
# Use the updated directory recorder
|
|
fs = FileSet()
|
|
fs.add('my_instance.dcm')
|
|
|
|
The function should take a single parameter which is the SOP Instance to be
|
|
added to the File-set as a :class:`~pydicom.dataset.Dataset` and return a
|
|
:class:`~pydicom.dataset.Dataset` with a single directory record matching the
|
|
directory record type. See :dcm:`Annex F.3.2.2<chtml/part03/sect_F.3.2.2.html>`
|
|
for possible record types.
|
|
|
|
For PRIVATE records you must use the
|
|
:meth:`~pydicom.fileset.FileSet.add_custom` method instead.
|
|
"""
|
|
_SINGLE_LEVEL_SOP_CLASSES = {
|
|
sop.HangingProtocolStorage: "HANGING PROTOCOL",
|
|
sop.ColorPaletteStorage: "PALETTE",
|
|
sop.GenericImplantTemplateStorage: "IMPLANT",
|
|
sop.ImplantAssemblyTemplateStorage: "IMPLANT ASSY",
|
|
sop.ImplantTemplateGroupStorage: "IMPLANT GROUP",
|
|
}
|
|
_FOUR_LEVEL_SOP_CLASSES = {
|
|
sop.RTDoseStorage: "RT DOSE",
|
|
sop.RTStructureSetStorage: "RT STRUCTURE SET",
|
|
sop.RTBeamsTreatmentRecordStorage: "RT TREAT RECORD",
|
|
sop.RTBrachyTreatmentRecordStorage: "RT TREAT RECORD",
|
|
sop.RTTreatmentSummaryRecordStorage: "RT TREAT RECORD",
|
|
sop.RTIonBeamsTreatmentRecordStorage: "RT TREAT RECORD",
|
|
sop.GrayscaleSoftcopyPresentationStateStorage: "PRESENTATION",
|
|
sop.ColorSoftcopyPresentationStateStorage: "PRESENTATION",
|
|
sop.PseudoColorSoftcopyPresentationStateStorage: "PRESENTATION",
|
|
sop.BlendingSoftcopyPresentationStateStorage: "PRESENTATION",
|
|
sop.XAXRFGrayscaleSoftcopyPresentationStateStorage: "PRESENTATION",
|
|
sop.BasicStructuredDisplayStorage: "PRESENTATION",
|
|
sop.BasicVoiceAudioWaveformStorage: "WAVEFORM",
|
|
sop.TwelveLeadECGWaveformStorage: "WAVEFORM",
|
|
sop.GeneralECGWaveformStorage: "WAVEFORM",
|
|
sop.AmbulatoryECGWaveformStorage: "WAVEFORM",
|
|
sop.HemodynamicWaveformStorage: "WAVEFORM",
|
|
sop.CardiacElectrophysiologyWaveformStorage: "WAVEFORM",
|
|
sop.ArterialPulseWaveformStorage: "WAVEFORM",
|
|
sop.RespiratoryWaveformStorage: "WAVEFORM",
|
|
sop.GeneralAudioWaveformStorage: "WAVEFORM",
|
|
sop.RoutineScalpElectroencephalogramWaveformStorage: "WAVEFORM",
|
|
sop.ElectromyogramWaveformStorage: "WAVEFORM",
|
|
sop.ElectrooculogramWaveformStorage: "WAVEFORM",
|
|
sop.SleepElectroencephalogramWaveformStorage: "WAVEFORM",
|
|
sop.MultichannelRespiratoryWaveformStorage: "WAVEFORM",
|
|
sop.BodyPositionWaveformStorage: "WAVEFORM",
|
|
sop.BasicTextSRStorage: "SR DOCUMENT",
|
|
sop.EnhancedSRStorage: "SR DOCUMENT",
|
|
sop.ComprehensiveSRStorage: "SR DOCUMENT",
|
|
sop.MammographyCADSRStorage: "SR DOCUMENT",
|
|
sop.ChestCADSRStorage: "SR DOCUMENT",
|
|
sop.ProcedureLogStorage: "SR DOCUMENT",
|
|
sop.XRayRadiationDoseSRStorage: "SR DOCUMENT",
|
|
sop.SpectaclePrescriptionReportStorage: "SR DOCUMENT",
|
|
sop.ColonCADSRStorage: "SR DOCUMENT",
|
|
sop.MacularGridThicknessAndVolumeReportStorage: "SR DOCUMENT",
|
|
sop.ImplantationPlanSRStorage: "SR DOCUMENT",
|
|
sop.Comprehensive3DSRStorage: "SR DOCUMENT",
|
|
sop.RadiopharmaceuticalRadiationDoseSRStorage: "SR DOCUMENT",
|
|
sop.ExtensibleSRStorage: "SR DOCUMENT",
|
|
sop.AcquisitionContextSRStorage: "SR DOCUMENT",
|
|
sop.SimplifiedAdultEchoSRStorage: "SR DOCUMENT",
|
|
sop.PatientRadiationDoseSRStorage: "SR DOCUMENT",
|
|
sop.PlannedImagingAgentAdministrationSRStorage: "SR DOCUMENT",
|
|
sop.PerformedImagingAgentAdministrationSRStorage: "SR DOCUMENT",
|
|
sop.KeyObjectSelectionDocumentStorage: "KEY OBJECT DOC",
|
|
sop.MRSpectroscopyStorage: "SPECTROSCOPY",
|
|
sop.RawDataStorage: "RAW DATA",
|
|
sop.SpatialRegistrationStorage: "REGISTRATION",
|
|
sop.DeformableSpatialRegistrationStorage: "REGISTRATION",
|
|
sop.SpatialFiducialsStorage: "FIDUCIAL",
|
|
sop.RealWorldValueMappingStorage: "VALUE MAP",
|
|
sop.StereometricRelationshipStorage: "STEREOMETRIC",
|
|
sop.LensometryMeasurementsStorage: "MEASUREMENT",
|
|
sop.AutorefractionMeasurementsStorage: "MEASUREMENT",
|
|
sop.KeratometryMeasurementsStorage: "MEASUREMENT",
|
|
sop.SubjectiveRefractionMeasurementsStorage: "MEASUREMENT",
|
|
sop.VisualAcuityMeasurementsStorage: "MEASUREMENT",
|
|
sop.OphthalmicAxialMeasurementsStorage: "MEASUREMENT",
|
|
sop.OphthalmicVisualFieldStaticPerimetryMeasurementsStorage: "MEASUREMENT",
|
|
sop.SurfaceSegmentationStorage: "SURFACE",
|
|
sop.SurfaceScanMeshStorage: "SURFACE SCAN",
|
|
sop.SurfaceScanPointCloudStorage: "SURFACE SCAN",
|
|
sop.TractographyResultsStorage: "TRACT",
|
|
sop.ContentAssessmentResultsStorage: "ASSESSMENT",
|
|
}
|
|
|
|
|
|
def _single_level_record_type(ds: Dataset) -> str:
|
|
"""Return a single-level *Directory Record Type* for `ds`."""
|
|
sop_class = cast(UID | None, getattr(ds, "SOPClassUID", None))
|
|
|
|
try:
|
|
return _SINGLE_LEVEL_SOP_CLASSES[sop_class] # type: ignore[index]
|
|
except KeyError:
|
|
return "PATIENT"
|
|
|
|
|
|
def _four_level_record_type(ds: Dataset) -> str:
|
|
"""Return the fourth-level *Directory Record Type* for `ds`."""
|
|
modality = getattr(ds, "Modality", None)
|
|
if modality in ["RTINTENT", "RTSEGANN", "RTRAD"]:
|
|
return "RADIOTHERAPY"
|
|
|
|
if modality == "PLAN":
|
|
return "PLAN"
|
|
|
|
if "EncapsulatedDocument" in ds:
|
|
return "ENCAP DOC"
|
|
|
|
if "RTPlanLabel" in ds:
|
|
return "RT PLAN"
|
|
|
|
sop_class = cast(UID | None, getattr(ds, "SOPClassUID", None))
|
|
|
|
try:
|
|
return _FOUR_LEVEL_SOP_CLASSES[sop_class] # type: ignore[index]
|
|
except KeyError:
|
|
return "IMAGE"
|