Hyperparameter Optimization Module

Pytorch Models for Sequential Data
from nbdev.config import get_config
project_root = get_config().config_file.parent
f_path = project_root / 'test_data/WienerHammerstein'
hdf_files = L([f for f in get_hdf_files(f_path) if '_test.hdf5' not in str(f)])
tfm_src = CreateDict([DfHDFCreateWindows(win_sz=400,stp_sz=100,clm='u')])
dls = DataBlock(blocks=(SequenceBlock.from_hdf(['u'],TensorSequencesInput),
                        SequenceBlock.from_hdf(['y'],TensorSequencesOutput)),
                get_items=tfm_src,
                splitter=ApplyToDict(FuncSplitter(lambda o: 'valid' in str(o)))).dataloaders(hdf_files)

optimizer core

First we need a log uniform distibution for variables with vast value ranges


source

log_uniform

 log_uniform (min_bound, max_bound, base=10)

uniform sampling in an exponential range

[log_uniform(1e-8, 1e-2)() for _ in range(5)]
[4.591524992137234e-08,
 0.003755410605938488,
 2.920688605923387e-07,
 3.4750213799838236e-06,
 2.1312097874133118e-08]

source

LearnerTrainable

 LearnerTrainable (config:Dict[str,Any]=None,
                   logger_creator:Callable[[Dict[str,Any]],ForwardRef('Log
                   ger')]=None, storage:Optional[ray.train._internal.stora
                   ge.StorageContext]=None)

*Abstract class for trainable models, functions, etc.

A call to train() on a trainable will execute one logical iteration of training. As a rule of thumb, the execution time of one train call should be large enough to avoid overheads (i.e. more than a few seconds), but short enough to report progress periodically (i.e. at most a few minutes).

Calling save() should save the training state of a trainable to disk, and restore(path) should restore a trainable to the given state.

Generally you only need to implement setup, step, save_checkpoint, and load_checkpoint when subclassing Trainable.

Other implementation methods that may be helpful to override are log_result, reset_config, cleanup, and _export_model.

Tune will convert this class into a Ray actor, which runs on a separate process. By default, Tune will also change the current working directory of this process to its corresponding trial-level log directory self.logdir. This is designed so that different trials that run on the same physical node won’t accidentally write to the same location and overstep each other.

The behavior of changing the working directory can be disabled by setting the RAY_CHDIR_TO_TRIAL_DIR=0 environment variable. This allows access to files in the original working directory, but relative paths should be used for read only purposes, and you must make sure that the directory is synced on all nodes if running on multiple machines.

The TUNE_ORIG_WORKING_DIR environment variable was the original workaround for accessing paths relative to the original working directory. This environment variable is deprecated, and the RAY_CHDIR_TO_TRIAL_DIR environment variable described above should be used instead.

This class supports checkpointing to and restoring from remote storage.*

Type Default Details
config Dict None
logger_creator Callable None Deprecated (2.7)
storage Optional None

source

stop_shared_memory_managers

 stop_shared_memory_managers (obj)

Iteratively finds and stops all SharedMemoryManager instances contained within the provided object.


source

learner_optimize

 learner_optimize (config)
class TrainSpecificEpoch(Callback):
    "Skip training up to `epoch`"
    order = 70
    
    def __init__(self, epoch:int):
        self._skip_to = epoch

    def before_epoch(self):
        print(self.epoch)
        # if self.epoch < self._skip_to:
        #     raise CancelEpochException
        # if self.epoch > self._skip_to:
        # raise CancelFitException
class TrainableModel(tune.Trainable):
    def setup(self, config):
        # Assuming create_lrn and dls are accessible here or passed in config
        self.create_lrn = ray.get(config['create_lrn'])
        self.dls = ray.get(config['dls'])
        self.config = config

        self.lrn = self.create_lrn(self.dls, config)

        self.lrn.lr = config['lr'] if 'lr' in config else 3e-3
        if 'wd' in config: self.lrn.wd = config['wd']
        self._setup_callbacks()

        if 'reporter' not in self.config:
            self.lrn.add_cb(CBRayReporter())
        else:
            self.lrn.add_cb(ray.get(self.config['reporter'])())

        if self.lrn.opt is None: self.lrn.create_opt()
        self.lrn.opt.set_hyper('lr', self.lrn.lr)
        lr = np.array([h['lr'] for h in self.lrn.opt.hypers])
        pct_start = config['pct_start'] if 'pct_start' in config else 0.3
        self.n_epoch = config['n_epoch'] if 'n_epoch' in config else 10
        lr_scheds = {'lr': combined_cos(pct_start, lr, lr, lr/div_final)}
        self.steps=0

    def step(self):

        self.fit(self.n_epoch, cbs=TrainSpecificEpoch(self.steps)+ParamScheduler(scheds)+L(cbs), wd=wd)
        self.steps += 1

        
        scores = self.lrn.recorder.values[-1]
        metrics = {
            'train_loss': scores[0],
            'valid_loss': scores[1]
        }        
        for metric,value in zip(self.learn.metrics,scores[2:]):
            m_name = metric.name if hasattr(metric,'name') else str(metric)
            metrics[m_name] = value
        return metrics

    def save_checkpoint(self, checkpoint_dir):
        file = os.path.join(temp_checkpoint_dir,'model.pth')
        save_model(file, self.learn.model,opt=None) 

    def load_checkpoint(self, checkpoint_path):
        self.lrn.model.load_state_dict(torch.load(checkpoint_dir + 'model.pth'))
class TrainableModel(tune.Trainable):
    def setup(self, config):
        # Assuming create_lrn and dls are accessible here or passed in config
        self.create_lrn = ray.get(config['create_lrn'])
        self.dls = ray.get(config['dls'])
        self.config = config
        self.lrn_kwargs = {'n_epoch': 100, 'pct_start': 0.5}

        for attr in ['n_epoch', 'pct_start']:
            if attr in config:
                self.lrn_kwargs[attr] = config[attr]

        self.lrn = self.create_lrn(self.dls, config)
        self.lrn.lr = config['lr'] if 'lr' in config else 3e-3


    def step(self):
        print(self.iteration)
        # fit_kwargs = {**self.lrn_kwargs,**{'cbs':TrainSpecificEpoch(self.iteration)}}
        # fit_kwargs = {**self.lrn_kwargs,**{'cbs':SkipToEpoch(self.iteration)}}
        # fit_kwargs = self.lrn_kwargs
        with self.lrn.no_bar(): 
            # ray.get(self.config['fit_method'])(self.lrn,**fit_kwargs)
            # self.lrn.fit_flat_cos(**fit_kwargs)
            self.lrn.fit_flat_cos(self.lrn_kwargs['n_epoch'],cbs=TrainSpecificEpoch(self.iteration))

        
        metrics = {
            'train_loss': 1,#scores[0],
            'valid_loss': 1,#scores[1],
             tune.result.DONE: self.iteration >= self.lrn_kwargs['n_epoch']-1
        }  
        
        # scores = self.lrn.recorder.values[-1]
        # metrics = {
        #     'train_loss': scores[0],
        #     'valid_loss': scores[1],
        #      tune.result.DONE: self.epoch_iter >= self.lrn_kwargs['n_epoch']
        # }        
        # for metric,value in zip(self.lrn.metrics,scores[2:]):
        #     m_name = metric.name if hasattr(metric,'name') else str(metric)
        #     metrics[m_name] = value
        return metrics

    def save_checkpoint(self, checkpoint_dir):
        file = os.path.join(temp_checkpoint_dir,'model.pth')
        save_model(file, self.learn.model,opt=None) 

    def load_checkpoint(self, checkpoint_path):
        self.lrn.model.load_state_dict(torch.load(checkpoint_dir + 'model.pth'))
dls = DataBlock(blocks=(SequenceBlock.from_hdf(['u'],TensorSequencesInput),
                    SequenceBlock.from_hdf(['y'],TensorSequencesOutput)),
            get_items=tfm_src,
            splitter=ApplyToDict(FuncSplitter(lambda o: 'valid' in str(o)))).dataloaders(hdf_files)

The mutation config dictionary consists of functions that sample from a distribution. In order to retrieve a dictionary with one realisation we need the function sample_config


source

sample_config

 sample_config (config)

source

CBRayReporter

 CBRayReporter (after_create=None, before_fit=None, before_epoch=None,
                before_train=None, before_batch=None, after_pred=None,
                after_loss=None, before_backward=None,
                after_cancel_backward=None, after_backward=None,
                before_step=None, after_cancel_step=None, after_step=None,
                after_cancel_batch=None, after_batch=None,
                after_cancel_train=None, after_train=None,
                before_validate=None, after_cancel_validate=None,
                after_validate=None, after_cancel_epoch=None,
                after_epoch=None, after_cancel_fit=None, after_fit=None)

Callback reports progress after every epoch to the ray tune logger


source

HPOptimizer

 HPOptimizer (create_lrn, dls)

Initialize self. See help(type(self)) for accurate signature.

Test Population Based Training

def create_lrn(dls,config):
    lr = config['lr']
    alpha = config['alpha']
    beta = config['beta']
    weight_p = config['weight_p']
    
    lrn = RNNLearner(dls)
    lrn.lr = lr
    return lrn
config={
            "lr": tune.loguniform(1e-2, 1e-4),
            "alpha": tune.loguniform(1e-5, 10),
            "beta": tune.loguniform(1e-5, 10),
            "weight_p": tune.uniform(0, 0.5)}
mut_conf = {# distribution for resampling
            "lr": log_uniform(1e-8, 1e-2),
            "alpha": log_uniform(1e-5, 10),
            "beta": log_uniform(1e-5, 10),
            "weight_p": lambda: np.random.uniform(0, 0.5)}

hp_opt = HPOptimizer(create_lrn,dls)
# hp_opt.start_ray()
# hp_opt.optimize_pbt('pbt_test',4,config,mut_conf,perturbation_interval=1,
#                  stop={"training_iteration": 1 },
#                  resources_per_trial={"gpu": 0.5},
#                  storage_path=str(Path.home() / 'ray_results'))#no cpu count is necessary
#hp_opt.best_model()