# ruff: noqa: D101, D102
"""
FitsImageDataSet is for if you have image data in a single directory and some sort of tabular catalog file.
At minimum, your tabular catalog **must** contain the following:
#. A unique ID column for each astronomical object you are interested in
#. A filename column containing the filename of the fits image file.
#. If you have multiple images with the same object ID, they must have separate rows in the catalog, one for each image. There must be a column describing the filter on the telescope that differentiates these objects
We recommend all your fits images be roughly the same size.
Setting up hyrax to use FitsImageDataSet works as follows in a notebook. The same configuration options can go
in a configuration file if you are running from the CLI
.. code-block:: python
import hyrax
h = hyrax.Hyrax()
h.config["data_set"]["name"] = "FitsImageDataSet"
h.config["general"]["data_dir"] = "/file/path/to/where/your/fits/files/are"
# Location of your catalog file. Any file format supported by astropy.Table will work
h.config["data_set"]["filter_catalog"] = "/file/path/to/your/catalog.fits"
# Size in pixels to send to ML model. All images must be this size or larger on
# both dimensions
h.config["data_set"]["crop_to"] = (100,100)
# This is good to simply attempt to construct the dataset. Once things are working you might try
# to train or infer
dataset = h.prepare()
This is the minimal setup that can work; however, there are several other configuration options you may need
to set depending on your usage.
The column names for the required columns are configurable. By default we use ``object_id``, ``filter``, and
``filename``; however, by setting ``h.config["data_set"]["object_id_column_name"]`` you can set the correct
name for your catalog file. ``h.config["data_set"]["filter_column_name"]`` and
``h.config["data_set"]["filename_column_name"]`` work in a corresponding manner.
If your dataset does not fit in memory on your system, we recommend setting
``h.config["data_set"]["use_cache"]`` and ``h.config["data_set"]["preload_cache"]`` to ``False``.
Both are ``True`` by default. The former caches all tensors read during an epoch into system RAM, with the
intent of speeding up later epochs of training if your disk has low bandwidth. The latter begins this process
of caching all tensors into system RAM in a background thread as soon as the ``FitsImageDataSet`` is
constructed, front-running the ``train`` or ``infer`` verb requesting tensors. The intent of this optimization
is to speed up the first epoch of training in the case where your disk has high latency. Both will result in
crashes if there is not enough room in your system RAM for the entire dataset.
If you need to truncate your dataset to fit in RAM, the easiest way is to select a small number of rows
from your original catalog file. FitsImageDataSet will only attempt to load images that exist in the catalog.
""" # noqa: E501
import logging
import time
from collections.abc import Generator
from pathlib import Path
from typing import Union
import numpy as np
import numpy.typing as npt
from torch.utils.data import Dataset
from .data_set_registry import HyraxDataset, HyraxImageDataset
from .tensor_cache_mixin import TensorCacheMixin
[docs]
logger = logging.getLogger(__name__)
[docs]
files_dict = dict[str, dict[str, str]]
[docs]
class FitsImageDataSet(HyraxDataset, HyraxImageDataset, TensorCacheMixin, Dataset):
"""
Dataset for Fits Images, typically cutouts.
"""
[docs]
_called_from_test = False
[docs]
def __init__(self, config: dict, data_location=None):
"""
.. py:method:: __init__
Initialize a FitsImageDataSet
Most work is done in ``_init_from_path`` and functions it calls in order to allow
subclasses to override behavior.
Parameters
----------
config : dict
Nested configuration dictionary for hyrax
data_location : Optional[Union[Path, str]]
The directory location of the data that this dataset class will access
"""
self.set_function_transform()
[docs]
self.object_id_column_name = (
config["data_set"]["object_id_column_name"]
if config["data_set"]["object_id_column_name"]
else "object_id"
)
[docs]
self.filter_column_name = (
config["data_set"]["filter_column_name"] if config["data_set"]["filter_column_name"] else "filter"
)
[docs]
self.filename_column_name = (
config["data_set"]["filename_column_name"]
if config["data_set"]["filename_column_name"]
else "filename"
)
self._init_from_path(data_location)
# Relies on self.filters_ref and self.filter_catalog_table which are both determined
# inside _init_from_path()
logger.debug("Preparing Metadata")
metadata = self._prepare_metadata()
super().__init__(config, metadata)
self._before_preload()
# Initialize tensor caching from mixin
self._init_tensor_cache(config)
[docs]
def _init_from_path(self, path: Union[Path, str]):
"""__init__ helper. Initialize an HSC data set from a path. This involves several filesystem scan
operations and will ultimately open and read the header info of every fits file in the given directory
Parameters
----------
path : Union[Path, str]
Path or string specifying the directory path that is the root of all filenames in the
catalog table
"""
self.path = path
# This is common code
filter_catalog = None
if self.config["data_set"]["filter_catalog"]:
filter_catalog = Path(self.config["data_set"]["filter_catalog"])
self.filter_catalog_table = self._read_filter_catalog(filter_catalog)
self.files = self._parse_filter_catalog(self.filter_catalog_table)
if self.files is None:
msg = "Cannot continue without files. Please ensure the table passed in "
msg += "config['data_set']['filter_catalog'] is well formed. It should minimally be "
msg += f"a table readable by Astropy's Table.read() with columns: {self.object_id_column_name}, "
msg += f"{self.filename_column_name}, and {self.filter_column_name}. This may also occur because "
msg += "of a misimplemented subclass"
raise RuntimeError(msg)
first_filter_dict = next(iter(self.files.values()))
self.num_filters = len(first_filter_dict)
self._set_crop_transform()
logger.info(f"FitsImageDataSet has {len(self)} objects")
[docs]
def _read_filter_catalog(self, filter_catalog_path: Path | None):
from astropy.table import Table
if filter_catalog_path is None:
msg = "Must provide a filter catalog in config['data_set']['filter_catalog']"
raise RuntimeError(msg)
if not filter_catalog_path.exists():
msg = f"Filter catalog file {filter_catalog_path} given in config does not exist."
raise RuntimeError(msg)
table = Table.read(filter_catalog_path)
colnames = table.colnames
object_id_missing = self.object_id_column_name not in colnames
filename_missing = self.filename_column_name not in colnames
filter_missing = self.filter_column_name not in colnames
if object_id_missing:
msg = f"Filter catalog file {filter_catalog_path} has no column '{self.object_id_column_name}'"
raise RuntimeError(msg)
if filename_missing:
msg = f"Filter catalog file {filter_catalog_path} has no column '{self.filename_column_name}'"
raise RuntimeError(msg)
if filter_missing:
msg = f"Filter catalog file {filter_catalog_path} has no column '{self.filter_column_name}'. "
logger.warning(msg)
_, counts = np.unique(table[self.object_id_column_name], return_counts=True)
if np.max(counts) == 1:
msg = "Object IDs are unique, filling in the same filter value across all objects"
logger.warning(msg)
table.add_column(np.full(len(table), "Unknown_filter"), name=self.filter_column_name)
else:
msg = "Object IDs are not unique. you must add a 'filter' column to your table or name "
msg += "the appropriate column by setting config['data_set']['filter_column_name']"
raise RuntimeError(msg)
table.add_index(self.object_id_column_name)
if not filter_missing:
table.add_index(self.filter_column_name)
return table
[docs]
def _parse_filter_catalog(self, table) -> None:
"""Sets self.files by parsing the catalog.
Subclasses may override this function to control parsing of the table more directly, but the
overriding class must create the files dict which has type dict[object_id -> dict[filter -> filename]]
with object_id, filter, and filename all strings. In the case of no filter distinction, a single
flag value may be used for the filter dict keys in the inner dicts.
Parameters
----------
table : Table
The catalog we read in
"""
filter_catalog: files_dict = {}
for row in table:
object_id = str(row[self.object_id_column_name])
filter = row[self.filter_column_name]
filename = row[self.filename_column_name]
# Insert into the filter catalog.
if object_id not in filter_catalog:
filter_catalog[object_id] = {}
filter_catalog[object_id][filter] = filename
return filter_catalog
[docs]
def _before_preload(self) -> None:
# Provided so subclasses can make edits to the class after full initialization
# but before the cache preload thread starts iterating over the datastructure and
# fetching
pass
[docs]
def shape(self) -> tuple[int, int, int]:
"""Shape of the individual cutouts this will give to a model
Returns
-------
tuple[int,int,int]
Tuple describing the dimensions of the 3 dimensional tensor handed back to models
The first index is the number of filters
The second index is the width of each image
The third index is the height of each image
"""
return (self.num_filters, self.cutout_shape[0], self.cutout_shape[1])
[docs]
def __len__(self) -> int:
"""Returns number of objects in this loader
Returns
-------
int
number of objects in this data loader
"""
return len(self.files)
[docs]
def get_object_id(self, idx: int) -> str:
"""Get the object ID at the given index
Parameters
----------
idx : int
Index of the object ID to return
Returns
-------
str
The object ID at the given index
"""
if idx >= len(self.files) or idx < 0:
raise IndexError("Index out of range")
# Use the list of object IDs for explicit indexing
return list(self.files.keys())[idx]
[docs]
def get_image(self, idx: int):
"""Get the image at the given index as a PyTorch Tensor.
Parameters
----------
idx : int
Index of the image to return
Returns
-------
torch.Tensor
The image at the given index as a PyTorch Tensor.
"""
object_id = self.get_object_id(idx)
return self._object_id_to_tensor(object_id)
[docs]
def __getitem__(self, idx: int):
if idx >= len(self.files) or idx < 0:
raise IndexError
object_id = self.get_object_id(idx)
return {
"data": {
"object_id": object_id,
"image": self.get_image(idx),
"index": idx,
},
"object_id": object_id,
}
[docs]
def __contains__(self, object_id: str) -> bool:
"""Allows you to do `object_id in dataset` queries. Used by testing code.
Parameters
----------
object_id : str
The object ID you'd like to know if is in the dataset
Returns
-------
bool
True of the object_id given is in the data set
"""
return object_id in list(self.files.keys())
[docs]
def _get_file(self, index: int) -> Path:
"""Private indexing method across all files.
Returns the file path corresponding to the given index.
The index is zero-based and defined in the same manner as the total order of _all_files() and
_object_files() iterator. Useful if you have an np.array() or list built from _all_files() and you
need to select an individual item.
Only valid after self.object_ids, self.files, self.path, and self.num_filters have been initialized
in __init__
Parameters
----------
index : int
Index, see above for order semantics
Returns
-------
Path
The path to the file
"""
object_index = int(index / self.num_filters)
object_id = list(self.files.keys())[object_index]
filters = self.files[object_id]
filter_names = sorted(list(filters))
filter = filter_names[index % self.num_filters]
return self._file_to_path(filters[filter])
[docs]
def ids(self, log_every=None) -> Generator[str]:
"""Public read-only iterator over all object_ids that enforces a strict total order across
objects. Will not work prior to self.files initialization in __init__
Yields
------
Iterator[str]
Object IDs currently in the dataset
"""
log = log_every is not None and isinstance(log_every, int)
for index, object_id in enumerate(self.files):
if log and index != 0 and index % log_every == 0:
logger.info(f"Processed {index + 1} objects")
yield str(object_id)
else:
if log:
logger.info(f"Processed {index + 1} objects")
[docs]
def _all_files(self):
"""
Private read-only iterator over all files that enforces a strict total order across
objects and filters. Will not work prior to self.files, and self.path initialization in __init__
Yields
------
Path
The path to the file.
"""
for object_id in self.ids():
for filename in self._object_files(object_id):
yield filename
[docs]
def _filter_filename(self, object_id):
"""
Private read-only iterator over all files for a given object. This enforces a strict total order
across filters. Will not work prior to self.files initialization in __init__
Yields
------
filter_name, file name
The name of a filter and the file name for the fits file.
The file name is relative to self.path
"""
filters = self.files[object_id]
filter_names = sorted(list(filters))
for filter_name in filter_names:
yield filter_name, filters[filter_name]
[docs]
def _object_files(self, object_id):
"""
Private read-only iterator over all files for a given object. This enforces a strict total order
across filters. Will not work prior to self.files, and self.path initialization in __init__
Yields
------
Path
The path to the file.
"""
for _, filename in self._filter_filename(object_id):
yield self._file_to_path(filename)
[docs]
def _file_to_path(self, filename: str) -> Path:
"""Turns a filename into a full path suitable for open. Equivalent to:
`Path(self.path) / Path(filename)`
Parameters
----------
filename : str
The filename string
Returns
-------
Path
A full path that is openable.
"""
return Path(self.path) / Path(filename)
[docs]
def _read_object_id(self, object_id: str):
from astropy.io import fits
start_time = time.monotonic_ns()
# Read all the files corresponding to this object
data = []
for filepath in self._object_files(object_id):
file_start_time = time.monotonic_ns()
raw_data = fits.getdata(filepath, memmap=False)
data.append(raw_data)
self._log_duration_tensorboard("file_read_time_s", file_start_time)
self._log_duration_tensorboard("object_read_time_s", start_time)
data_torch = self._convert_to_torch(data)
self._log_duration_tensorboard("object_total_read_time_s", start_time)
return data_torch
[docs]
def _convert_to_torch(self, data: list[npt.ArrayLike]):
from torch import from_numpy
start_time = time.monotonic_ns()
# Push all the filter data into a tensor object
data_np = np.array(data)
data_torch = from_numpy(data_np.astype(np.float32))
# Apply our transform stack
data_torch = self.transform(data_torch) if self.transform is not None else data_torch
self._log_duration_tensorboard("object_convert_tensor_time_s", start_time)
return data_torch
# TODO: Performance Change when files are read/cache pytorch tensors?
#
# This function loads from a file every time __getitem__ is called
# Do we want to pre-cache these into memory in init?
# Do we want to memoize them on first __getitem__ call?
#
# For now we just do it the naive way
[docs]
def _load_tensor_for_cache(self, object_id: str):
"""Implementation of TensorCacheMixin abstract method."""
return self._read_object_id(object_id)
[docs]
def _object_id_to_tensor(self, object_id: str):
"""Converts an object_id to a pytorch tensor with dimensions (self.num_filters, self.cutout_shape[0],
self.cutout_shape[1]). This is done by reading the file and slicing away any excess pixels at the
far corners of the image from (0,0).
The current implementation reads the files once the first time they are accessed, and then
keeps them in a dict for future accesses.
Parameters
----------
object_id : str
The object_id requested
Returns
-------
torch.Tensor
A tensor with dimension (self.num_filters, self.cutout_shape[0], self.cutout_shape[1])
"""
return self._object_id_to_tensor_cached(object_id)