from nbdev.config import get_config
Truncated Backpropagation Through Time
Custom Dataloaders
Pytorch Modules for Training Models for sequential data
The tbptt dataloader needs to split the minibatches that are created in several smaller minibatches that will be returned sequentially before the next minibatch may be created.
TbpttDl
TbpttDl (dataset, sub_seq_len=None, seq_len=None, shuffle=True, num_workers=2, bs:int=64, verbose:bool=False, do_setup:bool=True, pin_memory=False, timeout=0, batch_size=None, drop_last=False, indexed=None, n=None, device=None, persistent_workers=False, pin_memory_device='', wif=None, before_iter=None, after_item=None, before_batch=None, after_batch=None, after_iter=None, create_batches=None, create_item=None, create_batch=None, retain=None, get_idxs=None, sample=None, shuffle_fn=None, do_batch=None)
Transformed DataLoader
= get_config().config_file.parent
project_root = project_root / 'test_data/WienerHammerstein' f_path
= [DfHDFCreateWindows(win_sz=1000+1,stp_sz=1000,clm='u')]
tfm_lst = DataBlock(blocks=(SequenceBlock.from_hdf(['u','y'],TensorSequencesInput,clm_shift=[-1,-1]),
seq 'y'],TensorSequencesOutput,clm_shift=[1])),
SequenceBlock.from_hdf([=CreateDict(tfm_lst),
get_items=ApplyToDict(ParentSplitter()))
splitter= seq.dataloaders(get_hdf_files(f_path),dl_type=TbpttDl,sub_seq_len=99,num_workers=6) db
/var/folders/pc/13zbh_m514n1tp522cx9npt00000gn/T/ipykernel_57530/2691761175.py:21: UserWarning: Sequence length (1000) is not perfectly divisible by sub_seq_len (99). The last segment of each sequence in TbpttDl will be shorter.
warnings.warn(
= seq.dataloaders(get_hdf_files(f_path),dl_type=TbpttDl,sub_seq_len=100,num_workers=6) db
= [array(x[-1][0,:,0].cpu()) for x in db.train] example_sequence
/var/folders/pc/13zbh_m514n1tp522cx9npt00000gn/T/ipykernel_57530/587544570.py:1: DeprecationWarning: __array__ implementation doesn't accept a copy keyword, so passing copy=False failed. __array__ must implement 'dtype' and 'copy' keyword arguments. To learn more, see the migration guide https://numpy.org/devdocs/numpy_2_0_migration_guide.html#adapting-to-changes-in-the-copy-keyword
example_sequence = [array(x[-1][0,:,0].cpu()) for x in db.train]
plt.figure() plt.plot(np.concatenate(example_sequence))
num_workers can be > 1 with the new synchronization procedure
TBPTT_Reset_Callback
The stateful model needs to reset its hidden state, when a new sequence begins. The callback reads the reset flag and acts accordingly.
reset_model_state
reset_model_state (model)
TbpttResetCB
TbpttResetCB (after_create=None, before_fit=None, before_epoch=None, before_train=None, before_batch=None, after_pred=None, after_loss=None, before_backward=None, after_cancel_backward=None, after_backward=None, before_step=None, after_cancel_step=None, after_step=None, after_cancel_batch=None, after_batch=None, after_cancel_train=None, after_train=None, before_validate=None, after_cancel_validate=None, after_validate=None, after_cancel_epoch=None, after_epoch=None, after_cancel_fit=None, after_fit=None)
Callback
resets the rnn model with every new sequence for tbptt, calls reset_state
in every module of the model
Example
from tsfast.learner import RNNLearner,SkipNLoss,fun_rmse
= RNNLearner(db,num_layers=1,rnn_type='gru',stateful=False,metrics=[SkipNLoss(fun_rmse,1),fun_rmse])
lrn lrn.add_cb(TbpttResetCB())
<fastai.learner.Learner>
1,lr_max=3e-2) lrn.fit_one_cycle(
epoch | train_loss | valid_loss | fun_rmse | fun_rmse | time |
---|---|---|---|---|---|
0 | 0.092029 | 0.032849 | 0.041365 | 0.041402 | 00:01 |
= 10 db.train.sub_seq_len
1,lr_max=3e-2) lrn.fit_one_cycle(
epoch | train_loss | valid_loss | fun_rmse | fun_rmse | time |
---|---|---|---|---|---|
0 | 0.016088 | 0.009301 | 0.012578 | 0.013134 | 00:01 |
Weighted Sampling Dataloader
A weighted sampling dataloader for nonuniforly distributed data. A factory method receives the base Dataloader class and returns the inherited weighted sampling dataloader class
WeightedDL_Factory
WeightedDL_Factory (cls)
Weighted Dataloader that provides control over sampling probabilities. wgts: probability array with probability for every item gets extracted from the pandas ‘p_sample’ column if given. Otherwise uniform sampling will be enabled
= WeightedDL_Factory(TfmdDL)([1,2]*5,bs=10,wgts=[2,1]*5) dl
= RNNLearner(db,num_layers=1,rnn_type='gru',stateful=False,metrics=[SkipNLoss(fun_rmse,1),fun_rmse])
lrn 1,lr_max=3e-2) lrn.fit_one_cycle(
epoch | train_loss | valid_loss | fun_rmse | fun_rmse | time |
---|
dl.wgts
array([0.13333333, 0.06666667, 0.13333333, 0.06666667, 0.13333333,
0.06666667, 0.13333333, 0.06666667, 0.13333333, 0.06666667])
dl.one_batch()
tensor([1, 2, 1, 2, 1, 2, 1, 2, 1, 2])
ItemLst Transform for weight calculation
uniform_p_of_category
uniform_p_of_category (cat_name)
Scales sampling weights for an even distribution between every category
uniform_p_of_float
uniform_p_of_float (var_name, bins=10)
Scales sampling weights for an even distribution of the continous variable by creating equi sized bins
uniform_p_of_float_with_gaps
uniform_p_of_float_with_gaps (var_name, bins=100)
Scales sampling weights for an even distribution of the continous variable by creating equi sized bins
def train_valid(df):
''' test function that extracts valid and train from the path string'''
'train'] = df.path.astype(str).str.contains('train',regex=False)
df[return df
# %%time
= [train_valid, DfHDFCreateWindows(win_sz=1000+1,stp_sz=1000,clm='u') ,uniform_p_of_category('train'),uniform_p_of_float('l_slc'),uniform_p_of_float_with_gaps('r_slc')]
tfm_lst apply_df_tfms(get_hdf_files(f_path),tfm_lst)
path | train | l_slc | r_slc | p_sample | |
---|---|---|---|---|---|
0 | /Users/daniel/Development/tsfast/test_data/WienerHammerstein/valid/WienerHammerstein_valid.hdf5 | False | 0 | 1001 | 0.001219 |
0 | /Users/daniel/Development/tsfast/test_data/WienerHammerstein/valid/WienerHammerstein_valid.hdf5 | False | 1000 | 2001 | 0.001219 |
0 | /Users/daniel/Development/tsfast/test_data/WienerHammerstein/valid/WienerHammerstein_valid.hdf5 | False | 2000 | 3001 | 0.001170 |
0 | /Users/daniel/Development/tsfast/test_data/WienerHammerstein/valid/WienerHammerstein_valid.hdf5 | False | 3000 | 4001 | 0.002437 |
0 | /Users/daniel/Development/tsfast/test_data/WienerHammerstein/valid/WienerHammerstein_valid.hdf5 | False | 4000 | 5001 | 0.002340 |
... | ... | ... | ... | ... | ... |
2 | /Users/daniel/Development/tsfast/test_data/WienerHammerstein/train/WienerHammerstein_train.hdf5 | True | 74000 | 75001 | 0.007357 |
2 | /Users/daniel/Development/tsfast/test_data/WienerHammerstein/train/WienerHammerstein_train.hdf5 | True | 75000 | 76001 | 0.007357 |
2 | /Users/daniel/Development/tsfast/test_data/WienerHammerstein/train/WienerHammerstein_train.hdf5 | True | 76000 | 77001 | 0.007357 |
2 | /Users/daniel/Development/tsfast/test_data/WienerHammerstein/train/WienerHammerstein_train.hdf5 | True | 77000 | 78001 | 0.006180 |
2 | /Users/daniel/Development/tsfast/test_data/WienerHammerstein/train/WienerHammerstein_train.hdf5 | True | 78000 | 79001 | 0.011124 |
185 rows × 5 columns
= DataBlock(blocks=(SequenceBlock.from_hdf(['u','y'],TensorSequencesInput,clm_shift=[-1,-1]),
seq 'y'],TensorSequencesOutput,clm_shift=[1])),
SequenceBlock.from_hdf([=CreateDict(tfm_lst),
get_items=ApplyToDict(ParentSplitter()))
splitter# db = seq.dataloaders(get_hdf_files('test_data/battery'),dl_type=TbpttDl,sub_seq_len=200)
= seq.dataloaders(get_hdf_files(f_path),dl_type=WeightedDL_Factory(TbpttDl),sub_seq_len=500) db
db.train.wgts.shape
(79,)
5],db.valid.wgts[:5] db.train.wgts[:
(array([0.0034722 , 0.0034722 , 0.00333331, 0.0069444 , 0.00666662]),
array([0.03704676, 0.03704676, 0.03556485, 0.07409344, 0.0711297 ]))
= RNNLearner(db,num_layers=1,rnn_type='gru',stateful=False,metrics=[SkipNLoss(fun_rmse,1),fun_rmse])
lrn
lrn.add_cb(TbpttResetCB())1,lr_max=3e-2) lrn.fit_one_cycle(
epoch | train_loss | valid_loss | fun_rmse | fun_rmse | time |
---|---|---|---|---|---|
0 | 0.207963 | 0.084617 | 0.108266 | 0.108489 | 00:01 |
Importance Sampling Callback
# #| export
# class ImportanceSampling(Callback):
# #modify the dataloader weights to sample items with higher loss more often
# def __init__(self, filter_criterion=nn.HuberLoss(reduction='none')):
# self.filter_criterion = filter_criterion
# self.loss_list = None #store loss of each item in the dataloader
# def begin_fit(self):
# #empty the loss list at the beginning of each fit
# self.loss_list = None
# def after_pred(self):
# # store loss of each item in the batch
# if not self.training: return
# losses = self.filter_criterion(self.learn.pred, *self.learn.yb)
# if losses.ndim >= 2: losses = losses.mean(tuple(range(1,losses.ndim))) # If loss is multi-dimensional, take the mean over all but the first dimension
# #get the indices of each item in the batch from the dataloader
# # import pdb; pdb.set_trace()
# #if the loss_list is empty, initialize it with dataloader wgts
# if self.loss_list is None:
# self.loss_list = self.learn.dls.train.wgts.clone()
# #scale the loss_list mean to the mean loss of the batch
# self.loss_list *= self.loss_list.numel()/losses.mean()
# #store the loss of each item in the loss_list
# def after_epoch(self):
# #modify the dataloader weights to sample items with higher loss more often
# if not self.training: return
# self.learn.dls.train.wgts = self.loss_list
# self.learn.dls.train.wgts /= self.learn.dls.train.wgts.sum()
# lrn = RNNLearner(db,num_layers=1,stateful=False,metrics=[SkipNLoss(fun_rmse,1),fun_rmse],cbs=ImportanceSampling())
# lrn.fit(1)
# tfm_lst = [train_valid, DfHDFCreateWindows(win_sz=500+1,stp_sz=500,clm='u')]
# seq = DataBlock(blocks=(SequenceBlock.from_hdf(['u','y'],TensorSequencesInput,clm_shift=[-1,-1]),
# SequenceBlock.from_hdf(['y'],TensorSequencesOutput,clm_shift=[1])),
# get_items=CreateDict(tfm_lst),
# splitter=ApplyToDict(ParentSplitter()))
# # db = seq.dataloaders(get_hdf_files('test_data/battery'),dl_type=TbpttDl,sub_seq_len=200)
# db = seq.dataloaders(get_hdf_files('test_data/battery'),dl_type=WeightedDL_Factory(TfmdDL))
# RNNLearner(db,num_layers=1,stateful=False,metrics=[SkipNLoss(fun_rmse,1),fun_rmse],cbs=ImportanceSampling()).fit(1)
Mini Batch Limiter Dataloader
A weighted sampling dataloader for nonuniforly distributed data. A factory method receives the base Dataloader class and returns the inherited weighted sampling dataloader class
BatchLimit_Factory
BatchLimit_Factory (cls)
Batch limited Dataloader that provides an upper limit for the number of mini batches per epoch max_batches: upper limit for minibatch count per epoch
= BatchLimit_Factory(TfmdDL)([1,2]*5,bs=2,max_batches=3,shuffle=False) dl
len(dl)
3
dl.get_idxs()
[0, 1, 2, 3, 4, 5]
1,2]*5,bs=2,max_batches=3,shuffle=True).get_idxs() BatchLimit_Factory(TfmdDL)([
[3, 2, 7, 0, 6, 9]
for x in dl] [x
[tensor([1, 2]), tensor([1, 2]), tensor([1, 2])]
len(dl)
3