Truncated Backpropagation Through Time

Custom Dataloaders

Pytorch Modules for Training Models for sequential data

The tbptt dataloader needs to split the minibatches that are created in several smaller minibatches that will be returned sequentially before the next minibatch may be created.


source

TbpttDl

 TbpttDl (dataset, sub_seq_len=None, seq_len=None, shuffle=True,
          num_workers=2, bs:int=64, verbose:bool=False,
          do_setup:bool=True, pin_memory=False, timeout=0,
          batch_size=None, drop_last=False, indexed=None, n=None,
          device=None, persistent_workers=False, pin_memory_device='',
          wif=None, before_iter=None, after_item=None, before_batch=None,
          after_batch=None, after_iter=None, create_batches=None,
          create_item=None, create_batch=None, retain=None, get_idxs=None,
          sample=None, shuffle_fn=None, do_batch=None)

Transformed DataLoader

from nbdev.config import get_config
project_root = get_config().config_file.parent
f_path = project_root / 'test_data/WienerHammerstein'
tfm_lst = [DfHDFCreateWindows(win_sz=1000+1,stp_sz=1000,clm='u')]
seq = DataBlock(blocks=(SequenceBlock.from_hdf(['u','y'],TensorSequencesInput,clm_shift=[-1,-1]),
                        SequenceBlock.from_hdf(['y'],TensorSequencesOutput,clm_shift=[1])),
                 get_items=CreateDict(tfm_lst),
                 splitter=ApplyToDict(ParentSplitter()))
db = seq.dataloaders(get_hdf_files(f_path),dl_type=TbpttDl,sub_seq_len=99,num_workers=6)
/var/folders/pc/13zbh_m514n1tp522cx9npt00000gn/T/ipykernel_57530/2691761175.py:21: UserWarning: Sequence length (1000) is not perfectly divisible by sub_seq_len (99). The last segment of each sequence in TbpttDl will be shorter.
  warnings.warn(
db = seq.dataloaders(get_hdf_files(f_path),dl_type=TbpttDl,sub_seq_len=100,num_workers=6)
example_sequence = [array(x[-1][0,:,0].cpu()) for x in db.train]
/var/folders/pc/13zbh_m514n1tp522cx9npt00000gn/T/ipykernel_57530/587544570.py:1: DeprecationWarning: __array__ implementation doesn't accept a copy keyword, so passing copy=False failed. __array__ must implement 'dtype' and 'copy' keyword arguments. To learn more, see the migration guide https://numpy.org/devdocs/numpy_2_0_migration_guide.html#adapting-to-changes-in-the-copy-keyword
  example_sequence = [array(x[-1][0,:,0].cpu()) for x in db.train]
plt.figure()
plt.plot(np.concatenate(example_sequence))

num_workers can be > 1 with the new synchronization procedure

TBPTT_Reset_Callback

The stateful model needs to reset its hidden state, when a new sequence begins. The callback reads the reset flag and acts accordingly.


source

reset_model_state

 reset_model_state (model)

source

TbpttResetCB

 TbpttResetCB (after_create=None, before_fit=None, before_epoch=None,
               before_train=None, before_batch=None, after_pred=None,
               after_loss=None, before_backward=None,
               after_cancel_backward=None, after_backward=None,
               before_step=None, after_cancel_step=None, after_step=None,
               after_cancel_batch=None, after_batch=None,
               after_cancel_train=None, after_train=None,
               before_validate=None, after_cancel_validate=None,
               after_validate=None, after_cancel_epoch=None,
               after_epoch=None, after_cancel_fit=None, after_fit=None)

Callback resets the rnn model with every new sequence for tbptt, calls reset_state in every module of the model

Example

from tsfast.learner import RNNLearner,SkipNLoss,fun_rmse
lrn = RNNLearner(db,num_layers=1,rnn_type='gru',stateful=False,metrics=[SkipNLoss(fun_rmse,1),fun_rmse])
lrn.add_cb(TbpttResetCB())
<fastai.learner.Learner>
lrn.fit_one_cycle(1,lr_max=3e-2)
epoch train_loss valid_loss fun_rmse fun_rmse time
0 0.092029 0.032849 0.041365 0.041402 00:01
db.train.sub_seq_len = 10
lrn.fit_one_cycle(1,lr_max=3e-2)
epoch train_loss valid_loss fun_rmse fun_rmse time
0 0.016088 0.009301 0.012578 0.013134 00:01

Weighted Sampling Dataloader

A weighted sampling dataloader for nonuniforly distributed data. A factory method receives the base Dataloader class and returns the inherited weighted sampling dataloader class


source

WeightedDL_Factory

 WeightedDL_Factory (cls)

Weighted Dataloader that provides control over sampling probabilities. wgts: probability array with probability for every item gets extracted from the pandas ‘p_sample’ column if given. Otherwise uniform sampling will be enabled

dl = WeightedDL_Factory(TfmdDL)([1,2]*5,bs=10,wgts=[2,1]*5)
lrn = RNNLearner(db,num_layers=1,rnn_type='gru',stateful=False,metrics=[SkipNLoss(fun_rmse,1),fun_rmse])
lrn.fit_one_cycle(1,lr_max=3e-2)
0.00% [0/1 00:00<?]
epoch train_loss valid_loss fun_rmse fun_rmse time

4.00% [4/100 00:00<00:01 0.2294]
dl.wgts
array([0.13333333, 0.06666667, 0.13333333, 0.06666667, 0.13333333,
       0.06666667, 0.13333333, 0.06666667, 0.13333333, 0.06666667])
dl.one_batch()
tensor([1, 2, 1, 2, 1, 2, 1, 2, 1, 2])

ItemLst Transform for weight calculation


source

uniform_p_of_category

 uniform_p_of_category (cat_name)

Scales sampling weights for an even distribution between every category


source

uniform_p_of_float

 uniform_p_of_float (var_name, bins=10)

Scales sampling weights for an even distribution of the continous variable by creating equi sized bins


source

uniform_p_of_float_with_gaps

 uniform_p_of_float_with_gaps (var_name, bins=100)

Scales sampling weights for an even distribution of the continous variable by creating equi sized bins

def train_valid(df):   
    ''' test function that extracts valid and train from the path string'''
    df['train'] = df.path.astype(str).str.contains('train',regex=False)
    return df
# %%time
tfm_lst = [train_valid, DfHDFCreateWindows(win_sz=1000+1,stp_sz=1000,clm='u') ,uniform_p_of_category('train'),uniform_p_of_float('l_slc'),uniform_p_of_float_with_gaps('r_slc')]
apply_df_tfms(get_hdf_files(f_path),tfm_lst)
path train l_slc r_slc p_sample
0 /Users/daniel/Development/tsfast/test_data/WienerHammerstein/valid/WienerHammerstein_valid.hdf5 False 0 1001 0.001219
0 /Users/daniel/Development/tsfast/test_data/WienerHammerstein/valid/WienerHammerstein_valid.hdf5 False 1000 2001 0.001219
0 /Users/daniel/Development/tsfast/test_data/WienerHammerstein/valid/WienerHammerstein_valid.hdf5 False 2000 3001 0.001170
0 /Users/daniel/Development/tsfast/test_data/WienerHammerstein/valid/WienerHammerstein_valid.hdf5 False 3000 4001 0.002437
0 /Users/daniel/Development/tsfast/test_data/WienerHammerstein/valid/WienerHammerstein_valid.hdf5 False 4000 5001 0.002340
... ... ... ... ... ...
2 /Users/daniel/Development/tsfast/test_data/WienerHammerstein/train/WienerHammerstein_train.hdf5 True 74000 75001 0.007357
2 /Users/daniel/Development/tsfast/test_data/WienerHammerstein/train/WienerHammerstein_train.hdf5 True 75000 76001 0.007357
2 /Users/daniel/Development/tsfast/test_data/WienerHammerstein/train/WienerHammerstein_train.hdf5 True 76000 77001 0.007357
2 /Users/daniel/Development/tsfast/test_data/WienerHammerstein/train/WienerHammerstein_train.hdf5 True 77000 78001 0.006180
2 /Users/daniel/Development/tsfast/test_data/WienerHammerstein/train/WienerHammerstein_train.hdf5 True 78000 79001 0.011124

185 rows × 5 columns

seq = DataBlock(blocks=(SequenceBlock.from_hdf(['u','y'],TensorSequencesInput,clm_shift=[-1,-1]),
                        SequenceBlock.from_hdf(['y'],TensorSequencesOutput,clm_shift=[1])),
                 get_items=CreateDict(tfm_lst),
                 splitter=ApplyToDict(ParentSplitter()))
# db = seq.dataloaders(get_hdf_files('test_data/battery'),dl_type=TbpttDl,sub_seq_len=200)
db = seq.dataloaders(get_hdf_files(f_path),dl_type=WeightedDL_Factory(TbpttDl),sub_seq_len=500)
db.train.wgts.shape
(79,)
db.train.wgts[:5],db.valid.wgts[:5]
(array([0.0034722 , 0.0034722 , 0.00333331, 0.0069444 , 0.00666662]),
 array([0.03704676, 0.03704676, 0.03556485, 0.07409344, 0.0711297 ]))
lrn = RNNLearner(db,num_layers=1,rnn_type='gru',stateful=False,metrics=[SkipNLoss(fun_rmse,1),fun_rmse])
lrn.add_cb(TbpttResetCB())
lrn.fit_one_cycle(1,lr_max=3e-2)
epoch train_loss valid_loss fun_rmse fun_rmse time
0 0.207963 0.084617 0.108266 0.108489 00:01

Importance Sampling Callback

# #| export
# class ImportanceSampling(Callback):
#     #modify the dataloader weights to sample items with higher loss more often
#     def __init__(self, filter_criterion=nn.HuberLoss(reduction='none')):
#         self.filter_criterion = filter_criterion
#         self.loss_list = None #store loss of each item in the dataloader

#     def begin_fit(self):
#         #empty the loss list at the beginning of each fit
#         self.loss_list = None

#     def after_pred(self):
#         # store loss of each item in the batch
#         if not self.training: return
#         losses = self.filter_criterion(self.learn.pred, *self.learn.yb)
#         if losses.ndim >= 2: losses = losses.mean(tuple(range(1,losses.ndim)))  # If loss is multi-dimensional, take the mean over all but the first dimension
#         #get the indices of each item in the batch from the dataloader
#         # import pdb; pdb.set_trace()

#         #if the loss_list is empty, initialize it with dataloader wgts
#         if self.loss_list is None:
#             self.loss_list = self.learn.dls.train.wgts.clone()
#             #scale the loss_list mean to the mean loss of the batch
#             self.loss_list *= self.loss_list.numel()/losses.mean()

#         #store the loss of each item in the loss_list
        
#     def after_epoch(self):
#         #modify the dataloader weights to sample items with higher loss more often
#         if not self.training: return
#         self.learn.dls.train.wgts = self.loss_list
#         self.learn.dls.train.wgts /= self.learn.dls.train.wgts.sum()
# lrn = RNNLearner(db,num_layers=1,stateful=False,metrics=[SkipNLoss(fun_rmse,1),fun_rmse],cbs=ImportanceSampling())
# lrn.fit(1)
# tfm_lst = [train_valid, DfHDFCreateWindows(win_sz=500+1,stp_sz=500,clm='u')]
# seq = DataBlock(blocks=(SequenceBlock.from_hdf(['u','y'],TensorSequencesInput,clm_shift=[-1,-1]),
#                         SequenceBlock.from_hdf(['y'],TensorSequencesOutput,clm_shift=[1])),
#                  get_items=CreateDict(tfm_lst),
#                  splitter=ApplyToDict(ParentSplitter()))
# # db = seq.dataloaders(get_hdf_files('test_data/battery'),dl_type=TbpttDl,sub_seq_len=200)
# db = seq.dataloaders(get_hdf_files('test_data/battery'),dl_type=WeightedDL_Factory(TfmdDL))

# RNNLearner(db,num_layers=1,stateful=False,metrics=[SkipNLoss(fun_rmse,1),fun_rmse],cbs=ImportanceSampling()).fit(1)

Mini Batch Limiter Dataloader

A weighted sampling dataloader for nonuniforly distributed data. A factory method receives the base Dataloader class and returns the inherited weighted sampling dataloader class


source

BatchLimit_Factory

 BatchLimit_Factory (cls)

Batch limited Dataloader that provides an upper limit for the number of mini batches per epoch max_batches: upper limit for minibatch count per epoch

dl = BatchLimit_Factory(TfmdDL)([1,2]*5,bs=2,max_batches=3,shuffle=False)
len(dl)
3
dl.get_idxs()
[0, 1, 2, 3, 4, 5]
BatchLimit_Factory(TfmdDL)([1,2]*5,bs=2,max_batches=3,shuffle=True).get_idxs()
[3, 2, 7, 0, 6, 9]
[x for x in dl]
[tensor([1, 2]), tensor([1, 2]), tensor([1, 2])]
len(dl)
3