import os
import time
import numpy as np
from rich import box
from rich.table import Table
from rich.console import Console
from os.path import join
from scipy.stats import gaussian_kde
from typing import Union
# Optional imports
from vbi.optional_deps import torch, require_optional, optional_import
import re
import warnings
try:
import nbformat
import nbformat
from nbconvert import PythonExporter
except:
pass
try:
from sbi.analysis.plot import _get_default_fig_kwargs, _get_default_diag_kwargs
from sbi.analysis.plot import _update, ensure_numpy
except ImportError:
# warnings.warn(
# "sbi package is not installed: functions that require sbi (e.g. posterior_peaks) will raise if used. "
# "Install with: pip install sbi",
# UserWarning,
# stacklevel=2,
# )
pass
[docs]
def timer(func):
"""
Decorator to measure elapsed time.
Parameters
----------
func : function
Function to be decorated.
Returns
-------
function
Wrapped function that measures execution time.
"""
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
display_time(end - start, message="{:s}".format(func.__name__))
return result
return wrapper
[docs]
def display_time(time, message=""):
"""
Display elapsed time in hours, minutes, seconds.
Parameters
----------
time : float
Elapsed time in seconds.
message : str, optional
Optional message to display with the time. Default is empty string.
"""
hour = int(time / 3600)
minute = (int(time % 3600)) // 60
second = time - (3600.0 * hour + 60.0 * minute)
print(
"{:s} Done in {:d} hours {:d} minutes {:09.6f} seconds".format(
message, hour, minute, second
)
)
[docs]
class LoadSample(object):
"""
Utility class for loading sample datasets and connectivity matrices.
This class provides convenient methods to load structural connectivity matrices,
tract lengths, and BOLD signal data from the VBI dataset directory.
Parameters
----------
nn : int, optional
Number of nodes/regions in the connectivity matrix. Default is 84.
Supported values are typically 84 and 88.
"""
[docs]
def __init__(self, nn=84) -> None:
"""
Initialize the LoadSample utility.
Parameters
----------
nn : int, optional
Number of nodes/regions in the connectivity matrix. Default is 84.
"""
self.root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
self.nn = nn
[docs]
def get_weights(self, normalize=True):
"""
Load structural connectivity weights matrix.
Parameters
----------
normalize : bool, optional
Whether to normalize the weights by the maximum value. Default is True.
Returns
-------
np.ndarray
Structural connectivity matrix of shape (nn, nn) with diagonal set to 0.
Values are non-negative after removing negative entries.
"""
nn = self.nn
SC_name = join(
self.root_dir, "vbi/dataset", f"connectivity_{nn}", "weights.txt"
)
SC = np.loadtxt(SC_name)
np.fill_diagonal(SC, 0.0)
if normalize:
SC /= SC.max()
SC[SC < 0] = 0.0
return SC
[docs]
def get_lengths(self):
"""
Load tract lengths matrix.
Returns
-------
np.ndarray
Tract lengths matrix of shape (nn, nn) containing the physical
distances between brain regions.
"""
nn = self.nn
tract_lenghts_name = join(
self.root_dir, "vbi/dataset", f"connectivity_{nn}", "tract_lengths.txt"
)
tract_lengths = np.loadtxt(tract_lenghts_name)
return tract_lengths
[docs]
def get_bold(self):
"""
Load BOLD signal data.
Returns
-------
np.ndarray
BOLD signal data matrix of shape (nn, n_timepoints) containing
the empirical BOLD time series for each brain region.
"""
nn = self.nn
bold_name = join(
self.root_dir, "vbi", "dataset", f"connectivity_{nn}", "Bold.npz"
)
bold = np.load(bold_name)["Bold"]
return bold.T
[docs]
def get_limits(samples, limits=None):
"""
Calculate or validate parameter limits for samples.
This function computes the min/max limits for each parameter dimension
across one or more sample arrays, or validates provided limits.
Parameters
----------
samples : np.ndarray or list of np.ndarray
Sample array(s) of shape (n_samples, n_params) or list of such arrays.
If PyTorch tensors, they will be converted to numpy arrays.
limits : list or None, optional
Predefined limits as [[min1, max1], [min2, max2], ...] for each parameter.
If None or empty list, limits are computed from the data.
If single limit pair provided, it will be broadcast to all parameters.
Returns
-------
torch.Tensor
Tensor of shape (n_params, 2) containing [min, max] for each parameter.
"""
if type(samples) != list:
samples = ensure_numpy(samples)
samples = [samples]
else:
for i, sample_pack in enumerate(samples):
samples[i] = ensure_numpy(samples[i])
# Dimensionality of the problem.
dim = samples[0].shape[1]
if limits == [] or limits is None:
limits = []
for d in range(dim):
min = +np.inf
max = -np.inf
for sample in samples:
min_ = sample[:, d].min()
min = min_ if min_ < min else min
max_ = sample[:, d].max()
max = max_ if max_ > max else max
limits.append([min, max])
else:
if len(limits) == 1:
limits = [limits[0] for _ in range(dim)]
else:
limits = limits
limits = torch.as_tensor(limits)
return limits
[docs]
def get_limits_numpy(samples, limits=None):
"""
Calculate or validate parameter limits for samples (numpy-only version).
This function computes the min/max limits for each parameter dimension
across one or more sample arrays, or validates provided limits.
Parameters
----------
samples : np.ndarray or list of np.ndarray
Sample array(s) of shape (n_samples, n_params) or list of such arrays.
limits : list or None, optional
Predefined limits as [[min1, max1], [min2, max2], ...] for each parameter.
If None or empty list, limits are computed from the data.
If single limit pair provided, it will be broadcast to all parameters.
Returns
-------
np.ndarray
Array of shape (n_params, 2) containing [min, max] for each parameter.
"""
if not isinstance(samples, list):
samples = [np.asarray(samples)]
else:
samples = [np.asarray(sample) for sample in samples]
# Handle 1D arrays by ensuring they have shape (n_samples, 1)
for i, sample in enumerate(samples):
if sample.ndim == 1:
samples[i] = sample[:, np.newaxis]
# Dimensionality of the problem.
dim = samples[0].shape[1]
if limits == [] or limits is None:
limits = []
for d in range(dim):
min_val = +np.inf
max_val = -np.inf
for sample in samples:
min_ = sample[:, d].min()
min_val = min_ if min_ < min_val else min_val
max_ = sample[:, d].max()
max_val = max_ if max_ > max_val else max_val
limits.append([min_val, max_val])
else:
if len(limits) == 1:
limits = [limits[0] for _ in range(dim)]
else:
limits = limits
return np.array(limits)
[docs]
def posterior_peaks(samples, return_dict=False, **kwargs):
"""
Find the peaks (modes) of a posterior distribution using kernel density estimation.
This function estimates the probability density of the posterior samples
and identifies the locations of peak density for each parameter dimension.
Parameters
----------
samples : np.ndarray or torch.Tensor
Posterior samples of shape (n_samples, n_params).
If torch.Tensor, it will be converted to numpy array.
return_dict : bool, optional
If True, returns results as a dictionary with parameter labels as keys.
If False, returns a simple list of peak values. Default is False.
**kwargs
Additional keyword arguments passed to the plotting/analysis functions.
These may include 'labels' for parameter names.
Returns
-------
list or dict
If return_dict=False: List of peak values for each parameter.
If return_dict=True: Dictionary with parameter labels as keys and
peak values as values.
"""
# Prefer sbi-based plotting/analysis helpers when available; if they are
# not present (sbi not installed), fall back to the numpy-only
# implementation `posterior_peaks_numpy` which doesn't require sbi/torch.
try:
# Get default kwargs for KDE diagonal plots and figure kwargs
fig_kwargs = _get_default_fig_kwargs()
kde_kwargs = _get_default_diag_kwargs("kde")
# Update with user-provided kwargs
fig_kwargs = _update(fig_kwargs, kwargs)
limits = get_limits(samples)
samples = ensure_numpy(samples)
n, dim = samples.shape
try:
labels = fig_kwargs.get("labels")
except:
labels = range(dim)
peaks = {}
if labels is None:
labels = range(dim)
for i in range(dim):
peaks[labels[i]] = 0
for row in range(dim):
density = gaussian_kde(samples[:, row], bw_method=kde_kwargs["bw_method"])
xs = np.linspace(limits[row, 0], limits[row, 1], kde_kwargs["bins"])
ys = density(xs)
peaks[labels[row]] = xs[ys.argmax()]
if return_dict:
return peaks
else:
return list(peaks.values())
except NameError:
# sbi helpers not available; fallback to numpy-only implementation.
labels = kwargs.get("labels", None)
bins = kwargs.get("bins", 100)
bw_method = kwargs.get("bw_method", None)
return posterior_peaks_numpy(samples, return_dict=return_dict, labels=labels, bins=bins, bw_method=bw_method)
[docs]
def posterior_peaks_numpy(
samples, return_dict=False, labels=None, bins=100, bw_method=None
):
"""
Find the peaks (modes) of a posterior distribution using kernel density estimation (numpy-only version).
This function estimates the probability density of the posterior samples
and identifies the locations of peak density for each parameter dimension.
Does not require torch or sbi dependencies.
Parameters
----------
samples : np.ndarray
Posterior samples of shape (n_samples, n_params).
return_dict : bool, optional
If True, returns results as a dictionary with parameter labels as keys.
If False, returns a simple list of peak values. Default is False.
labels : list or None, optional
Parameter names for dictionary keys. If None, uses integer indices.
bins : int, optional
Number of bins for density estimation grid. Default is 100.
bw_method : str, float or None, optional
Bandwidth method for KDE. Can be 'scott', 'silverman', or a scalar.
If None, uses 'scott'. Default is None.
Returns
-------
list or dict
If return_dict=False: List of peak values for each parameter.
If return_dict=True: Dictionary with parameter labels as keys and
peak values as values.
Raises
------
ValueError
If samples contain insufficient data for KDE (need at least 2 samples).
"""
# Convert to numpy array if needed
samples = np.asarray(samples)
if samples.ndim == 1:
samples = samples[:, np.newaxis]
n, dim = samples.shape
# Check for sufficient data
if n < 2:
raise ValueError("KDE requires at least 2 samples. Got {} samples.".format(n))
# Get limits for each dimension
limits = get_limits_numpy(samples)
# Set up labels
if labels is None:
labels = list(range(dim))
elif len(labels) != dim:
raise ValueError(
f"Number of labels ({len(labels)}) must match number of dimensions ({dim})"
)
# Set default bandwidth method
if bw_method is None:
bw_method = "scott"
peaks = {}
for row in range(dim):
# Compute KDE for this dimension
density = gaussian_kde(samples[:, row], bw_method=bw_method)
# Create evaluation grid
xs = np.linspace(limits[row, 0], limits[row, 1], bins)
ys = density(xs)
# Find the peak (maximum density location)
peak_idx = ys.argmax()
peaks[labels[row]] = xs[peak_idx]
if return_dict:
return peaks
else:
return list(peaks.values())
[docs]
def p2j(modulePath):
"""convert python script to jupyter notebook"""
os.system(f"p2j -o {modulePath}")
[docs]
def j2p(notebookPath, modulePath=None):
"""
convert a jupyter notebook to a python module
>>> j2p("sample.ipynb", "sample.py")
"""
with open(notebookPath) as fh:
nb = nbformat.reads(fh.read(), nbformat.NO_CONVERT)
exporter = PythonExporter()
source, meta = exporter.from_notebook_node(nb)
# remove lines start with `# In[` from source
source = re.sub(r"^# In\[[0-9 ]*\]:\n", "", source, flags=re.MULTILINE)
# replace more that 1 empty lines with 1 empty line
source = re.sub(r"\n{2,}", "\n\n", source)
if modulePath is None:
modulePath = notebookPath.replace(".ipynb", ".py")
with open(modulePath, "w+") as fh:
fh.writelines(source)
[docs]
def posterior_shrinkage(
prior_samples: "Union[torch.Tensor, np.ndarray]",
post_samples: "Union[torch.Tensor, np.ndarray]",
) -> "torch.Tensor":
"""
Calculate the posterior shrinkage, quantifying how much
the posterior distribution contracts from the initial
prior distribution.
References:
https://arxiv.org/abs/1803.08393
Parameters
----------
prior_samples : array_like or torch.Tensor [n_samples, n_params]
Samples from the prior distribution.
post_samples : array-like or torch.Tensor [n_samples, n_params]
Samples from the posterior distribution.
Returns
-------
shrinkage : torch.Tensor [n_params]
The posterior shrinkage.
Raises
------
ImportError
If PyTorch is not installed.
"""
if torch is None:
raise ImportError(
"PyTorch is required for posterior_shrinkage function. "
"Please install PyTorch with: pip install torch"
)
if len(prior_samples) == 0 or len(post_samples) == 0:
raise ValueError("Input samples are empty")
if not isinstance(prior_samples, torch.Tensor):
prior_samples = torch.tensor(prior_samples, dtype=torch.float32)
if not isinstance(post_samples, torch.Tensor):
post_samples = torch.tensor(post_samples, dtype=torch.float32)
if prior_samples.ndim == 1:
prior_samples = prior_samples[:, None]
if post_samples.ndim == 1:
post_samples = post_samples[:, None]
prior_std = torch.std(prior_samples, dim=0)
post_std = torch.std(post_samples, dim=0)
return 1 - (post_std / prior_std) ** 2
[docs]
def posterior_shrinkage_numpy(
prior_samples: np.ndarray,
post_samples: np.ndarray,
) -> np.ndarray:
"""
Calculate the posterior shrinkage using numpy, quantifying how much
the posterior distribution contracts from the initial
prior distribution.
References:
https://arxiv.org/abs/1803.08393
Parameters
----------
prior_samples : np.ndarray [n_samples, n_params]
Samples from the prior distribution.
post_samples : np.ndarray [n_samples, n_params]
Samples from the posterior distribution.
Returns
-------
shrinkage : np.ndarray [n_params]
The posterior shrinkage.
Raises
------
ValueError
If input samples are empty or have incompatible shapes.
"""
if len(prior_samples) == 0 or len(post_samples) == 0:
raise ValueError("Input samples are empty")
# Convert to numpy arrays if needed
prior_samples = np.asarray(prior_samples, dtype=np.float32)
post_samples = np.asarray(post_samples, dtype=np.float32)
# Handle 1D case by adding a dimension
if prior_samples.ndim == 1:
prior_samples = prior_samples[:, None]
if post_samples.ndim == 1:
post_samples = post_samples[:, None]
# Calculate standard deviations along the sample dimension (axis=0)
prior_std = np.std(prior_samples, axis=0)
post_std = np.std(post_samples, axis=0)
# Avoid division by zero
with np.errstate(divide="ignore", invalid="ignore"):
shrinkage = 1 - (post_std / prior_std) ** 2
# Handle cases where prior_std is zero
shrinkage = np.where(prior_std == 0, 0, shrinkage)
return shrinkage
[docs]
def posterior_zscore(
true_theta: "Union[torch.Tensor, np.ndarray, float]",
post_samples: "Union[torch.Tensor, np.ndarray]",
):
"""
Calculate the posterior z-score, quantifying how much the posterior
distribution of a parameter encompasses its true value.
References:
https://arxiv.org/abs/1803.08393
Parameters
----------
true_theta : float, array-like or torch.Tensor [n_params]
The true value of the parameters.
post_samples : array-like or torch.Tensor [n_samples, n_params]
Samples from the posterior distributions.
Returns
-------
z : torch.Tensor [n_params]
The z-score of the posterior distributions.
Raises
------
ImportError
If PyTorch is not installed.
"""
if torch is None:
raise ImportError(
"PyTorch is required for posterior_zscore function. "
"Please install PyTorch with: pip install torch"
)
if len(post_samples) == 0:
raise ValueError("Input samples are empty")
if not isinstance(true_theta, torch.Tensor):
true_theta = torch.tensor(true_theta, dtype=torch.float32)
if not isinstance(post_samples, torch.Tensor):
post_samples = torch.tensor(post_samples, dtype=torch.float32)
true_theta = np.atleast_1d(true_theta)
if post_samples.ndim == 1:
post_samples = post_samples[:, None]
post_mean = torch.mean(post_samples, dim=0)
post_std = torch.std(post_samples, dim=0)
return torch.abs((post_mean - true_theta) / post_std)
[docs]
def posterior_zscore_numpy(
true_theta: Union[np.ndarray, float],
post_samples: np.ndarray,
) -> np.ndarray:
"""
Calculate the posterior z-score using numpy, quantifying how much the posterior
distribution of a parameter encompasses its true value.
References:
https://arxiv.org/abs/1803.08393
Parameters
----------
true_theta : float or np.ndarray [n_params]
The true value of the parameters.
post_samples : np.ndarray [n_samples, n_params]
Samples from the posterior distributions.
Returns
-------
z : np.ndarray [n_params]
The z-score of the posterior distributions.
Raises
------
ValueError
If input samples are empty.
"""
if len(post_samples) == 0:
raise ValueError("Input samples are empty")
# Convert to numpy arrays
true_theta = np.asarray(true_theta, dtype=np.float32)
post_samples = np.asarray(post_samples, dtype=np.float32)
true_theta = np.atleast_1d(true_theta)
if post_samples.ndim == 1:
post_samples = post_samples[:, None]
post_mean = np.mean(post_samples, axis=0)
post_std = np.std(post_samples, axis=0)
# Avoid division by zero
with np.errstate(divide="ignore", invalid="ignore"):
z_score = np.abs((post_mean - true_theta) / post_std)
# Handle cases where post_std is zero
z_score = np.where(post_std == 0, np.inf, z_score)
return z_score
[docs]
def set_diag(A: np.ndarray, k: int = 0, value: float = 0.0):
"""
set k diagonals of the given matrix to given value.
Parameters
----------
A: np.ndarray
matrix
k: int
number of diagonals
value: float
value to be set
Returns
-------
A: np.ndarray
matrix with k diagonals set to value
"""
assert len(A.shape) == 2
n = A.shape[0]
assert k < n
for i in range(-k, k + 1):
a1 = np.diag(np.random.randint(1, 2, n - abs(i)), i)
idx = np.where(a1)
A[idx] = value
return A
[docs]
def test_imports():
"""Check required dependencies, including C++ modules, print versions, and warn if unavailable."""
console = Console()
table = Table(title="Dependency Check", box=box.SIMPLE_HEAVY)
table.add_column("Package", style="bold cyan")
table.add_column("Version", style="bold green")
table.add_column("Status", style="bold yellow")
# Python package dependencies
dependencies = [
("vbi", "vbi"),
("numpy", "numpy"),
("scipy", "scipy"),
("matplotlib", "matplotlib"),
("sbi", "sbi"),
("torch", "torch"),
("cupy", "cupy"),
]
# C++ module dependencies
cpp_modules = [
("vbi.models.cpp.mpr.MPR_sde", "vbi.models.cpp.mpr", "MPR_sde"),
("vbi.models.cpp.damp_oscillator.DO", "vbi.models.cpp.damp_oscillator", "DO"),
("vbi.models.cpp._src.do.DO", "vbi.models.cpp._src.do", "DO"),
("vbi.models.cpp._src.jr_sde.JR_sde", "vbi.models.cpp._src.jr_sde", "JR_sde"),
("vbi.models.cpp._src.jr_sdde.JR_sdde", "vbi.models.cpp._src.jr_sdde", "JR_sdde"),
("vbi.models.cpp._src.km_sde.KM_sde", "vbi.models.cpp._src.km_sde", "KM_sde"),
("vbi.models.cpp._src.mpr_sde.MPR_sde", "vbi.models.cpp._src.mpr_sde", "MPR_sde"),
("vbi.models.cpp._src.mpr_sde.BoldParams", "vbi.models.cpp._src.mpr_sde", "BoldParams"),
("vbi.models.cpp._src.vep.VEP", "vbi.models.cpp._src.vep", "VEP"),
("vbi.models.cpp._src.wc_ode.WC_ode", "vbi.models.cpp._src.wc_ode", "WC_ode"),
]
# Check Python packages
for name, module in dependencies:
try:
pkg = __import__(module)
version = pkg.__version__
status = "✅ Available"
except ImportError:
version = "-"
status = "❌ Not Found"
table.add_row(name, version, status)
# Check C++ modules collectively
cpp_status = "✅ Available"
for name, module_path, module_name in cpp_modules:
try:
module = __import__(module_path, fromlist=[module_name])
getattr(module, module_name)
except (ImportError, AttributeError):
cpp_status = "❌ Failed"
break
table.add_row("All C++ Modules", "-", cpp_status)
console.print(table)
# Additional GPU checks for torch
try:
import torch
console.print(
f"[bold blue]Torch GPU available:[/bold blue] {torch.cuda.is_available()}"
)
console.print(
f"[bold blue]Torch device count:[/bold blue] {torch.cuda.device_count()}"
)
console.print(
f"[bold blue]Torch CUDA version:[/bold blue] {torch.version.cuda}"
)
except ImportError:
pass
# Additional GPU checks for cupy
try:
import cupy
console.print(
f"[bold blue]CuPy GPU available:[/bold blue] {cupy.cuda.is_available()}"
)
console.print(
f"[bold blue]CuPy device count:[/bold blue] {cupy.cuda.runtime.getDeviceCount()}"
)
info = get_cuda_info()
if isinstance(info, dict):
console.print(f"[bold blue]CUDA Version:[/bold blue] {info['cuda_version']}")
console.print(f"[bold blue]Device Name:[/bold blue] {info['device_name']}")
console.print(f"[bold blue]Total Memory:[/bold blue] {info['total_memory']:.2f} GB")
console.print(f"[bold blue]Compute Capability:[/bold blue] {info['compute_capability']}")
except ImportError:
pass
# def test_imports():
# """Check required dependencies, print versions, and warn if unavailable."""
# console = Console()
# table = Table(title="Dependency Check", box=box.SIMPLE_HEAVY)
# table.add_column("Package", style="bold cyan")
# table.add_column("Version", style="bold green")
# table.add_column("Status", style="bold yellow")
# dependencies = [
# ("vbi", "vbi"),
# ("numpy", "numpy"),
# ("scipy", "scipy"),
# ("matplotlib", "matplotlib"),
# ("sbi", "sbi"),
# ("torch", "torch"),
# ("cupy", "cupy"),
# ]
# for name, module in dependencies:
# try:
# pkg = __import__(module)
# version = pkg.__version__
# status = "✅ Available"
# except ImportError:
# version = "-"
# status = "❌ Not Found"
# table.add_row(name, version, status)
# console.print(table)
# # Additional GPU checks
# try:
# import torch
# console.print(
# f"[bold blue]Torch GPU available:[/bold blue] {torch.cuda.is_available()}"
# )
# console.print(
# f"[bold blue]Torch device count:[/bold blue] {torch.cuda.device_count()}"
# )
# console.print(
# f"[bold blue]Torch CUDA version:[/bold blue] {torch.version.cuda}"
# ) # Display CUDA version used by PyTorch
# except ImportError:
# pass
# try:
# import cupy
# console.print(
# f"[bold blue]CuPy GPU available:[/bold blue] {cupy.cuda.is_available()}"
# )
# console.print(
# f"[bold blue]CuPy device count:[/bold blue] {cupy.cuda.runtime.getDeviceCount()}"
# )
# info = get_cuda_info()
# if isinstance(info, dict):
# print(f"CUDA Version: {info['cuda_version']}")
# print(f"Device Name: {info['device_name']}")
# print(f"Total Memory: {info['total_memory']:.2f} GB")
# print(f"Compute Capability: {info['compute_capability']}")
# except ImportError:
# pass
[docs]
def get_cuda_info():
"""
Get CUDA version and device information using CuPy.
Returns:
dict: Dictionary containing CUDA version and device information
"""
import cupy as cp
try:
# Get CUDA version
cuda_version = cp.cuda.runtime.runtimeGetVersion()
major = cuda_version // 1000
minor = (cuda_version % 1000) // 10
# Get device info
device = cp.cuda.runtime.getDeviceProperties(0)
return {
"cuda_version": f"{major}.{minor}",
"device_name": device["name"].decode(),
"total_memory": device["totalGlobalMem"] / (1024**3), # Convert to GB
"compute_capability": f"{device['major']}.{device['minor']}",
}
except ImportError:
return "CuPy is not installed"
except Exception as e:
return f"Error getting CUDA information: {str(e)}"
[docs]
def pretty_dtype(dtype):
"""Map numba types to human-readable strings."""
from numba import float64, types
if dtype == types.string:
return "string"
if dtype == float64:
return "float64"
if dtype == types.int64:
return "int64"
if dtype == types.int32:
return "int32"
if dtype == types.float32:
return "float32"
if dtype == types.boolean:
return "bool"
if dtype == types.complex128:
return "complex128"
if dtype == types.complex64:
return "complex64"
if isinstance(dtype, types.Array):
return f"{dtype.dtype}[{dtype.ndim}d]"
return str(dtype)
[docs]
def print_valid_parameters(par_spec):
print("Valid parameters:")
print("────────────────────────────────────────────")
print(f"{'Name':<15} {'Datatype':<20}")
print("────────────────────────────────────────────")
for name, dtype in par_spec:
print(f"{name:<15} {pretty_dtype(dtype)}")
print("────────────────────────────────────────────")