Skip to content

base


Processor

Bases: DataCollection

An abstract base class for data processing classes.

Provides methods for initialising source data from download-toolbox defined configurations, process the data, and saving the processed data to normalised netCDF files.

TODO: the majority of actual data processing, for the moment, is being isolated in the child implementation of NormalisingChannelProcessor whilst I work out what is going on regarding the relevant processing and the inheritance hierarchy that support future, diverse implementations for alternative data types not based on xarray

Source code in preprocess_toolbox/base.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class Processor(DataCollection):
    """An abstract base class for data processing classes.

    Provides methods for initialising source data from download-toolbox defined
    configurations, process the data, and saving the processed data to normalised netCDF files.

    TODO: the majority of actual data processing, for the moment, is being isolated in the
     child implementation of NormalisingChannelProcessor whilst I work out what is going
     on regarding the relevant processing and the inheritance hierarchy that support future,
     diverse implementations for alternative data types not based on xarray

    """

    SUFFIXES = ["abs"]

    def __init__(self,
                 dataset_config: DatasetConfig,
                 absolute_vars: list,
                 identifier: str,
                 base_path: os.PathLike = os.path.join(".", "processed"),
                 config_path: os.PathLike = None,
                 dtype: np.typecodes = np.float32,
                 processed_files: dict = None,
                 update_key: str = None,
                 **kwargs) -> None:
        """

        Args:
            dataset_config:
            absolute_vars:
            identifier:
            base_path:
            dtype:
            update_key:
            **kwargs:
        """
        super().__init__(base_path=base_path,
                         config_path=config_path,
                         config_type="processed",
                         identifier=identifier,
                         path_components=[])

        self.config.output_path = "." if config_path is None else config_path

        self._abs_vars = absolute_vars if absolute_vars else []
        self._dtype = dtype

        self._processed_files = dict() if processed_files is None else processed_files

        self._update_key = self.identifier if not update_key else update_key

    def get_data_var_folder(self,
                            var_name: str,
                            append: object = None,
                            missing_error: bool = False) -> os.PathLike:
        """Returns the path for a specific data variable.

        Appends additional folders to the path if specified in the `append` parameter.

        :param var_name: The data variable.
        :param append: Additional folders to append to the path.
        :param missing_error: Flag to specify if missing directories should be treated as an error.

        :return str: The path for the specific data variable.
        """
        if not append:
            append = []

        data_var_path = os.path.join(self.path, *[var_name, *append])

        if not os.path.exists(data_var_path):
            if not missing_error:
                os.makedirs(data_var_path, exist_ok=True)
            else:
                raise OSError("Directory {} is missing and this is "
                              "flagged as an error!".format(data_var_path))

        return data_var_path

    def save_processed_file(self,
                            var_name: str,
                            name: str,
                            data: object,
                            convert: bool = True,
                            overwrite: bool = False) -> str:
        """Save processed data to netCDF file.

        Args:
            var_name: The name of the variable.
            name: The name of the file.
            data: The data to be saved.
            convert: Whether to convert data to the processors data type
            overwrite: Whether to overwrite extant files

        Returns:
            object: The path of the saved netCDF file.

        """
        file_path = os.path.join(self.path, name)
        if overwrite or not os.path.exists(file_path):
            logging.debug("Writing to {}".format(file_path))
            if convert:
                data = data.astype(self._dtype)
            data.to_netcdf(file_path)

        if var_name not in self.processed_files.keys():
            self.processed_files[var_name] = list()

        if file_path not in self.processed_files[var_name]:
            logging.debug("Adding {} file: {}".format(var_name, file_path))
            self.processed_files[var_name].append(file_path)
        # else:
        #     logging.warning("{} already exists in {} processed list".format(file_path, var_name))
        return file_path

    def get_dataset(self,
                    var_names: list = None):
        logging.debug("Finding files for {}".format(", ".join(var_names if var_names is not None else "everything")))

        var_files = [var_filepaths
                     for vn in var_names
                     for var_filepaths in self.processed_files[vn]] \
            if var_names is not None else \
                    [var_filepaths
                     for vn in self.processed_files.keys()
                     for var_filepaths in self.processed_files[vn].values()]

        logging.info("Got {} filenames to open dataset with!".format(len(var_files)))
        logging.debug(pformat(var_files))

        # TODO: where's my parallel mfdataset please!?
        with (dask.config.set(**{'array.slicing.split_large_chunks': True})):
            ds = xr.open_mfdataset(
                var_files,
                combine="nested",
                concat_dim="time",
                coords="minimal",
                compat="override",
                chunks=dict(time=1, ),
            )
            ds = ds.astype(self._dtype)
        return ds

    @abstractmethod
    def process(self):
        pass

    @property
    def abs_vars(self):
        return self._abs_vars

    @property
    def dtype(self):
        return self._dtype

    @property
    def processed_files(self) -> dict:
        """A dict with the processed files organised by variable name."""
        return self._processed_files

    @property
    def update_key(self):
        return self._update_key

processed_files property

A dict with the processed files organised by variable name.

__init__(dataset_config, absolute_vars, identifier, base_path=os.path.join('.', 'processed'), config_path=None, dtype=np.float32, processed_files=None, update_key=None, **kwargs)

Args: dataset_config: absolute_vars: identifier: base_path: dtype: update_key: **kwargs:

Source code in preprocess_toolbox/base.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def __init__(self,
             dataset_config: DatasetConfig,
             absolute_vars: list,
             identifier: str,
             base_path: os.PathLike = os.path.join(".", "processed"),
             config_path: os.PathLike = None,
             dtype: np.typecodes = np.float32,
             processed_files: dict = None,
             update_key: str = None,
             **kwargs) -> None:
    """

    Args:
        dataset_config:
        absolute_vars:
        identifier:
        base_path:
        dtype:
        update_key:
        **kwargs:
    """
    super().__init__(base_path=base_path,
                     config_path=config_path,
                     config_type="processed",
                     identifier=identifier,
                     path_components=[])

    self.config.output_path = "." if config_path is None else config_path

    self._abs_vars = absolute_vars if absolute_vars else []
    self._dtype = dtype

    self._processed_files = dict() if processed_files is None else processed_files

    self._update_key = self.identifier if not update_key else update_key

get_data_var_folder(var_name, append=None, missing_error=False)

Returns the path for a specific data variable.

Appends additional folders to the path if specified in the append parameter.

Parameters:

Name Type Description Default
var_name str

The data variable.

required
append object

Additional folders to append to the path.

None
missing_error bool

Flag to specify if missing directories should be treated as an error.

False

Returns:

Type Description
PathLike

The path for the specific data variable.

Source code in preprocess_toolbox/base.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def get_data_var_folder(self,
                        var_name: str,
                        append: object = None,
                        missing_error: bool = False) -> os.PathLike:
    """Returns the path for a specific data variable.

    Appends additional folders to the path if specified in the `append` parameter.

    :param var_name: The data variable.
    :param append: Additional folders to append to the path.
    :param missing_error: Flag to specify if missing directories should be treated as an error.

    :return str: The path for the specific data variable.
    """
    if not append:
        append = []

    data_var_path = os.path.join(self.path, *[var_name, *append])

    if not os.path.exists(data_var_path):
        if not missing_error:
            os.makedirs(data_var_path, exist_ok=True)
        else:
            raise OSError("Directory {} is missing and this is "
                          "flagged as an error!".format(data_var_path))

    return data_var_path

save_processed_file(var_name, name, data, convert=True, overwrite=False)

Save processed data to netCDF file.

Args: var_name: The name of the variable. name: The name of the file. data: The data to be saved. convert: Whether to convert data to the processors data type overwrite: Whether to overwrite extant files

Returns: object: The path of the saved netCDF file.

Source code in preprocess_toolbox/base.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def save_processed_file(self,
                        var_name: str,
                        name: str,
                        data: object,
                        convert: bool = True,
                        overwrite: bool = False) -> str:
    """Save processed data to netCDF file.

    Args:
        var_name: The name of the variable.
        name: The name of the file.
        data: The data to be saved.
        convert: Whether to convert data to the processors data type
        overwrite: Whether to overwrite extant files

    Returns:
        object: The path of the saved netCDF file.

    """
    file_path = os.path.join(self.path, name)
    if overwrite or not os.path.exists(file_path):
        logging.debug("Writing to {}".format(file_path))
        if convert:
            data = data.astype(self._dtype)
        data.to_netcdf(file_path)

    if var_name not in self.processed_files.keys():
        self.processed_files[var_name] = list()

    if file_path not in self.processed_files[var_name]:
        logging.debug("Adding {} file: {}".format(var_name, file_path))
        self.processed_files[var_name].append(file_path)
    # else:
    #     logging.warning("{} already exists in {} processed list".format(file_path, var_name))
    return file_path