Source code for autotab._main


import os
import gc
import json
import sys
import time
import math
import types
import shutil
import inspect
from typing import Union, Callable, Tuple
from collections import OrderedDict, defaultdict

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from SeqMetrics import RegressionMetrics, ClassificationMetrics
from easy_mpl import dumbbell_plot, taylor_plot, circular_bar_plot, bar_chart

import ai4water
from ai4water import Model
from ai4water._optimize import make_space
from ai4water.hyperopt.utils import to_skopt_space
from ai4water.utils.utils import dateandtime_now, jsonize
from ai4water.hyperopt import Categorical, HyperOpt, Integer, Real
from ai4water.models import MLP, CNN, LSTM, CNNLSTM, LSTMAutoEncoder, TFT, TCN
from ai4water.experiments.utils import regression_space, classification_space, dl_space

assert ai4water.__version__ >= "1.02"


# TODO's
# custom metric
# custom model which is installed/not installed

# in order to unify the use of metrics
Metrics = {
    'regression': lambda t, p, multiclass=False, **kwargs: RegressionMetrics(t, p, **kwargs),
    'classification': lambda t, p, multiclass=False, **kwargs: ClassificationMetrics(t, p,
        multiclass=multiclass, **kwargs)
}

METRICS_KWARGS = {
    'accuracy': {},
    "cross_entropy": {},
    'f1_score': {"average": "macro"},
    "precision": {"average": "macro"},
    "recall": {"average": "macro"},
    "specificity": {"average": "macro"},
}

DL_MODELS = {
    "MLP": MLP,
    "LSTM":LSTM,
    "CNN":CNN,
    "CNNLSTM":CNNLSTM,
    "TFT":TFT,
    "TCN":TCN,
    "LSTMAutoEncoder":LSTMAutoEncoder
}

SEP = os.sep

DEFAULT_TRANSFORMATIONS = [
    "minmax", "center", "scale", "zscore", "box-cox", "yeo-johnson",
    "quantile", "robust", "log", "log2", "log10", "sqrt", "none",
              ]

METRIC_TYPES = {
    "r2": "max",
    "nse": "max",
    "r2_score": "max",
    "kge": "max",
    'log_nse': 'max',
    "corr_coeff": "max",
    'accuracy': "max",
    'f1_score': 'max',
    "mse": "min",
    "rmse": "min",
    "rmsle": "min",
    "mape": "min",
    "nrmse": "min",
    "pbias": "min",
    "bias": "min",
    "med_seq_error": "min",
}

def compare_func(metric_type:str):
    if metric_type == "min":
        return np.less_equal
    return np.greater_equal


def compare_func1(metric_type:str):
    if metric_type == "min":
        return np.nanmin
    return np.nanmax


def fill_val(metric_type:str, best_so_far):
    if math.isfinite(best_so_far):
        return best_so_far
    if metric_type == "min":
        return 99999999999999
    return -9999999999


class PipelineMixin(object):

    def __init__(
            self,
            mode,
            category,
    ):
        assert mode in ("regression", "classification"), f"{mode} not allowed as mode"
        self.mode = mode

        assert category in ("DL", "ML")
        self.category = category

        self.transformations = {
            "quantile": {'output_distribution': 'normal'},
            "minmax": {},
            "center": {},
            "scale": {},
            "zscore": {},
            "box-cox": {'treat_negatives': True, 'replace_zeros': True},
            "yeo-johnson": {},
            "robust": {},
            "log": {'treat_negatives': True, 'replace_zeros': True},
            "log2": {'treat_negatives': True, 'replace_zeros': True},
            "log10": {'treat_negatives': True, 'replace_zeros': True},
            "sqrt": {'treat_negatives': True}
        }

[docs]class OptimizePipeline(PipelineMixin): """ optimizes model/estimator, its hyperparameters and preprocessing operation to be performed on input and output features. It consists of two hpo loops. The parent or outer loop optimizes preprocessing/feature engineering, feature selection and model selection while the child hpo loop optimizes hyperparmeters of child hpo loop. Attributes ---------- - metrics_ a pandas DataFrame of shape (parent_iterations, len(monitor)) which contains values of metrics being monitored at each parent iteration. - val_scores_ a 1d numpy array of length equal to parent_iterations which contains value of evaluation metric at each parent iteration. - parent_suggestions_: an ordered dictionary of suggestions to the parent objective function during parent hpo loop - child_val_scores_: a numpy array of shape (parent_iterations, child_iterations) containing value of eval_metric at all child hpo loops - optimizer_ an instance of ai4water.hyperopt.HyperOpt [1]_ for parent optimization - models a list of models being considered for optimization - model_space a dictionary which contains parameter space for each model Example ------- >>> from autotab import OptimizePipeline >>> from ai4water.datasets import busan_beach >>> data = busan_beach() >>> input_features = data.columns.tolist()[0:-1] >>> output_features = data.columns.tolist()[-1:] >>> pl = OptimizePipeline(input_features=input_features, >>> output_features=output_features, >>> inputs_to_transform=input_features) >>> results = pl.fit(data=data) Note ---- This optimization always solves a minimization problem even if the val_metric is $R^2$. .. [1] https://ai4water.readthedocs.io/en/latest/hpo.html#hyperopt """
[docs] def __init__( self, inputs_to_transform, input_transformations: Union[list, dict] = None, outputs_to_transform=None, output_transformations: Union[list, ] = None, models: list = None, parent_iterations: int = 100, child_iterations: int = 25, parent_algorithm: str = "bayes", child_algorithm: str = "bayes", eval_metric: str = None, cv_parent_hpo: bool = None, cv_child_hpo: bool = None, monitor: Union[list, str] = None, mode: str = "regression", num_classes:int = None, category:str = "ML", prefix: str = None, **model_kwargs ): """ initializes the class Parameters ---------- inputs_to_transform : list Input features on which feature engineering/transformation is to be applied. By default all input features are considered. If you want to apply a single transformation on a group of input features, then pass this as a dictionary. This is helpful if the input data consists of hundred or thousands of input features. input_transformations : list, dict The transformations to be considered for input features. Default is None, in which case all input features are considered. If list, then it will be the names of transformations to be considered for all input features. By default following transformations are considered - ``minmax`` rescale from 0 to 1 - ``center`` center the data by subtracting mean from it - ``scale`` scale the data by dividing it with its standard deviation - ``zscore`` first performs centering and then scaling - ``box-cox`` - ``yeo-johnson`` - ``quantile`` - ``robust`` - ``log`` - ``log2`` - ``log10`` - ``sqrt`` square root The user can however, specify list of transformations to be considered for each input feature. In such a case, this argument must be a dictionary whose keys are names of input features and values are list of transformations. outputs_to_transform : list, optional Output features on which feature engineering/transformation is to be applied. If None, then transformations on outputs are not applied. output_transformations : The transformations to be considered for outputs/targets. The user can consider any transformation as given for ``input_transformations`` models : list, optional The models/algorithms to consider during optimzation. If not given, then all available models from sklearn, xgboost, catboost and lgbm are considered. For neural neworks, following 6 model types are considered by default - MLP [1]_ multi layer perceptron - CNN [2]_ 1D convolution neural network - LSTM [3]_ Long short term memory network - CNNLSTM [4]_ CNN-> LSTM - LSTMAutoEncoder [5]_ LSTM based autoencoder - TCN [6]_ Temporal convolution networks - TFT [7]_ Temporal fusion Transformer However, in such cases, the ``category`` must be ``DL``. parent_iterations : int, optional (default=100) Number of iterations for parent optimization loop child_iterations : int, optional Number of iterations for child optimization loop. It set to 0, the child hpo loop is not run which means the hyperparameters of the model are not optimized. You can customize iterations for each model by making using of :meth: `change_child_iterations` method. parent_algorithm : str, optional Algorithm for optimization of parent optimzation child_algorithm : str, optional Algorithm for optimization of child optimization eval_metric : str, optional Validation metric to calculate val_score in objective function. The parent and child hpo loop optimizes/improves this metric. This metric is calculated on valdation data. If cross validation is performed then this metric is calculated using cross validation. cv_parent_hpo : bool, optional (default=False) Whether we want to apply cross validation in parent hpo loop or not?. If given, the parent hpo loop will optimize the cross validation score. The model is fitted on whole training data (training+validation) after cross validation and the metrics printed (other than parent_val_metric) are calculated on the based the updated model i.e. the one fitted on whole training (trainning+validation) data. cv_child_hpo : bool, optional (default=False) Whether we want to apply cross validation in child hpo loop or not?. If False, then val_score will be caclulated on validation data. The type of cross validator used is taken from model.config['cross_validator'] monitor : Union[str, list], optional, (default=None) Nmaes of performance metrics to monitor in parent hpo loop. If None, then R2 is monitored for regression and accuracy for classification. mode : str, optional (default="regression") whether this is a ``regression`` problem or ``classification`` num_classes : int, optional (default=None) number of classes, only relevant if mode=="classification". category : str, optional (detault="DL") either "DL" or "ML". If DL, the pipeline is optimized for neural networks. **model_kwargs : any additional key word arguments for ai4water's Model References ---------- .. [1] https://ai4water.readthedocs.io/en/latest/models/models.html#ai4water.models.MLP .. [2] https://ai4water.readthedocs.io/en/latest/models/models.html#ai4water.models.CNN .. [3] https://ai4water.readthedocs.io/en/latest/models/models.html#ai4water.models.LSTM .. [4] https://ai4water.readthedocs.io/en/latest/models/models.html#ai4water.models.CNNLSTM .. [5] https://ai4water.readthedocs.io/en/latest/models/models.html#ai4water.models.LSTMAutoEncoder .. [6] https://ai4water.readthedocs.io/en/latest/models/models.html#ai4water.models.TCN .. [7] https://ai4water.readthedocs.io/en/latest/models/models.html#ai4water.models.TFT """ if isinstance(inputs_to_transform, dict): self._groups = inputs_to_transform self.inputs_to_transform = list(inputs_to_transform.keys()) else: self._groups = {inp:[inp] for inp in inputs_to_transform} self.inputs_to_transform = inputs_to_transform self.input_transformations = input_transformations self.output_transformations = output_transformations or DEFAULT_TRANSFORMATIONS super(OptimizePipeline, self).__init__(mode, category) self.num_classes = num_classes self.models = models if models is None: if mode == "regression": if category == "ML": self.models = list(regression_space(2).keys()) else: self.models = list(dl_space(2).keys()) else: if category == "ML": self.models = list(classification_space(2).keys()) else: self.models = list(dl_space(2).keys()) elif isinstance(models, list): assert all([isinstance(obj, str) for obj in models]) if len(set(models)) != len(models): raise ValueError(f"models contain repeating values. \n{models}") if self.category == "DL": assert all([model in DL_MODELS.keys() for model in models]), f""" Only following deel learning models can be considered {DL_MODELS.keys()} """ self.parent_iterations = parent_iterations self.child_iterations = child_iterations # for internal use, we keep child_iter for each model self._child_iters = {model: child_iterations for model in self.models} self.parent_algorithm = parent_algorithm self.child_algorithm = child_algorithm if eval_metric is None: if self.mode == "regression": eval_metric = "mse" else: eval_metric = "accuracy" self.eval_metric = eval_metric self.cv_parent_hpo = cv_parent_hpo self.cv_child_hpo = cv_child_hpo for arg in ['model', 'x_transformation', 'y_transformation']: if arg in model_kwargs: raise ValueError(f"argument {arg} not allowed") self.model_kwargs = model_kwargs self.outputs_to_transform = outputs_to_transform if outputs_to_transform is not None: if isinstance(outputs_to_transform, str): outputs_to_transform = [outputs_to_transform] self._groups.update({outp: [outp] for outp in outputs_to_transform}) # self.seed = None if monitor is None: if mode == "regression": monitor = ['r2'] else: monitor = ['accuracy'] if isinstance(monitor, str): monitor = [monitor] # evaluation_metric is monitored by default if eval_metric not in monitor: monitor.append(eval_metric) assert isinstance(monitor, list) self.monitor = monitor if self.category == "ML": if self.mode == "regression": space = regression_space(num_samples=10) else: space = classification_space(num_samples=10) else: space = dl_space(num_samples=10) # model_space contains just those models which are being considered self.model_space = {} for mod, mod_sp in space.items(): if mod in self.models: self.model_space[mod] = mod_sp self._optimize_model = True self._model = None if self.outputs_to_transform is None: self._features_to_transform = self.inputs_to_transform else: self._features_to_transform = self.inputs_to_transform + self.outputs_to_transform self.batch_space = [] self.lr_space = [] if category == "DL": self.batch_space = [Categorical([8, 16, 32, 64], name="batch_size")] self.lr_space = [Real(1e-5, 0.05, num_samples=10, name="lr")]
@property def outputs_to_transform(self): return self._out_to_transform @outputs_to_transform.setter def outputs_to_transform(self, x): if x: if isinstance(x, str): x = [x] assert isinstance(x, list) for i in x: assert i in self.output_features self._out_to_transform = x def maybe_make_path(self): _path = os.path.join(os.getcwd(), "results", self.parent_prefix_) if not os.path.exists(_path): os.makedirs(_path) return _path @property def mode(self): return self._mode @mode.setter def mode(self, x): self._mode = x @property def Metrics(self): return Metrics[self.mode] @property def input_features(self): if 'input_features' in self.model_kwargs: return self.model_kwargs['input_features'] else: raise ValueError @property def output_features(self): if 'output_features' in self.model_kwargs: _output_features = self.model_kwargs['output_features'] if isinstance(_output_features, str): _output_features = [_output_features] return _output_features else: raise ValueError @property def num_outputs(self): if self.mode == "classification": return self.num_classes else: return len(self.output_features) def _save_config(self): cpath = os.path.join(self.path, "config.json") config = self.config() with open(cpath, 'w') as fp: json.dump(jsonize(config), fp, indent=4) return
[docs] def update_model_space(self, space: dict) -> None: """updates or changes the search space of an already existing model Parameters --------- space a dictionary whose keys are names of models and values are parameter space for that model. Returns ------- None Example ------- >>> pl = OptimizePipeline(...) >>> rf_space = {'max_depth': [5,10, 15, 20], >>> 'n_models': [5,10, 15, 20]} >>> pl.update_model_space({"RandomForestRegressor": rf_space}) """ for model, space in space.items(): if model not in self.model_space: raise ValueError(f"{model} is not valid because it is not being considered.") space = to_skopt_space(space) self.model_space[model] = {'param_space': [s for s in space]} return
[docs] def add_dl_model( self, model: Callable, space:Union[list, Real, Categorical, Integer] )->None: """adds a deep learning model to be considered. Parameters ---------- model : callable the model to be added space : list the search space of the model """ if isinstance(model, types.FunctionType): model_config = model() assert isinstance(model_config, dict), f"model does not require valid model config {model_config}" assert len(model_config) == 1, f"model config has length of 1 {len(model_config)}" assert 'layers' in model_config, f"model config must have 'layers' key {model_config.keys()}" model_name = model.__name__ space = to_skopt_space(space) self.models.append(model_name) DL_MODELS[model_name] = model self.model_space[model_name] = {'param_space': space} self._child_iters[model_name] = self.child_iterations else: raise NotImplementedError
[docs] def add_model( self, model: dict ) -> None: """adds a new model which will be considered during optimization. Parameters ---------- model : dict a dictionary of length 1 whose value should also be a dictionary of parameter space for that model Example ------- >>> pl = OptimizePipeline(...) >>> pl.add_model({"XGBRegressor": {"n_estimators": [100, 200,300, 400, 500]}}) """ msg = """{} is already present. If you want to change its space, please consider using 'change_model_space' function. """ for model_name, model_space in model.items(): assert model_name not in self.model_space, msg.format(model_name) assert model_name not in self.models, msg.format(model_name) assert model_name not in self._child_iters, msg.format(model_name) model_space = to_skopt_space(model_space) self.model_space[model_name] = {'param_space': model_space} self.models.append(model_name) self._child_iters[model_name] = self.child_iterations return
[docs] def remove_model(self, models: Union[str, list]) -> None: """ removes an model/models from being considered. The follwoing attributes are updated. - models - model_space - _child_iters Parameters ---------- models : list, str name or names of model to be removed. Example ------- >>> pl = OptimizePipeline(...) >>> pl.remove_model("ExtraTreeRegressor") """ if isinstance(models, str): models = [models] for model in models: self.models.remove(model) self.model_space.pop(model) self._child_iters.pop(model) return
[docs] def change_child_iteration(self, model: dict): """ We may want to change the child hpo iterations for one or more models. For example we may want to run only 10 iterations for LinearRegression but 40 iterations for XGBRegressor. In such a case we can use this function to modify child hpo iterations for one or more models. The iterations for all the remaining models will remain same as defined by the user at the start. This method updated `_child_iters` dictionary Parameters ---------- model : dict a dictionary whose keys are names of models and values are number of iterations for that model during child hpo Example ------- >>> pl = OptimizePipeline(...) >>> pl.change_child_iteration({"XGBRegressor": 10}) If we want to change iterations for more than one models >>> pl.change_child_iteration(({"XGBRegressor": 30, >>> "RandomForestRegressor": 20})) """ for _model, _iter in model.items(): if _model not in self._child_iters: raise ValueError(f"{_model} is not a valid model name") self._child_iters[_model] = _iter return
def space(self) -> list: """makes the parameter space for parent hpo""" append = {} y_categories = [] if self.input_transformations is None: x_categories = DEFAULT_TRANSFORMATIONS elif isinstance(self.input_transformations, list): x_categories = self.input_transformations else: x_categories = DEFAULT_TRANSFORMATIONS assert isinstance(self.input_transformations, dict) for feature, transformation in self.input_transformations.items(): assert isinstance(transformation, list) append[feature] = transformation if self.outputs_to_transform: # if the user has provided name of any outupt feature # on feature transformation is to be applied if isinstance(self.output_transformations, list): assert all([t in DEFAULT_TRANSFORMATIONS for t in self.output_transformations]), f""" transformations must be one of {DEFAULT_TRANSFORMATIONS}""" for out in self.output_features: append[out] = self.output_transformations y_categories = self.output_transformations else: assert isinstance(self.output_transformations, dict) for out_feature, y_transformations in self.output_transformations.items(): assert out_feature in self.output_features assert isinstance(y_transformations, list) assert all( [t in DEFAULT_TRANSFORMATIONS for t in self.output_transformations]), f""" transformations must be one of {DEFAULT_TRANSFORMATIONS}""" append[out_feature] = y_transformations y_categories = list(self.output_transformations.values()) sp = make_space(self.inputs_to_transform + (self.outputs_to_transform or []), categories=list(set(x_categories + y_categories)), append=append) if len(self.models)>1: algos = Categorical(self.models, name="model") sp = sp + [algos] else: self._optimize_model = False self._model = self.models[0] return sp def change_batch_size_space(self, space:list, low=None, high=None): """changes the value of class attribute ``batch_space``. It should be used after pipeline initialization and before calling ``fit`` method. """ assert self.category == "DL" if isinstance(space, list): self.batch_space = [Categorical(space, name="lr")] else: self.batch_space = [Integer(low, high, name="lr", num_samples=10)] return def change_lr_space(self, space:list, low=None, high=None): """changes the value of class attribute ``lr_space``. It should be used after pipeline initialization and before calling ``fit`` method. """ assert self.category == "DL" if isinstance(space, list): self.lr_space = [Categorical(space, name="lr")] else: self.lr_space = [Real(low, high, name="lr", num_samples=10)] return @property def max_child_iters(self) -> int: # the number of child hpo iterations can be different based upon models # this property calculates maximum child iterations return max(self._child_iters.values()) def reset(self): # called at the start of fit method # a new path is created every time we call .fit self.parent_prefix_ = f"pipeline_opt_{dateandtime_now()}" self.path = self.maybe_make_path() self.metrics_ = pd.DataFrame( np.full((self.parent_iterations, len(self.monitor)), np.nan), columns=self.monitor ) self.parent_iter_ = 0 self.child_iter_ = 0 self.val_scores_ = np.full(self.parent_iterations, np.nan) metrics_best = np.full((self.parent_iterations, len(self.monitor)), np.nan) self.metrics_best_ = pd.DataFrame(metrics_best, columns=self.monitor) self.parent_seeds_ = np.random.randint(0, 10000, self.parent_iterations) self.child_seeds_ = np.random.randint(0, 10000, self.max_child_iters) # each row indicates parent iteration, column indicates child iteration self.child_val_scores_ = np.full((self.parent_iterations, self.max_child_iters), np.nan) self.start_time_ = time.asctime() self.parent_suggestions_ = OrderedDict() # create container to store data for Taylor plot # It will be populated during postprocessing self.taylor_plot_data_ = { 'simulations': {"test": {}}, 'observations': {"test": None} } self.baseline_results_ = None self._save_config() # will also make path if it does not already exists self._print_header() return def _print_header(self): # prints the first line on console formatter = "{:<5} {:<18} " + "{:<15} " * (len(self.monitor)) print(formatter.format( "Iter", self.eval_metric, *self.monitor) ) return
[docs] def fit( self, x:np.ndarray = None, y:np.ndarray = None, data: pd.DataFrame = None, validation_data:Tuple[np.ndarray, np.ndarray] = None, previous_results:dict = None, process_results:bool = True, ) -> "ai4water.hyperopt.HyperOpt": """ Optimizes the pipeline for the given data. Parameters ---------- x : np.ndarray input training data y : np.ndarray output/target/label data. It must of same length as ``x``. data : A pandas dataframe which contains input (x) and output (y) features Only required if ``x`` and ``y`` are not given. The training and validation data will be extracted from this data. validation_data : validation data on which pipeline is optimized. Only required if ``data`` is not given. previous_results : dict, optional path of file which contains xy values. process_results : bool Returns -------- an instance of ai4water.hyperopt.HyperOpt class which is used for optimization. """ self.data_, self.val_data_ = verify_data(x, y, data, validation_data) self.reset() parent_opt = HyperOpt( self.parent_algorithm, param_space=self.space(), objective_fn=self.parent_objective, num_iterations=self.parent_iterations, opt_path=self.path, verbosity = 0, process_results=process_results, ) if previous_results is not None: parent_opt.add_previous_results(previous_results) res = parent_opt.fit() setattr(self, 'optimizer_', parent_opt) self.save_results() self.report() self._save_config() return res
def parent_objective( self, **suggestions ) -> float: """ objective function for parent hpo loop. This objective fuction is to optimize transformations for each input feature and the model. Parameters ---------- **suggestions : key word arguments consisting of suggested transformation for each input feature and the model to use """ self.CHILD_PREFIX = f"{self.parent_iter_}_{dateandtime_now()}" # self.seed = np.random.randint(0, 10000, 1).item() if self._optimize_model: model = suggestions['model'] else: model = self._model x_trnas, y_trans = self._cook_transformations(suggestions) if self._child_iters[model]>0: # optimize the hyperparas of model using child objective opt_paras = self.optimize_model_paras( model, x_transformations=x_trnas, y_transformations=y_trans or None ) else: opt_paras = {} kwargs = {} if self.category == "DL": for arg in ['lr', 'batch_size']: if arg in opt_paras: kwargs[arg] = opt_paras.pop(arg) model_config = DL_MODELS[model](mode=self.mode, output_features=self.num_outputs, **opt_paras) else: model_config = {model: opt_paras} # fit the model with optimized hyperparameters and suggested transformations _model = self._build_model( model=model_config, val_metric=self.eval_metric, x_transformation=x_trnas, y_transformation=y_trans, prefix=f"{self.parent_prefix_}{SEP}{self.CHILD_PREFIX}", **kwargs ) # set the global seed. This is only for internal use so that results become more reproducible # when the model is built again _model.seed_everything(int(self.parent_seeds_[self.parent_iter_])) self.parent_suggestions_[self.parent_iter_] = { # 'seed': self.seed, 'x_transformation': x_trnas, 'y_transformation': y_trans, 'model': {model: opt_paras}, 'path': _model.path } val_score = self._fit_and_eval( model=_model, cross_validate=self.cv_parent_hpo, eval_metrics=True, ) self.val_scores_[self.parent_iter_] = val_score # -1 because array indexing starts from 0 _val_score = val_score if np.less_equal(val_score, np.nanmin(self.val_scores_[:self.parent_iter_+1 ])) else '' # print the merics being monitored # we fill the nan in metrics_best_ with '' so that it does not gen printed formatter = "{:<5} {:<18.3} " + "{:<15.7} " * (len(self.monitor)) print(formatter.format( self.parent_iter_, _val_score, *self.metrics_best_.loc[self.parent_iter_].fillna('').values.tolist()) ) self.parent_iter_ += 1 return val_score def optimize_model_paras( self, model: str, x_transformations: list, y_transformations: list ) -> dict: """optimizes hyperparameters of a model""" def child_objective(lr=0.001, batch_size=32, **suggestions): """objective function for optimization of model parameters""" if self.category == "DL": model_config = DL_MODELS[model](mode=self.mode, output_features=self.num_outputs, **suggestions) else: model_config = {model: suggestions} # build child model _model = self._build_model( model=model_config, val_metric=self.eval_metric, x_transformation=x_transformations, y_transformation=y_transformations, prefix=f"{self.parent_prefix_}{SEP}{self.CHILD_PREFIX}", lr=float(lr), batch_size=int(batch_size) ) _model.seed_everything(int(self.child_seeds_[self.child_iter_])) val_score = self._fit_and_eval( model=_model, cross_validate=self.cv_child_hpo) # populate all child val scores self.child_val_scores_[self.parent_iter_-1, self.child_iter_] = val_score self.child_iter_ += 1 return val_score # make space child_space = self.model_space[model]['param_space'] + self.batch_space + self.lr_space self.child_iter_ = 0 # before starting child hpo, reset iteration counter optimizer = HyperOpt( self.child_algorithm, objective_fn=child_objective, num_iterations=self._child_iters[model], param_space=child_space, verbosity=0, process_results=False, opt_path=os.path.join(self.path, self.CHILD_PREFIX), ) optimizer.fit() # free memory if possible gc.collect() # return the optimized parameters return optimizer.best_paras() def _cook_transformations(self, suggestions): """prepares the transformation keyword argument based upon suggestions""" # container for transformations for all features x_transformations = [] y_transformations = [] for feature, method in suggestions.items(): if feature in self._features_to_transform: if method != "none": # don't do anything with this feature # get the relevant transformation for this feature t_config = {"method": method, "features": self._groups[feature]} # some preprocessing is required for log based transformations t_config.update(self.transformations[method]) if feature in self.inputs_to_transform: x_transformations.append(t_config) else: y_transformations.append(t_config) return x_transformations, y_transformations
[docs] def _build_model( self, model: dict, val_metric: str, x_transformation, y_transformation, prefix: Union[str, None], verbosity:int = 0, batch_size:int = 32, lr:float = 0.001, ) -> Model: """ build the ai4water Model. When overwriting this method, the user must return an instance of ai4water's Model_ class. batch_size : only used when category is "DL". lr : only used when category is "DL" .. Model: https://ai4water.readthedocs.io/en/master/model.html#ai4water._main.BaseModel """ model = Model( model=model, verbosity=verbosity, val_metric=val_metric, x_transformation=x_transformation, y_transformation=y_transformation, # seed=self.seed, prefix=prefix, batch_size=int(batch_size), lr=float(lr), **self.model_kwargs ) return model
def _fit_and_eval( self, model, cross_validate:bool = False, eval_metrics:bool = False, ) -> float: """fits the model and evaluates it and returns the score""" if cross_validate: # val_score will be obtained by performing cross validation if self.val_data_: # keyword data val_scores = model.cross_val_score( validation_data=self.val_data_, scoring=[self.eval_metric] + self.monitor, refit=False, **self.data_) else: # keyword x,y val_scores = model.cross_val_score( scoring=[self.eval_metric] + self.monitor, refit=False, **self.data_) val_score = val_scores.pop(0) for k, pm_val in zip(self.monitor, val_scores): self.metrics_.at[self.parent_iter_, k] = pm_val func = compare_func1(METRIC_TYPES[k]) best_so_far = func(self.metrics_best_.loc[:self.parent_iter_, k]) best_so_far = fill_val(METRIC_TYPES[k], best_so_far) func = compare_func(METRIC_TYPES[k]) if func(pm_val, best_so_far): self.metrics_best_.at[self.parent_iter_, k] = pm_val else: # train the model and evaluate it to calculate val_score model.fit(**self.data_) val_score = self._eval_model_manually( model, #data, self.eval_metric, eval_metrics=eval_metrics ) return val_score
[docs] def get_best_metric( self, metric_name: str ) -> float: """ returns the best value of a particular performance metric. The metric must be recorded i.e. must be given as `monitor` argument. Parameters ---------- metric_name : str Name of performance metric Returns ------- float the best value of performance metric acheived """ if metric_name not in self.monitor: raise MetricNotMonitored(metric_name, self.monitor) if METRIC_TYPES[metric_name] == "min": return np.nanmin(self.metrics_[metric_name]).item() else: return np.nanmax(self.metrics_[metric_name]).item()
[docs] def get_best_metric_iteration( self, metric_name: str = None ) -> int: """returns iteration of the best value of a particular performance metric. Parameters ---------- metric_name : str, optional The metric must be recorded i.e. must be given as `monitor` argument. If not given, then evaluation metric is used. """ metric_name = metric_name or self.eval_metric if metric_name not in self.monitor: raise MetricNotMonitored(metric_name, self.monitor) if METRIC_TYPES[metric_name] == "min": idx = np.nanargmin(self.metrics_[metric_name].values) else: idx = np.nanargmax(self.metrics_[metric_name].values) return int(idx)
[docs] def get_best_pipeline_by_metric( self, metric_name: str = None ) -> dict: """returns the best pipeline with respect to a particular performance metric. Parameters --------- metric_name : str, optional The name of metric whose best value is to be retrieved. The metric must be recorded i.e. must be given as `monitor`. Returns ------- dict a dictionary with follwoing keys - ``path`` path where the model is saved on disk - ``model`` name of model - ``x_transfromations`` transformations for the input data - ``y_transformations`` transformations for the target data - ``iter_num`` iteration number on which this pipeline was achieved """ metric_name = metric_name or self.eval_metric iter_num = self.get_best_metric_iteration(metric_name) pipeline = self.parent_suggestions_[iter_num] pipeline['iter_num'] = iter_num return pipeline
[docs] def get_best_pipeline_by_model( self, model_name: str, metric_name: str = None ) -> tuple: """returns the best pipeline with respect to a particular model and performance metric. The metric must be recorded i.e. must be given as `monitor` argument. Parameters ---------- model_name : str The name of model for which best pipeline is to be found. The `best` is defined by ``metric_name``. metric_name : str, optional The name of metric with respect to which the best model is to be retrieved. If not given, the best model is defined by the evaluation metric. Returns ------- tuple a tuple of length two - first value is a float which represents the value of metric - second value is a dictionary of pipeline with four keys ``x_transformation`` ``y_transformation`` ``model`` ``path`` ``iter_num`` """ metric_name = metric_name or self.eval_metric # checks if the given metric is a valid metric or not if metric_name not in self.monitor: raise MetricNotMonitored(metric_name, self.monitor) # initialize an empty dictionary to store model parameters model_container = {} for iter_num, iter_suggestions in self.parent_suggestions_.items(): # iter_suggestion is a dictionary and it contains four keys model = iter_suggestions['model'] # model is dictionary, whose key is the model_name and values # are model configuration if model_name in model: # find out the metric value at iter_num metric_val = self.metrics_.loc[int(iter_num), metric_name] metric_val = round(metric_val, 4) iter_suggestions['iter_num'] = iter_num model_container[metric_val] = iter_suggestions if len(model_container) == 0: raise ModelNotUsedError(model_name) # sorting the container w.r.t given metric_name sorted_container = sorted(model_container.items()) return sorted_container[-1]
[docs] def baseline_results( self, x = None, y = None, data = None, test_data = None, fit_on_all_train_data:bool = True ) -> tuple: """ Returns default performance of all models. It runs all the models with their default parameters and without any x and y transformation. These results can be considered as baseline results and can be compared with optimized model's results. The model is trained on 'training'+'validation' data. Parameters ---------- x : the input data for training y : the target data for training data : raw unprepared and unprocessed data from which x,y pairs for both training and test will be prepared. It is only required if x, y are not provided. test_data : a tuple/list of length 2 whose first element is x and second value is y. The is the data on which the peformance of optimized pipeline will be calculated. This should only be given if ``data`` argument is not given. fit_on_all_train_data : bool, optional (default=True) If true, the model is trained on (training+validation) data. This is based on supposition that the data is splitted into training, validation and test sets. The optimization of pipeline was performed on validation data. But now, we are training the model on all available training data which is (training + validation) data. If False, then model is trained only on training data. Returns ------- tuple a tuple of two dictionaries. - a dictionary of val_scores on test data for each model - a dictionary of metrics being monitored for each model on test data. """ train_data, test_data = verify_data(x, y, data, test_data, "test") if self.baseline_results_ is None: val_scores = {} metrics = {} for model_name in self.models: model_config = model_name if self.category == "DL": model_config = DL_MODELS[model_name](mode=self.mode, output_features=self.num_outputs) # build model model = self._build_model( model=model_config, val_metric=self.eval_metric, prefix=f"{self.parent_prefix_}{SEP}baselines", x_transformation=None, y_transformation=None ) if fit_on_all_train_data and 'data' in train_data: model.fit_on_all_training_data(**train_data) else: # when data is given as x,y, we don't have access to validation data # it is hoped that validation data is already in x if data was split into 3 sets model.fit(**train_data) if test_data: t, p = model.predict(return_true=True, **test_data) else: t, p = model.predict(return_true=True) errors = self.Metrics(t, p, multiclass=model.is_multiclass) val_scores[model_name] = getattr(errors, self.eval_metric)(**METRICS_KWARGS.get(self.eval_metric, {})) _metrics = {} for m in self.monitor: _metrics[m] = getattr(errors, m)(**METRICS_KWARGS.get(m, {})) metrics[model_name] = _metrics results = { 'val_scores': val_scores, 'metrics': metrics } setattr(self, 'baseline_results_', results) with open(os.path.join(self.path, "baselines", "results.json"), 'w') as fp: json.dump(results, fp, sort_keys=True, indent=4) else: val_scores, metrics = self.baseline_results_.values() return val_scores, metrics
[docs] def dumbbell_plot( self, x = None, y = None, data = None, test_data = None, metric_name: str = None, fit_on_all_train_data:bool = True, figsize: tuple = None, show: bool = True, save: bool = True ) -> plt.Axes: """ Generate Dumbbell_ plot as comparison of baseline models with optimized models. Not that this command will train all the considered models, so this can be expensive. Parameters ---------- x : the input data for training y : the target data for training data : raw unprepared and unprocessed data from which x,y pairs for both training and test will be prepared. It is only required if x, y are not provided. test_data : a tuple/list of length 2 whose first element is x and second value is y. The is the data on which the peformance of optimized pipeline will be calculated. This should only be given if ``data`` argument is not given. metric_name: str The name of metric with respect to which the models have to be compared. If not given, the evaluation metric is used. fit_on_all_train_data : bool, optional (default=True) If true, the model is trained on (training+validation) data. This is based on supposition that the data is splitted into training, validation and test sets. The optimization of pipeline was performed on validation data. But now, we are training the model on all available training data which is (training + validation) data. If False, then model is trained only on training data. figsize: tuple If given, plot will be generated of this size. show : bool whether to show the plot or not save By default True. If False, function will not save the resultant plot in current working directory. Returns ------- matplotlib Axes .. _Dumbbell: https://easy-mpl.readthedocs.io/en/latest/plots.html#easy_mpl.dumbbell_plot """ metric_name = metric_name or self.eval_metric _, bl_results = self.baseline_results(x=x, y=y, data=data, fit_on_all_train_data=fit_on_all_train_data, test_data=test_data) plt.close('all') bl_models = {} for k, v in bl_results.items(): bl_models[k] = v[metric_name] optimized_models = {} for model_name in self.models: try: metric_val, _ = self.get_best_pipeline_by_model(model_name, metric_name) # the model was not used so consider the baseline result as optimzied # result except ModelNotUsedError: metric_val = bl_models[model_name] optimized_models[model_name] = metric_val combined = defaultdict(list) for d in (bl_models, optimized_models): for key, value in d.items(): combined[key].append(value) df = pd.DataFrame.from_dict(combined).transpose() df = df.reset_index() df.columns = ['models', 'baseline', 'optimized'] labels = _shred_suffix(df['models'].tolist()) df.to_csv(os.path.join(self.path, f"dumbell_{metric_name}_data.csv")) baseline = np.where(df['baseline']<-1.0, -1.0, df['baseline']) fig, ax = plt.subplots(figsize=figsize) ax = dumbbell_plot(baseline, df['optimized'], labels=labels, show=False, xlabel=metric_name, ylabel="Models", ax=ax ) fpath = os.path.join(self.path, f"dumbell_{metric_name}") if save: plt.savefig(fpath, dpi=300, bbox_inches='tight') if show: plt.tight_layout() plt.show() return ax
[docs] def taylor_plot( self, x = None, y = None, data = None, test_data = None, fit_on_all_train_data: bool = True, plot_bias: bool = True, figsize: tuple = None, show: bool = True, save: bool = True, verbosity:int = 0, **kwargs ) -> plt.Figure: """ makes `Taylor`_'s plot using the best version of each model. The number of models in taylor plot will be equal to the number of models which have been considered by the model. Parameters ---------- x : the input data for training y : the target data for training data : raw unprepared and unprocessed data from which x,y pairs for both training and test will be prepared. It is only required if x, y are not provided. test_data : a tuple/list of length 2 whose first element is x and second value is y. The is the data on which the peformance of optimized pipeline will be calculated. This should only be given if ``data`` argument is not given. fit_on_all_train_data : bool, optional (default=True) If true, the model is trained on (training+validation) data. This is based on supposition that the data is splitted into training, validation and test sets. The optimization of pipeline was performed on validation data. But now, we are training the model on all available training data which is (training + validation) data. If False, then model is trained only on training data. plot_bias : bool, optional whether to plot the bias or not figsize : tuple, optional a tuple determining figure size show : bool, optional whether to show the plot or not save : bool, optional whether to save the plot or not verbosity : int, optional (default=0) determines the amount of print information **kwargs : any additional keyword arguments for taylor_plot function of `easy_mpl`_. Returns ------- matplotlib.pyplot.Figure .. _easy_mpl: https://github.com/Sara-Iftikhar/easy_mpl#taylor_plot .. _Taylor: https://doi.org/10.1029/2000JD900719 """ if self.taylor_plot_data_['observations']['test'] is None: self.bfe_all_best_models(x=x, y=y, data=data, test_data=test_data, fit_on_all_train_data=fit_on_all_train_data, verbosity=verbosity) ax = taylor_plot( show=False, save=False, plot_bias=plot_bias, cont_kws={}, grid_kws={}, figsize=figsize, **self.taylor_plot_data_, # simulations and trues as keyword arguments **kwargs ) ax.legend(loc=(1.01, 0.01)) fname = os.path.join(self.path, "taylor_plot") if save: plt.savefig(fname, dpi=300, bbox_inches="tight") if show: plt.show() # save taylor plot data as csv file, first make a dataframe sim = self.taylor_plot_data_['simulations']['test'] data = np.column_stack([v.reshape(-1, ) for v in sim.values()]) df = pd.DataFrame(data, columns=list(sim.keys())) df['observations'] = self.taylor_plot_data_['observations']['test'] df.to_csv(os.path.join(self.path, "taylor_data.csv"), index=False) return ax
[docs] def save_results(self)->None: """ saves the results. It is called automatically at the end of optimization. It saves tried models and transformations at each step as json file with the name ``parent_suggestions.json``. An ``errors.csv`` file is saved which contains validation peformance of the models at each optimization iteration with respect to all metrics being monitored. The performance of each model during child optimization iteration is saved as a csv file with the name ``child_val_scores.csv``. The global seeds for parent and child iterations are also saved in csv files with name ``parent_seeds.csv`` and ``child_seeds.csv``. All of these results are saved in pl.path folder. Returns ------- None """ self.end_time_ = time.asctime() # save parent_suggestions parent_suggestions = jsonize(self.parent_suggestions_) with open(os.path.join(self.path, "parent_suggestions.json"), "w") as fp: json.dump(parent_suggestions, fp, sort_keys=True) # make a 2d array of all erros being monitored. errors = pd.concat([self.metrics_, pd.DataFrame(self.val_scores_, columns=['val_scores'])], axis=1) # save the errors being monitored fpath = os.path.join(self.path, "errors.csv") errors.to_csv(fpath, index_label="iterations") # save results of child iterations as csv file fpath = os.path.join(self.path, "child_val_scores.csv") pd.DataFrame( self.child_val_scores_, columns=[f'child_iter_{i}' for i in range(self.max_child_iters)]).to_csv(fpath) fpath = os.path.join(self.path, 'child_seeds.csv') pd.DataFrame(self.child_seeds_, columns=['child_seeds']).to_csv(fpath, index=False) fpath = os.path.join(self.path, 'parent_seeds.csv') pd.DataFrame(self.parent_seeds_, columns=['parent_seeds']).to_csv(fpath, index=False) return
def metric_report(self, metric_name: str) -> str: """report with respect to one performance metric""" metric_val_ = self.get_best_metric(metric_name) best_model_name = list(self.get_best_pipeline_by_metric(metric_name)['model'].keys())[0] rep = f""" With respect to {metric_name}, the best model was {best_model_name} which had '{metric_name}' value of {round(metric_val_, 4)}. This model was obtained at {self.get_best_metric_iteration(metric_name)} iteration and is saved at {self.get_best_pipeline_by_metric(metric_name)['path']} """ return rep
[docs] def report( self, write: bool = True ) -> str: """makes the reprot and writes it in text form""" st_time = self.start_time_ en_time = getattr(self, "end_time_", time.asctime()) num_models = len(self.models) text = f""" The optization started at {st_time} and ended at {en_time} after completing {self.parent_iter_} iterations. The optimization considered {num_models} models. """ if self.parent_iter_ < self.parent_iterations: text += f""" The given parent iterations were {self.parent_iterations} but optimization stopped early""" for metric in self.monitor: text += self.metric_report(metric) if write: rep_fpath = os.path.join(self.path, "report.txt") with open(rep_fpath, "w") as fp: fp.write(text) return text
def _runtime_attrs(self) -> dict: """These attributes are only set during call to fit""" config = {} for attr in ['start_time_', 'end_time_', 'child_iter_', 'parent_iter_']: config[attr] = getattr(self, attr, None) data_config = {} if hasattr(self, 'data_'): data_config['type'] = self.data_.__class__.__name__ if isinstance(self.data_, pd.DataFrame): data_config['shape'] = self.data_.shape data_config['columns'] = self.data_.columns config['data'] = data_config return config def _init_paras(self) -> dict: """Returns the initializing parameters of this class""" signature = inspect.signature(self.__init__) init_paras = {} for para in signature.parameters.values(): if para.name not in ["prefix"]: init_paras[para.name] = getattr(self, para.name) return init_paras @staticmethod def _version_info() -> dict: """returns version of the third party libraries used""" import ai4water import SeqMetrics import matplotlib import sklearn import easy_mpl from . import __version__ versions = dict() versions['ai4water'] = ai4water.__version__ versions['SeqMetrics'] = SeqMetrics.__version__ versions['easy_mpl'] = easy_mpl.__version__ versions['numpy'] = np.__version__ versions['pandas'] = pd.__version__ versions['matplotlib'] = matplotlib.__version__ versions['sklearn'] = sklearn.__version__ versions['python'] = sys.version versions['autotab'] = __version__ try: import xgboost versions['xgboost'] = xgboost.__version__ except (ModuleNotFoundError, ImportError): versions['xgboost'] = None try: import catboost versions['catboost'] = catboost.__version__ except (ModuleNotFoundError, ImportError): versions['catboost'] = None try: import lightgbm versions['lightgbm'] = lightgbm.__version__ except (ModuleNotFoundError, ImportError): versions['lightgbm'] = None try: import tensorflow versions['tensorflow'] = tensorflow.__version__ except (ModuleNotFoundError, ImportError): versions['tensorflow'] = None return versions
[docs] def config(self) -> dict: """ Returns a dictionary which contains all the information about the class and from which the class can be created. Returns ------- dict a dictionary with two keys ``init_paras`` and ``runtime_paras`` and ``version_info``. """ _config = { 'init_paras': self._init_paras(), 'version_info': self._version_info(), 'runtime_attrs': self._runtime_attrs() } return _config
[docs] @classmethod def from_config_file(cls, config_file: str) -> "OptimizePipeline": """Builds the class from config file. Parameters ---------- config_file : str complete path of config file which has .json extension Returns ------- an instance of OptimizePipeline class """ if not os.path.isfile(config_file): raise ValueError(f""" config_file must be complete path of config file but it is {config_file} of type {type(config_file)} """) with open(config_file, 'r') as fp: config = json.load(fp) model_kwargs = config['init_paras'].pop('model_kwargs') cls.start_time_ = config['runtime_attrs'] path = os.path.dirname(config_file) fpath = os.path.join(path, "parent_suggestions.json") if os.path.exists(fpath): with open(fpath, "r") as fp: parent_suggestions = json.load(fp) cls.parent_suggestions_ = {int(k):v for k,v in parent_suggestions.items()} cls.parent_iter_ = len(parent_suggestions) fpath = os.path.join(path, "errors.csv") if os.path.exists(fpath): errors = pd.read_csv(fpath, index_col="iterations") # don't put val_scores in metrics_ cls.val_scores_ = errors.pop('val_scores').values cls.metrics_ = errors cls.taylor_plot_data_ = { 'simulations': {"test": {}}, 'observations': {"test": None} } fpath = os.path.join(path, "taylor_data.csv") if os.path.exists(fpath): taylor_data = pd.read_csv(fpath) cls.taylor_plot_data_['observations']['test'] = taylor_data.pop('observations') cls.parent_prefix_ = os.path.basename(path) cls.path = path fpath = os.path.join(path, 'parent_seeds.csv') if os.path.exists(fpath): cls.parent_seeds_ = pd.read_csv(fpath).values fpath = os.path.join(path, "baselines", "results.json") cls.baseline_results_ = None if os.path.exists(fpath): with open(fpath, 'r') as fp: cls.baseline_results_ = json.load(fp) return cls(**config['init_paras'], **model_kwargs)
[docs] @classmethod def from_config(cls, config: dict) -> "OptimizePipeline": """Builds the class from config dictionary Parameters ---------- config : dict a dictionary which contains `init_paras` key. Returns ------- an instance of OptimizePipeline class """ return cls(**config['init_paras'])
[docs] def be_best_model_from_config( self, x=None, y=None, data=None, test_data=None, metric_name: str = None, model_name: str = None, verbosity = 1 )->Model: """Build and Evaluate the best model with respect to metric *from config*. Parameters ---------- x : the input data for training y : the target data for training data : raw unprepared and unprocessed data from which x,y pairs for both training and test will be prepared. It is only required if x, y are not provided. test_data : a tuple/list of length 2 whose first element is x and second value is y. The is the data on which the peformance of optimized pipeline will be calculated. This should only be given if ``data`` argument is not given. metric_name : str the metric with respect to which the best model is fetched and then built/evaluated. If not given, the best model is built/evaluated with respect to evaluation metric. model_name : str, optional If given, the best version of this model will be fetched and built. The 'best' will be decided based upon `metric_name` verbosity : int, optinoal (default=1) determines the amount of print information Returns ------- an instance of trained ai4water Model """ train_data, test_data = verify_data(x,y,data, test_data, "test") metric_name = metric_name or self.eval_metric if model_name: _, pipeline = self.get_best_pipeline_by_model(model_name, metric_name) else: pipeline = self.get_best_pipeline_by_metric(metric_name=metric_name) cpath = os.path.join(pipeline['path'], "config.json") if verbosity: print(f"building using config file from {cpath}") model = Model.from_config_file(cpath) model.config['verbosity'] = verbosity model.verbosity = verbosity wpath = os.path.join(pipeline['path'], "weights", list(pipeline['model'].keys())[0]) model.update_weights(wpath) self._populate_results(model, train_data=train_data, test_data=test_data) return model
[docs] def bfe_model_from_scratch( self, iter_num: int, x = None, y = None, data = None, test_data=None, fit_on_all_train_data: bool = True, )->Model: """ Builds, trains and evalutes the model from a specific iteration. The model is trained on 'training'+'validation' data. Parameters ---------- iter_num : int iteration number from which to choose the model x : the input data for training y : the target data for training data : raw unprepared and unprocessed data from which x,y pairs for both training and test will be prepared. It is only required if x, y are not provided. test_data : a tuple/list of length 2 whose first element is x and second value is y. The is the data on which the peformance of optimized pipeline will be calculated. This should only be given if ``data`` argument is not given. fit_on_all_train_data : bool, optional (default=True) If true, the model is trained on (training+validation) data. This is based on supposition that the data is splitted into training, validation and test sets. The optimization of pipeline was performed on validation data. But now, we are training the model on all available training data which is (training + validation) data. If False, then model is trained only on training data. Returns ------- an instance of trained ai4water Model """ train_data, test_data = verify_data(x, y, data, test_data, "test") pipeline = self.parent_suggestions_[iter_num] prefix = f"{self.path}{SEP}results_from_scratch{SEP}iteration_{iter_num}" model = self._build_and_eval_from_scratch( model=pipeline['model'], train_data=train_data, x_transformation=pipeline['x_transformation'], y_transformation=pipeline['y_transformation'], prefix=prefix, fit_on_all_train_data=fit_on_all_train_data, seed=self.parent_seeds_[int(pipeline['iter_num'])-1], test_data=test_data, ) return model
[docs] def bfe_best_model_from_scratch( self, x = None, y = None, data = None, test_data=None, metric_name: str = None, model_name: str = None, fit_on_all_train_data: bool = True, verbosity:int = 1 )->Model: """ Builds, Trains and Evaluates the **best model** with respect to metric from scratch. The model is trained on 'training'+'validation' data. Running this mothod will also populate ``taylor_plot_data_`` dictionary. Parameters ---------- x : the input data for training y : the target data for training data : raw unprepared and unprocessed data from which x,y pairs for both training and test will be prepared. It is only required if x, y are not provided. test_data : a tuple/list of length 2 whose first element is x and second value is y. The is the data on which the peformance of optimized pipeline will be calculated. This should only be given if ``data`` argument is not given. metric_name : str the metric with respect to which the best model is searched and then built/trained/evaluated. If None, the best model is chosen based on the evaluation metric. model_name : str, optional If given, the best version of this model will be found and built. The 'best' will be decided based upon `metric_name` fit_on_all_train_data : bool, optional (default=True) If true, the model is trained on (training+validation) data. This is based on supposition that the data is splitted into training, validation and test sets. The optimization of pipeline was performed on validation data. But now, we are training the model on all available training data which is (training + validation) data. If False, then model is trained only on training data. verbosity : int, optional (default=1) determines amount of information to be printed. Returns ------- an instance of trained ai4water Model """ train_data, test_data = verify_data(x, y, data, test_data, "test") metric_name = metric_name or self.eval_metric if model_name: met_val, pipeline = self.get_best_pipeline_by_model( model_name, metric_name) else: met_val = self.get_best_metric(metric_name) pipeline = self.get_best_pipeline_by_metric(metric_name=metric_name) met_val = round(met_val, 3) model_name = model_name or '' suffix = f"{SEP}{metric_name}_{met_val}_{model_name}" prefix = f"{self.path}{SEP}results_from_scratch{suffix}" model_config = pipeline['model'] if self.category == "DL": model_name = list(model_config.keys())[0] kwargs = list(model_config.values())[0] model_config = DL_MODELS[model_name](mode=self.mode, output_features=self.num_outputs, **kwargs) model = self._build_and_eval_from_scratch( model=model_config, train_data=train_data, x_transformation=pipeline['x_transformation'], y_transformation=pipeline['y_transformation'], prefix=prefix, fit_on_all_train_data=fit_on_all_train_data, verbosity=verbosity, seed=self.parent_seeds_[int(pipeline['iter_num'])-1], test_data=test_data ) return model
def _build_and_eval_from_scratch( self, model, train_data, x_transformation, y_transformation, prefix:str, model_name=None, verbosity:int = 1, fit_on_all_train_data:bool = True, seed:int = None, test_data=None, ) -> "Model": """builds and evaluates the model from scratch. If model_name is given, model's predictions are saved in 'taylor_plot_data_' dictionary """ model = self._build_model( model=model, x_transformation=x_transformation, y_transformation=y_transformation, prefix=prefix, val_metric=self.eval_metric, verbosity=verbosity ) if seed: model.seed_everything(int(seed)) if 'data' in train_data: if fit_on_all_train_data: model.fit_on_all_training_data(**train_data) model.dh_.to_disk(model.path) else: model.fit(**train_data) # todo, save x,y in disk self._populate_results(model, train_data, test_data, model_name=model_name) return model def _populate_results( self, model, train_data, test_data, model_name=None ) -> None: """evaluates/makes predictions from model on traiing/validation/test data. if model_name is given, model's predictions are saved in 'taylor_plot_data_' dictionary """ if 'data' in train_data: model.predict_on_training_data(**train_data, metrics="all") else: model.predict(**train_data, metrics="all") if test_data: t, p = model.predict(**test_data, return_true=True, metrics="all") else: # if data is split into 2 sets, we don't have test set. if model.config['train_fraction']<1.0: model.predict_on_validation_data(**train_data, metrics="all") t, p = model.predict_on_test_data(**train_data, metrics="all", return_true=True) else: t, p = model.predict_on_validation_data(**train_data, metrics="all", return_true=True) if model_name: self.taylor_plot_data_['observations']['test'] = t self.taylor_plot_data_['simulations']['test'][model_name] = p return def evaluate_model( self, model: Model, x = None, y = None, data=None, metric_name: str = None, )->float: """Evaluates the ai4water's Model on the data for the metric. Parameters ---------- model : an instance of ai4water's Model class data : raw, unpreprocessed data form which x,y pairs are made metric_name : str, optional name of performance metric. If not given, evaluation metric is used. x : alternative to ``data``. Only required if ``data`` is not given. y : only required if x is given Returns ------- float, the evaluation score of model with respect to ``metric_name`` """ metric_name = metric_name or self.eval_metric assert hasattr(model, 'predict') if x is not None: assert y is not None t, p = model.predict(x=x, y=y, process_results=False, return_true=True) else: assert x is None t, p = model.predict(data=data, process_results=False, return_true=True) errors = self.Metrics(t, p, multiclass=model.is_multiclass) return getattr(errors, metric_name)()
[docs] def bfe_all_best_models( self, x = None, y = None, data = None, test_data=None, metric_name: str = None, fit_on_all_train_data: bool = True, verbosity:int = 0, ) -> None: """ builds, trains and evaluates best versions of all the models. The model is trained on 'training'+'validation' data. Parameters ---------- x : the input data for training y : the target data for training data : raw unprepared and unprocessed data from which x,y pairs for both training and test will be prepared. It is only required if x, y are not provided. test_data : a tuple/list of length 2 whose first element is x and second value is y. The is the data on which the peformance of optimized pipeline will be calculated. This should only be given if ``data`` argument is not given. metric_name : str the name of metric to determine best version of a model. If not given, parent_val_metric will be used. fit_on_all_train_data : bool, optional (default=True) If true, the model is trained on (training+validation) data. This is based on supposition that the data is splitted into training, validation and test sets. The optimization of pipeline was performed on validation data. But now, we are training the model on all available training data which is (training + validation) data. If False, then model is trained only on training data. verbosity : int, optional (default=0) determines the amount of print information Returns ------- None """ train_data, test_data = verify_data(x, y, data, test_data, "test") met_name = metric_name or self.eval_metric for model in self.models: try: metric_val, pipeline = self.get_best_pipeline_by_model(model, met_name) except ModelNotUsedError: continue prefix = f"{self.path}{SEP}results_from_scratch{SEP}{met_name}_{metric_val}_{model}" model_config = pipeline['model'] if self.category == "DL": model_name = list(model_config.keys())[0] kwargs = list(model_config.values())[0] model_config = DL_MODELS[model_name](mode=self.mode, output_features=self.num_outputs, **kwargs) _ = self._build_and_eval_from_scratch( model=model_config, train_data=train_data, x_transformation=pipeline['x_transformation'], y_transformation=pipeline['y_transformation'], prefix=prefix, model_name=model, fit_on_all_train_data=fit_on_all_train_data, verbosity=verbosity, seed=self.parent_seeds_[int(pipeline['iter_num'])-1], test_data=test_data, ) return
[docs] def post_fit( self, x = None, y = None, data = None, test_data:Union[list, tuple] = None, fit_on_all_train_data:bool = True, show:bool = True ) -> None: """post processing of results to draw dumbell plot and taylor plot. Parameters ---------- x : the input data for training y : the target data for training data : raw unprepared and unprocessed data from which x,y pairs for both training and test will be prepared. It is only required if x, y are not provided. test_data : a tuple/list of length 2 whose first element is x and second value is y. The is the data on which the peformance of optimized pipeline will be calculated. This should only be given if ``data`` argument is not given. fit_on_all_train_data : bool, optional (default=True) If true, the model is trained on (training+validation) data. This is based on supposition that the data is splitted into training, validation and test sets. The optimization of pipeline was performed on validation data. But now, we are training the model on all available training data which is (training + validation) data. If False, then model is trained only on training data. show : bool, optional (default=True) whether to show the plots or not Returns ------- None """ self.bfe_all_best_models(x=x, y=y, data=data, fit_on_all_train_data=fit_on_all_train_data, test_data=test_data) self.dumbbell_plot(x=x, y=y, data=data, test_data=test_data, fit_on_all_train_data=fit_on_all_train_data, metric_name=self.eval_metric, show=show) # following plots only make sense if more than one models are tried if self._optimize_model: self.taylor_plot(x=y, y=y, data=data, test_data=test_data, fit_on_all_train_data=fit_on_all_train_data, show=show) self.compare_models(show=show) self.compare_models(plot_type="bar_chart", show=show) return
def cleanup( self, dirs_to_exclude: Union[str, list] = None ) -> None: """removes the folders from path except the 'results_from_scratch' and the folders defined by user. Parameters ---------- dirs_to_exclude : str, list, optional The names of folders inside path which should not be deleted. Returns ------- None """ if isinstance(dirs_to_exclude, str): dirs_to_exclude = [dirs_to_exclude] if dirs_to_exclude is None: dirs_to_exclude = [] for _item in os.listdir(self.path): _path = os.path.join(self.path, _item) if os.path.isdir(_path): if _item not in ['results_from_scratch'] + dirs_to_exclude: shutil.rmtree(_path) return
[docs] def compare_models( self, metric_name: str = None, plot_type: str = "circular", show : bool = False, **kwargs )->plt.Axes: """ Compares all the models with respect to a metric and plots a bar plot. Parameters ---------- metric_name : str, optional The metric with respect to which to compare the models. plot_type : str, optional if "circular" then `easy_mpl.circular_bar_plot <https://easy-mpl.readthedocs.io/en/latest/#module-12>`_ is drawn otherwise a simple bar_plot is drawn. show : bool, optional whether to show the plot or not **kwargs : keyword arguments for `easy_mpl.circular_bar_plot <https://easy-mpl.readthedocs.io/en/latest/#module-12>`_ or `easy_mpl.bar_chart <https://easy-mpl.readthedocs.io/en/latest/#module-1>`_ Returns ------- matplotlib.pyplot.Axes """ metric_name = metric_name or self.eval_metric models = {} for model in self.models: try: metric_val, _ = self.get_best_pipeline_by_model(model, metric_name) models[model] = metric_val except ModelNotUsedError: continue labels = _shred_suffix(list(models.keys())) plt.close('all') if plot_type == "circular": ax = circular_bar_plot(np.array(list(models.values())), labels, sort=True, show=False, **kwargs) else: ax = bar_chart(list(models.values()), labels, xlabel=metric_name, sort=True, show=False, **kwargs) fpath = os.path.join(self.path, f"{plot_type}_plot_wrt_{metric_name}") plt.savefig(fpath, dpi=300, bbox_inches='tight') if show: plt.tight_layout() plt.show() return ax
def _eval_model_manually( self, model, metric: str, eval_metrics=False) -> float: """evaluates the model""" # make prediction on validation data if self.val_data_: t, p = model.predict(**self.val_data_, return_true=True, process_results=False) else: t, p = model.predict_on_validation_data(**self.data_, return_true=True, process_results=False) if len(p) == p.size: p = p.reshape(-1, 1) # TODO, for cls, Metrics do not accept (n,) array if self.mode=="classification": t = np.argmax(t, axis=1) p = np.argmax(p, axis=1) errors = self.Metrics( t, p, remove_zero=True, remove_neg=True, multiclass=model.is_multiclass) val_score = getattr(errors, metric)() metric_type = METRIC_TYPES.get(metric, 'min') # the optimization will always solve minimization problem so if # the metric is to be maximized change the val_score accordingly if metric_type != "min": val_score = 1.0 - val_score # val_score can be None/nan/inf if not math.isfinite(val_score): val_score = 1.0 if eval_metrics: # calculate all additional performance metrics which are being monitored for _metric in self.monitor: pm = getattr(errors, _metric)(**METRICS_KWARGS.get(_metric, {})) #v[self.parent_iter_] = pm self.metrics_.at[self.parent_iter_, _metric] = pm func = compare_func1(METRIC_TYPES[_metric]) best_so_far = func(self.metrics_best_.loc[:self.parent_iter_, _metric]) best_so_far = fill_val(METRIC_TYPES[_metric], best_so_far) func = compare_func(METRIC_TYPES[_metric]) if func(pm, best_so_far): self.metrics_best_.at[self.parent_iter_, _metric] = pm return val_score
def verify_data( x=None, y=None, data=None, validation_data=None, label="validation" )->Tuple[dict, dict]: def num_examples(samples): if isinstance(samples, list): assert len(set(len(sample) for sample in samples)) == 1 return len(samples[0]) return len(samples) """verifies that either x,y and validation_data is given or only data is given.""" train_data = {} val_data = {} if x is None: assert y is None, f"y must only be given if x is given. x is {type(x)}" assert data is not None, f"if x is given, data must not be given" assert validation_data is None, f"{label} data must only be given if x is given" train_data['data'] = data else: train_data['x'] = x train_data['y'] = y assert y is not None, f"if x is given, corresponding y must also be given" assert isinstance(y, np.ndarray) assert validation_data is not None, f"if x is given, {label}_data must also be given" assert num_examples(x) == num_examples(y) if data is None: assert validation_data is not None, f"If data is not given, {label}_data must be given" else: assert validation_data is None, f"If data is given, {label} data must not be given" assert isinstance(data, pd.DataFrame), f"data must be dataframe, but it is {type(data)}" if validation_data is not None: assert isinstance(validation_data, (tuple, list)), f"{label} data must be of type tuple but it is {type(validation_data)}" assert len(validation_data) == 2, f"{label}_data tuple must have length 2 but it has {len(validation_data)}" assert isinstance(validation_data[1], np.ndarray), f"second value in {label}_data must be ndarray" assert num_examples(validation_data[0]) == num_examples(validation_data[1]) val_data['x'] = validation_data[0] val_data['y'] = validation_data[1] return train_data, val_data def _shred_suffix(labels:list)->list: new_labels = [] for label in labels: if label.endswith('Regressor'): label = label.replace('Regressor', '') elif label.endswith('Classifier'): label = label.replace('Classifier', '') new_labels.append(label) return new_labels class MetricNotMonitored(Exception): def __init__(self, metric_name, available_metrics): self.metric = metric_name self.avail_metrics = available_metrics def __str__(self): return f""" metric {self.metric} was not monitored. Please choose from {self.avail_metrics} """ class ModelNotUsedError(Exception): def __init__(self, model_name): self.model = model_name def __str__(self): return f"""model {self.model} is not used during optimization"""