Initial commit

This commit is contained in:
René Mathieu
2026-01-17 13:49:51 +01:00
commit 0fef8d96c5
1897 changed files with 396119 additions and 0 deletions

View File

@@ -0,0 +1,529 @@
# Copyright 2008-2021 pydicom authors. See LICENSE file for details.
"""
Produce runnable python code which can recreate DICOM objects or files.
Can run as a script to produce code for an entire file,
or import and use specific functions to provide code for pydicom DICOM classes
"""
# Run this from the same directory as a "base" dicom file and
# this code will output to screen the dicom parameters like:
# ds.PatientName = 'TEST'
# etc for all parameters in the file.
# This can then be pasted into a python file and parameters edited as necessary
# to create a DICOM file from scratch
import argparse
import os.path
import re
import sys
from typing import cast
from collections.abc import Callable
from collections import deque
import pydicom
from pydicom.datadict import dictionary_keyword
from pydicom.dataelem import DataElement
from pydicom.dataset import Dataset
from pydicom.tag import BaseTag
from pydicom.valuerep import BYTES_VR, AMBIGUOUS_VR, VR
from pydicom.cli.main import filespec_help, filespec_parser
line_term = "\n"
# Precompiled search patterns for camel_to_underscore()
first_cap_re = re.compile("(.)([A-Z][a-z]+)")
all_cap_re = re.compile("([a-z0-9])([A-Z])")
def camel_to_underscore(name: str) -> str:
"""Convert name from CamelCase to lower_case_with_underscores"""
# From https://stackoverflow.com/questions/1175208
s1 = first_cap_re.sub(r"\1_\2", name)
return all_cap_re.sub(r"\1_\2", s1).lower()
def tag_repr(tag: BaseTag) -> str:
"""String of tag value as (0xgggg, 0xeeee)"""
return f"(0x{tag.group:04X}, 0x{tag.element:04X})"
def default_name_filter(name: str) -> str:
"""Callable to reduce some names in code to more readable short form
:arg name: a sequence variable name or sequence item name
:return: a shorter version of name if a known conversion,
else return original name
"""
name = camel_to_underscore(name)
name = name.replace("control_point", "cp")
name = name.replace("reference", "ref")
name = name.replace("fraction_group", "frxn_gp")
return name
# Functions to produce python code
def code_imports() -> str:
"""Code the import statements needed by other codify results
:return: a string of import statement lines
"""
line1 = "import pydicom"
line2 = "from pydicom.dataset import Dataset, FileMetaDataset"
line3 = "from pydicom.sequence import Sequence"
return line_term.join((line1, line2, line3))
def code_dataelem(
dataelem: DataElement,
dataset_name: str = "ds",
exclude_size: int | None = None,
include_private: bool = False,
var_names: deque | None = None,
) -> str:
"""Code lines for a single DICOM data element
Parameters
----------
dataelem : DataElement
The DataElement instance to turn into code
dataset_name : str
The variable name of the Dataset containing `dataelem`
exclude_size : int | None
If specified, values longer than this (in bytes)
will only have a commented string for a value,
causing a syntax error when the code is run,
and thus prompting the user to remove or fix that line.
var_names: deque | None
Used internally to ensure unique variable names in nested sequences.
Returns
-------
str
A string containing code to recreate the data element
If the data element is a sequence, calls code_sequence
"""
if dataelem.VR == VR.SQ:
return code_sequence(
dataelem, dataset_name, exclude_size, include_private, var_names=var_names
)
# If in DICOM dictionary, set using the keyword
# If not (e.g. is private element), set using add_new method
have_keyword = True
try:
keyword = dictionary_keyword(dataelem.tag)
except KeyError:
have_keyword = False
# If the value representation of the data element is AT (Attribute Tag),
# then format it as a tag
if dataelem.VR == "AT":
valuerep = tag_repr(dataelem.value)
else:
valuerep = repr(dataelem.value)
if exclude_size:
if (
dataelem.VR in (BYTES_VR | AMBIGUOUS_VR) - {VR.US_SS}
and not isinstance(dataelem.value, int | float)
and len(dataelem.value) > exclude_size
):
valuerep = f"# XXX Array of {len(dataelem.value)} bytes excluded"
if have_keyword:
line = f"{dataset_name}.{keyword} = {valuerep}"
else:
tag = tag_repr(dataelem.tag)
vr = dataelem.VR
line = f"{dataset_name}.add_new({tag}, '{vr}', {valuerep})"
return line
def code_sequence(
dataelem: DataElement,
dataset_name: str = "ds",
exclude_size: int | None = None,
include_private: bool = False,
name_filter: Callable[[str], str] = default_name_filter,
var_names: deque | None = None,
) -> str:
"""Code lines for recreating a Sequence data element
Parameters
----------
dataelem : DataElement
The DataElement instance whose value is the Sequence
dataset_name : str
Variable name of the dataset containing the Sequence
exclude_size : int, optional
If not ``None``, values longer than this (in bytes) will only have a
commented string for a value, causing a syntax error when the code is
run, and thus prompting the user to remove or fix that line.
include_private: bool
If ``False`` (default) private elements are skipped, otherwise private
data elements will be coded.
name_filter: Callable[[str], str]
A callable taking a sequence name or sequence item name, and returning
a shorter name for easier code reading
var_names: deque | None
Used internally to ensure unique variable names in nested sequences.
Returns
-------
str
A string containing code lines to recreate a DICOM sequence
"""
# Normally var_names is given from code_dataset, but for some tests need
# to initialize it
if var_names is None:
var_names = deque()
def unique_name(name: str) -> str:
name_count = (
cast(deque, var_names).count(name) - 1
) # type:ignore[redundant-cast]
return name if name_count == 0 else name + f"_{name_count}"
lines = []
seq = dataelem.value
seq_name = dataelem.name
seq_item_name = seq_name.replace(" Sequence", "")
try:
seq_keyword = dictionary_keyword(dataelem.tag)
except KeyError:
seq_keyword = f"Tag{dataelem.tag:08x}"
# Create comment line to document the start of Sequence
lines.append("")
lines.append("# " + seq_name)
# Code line to create a new Sequence object
seq_var = name_filter(seq_keyword)
var_names.append(seq_var)
orig_seq_var = seq_var
seq_var = unique_name(seq_var)
lines.append(seq_var + " = Sequence()")
# Code line to add the sequence to its parent
lines.append(dataset_name + "." + seq_keyword + " = " + seq_var)
# Code lines to add sequence items to the Sequence
for i, ds in enumerate(seq):
# Determine index to use. If seq item has a data element with 'Index',
# use that; if one with 'Number', use that, else start at 1
index_keyword = seq_keyword.replace("Sequence", "") + "Index"
number_keyword = seq_keyword.replace("Sequence", "") + "Number"
if hasattr(ds, index_keyword):
index_str = str(getattr(ds, index_keyword))
elif hasattr(ds, number_keyword):
index_str = str(getattr(ds, number_keyword))
else:
index_str = str(i + 1)
# Code comment line to mark start of sequence item
lines.append("")
lines.append("# " + seq_name + ": " + seq_item_name + " " + index_str)
# Determine the variable name to use for the sequence item (dataset)
ds_name = orig_seq_var.replace("_sequence", "") + index_str
# Append "_#" if name already in use (in parent sequences)
var_names.append(ds_name)
ds_name = unique_name(ds_name)
# Code the sequence item dataset
code_item = code_dataset(
ds, ds_name, exclude_size, include_private, var_names=var_names
)
# Remove variable name from stored list, this dataset complete
var_names.pop()
# Code dataset creation and appending that to sequence, then the rest
# This keeps the logic close together, rather than after many items set
code_split = code_item.splitlines()
lines.append(code_split[0]) # "<ds_name> = Dataset()"
lines.append(f"{seq_var}.append({ds_name})")
lines.extend(code_split[1:])
# Remove sequence variable name we've used
var_names.pop()
# Join the lines and return a single string
return line_term.join(lines)
def code_dataset(
ds: Dataset,
dataset_name: str = "ds",
exclude_size: int | None = None,
include_private: bool = False,
is_file_meta: bool = False,
var_names: deque | None = None,
) -> str:
"""Return Python code for creating `ds`.
Parameters
----------
ds : pydicom.dataset.Dataset
The dataset to codify.
dataset_name : str, optional
The Python variable name to use for the dataset, default ``'ds'``.
exclude_size : int, optional
If not ``None``, values longer than this (in bytes) will only have a
commented string for a value, causing a syntax error when the code is
run, and thus prompting the user to remove or fix that line.
include_private : bool, optional
If ``False`` (default) private elements are skipped, otherwise private
data elements will be coded.
is_file_meta : bool, optional
``True`` if `ds` contains file meta information elements.
var_names: deque, optional
Used internally to ensure unique variable names in nested sequences.
Returns
-------
str
The codified dataset.
"""
if var_names is None:
var_names = deque()
lines = []
ds_class = " = FileMetaDataset()" if is_file_meta else " = Dataset()"
lines.append(dataset_name + ds_class)
for dataelem in ds:
# If a private data element and flag says so, skip it and go to next
if not include_private and dataelem.tag.is_private:
continue
# Otherwise code the line and add it to the lines list
code_line = code_dataelem(
dataelem, dataset_name, exclude_size, include_private, var_names=var_names
)
lines.append(code_line)
# Add blank line if just coded a sequence
if dataelem.VR == VR.SQ:
lines.append("")
# If sequence was end of this dataset, remove the extra blank line
if len(lines) and lines[-1] == "":
lines.pop()
# Join all the code lines and return them
return line_term.join(lines)
def code_file(
filename: str, exclude_size: int | None = None, include_private: bool = False
) -> str:
"""Write a complete source code file to recreate a DICOM file
Parameters
----------
filename : str
Complete path and filename of a DICOM file to convert
exclude_size : int |None
If not None, values longer than this (in bytes)
will only have a commented string for a value,
causing a syntax error when the code is run,
and thus prompting the user to remove or fix that line.
include_private : bool
If ``False`` (default), private elements are skipped
If ``True``, private data elements will be coded.
Returns
-------
str
A string containing code lines to recreate the entire DICOM file
"""
ds = pydicom.dcmread(filename, force=True)
return code_file_from_dataset(ds, exclude_size, include_private)
def code_file_from_dataset(
ds: Dataset, exclude_size: int | None = None, include_private: bool = False
) -> str:
"""Write a complete source code file to recreate a DICOM file
Parameters
----------
ds : Dataset
A pydicom Dataset to convert
exclude_size : int |None
If not None, values longer than this (in bytes)
will only have a commented string for a value,
causing a syntax error when the code is run,
and thus prompting the user to remove or fix that line.
include_private : bool
If ``False`` (default), private elements are skipped
If ``True``, private data elements will be coded.
Returns
-------
str
A string containing code lines to recreate the entire DICOM file
"""
lines = []
# Code a nice header for the python file
filename = ds.get("filename")
identifier = f"DICOM file '{filename}'" if filename else "non-file dataset"
lines.append("# -*- coding: utf-8 -*-")
lines.append(f"# Coded version of {identifier}")
lines.append("# Produced by pydicom codify utility script")
# Code the necessary imports
lines.append(code_imports())
lines.append("")
# Code the file_meta information
if hasattr(ds, "file_meta"):
lines.append("# File meta info data elements")
code_meta = code_dataset(
ds.file_meta,
"file_meta",
exclude_size,
include_private,
is_file_meta=True,
)
lines.append(code_meta)
lines.append("")
# Code the main dataset
lines.append("# Main data elements")
code_ds = code_dataset(
ds, exclude_size=exclude_size, include_private=include_private
)
lines.append(code_ds)
lines.append("")
# Add the file meta to the dataset, and set transfer syntax
if hasattr(ds, "file_meta"):
lines.append("ds.file_meta = file_meta")
implicit_vr, little_endian = ds.original_encoding
lines.append(f"ds.set_original_encoding({implicit_vr}, {little_endian})")
# Return the complete code string
return line_term.join(lines)
def set_parser_arguments(
parser: argparse.ArgumentParser, default_exclude_size: int
) -> None:
parser.add_argument(
"filespec",
help=filespec_help,
type=filespec_parser,
)
parser.add_argument(
"outfile",
nargs="?",
type=argparse.FileType("w", encoding="UTF-8"),
help=(
"Filename to write Python code to, if not specified then code is "
"written to stdout"
),
default=sys.stdout,
)
parser.add_argument(
"-e",
"--exclude-size",
type=int,
default=default_exclude_size,
help=(
"Exclude binary data larger than specified (default: "
f"{default_exclude_size} bytes)"
),
)
parser.add_argument(
"-p",
"--include-private",
action="store_true",
help="Include private data elements (default is to exclude them)",
)
parser.add_argument(
"-s",
"--save-as",
help=(
"Specify the filename for ds.save_as(save_filename); "
"otherwise the input name + '_from_codify' will be used"
),
)
def do_codify(args: argparse.Namespace) -> None:
# Convert the requested dataset to python/pydicom code lines
if len(args.filespec) != 1:
raise NotImplementedError("Codify can only work on a single DICOM file input")
ds, element = args.filespec[0]
filename = ds.filename
if element and not isinstance(element, Dataset):
raise NotImplementedError(
f"Codify can only code a Dataset, not a {type(element)}"
)
code_str = code_file_from_dataset(
element or ds, args.exclude_size, args.include_private
)
# If requested, write a code line to save the dataset
if args.save_as:
save_as_filename = args.save_as
else:
base, _ = os.path.splitext(filename)
save_as_filename = base + "_from_codify" + ".dcm"
save_line = f"\nds.save_as(r'{save_as_filename}', enforce_file_format=True)"
code_str += save_line
# Write the code lines to specified file or to standard output
# For test_util, captured output .name throws error, ignore it:
try:
if args.outfile.name != "<stdout>":
print(f"Writing code to file '{args.outfile.name}'")
except AttributeError:
pass
args.outfile.write(code_str)
def main(default_exclude_size: int, args: list[str] | None = None) -> None:
"""Create Python code according to user options
Parameters:
-----------
default_exclude_size : int
Values longer than this will be coded as a commented syntax error
args : List[str], optional
Command-line arguments to parse. If ``None`` then :attr:`sys.argv` is
used.
"""
parser = argparse.ArgumentParser(
description="Produce python/pydicom code from a DICOM file",
epilog=(
"Binary data (e.g. pixels) larger than --exclude-size "
f"(default {default_exclude_size} bytes) is not included. A "
"dummy line with a syntax error is produced. "
"Private data elements are not included by default."
),
)
set_parser_arguments(parser, default_exclude_size)
do_codify(parser.parse_args(args))
if __name__ == "__main__": # pragma: no cover
main(default_exclude_size=100)

View File

@@ -0,0 +1,135 @@
# Copyright 2008-2021 pydicom authors. See LICENSE file for details.
"""Utility functions used in debugging writing and reading"""
from io import BytesIO
import os
import sys
from typing import BinaryIO, TYPE_CHECKING
from pydicom.valuerep import VR
if TYPE_CHECKING: # pragma: no cover
from pydicom.dataset import Dataset
def print_character(ordchr: int) -> str:
"""Return a printable character, or '.' for non-printable ones."""
if 31 < ordchr < 126 and ordchr != 92:
return chr(ordchr)
return "."
def filedump(
filename: str | bytes | os.PathLike,
start_address: int = 0,
stop_address: int | None = None,
) -> str:
"""Dump out the contents of a file to a standard hex dump 16 bytes wide"""
with open(filename, "rb") as f:
return hexdump(f, start_address, stop_address)
def datadump(
data: bytes, start_address: int = 0, stop_address: int | None = None
) -> str:
"""Return a hex string representation of `data`."""
return hexdump(BytesIO(data), start_address, stop_address)
def hexdump(
f: BinaryIO,
start_address: int = 0,
stop_address: int | None = None,
show_address: bool = True,
) -> str:
"""Return a formatted string of hex bytes and characters in data.
This is a utility function for debugging file writing.
Parameters
----------
f : BinaryIO
The file-like to dump.
start_address : int, optional
The offset where the dump should start (default ``0``)
stop_address : int, optional
The offset where the dump should end, by default the entire file will
be dumped.
show_address : bool, optional
If ``True`` (default) then include the offset of each line of output.
Returns
-------
str
"""
s = []
# Determine the maximum number of characters for the offset
max_offset_len = len(f"{f.seek(0, 2):X}")
if stop_address:
max_offset_len = len(f"{stop_address:X}")
f.seek(start_address)
while True:
offset = f.tell()
if stop_address and offset > stop_address:
break
data = f.read(16)
if not data:
break
current = []
if show_address:
# Offset at the start of the current line
current.append(f"{offset:0{max_offset_len}X} ")
# Add hex version of the current line
b = " ".join([f"{x:02X}" for x in data])
current.append(f"{b:<49}") # if fewer than 16 bytes, pad out to length
# Append the ASCII version of the current line (or . if not ASCII)
current.append("".join([print_character(x) for x in data]))
s.append("".join(current))
return "\n".join(s)
def pretty_print(
ds: "Dataset", indent_level: int = 0, indent_chars: str = " "
) -> None:
"""Print a dataset directly, with indented levels.
This is just like Dataset._pretty_str, but more useful for debugging as it
prints each item immediately rather than composing a string, making it
easier to immediately see where an error in processing a dataset starts.
"""
indent = indent_chars * indent_level
next_indent = indent_chars * (indent_level + 1)
for elem in ds:
if elem.VR == VR.SQ: # a sequence
print(f"{indent}{elem.tag} {elem.name} -- {len(elem.value)} item(s)")
for dataset in elem.value:
pretty_print(dataset, indent_level + 1)
print(next_indent + "---------")
else:
print(indent + repr(elem))
if __name__ == "__main__": # pragma: no cover
filename = sys.argv[1]
start_address = 0
stop_address = None
if len(sys.argv) > 2: # then have start address
start_address = eval(sys.argv[2])
if len(sys.argv) > 3:
stop_address = eval(sys.argv[3])
print(filedump(filename, start_address, stop_address))

View File

@@ -0,0 +1,123 @@
# Copyright 2008-2021 pydicom authors. See LICENSE file for details.
"""Code to fix non-standard dicom issues in files
"""
from typing import TYPE_CHECKING, Any
from pydicom import config
from pydicom import datadict
from pydicom import values
from pydicom.valuerep import VR
if TYPE_CHECKING: # pragma: no cover
from pydicom.dataelem import RawDataElement
def fix_separator_callback(
raw_elem: "RawDataElement", **kwargs: Any
) -> "RawDataElement":
"""Used by fix_separator as the callback function from read_dataset"""
return_val = raw_elem
try_replace = False
# If elements are implicit VR, attempt to determine the VR
if raw_elem.VR is None:
try:
vr = datadict.dictionary_VR(raw_elem.tag)
# Not in the dictionary, process if flag says to do so
except KeyError:
try_replace = kwargs["process_unknown_VRs"]
else:
try_replace = vr in kwargs["for_VRs"]
else:
try_replace = raw_elem.VR in kwargs["for_VRs"]
if try_replace:
# Note value has not been decoded yet when this function called,
# so need to replace backslash as bytes
new_value = None
if raw_elem.value is not None:
if kwargs["invalid_separator"] == b" ":
stripped_val = raw_elem.value.strip()
strip_count = len(raw_elem.value) - len(stripped_val)
new_value = (
stripped_val.replace(kwargs["invalid_separator"], b"\\")
+ b" " * strip_count
)
else:
new_value = raw_elem.value.replace(kwargs["invalid_separator"], b"\\")
return_val = raw_elem._replace(value=new_value)
return return_val
def fix_separator(
invalid_separator: bytes,
for_VRs: tuple[str, ...] = ("DS", "IS"),
process_unknown_VRs: bool = True,
) -> None:
"""A callback function to fix RawDataElement values using
some other separator than the dicom standard backslash character
Parameters
----------
invalid_separator : bytes
A single byte to replace with dicom backslash, in raw data element
values before they have been decoded or processed by pydicom
for_VRs : list, optional
A list of VRs for which the replacement will be done.
If the VR is unknown (for example, if a private element),
then process_unknown_VR is used to determine whether to replace or not.
process_unknown_VRs: bool, optional
If True (default) then attempt the fix even if the VR is not known.
Returns
-------
No return value. However, the callback function will return either
the original RawDataElement instance, or a fixed one.
"""
config.data_element_callback = fix_separator_callback
config.data_element_callback_kwargs = {
"invalid_separator": invalid_separator,
"for_VRs": for_VRs,
"process_unknown_VRs": process_unknown_VRs,
}
def fix_mismatch_callback(
raw_elem: "RawDataElement", **kwargs: Any
) -> "RawDataElement":
if raw_elem.VR is None:
return raw_elem
try:
values.convert_value(raw_elem.VR, raw_elem)
except ValueError:
for vr in kwargs["with_VRs"]:
try:
values.convert_value(vr, raw_elem)
except ValueError:
pass
else:
raw_elem = raw_elem._replace(VR=vr)
return raw_elem
def fix_mismatch(with_VRs: tuple[str, ...] = (VR.PN, VR.DS, VR.IS)) -> None:
"""A callback function to check that RawDataElements are translatable
with their provided VRs. If not, re-attempt translation using
some other translators.
Parameters
----------
with_VRs : Tuple[str]
A tuple of VR strings to attempt if the raw data element value cannot
be translated with the raw data element's VR. Default
``('PN', 'DS', 'IS')``.
Returns
-------
No return value. The callback function will return either
the original RawDataElement instance, or one with a fixed VR.
"""
config.data_element_callback = fix_mismatch_callback
config.data_element_callback_kwargs = {"with_VRs": with_VRs}

View File

@@ -0,0 +1,45 @@
# Copyright 2008-2018 pydicom authors. See LICENSE file for details.
"""Miscellaneous utility routines relating to hex and byte strings"""
from binascii import a2b_hex, b2a_hex
from pydicom.charset import default_encoding
def hex2bytes(hexstring: str | bytes) -> bytes:
"""Return bytestring for a string of hex bytes separated by whitespace
This is useful for creating specific byte sequences for testing, using
python's implied concatenation for strings with comments allowed.
Examples
--------
::
hex_string = (
"08 00 32 10 " # (0008,1032) SQ "Procedure Code Sequence"
"08 00 00 00 " # length 8
"fe ff 00 e0 " # (FFFE,E000) Item Tag
)
byte_string = hex2bytes(hex_string)
Note in the example that all lines except the first must
start with a space, alternatively the space could
end the previous line.
"""
# This works in both 3.x and 2.x because the first conditional evaluates to
# true in 2.x so the difference in bytes constructor doesn't matter
if isinstance(hexstring, bytes):
return a2b_hex(hexstring.replace(b" ", b""))
if isinstance(hexstring, str):
return a2b_hex(bytes(hexstring.replace(" ", ""), default_encoding))
raise TypeError("argument shall be bytes or string type")
def bytes2hex(byte_string: bytes) -> str:
"""Return a hex string representation of encoded bytes."""
s = b2a_hex(byte_string).decode()
return " ".join(s[i : i + 2] for i in range(0, len(s), 2))

View File

@@ -0,0 +1,178 @@
# Copyright 2008-2021 pydicom authors. See LICENSE file for details.
"""Read a dicom media file"""
import os
from struct import Struct, unpack
from types import TracebackType
from typing import cast, BinaryIO
from collections.abc import Iterator, Callable
from pydicom.misc import size_in_bytes
from pydicom.datadict import dictionary_VR
from pydicom.tag import TupleTag, ItemTag
from pydicom.uid import UID
from pydicom.valuerep import EXPLICIT_VR_LENGTH_32
extra_length_VRs_b = tuple(vr.encode("ascii") for vr in EXPLICIT_VR_LENGTH_32)
ExplicitVRLittleEndian = b"1.2.840.10008.1.2.1"
ImplicitVRLittleEndian = b"1.2.840.10008.1.2"
DeflatedExplicitVRLittleEndian = b"1.2.840.10008.1.2.1.99"
ExplicitVRBigEndian = b"1.2.840.10008.1.2.2"
_ElementType = tuple[tuple[int, int], bytes | None, int, bytes | None, int]
class dicomfile:
"""Context-manager based DICOM file object with data element iteration"""
def __init__(self, filename: str | bytes | os.PathLike) -> None:
self.fobj = fobj = open(filename, "rb")
# Read the DICOM preamble, if present
self.preamble: bytes | None = fobj.read(0x80)
dicom_prefix = fobj.read(4)
if dicom_prefix != b"DICM":
self.preamble = None
fobj.seek(0)
def __enter__(self) -> "dicomfile":
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool | None:
self.fobj.close()
return None
def __iter__(self) -> Iterator[_ElementType]:
# Need the transfer_syntax later
tsyntax: UID | None = None
# Yield the file meta info elements
file_meta = data_element_generator(
self.fobj,
is_implicit_VR=False,
is_little_endian=True,
stop_when=lambda group, elem: group != 2,
)
for elem in file_meta:
if elem[0] == (0x0002, 0x0010):
value = cast(bytes, elem[3])
tsyntax = UID(value.strip(b" \0").decode("ascii"))
yield elem
# Continue to yield elements from the main data
if not tsyntax:
raise NotImplementedError("No transfer syntax in file meta info")
ds_gen = data_element_generator(
self.fobj, tsyntax.is_implicit_VR, tsyntax.is_little_endian
)
for elem in ds_gen:
yield elem
def data_element_generator(
fp: BinaryIO,
is_implicit_VR: bool,
is_little_endian: bool,
stop_when: Callable[[int, int], bool] | None = None,
defer_size: str | int | float | None = None,
) -> Iterator[_ElementType]:
""":return: (tag, VR, length, value, value_tell,
is_implicit_VR, is_little_endian)
"""
endian_chr = "<" if is_little_endian else ">"
if is_implicit_VR:
element_struct = Struct(endian_chr + "HHL")
else: # Explicit VR
# tag, VR, 2-byte length (or 0 if special VRs)
element_struct = Struct(endian_chr + "HH2sH")
extra_length_struct = Struct(endian_chr + "L") # for special VRs
extra_length_unpack = extra_length_struct.unpack # for lookup speed
# Make local variables so have faster lookup
fp_read = fp.read
fp_tell = fp.tell
element_struct_unpack = element_struct.unpack
defer_size = size_in_bytes(defer_size)
while True:
# Read tag, VR, length, get ready to read value
bytes_read = fp_read(8)
if len(bytes_read) < 8:
return # at end of file
if is_implicit_VR:
# must reset VR each time; could have set last iteration (e.g. SQ)
vr = None
group, elem, length = element_struct_unpack(bytes_read)
else: # explicit VR
group, elem, vr, length = element_struct_unpack(bytes_read)
if vr in extra_length_VRs_b:
length = extra_length_unpack(fp_read(4))[0]
# Positioned to read the value, but may not want to -- check stop_when
value_tell = fp_tell()
if stop_when is not None:
if stop_when(group, elem):
rewind_length = 8
if not is_implicit_VR and vr in extra_length_VRs_b:
rewind_length += 4
fp.seek(value_tell - rewind_length)
return
# Reading the value
# First case (most common): reading a value with a defined length
if length != 0xFFFFFFFF:
if defer_size is not None and length > defer_size:
# Flag as deferred by setting value to None, and skip bytes
value = None
fp.seek(fp_tell() + length)
else:
value = fp_read(length)
# import pdb;pdb.set_trace()
yield ((group, elem), vr, length, value, value_tell)
# Second case: undefined length - must seek to delimiter,
# unless is SQ type, in which case is easier to parse it, because
# undefined length SQs and items of undefined lengths can be nested
# and it would be error-prone to read to the correct outer delimiter
else:
# Try to look up type to see if is a SQ
# if private tag, won't be able to look it up in dictionary,
# in which case just ignore it and read the bytes unless it is
# identified as a Sequence
if vr is None:
try:
vr = dictionary_VR((group, elem)).encode("ascii")
except KeyError:
# Look ahead to see if it consists of items and
# is thus a SQ
next_tag = TupleTag(
cast(
tuple[int, int],
unpack(endian_chr + "HH", fp_read(4)),
)
)
# Rewind the file
fp.seek(fp_tell() - 4)
if next_tag == ItemTag:
vr = b"SQ"
if vr == b"SQ":
yield ((group, elem), vr, length, None, value_tell)
else:
raise NotImplementedError(
"This reader does not handle undefined length except for SQ"
)