Skip to content
Snippets Groups Projects

Probe Functionality

Merged Florian Zill requested to merge FZill/ogstools:point_sampling into main
2 files
+ 153
132
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -7,140 +7,10 @@ import meshio
import numpy as np
import pyvista as pv
import vtuIO
from meshio.xdmf.time_series import (
ReadError,
cell_data_from_raw,
xdmf_to_numpy_type,
)
from scipy.interpolate import LinearNDInterpolator, NearestNDInterpolator
from tqdm.auto import tqdm
class TimeSeriesReader(meshio.xdmf.TimeSeriesReader):
def __init__(self, filename):
super().__init__(filename)
def read_data(self, k: int):
point_data = {}
cell_data_raw = {}
other_data = {}
t = None
for c in list(self.collection[k]):
if c.tag == "Time":
t = float(c.attrib["Value"])
elif c.tag == "Attribute":
name = c.get("Name")
if len(list(c)) != 1:
raise ReadError()
data_item = list(c)[0]
data = self._read_data_item(data_item)
if c.get("Center") == "Node":
point_data[name] = data
elif c.get("Center") == "Cell":
cell_data_raw[name] = data
elif c.get("Center") == "Other":
other_data[name] = data
else:
raise ReadError()
else:
# skip the xi:included mesh
continue
if self.cells is None:
raise ReadError()
cell_data = cell_data_from_raw(self.cells, cell_data_raw)
if t is None:
raise ReadError()
return t, point_data, cell_data, other_data
def _read_data_item(self, data_item):
dims = [int(d) for d in data_item.get("Dimensions").split()]
# Actually, `NumberType` is XDMF2 and `DataType` XDMF3, but many files out there
# use both keys interchangeably.
if data_item.get("DataType"):
if data_item.get("NumberType"):
raise ReadError()
data_type = data_item.get("DataType")
elif data_item.get("NumberType"):
if data_item.get("DataType"):
raise ReadError()
data_type = data_item.get("NumberType")
else:
# Default, see
# <https://xdmf.org/index.php/XDMF_Model_and_Format#XML_Element_.28Xdmf_ClassName.29_and_Default_XML_Attributes>
data_type = "Float"
try:
precision = data_item.attrib["Precision"]
except KeyError:
precision = "4"
data_format = data_item.attrib["Format"]
if data_format == "XML":
return np.fromstring(
data_item.text,
dtype=xdmf_to_numpy_type[(data_type, precision)],
sep=" ",
).reshape(dims)
if data_format == "Binary":
return np.fromfile(
data_item.text.strip(),
dtype=xdmf_to_numpy_type[(data_type, precision)],
).reshape(dims)
if data_format != "HDF":
msg = f"Unknown XDMF Format '{data_format}'."
raise ReadError(msg)
file_info = data_item.text.strip()
file_h5path__selections = file_info.split("|")
file_h5path = file_h5path__selections[0]
selections = (
file_h5path__selections[1]
if len(file_h5path__selections) > 1
else None
)
filename, h5path = file_h5path.split(":")
if selections:
# offsets, slices, current_data_extends, global_data_extends by dimension
m = [
list(map(int, att.split(" "))) for att in selections.split(":")
]
t = np.transpose(m)
selection = tuple(
slice(start, start + extend, step)
for start, step, extend, _ in t
)
else:
selection = ()
# The HDF5 file path is given with respect to the XDMF (XML) file.
dirpath = self.filename.resolve().parent
full_hdf5_path = dirpath / filename
if full_hdf5_path in self.hdf5_files:
f = self.hdf5_files[full_hdf5_path]
else:
import h5py
f = h5py.File(full_hdf5_path, "r")
self.hdf5_files[full_hdf5_path] = f
if h5path[0] != "/":
raise ReadError()
for key in h5path[1:].split("/"):
f = f[key]
# `[()]` gives a np.ndarray
return f[selection].squeeze()
from .xdmf_reader import XDMFReader
class MeshSeries:
@@ -172,7 +42,7 @@ class MeshSeries:
if self._data_type == "pvd":
self._pvd_reader = pv.PVDReader(filepath)
elif self._data_type == "xdmf":
self._xdmf_reader = TimeSeriesReader(filepath)
self._xdmf_reader = XDMFReader(filepath)
self._read_xdmf(0) # necessary to initialize hdf5_files
meshes = self.hdf5["meshes"]
self.hdf5_bulk_name = list(meshes.keys())[
Loading