from nbdev.config import get_config
Hyperparameter Optimization Module
= get_config().config_file.parent
project_root = project_root / 'test_data/WienerHammerstein' f_path
= L([f for f in get_hdf_files(f_path) if '_test.hdf5' not in str(f)])
hdf_files = CreateDict([DfHDFCreateWindows(win_sz=400,stp_sz=100,clm='u')])
tfm_src = DataBlock(blocks=(SequenceBlock.from_hdf(['u'],TensorSequencesInput),
dls 'y'],TensorSequencesOutput)),
SequenceBlock.from_hdf([=tfm_src,
get_items=ApplyToDict(FuncSplitter(lambda o: 'valid' in str(o)))).dataloaders(hdf_files) splitter
optimizer core
First we need a log uniform distibution for variables with vast value ranges
log_uniform
log_uniform (min_bound, max_bound, base=10)
uniform sampling in an exponential range
1e-8, 1e-2)() for _ in range(5)] [log_uniform(
[4.591524992137234e-08,
0.003755410605938488,
2.920688605923387e-07,
3.4750213799838236e-06,
2.1312097874133118e-08]
LearnerTrainable
LearnerTrainable (config:Dict[str,Any]=None, logger_creator:Callable[[Dict[str,Any]],ForwardRef('Log ger')]=None, storage:Optional[ray.train._internal.stora ge.StorageContext]=None)
*Abstract class for trainable models, functions, etc.
A call to train()
on a trainable will execute one logical iteration of training. As a rule of thumb, the execution time of one train call should be large enough to avoid overheads (i.e. more than a few seconds), but short enough to report progress periodically (i.e. at most a few minutes).
Calling save()
should save the training state of a trainable to disk, and restore(path)
should restore a trainable to the given state.
Generally you only need to implement setup
, step
, save_checkpoint
, and load_checkpoint
when subclassing Trainable.
Other implementation methods that may be helpful to override are log_result
, reset_config
, cleanup
, and _export_model
.
Tune will convert this class into a Ray actor, which runs on a separate process. By default, Tune will also change the current working directory of this process to its corresponding trial-level log directory self.logdir
. This is designed so that different trials that run on the same physical node won’t accidentally write to the same location and overstep each other.
The behavior of changing the working directory can be disabled by setting the RAY_CHDIR_TO_TRIAL_DIR=0
environment variable. This allows access to files in the original working directory, but relative paths should be used for read only purposes, and you must make sure that the directory is synced on all nodes if running on multiple machines.
The TUNE_ORIG_WORKING_DIR
environment variable was the original workaround for accessing paths relative to the original working directory. This environment variable is deprecated, and the RAY_CHDIR_TO_TRIAL_DIR
environment variable described above should be used instead.
This class supports checkpointing to and restoring from remote storage.*
Type | Default | Details | |
---|---|---|---|
config | Dict | None | |
logger_creator | Callable | None | Deprecated (2.7) |
storage | Optional | None |
learner_optimize
learner_optimize (config)
class TrainSpecificEpoch(Callback):
"Skip training up to `epoch`"
= 70
order
def __init__(self, epoch:int):
self._skip_to = epoch
def before_epoch(self):
print(self.epoch)
# if self.epoch < self._skip_to:
# raise CancelEpochException
# if self.epoch > self._skip_to:
# raise CancelFitException
class TrainableModel(tune.Trainable):
def setup(self, config):
# Assuming create_lrn and dls are accessible here or passed in config
self.create_lrn = ray.get(config['create_lrn'])
self.dls = ray.get(config['dls'])
self.config = config
self.lrn = self.create_lrn(self.dls, config)
self.lrn.lr = config['lr'] if 'lr' in config else 3e-3
if 'wd' in config: self.lrn.wd = config['wd']
self._setup_callbacks()
if 'reporter' not in self.config:
self.lrn.add_cb(CBRayReporter())
else:
self.lrn.add_cb(ray.get(self.config['reporter'])())
if self.lrn.opt is None: self.lrn.create_opt()
self.lrn.opt.set_hyper('lr', self.lrn.lr)
= np.array([h['lr'] for h in self.lrn.opt.hypers])
lr = config['pct_start'] if 'pct_start' in config else 0.3
pct_start self.n_epoch = config['n_epoch'] if 'n_epoch' in config else 10
= {'lr': combined_cos(pct_start, lr, lr, lr/div_final)}
lr_scheds self.steps=0
def step(self):
self.fit(self.n_epoch, cbs=TrainSpecificEpoch(self.steps)+ParamScheduler(scheds)+L(cbs), wd=wd)
self.steps += 1
= self.lrn.recorder.values[-1]
scores = {
metrics 'train_loss': scores[0],
'valid_loss': scores[1]
} for metric,value in zip(self.learn.metrics,scores[2:]):
= metric.name if hasattr(metric,'name') else str(metric)
m_name = value
metrics[m_name] return metrics
def save_checkpoint(self, checkpoint_dir):
file = os.path.join(temp_checkpoint_dir,'model.pth')
file, self.learn.model,opt=None)
save_model(
def load_checkpoint(self, checkpoint_path):
self.lrn.model.load_state_dict(torch.load(checkpoint_dir + 'model.pth'))
class TrainableModel(tune.Trainable):
def setup(self, config):
# Assuming create_lrn and dls are accessible here or passed in config
self.create_lrn = ray.get(config['create_lrn'])
self.dls = ray.get(config['dls'])
self.config = config
self.lrn_kwargs = {'n_epoch': 100, 'pct_start': 0.5}
for attr in ['n_epoch', 'pct_start']:
if attr in config:
self.lrn_kwargs[attr] = config[attr]
self.lrn = self.create_lrn(self.dls, config)
self.lrn.lr = config['lr'] if 'lr' in config else 3e-3
def step(self):
print(self.iteration)
# fit_kwargs = {**self.lrn_kwargs,**{'cbs':TrainSpecificEpoch(self.iteration)}}
# fit_kwargs = {**self.lrn_kwargs,**{'cbs':SkipToEpoch(self.iteration)}}
# fit_kwargs = self.lrn_kwargs
with self.lrn.no_bar():
# ray.get(self.config['fit_method'])(self.lrn,**fit_kwargs)
# self.lrn.fit_flat_cos(**fit_kwargs)
self.lrn.fit_flat_cos(self.lrn_kwargs['n_epoch'],cbs=TrainSpecificEpoch(self.iteration))
= {
metrics 'train_loss': 1,#scores[0],
'valid_loss': 1,#scores[1],
self.iteration >= self.lrn_kwargs['n_epoch']-1
tune.result.DONE:
}
# scores = self.lrn.recorder.values[-1]
# metrics = {
# 'train_loss': scores[0],
# 'valid_loss': scores[1],
# tune.result.DONE: self.epoch_iter >= self.lrn_kwargs['n_epoch']
# }
# for metric,value in zip(self.lrn.metrics,scores[2:]):
# m_name = metric.name if hasattr(metric,'name') else str(metric)
# metrics[m_name] = value
return metrics
def save_checkpoint(self, checkpoint_dir):
file = os.path.join(temp_checkpoint_dir,'model.pth')
file, self.learn.model,opt=None)
save_model(
def load_checkpoint(self, checkpoint_path):
self.lrn.model.load_state_dict(torch.load(checkpoint_dir + 'model.pth'))
= DataBlock(blocks=(SequenceBlock.from_hdf(['u'],TensorSequencesInput),
dls 'y'],TensorSequencesOutput)),
SequenceBlock.from_hdf([=tfm_src,
get_items=ApplyToDict(FuncSplitter(lambda o: 'valid' in str(o)))).dataloaders(hdf_files) splitter
The mutation config dictionary consists of functions that sample from a distribution. In order to retrieve a dictionary with one realisation we need the function sample_config
sample_config
sample_config (config)
CBRayReporter
CBRayReporter (after_create=None, before_fit=None, before_epoch=None, before_train=None, before_batch=None, after_pred=None, after_loss=None, before_backward=None, after_cancel_backward=None, after_backward=None, before_step=None, after_cancel_step=None, after_step=None, after_cancel_batch=None, after_batch=None, after_cancel_train=None, after_train=None, before_validate=None, after_cancel_validate=None, after_validate=None, after_cancel_epoch=None, after_epoch=None, after_cancel_fit=None, after_fit=None)
Callback
reports progress after every epoch to the ray tune logger
HPOptimizer
HPOptimizer (create_lrn, dls)
Initialize self. See help(type(self)) for accurate signature.
Test Population Based Training
def create_lrn(dls,config):
= config['lr']
lr = config['alpha']
alpha = config['beta']
beta = config['weight_p']
weight_p
= RNNLearner(dls)
lrn = lr
lrn.lr return lrn
={
config"lr": tune.loguniform(1e-2, 1e-4),
"alpha": tune.loguniform(1e-5, 10),
"beta": tune.loguniform(1e-5, 10),
"weight_p": tune.uniform(0, 0.5)}
= {# distribution for resampling
mut_conf "lr": log_uniform(1e-8, 1e-2),
"alpha": log_uniform(1e-5, 10),
"beta": log_uniform(1e-5, 10),
"weight_p": lambda: np.random.uniform(0, 0.5)}
= HPOptimizer(create_lrn,dls)
hp_opt # hp_opt.start_ray()
# hp_opt.optimize_pbt('pbt_test',4,config,mut_conf,perturbation_interval=1,
# stop={"training_iteration": 1 },
# resources_per_trial={"gpu": 0.5},
# storage_path=str(Path.home() / 'ray_results'))#no cpu count is necessary
#hp_opt.best_model()
Test Grid Search
# dls.cpu()
def create_lrn(dls,config):
= DataBlock(blocks=(SequenceBlock.from_hdf(['u'],TensorSequencesInput),
dls 'y'],TensorSequencesOutput)),
SequenceBlock.from_hdf([=tfm_src,
get_items=ApplyToDict(FuncSplitter(lambda o: 'valid' in str(o)))).dataloaders(hdf_files)
splitter= RNNLearner(dls,hidden_size=config['hidden_size'],metrics=[fun_rmse,mse])
lrn return lrn
= HPOptimizer(create_lrn,None) hp_opt
= {
search_space "hidden_size": tune.grid_search([10,20,50,100]),
'n_epoch':10
}
# hp_opt.optimize(optimize_func=TrainableModel,
# resources_per_trial={"cpu": 4},
# config=search_space)
# hp_opt.analysis.get_best_config('mean_loss',mode='min')