Skip to content

Signal Generation

Excitation signal generation for PINN collocation points.

generate_excitation_signals

generate_excitation_signals(batch_size: int, seq_length: int, n_inputs: int = 1, dt: float = 0.01, device: str = 'cpu', signal_types: list | None = None, amplitude_range: tuple = (0.5, 2.0), frequency_range: tuple = (0.1, 3.0), input_configs: list | None = None, noise_probability: float = 0.0, noise_std_range: tuple = (0.05, 0.15), bias_probability: float = 0.0, bias_range: tuple = (-0.5, 0.5), synchronized_inputs: bool = False, seed: int | None = None) -> torch.Tensor

Generate standard excitation signals for PINN collocation points (vectorized).

Parameters:

Name Type Description Default
batch_size int

number of sequences in batch

required
seq_length int

length of each sequence

required
n_inputs int

number of input dimensions

1
dt float

time step

0.01
device str

device for tensors

'cpu'
signal_types list | None

signal types to use (None = all core types)

None
amplitude_range tuple

global amplitude range

(0.5, 2.0)
frequency_range tuple

global frequency range

(0.1, 3.0)
input_configs list | None

per-input configuration (list of dicts)

None
noise_probability float

probability of adding noise per sequence

0.0
noise_std_range tuple

noise std as fraction of amplitude

(0.05, 0.15)
bias_probability float

probability of adding DC bias per sequence

0.0
bias_range tuple

DC bias range

(-0.5, 0.5)
synchronized_inputs bool

if True, all inputs get same signal type

False
seed int | None

random seed

None

Returns:

Type Description
Tensor

Excitation signal tensor of shape [batch_size, seq_length, n_inputs].

Source code in tsfast/pinn/signals.py
def generate_excitation_signals(
    batch_size: int,
    seq_length: int,
    n_inputs: int = 1,
    dt: float = 0.01,
    device: str = "cpu",
    signal_types: list | None = None,
    amplitude_range: tuple = (0.5, 2.0),
    frequency_range: tuple = (0.1, 3.0),
    input_configs: list | None = None,
    noise_probability: float = 0.0,
    noise_std_range: tuple = (0.05, 0.15),
    bias_probability: float = 0.0,
    bias_range: tuple = (-0.5, 0.5),
    synchronized_inputs: bool = False,
    seed: int | None = None,
) -> torch.Tensor:
    """Generate standard excitation signals for PINN collocation points (vectorized).

    Args:
        batch_size: number of sequences in batch
        seq_length: length of each sequence
        n_inputs: number of input dimensions
        dt: time step
        device: device for tensors
        signal_types: signal types to use (None = all core types)
        amplitude_range: global amplitude range
        frequency_range: global frequency range
        input_configs: per-input configuration (list of dicts)
        noise_probability: probability of adding noise per sequence
        noise_std_range: noise std as fraction of amplitude
        bias_probability: probability of adding DC bias per sequence
        bias_range: DC bias range
        synchronized_inputs: if True, all inputs get same signal type
        seed: random seed

    Returns:
        Excitation signal tensor of shape [batch_size, seq_length, n_inputs].
    """
    if seed is not None:
        np.random.seed(seed)

    signal_types = signal_types or DEFAULT_SIGNAL_TYPES
    total_signals = batch_size * n_inputs

    # Build configuration for each signal
    configs = []
    for batch_idx in range(batch_size):
        for input_idx in range(n_inputs):
            if input_configs and input_idx < len(input_configs):
                input_cfg = input_configs[input_idx]
                configs.append(
                    {
                        "amplitude_range": input_cfg.get("amplitude_range", amplitude_range),
                        "frequency_range": input_cfg.get("frequency_range", frequency_range),
                        "signal_types": input_cfg.get("signal_types", signal_types),
                        "signal_params": input_cfg.get("signal_params", {}),
                    }
                )
            else:
                configs.append(
                    {
                        "amplitude_range": amplitude_range,
                        "frequency_range": frequency_range,
                        "signal_types": signal_types,
                        "signal_params": {},
                    }
                )

    # Pre-sample signal types (handling synchronized_inputs)
    signal_type_choices = np.empty(total_signals, dtype=object)
    if synchronized_inputs:
        for batch_idx in range(batch_size):
            sig_type = np.random.choice(signal_types)
            for input_idx in range(n_inputs):
                idx = batch_idx * n_inputs + input_idx
                signal_type_choices[idx] = sig_type
    else:
        for idx, cfg in enumerate(configs):
            signal_type_choices[idx] = np.random.choice(cfg["signal_types"])

    # Pre-sample amplitudes
    amplitudes = np.array([np.random.uniform(*cfg["amplitude_range"]) for cfg in configs])

    # Group signals by type
    result = np.zeros((total_signals, seq_length))
    for sig_type in set(signal_type_choices):
        indices = np.where(signal_type_choices == sig_type)[0]
        if len(indices) == 0:
            continue

        type_amps = amplitudes[indices]
        type_configs = [configs[i] for i in indices]

        # Generate based on signal type
        if sig_type == "sine":
            freqs = np.array(
                [
                    np.random.uniform(
                        *cfg["signal_params"].get("sine", {}).get("frequency_range", cfg["frequency_range"])
                    )
                    for cfg in type_configs
                ]
            )
            phases = np.array(
                [
                    np.random.uniform(
                        *cfg["signal_params"]
                        .get("sine", {})
                        .get("phase_range", DEFAULT_SIGNAL_PARAMS["sine"]["phase_range"])
                    )
                    for cfg in type_configs
                ]
            )
            result[indices] = _generate_sine(seq_length, dt, type_amps, freqs, phases)

        elif sig_type == "multisine":
            all_freqs = []
            for cfg in type_configs:
                n_comp = (
                    cfg["signal_params"]
                    .get("multisine", {})
                    .get("n_components", DEFAULT_SIGNAL_PARAMS["multisine"]["n_components"])
                )
                freq_range = cfg["signal_params"].get("multisine", {}).get("frequency_range", cfg["frequency_range"])
                all_freqs.append(np.random.uniform(*freq_range, size=n_comp))
            result[indices] = _generate_multisine(seq_length, dt, type_amps, all_freqs)

        elif sig_type == "step":
            time_ranges = np.array(
                [
                    [
                        *cfg["signal_params"]
                        .get("step", {})
                        .get("time_range", DEFAULT_SIGNAL_PARAMS["step"]["time_range"])
                    ]
                    for cfg in type_configs
                ]
            )
            step_times = np.random.uniform(time_ranges[:, 0], time_ranges[:, 1])
            step_indices = (step_times * seq_length).astype(int)
            result[indices] = _generate_step(seq_length, type_amps, step_indices)

        elif sig_type == "ramp":
            slope_ranges = np.array(
                [
                    [
                        *cfg["signal_params"]
                        .get("ramp", {})
                        .get("slope_range", DEFAULT_SIGNAL_PARAMS["ramp"]["slope_range"])
                    ]
                    for cfg in type_configs
                ]
            )
            start_time_ranges = np.array(
                [
                    [
                        *cfg["signal_params"]
                        .get("ramp", {})
                        .get("start_time_range", DEFAULT_SIGNAL_PARAMS["ramp"]["start_time_range"])
                    ]
                    for cfg in type_configs
                ]
            )
            slopes = np.random.uniform(slope_ranges[:, 0], slope_ranges[:, 1])
            start_times = np.random.uniform(start_time_ranges[:, 0], start_time_ranges[:, 1])
            start_indices = (start_times * seq_length).astype(int)
            result[indices] = _generate_ramp(seq_length, type_amps, slopes, start_indices)

        elif sig_type == "chirp":
            f0_ranges = np.array(
                [
                    [*cfg["signal_params"].get("chirp", {}).get("f0_range", DEFAULT_SIGNAL_PARAMS["chirp"]["f0_range"])]
                    for cfg in type_configs
                ]
            )
            f1_ranges = np.array(
                [
                    [*cfg["signal_params"].get("chirp", {}).get("f1_range", DEFAULT_SIGNAL_PARAMS["chirp"]["f1_range"])]
                    for cfg in type_configs
                ]
            )
            f0s = np.random.uniform(f0_ranges[:, 0], f0_ranges[:, 1])
            f1s = np.random.uniform(f1_ranges[:, 0], f1_ranges[:, 1])
            result[indices] = _generate_chirp(seq_length, dt, type_amps, f0s, f1s)

        elif sig_type == "noise":
            result[indices] = _generate_noise(seq_length, type_amps)

        elif sig_type == "prbs":
            switch_probs = np.array(
                [
                    cfg["signal_params"]
                    .get("prbs", {})
                    .get("switch_probability", DEFAULT_SIGNAL_PARAMS["prbs"]["switch_probability"])
                    for cfg in type_configs
                ]
            )
            result[indices] = _generate_prbs(seq_length, type_amps, switch_probs)

        elif sig_type == "square":
            freqs = np.array(
                [
                    np.random.uniform(
                        *cfg["signal_params"].get("square", {}).get("frequency_range", cfg["frequency_range"])
                    )
                    for cfg in type_configs
                ]
            )
            duty_cycle_ranges = np.array(
                [
                    [
                        *cfg["signal_params"]
                        .get("square", {})
                        .get("duty_cycle_range", DEFAULT_SIGNAL_PARAMS["square"]["duty_cycle_range"])
                    ]
                    for cfg in type_configs
                ]
            )
            duty_cycles = np.random.uniform(duty_cycle_ranges[:, 0], duty_cycle_ranges[:, 1])
            result[indices] = _generate_square(seq_length, dt, type_amps, freqs, duty_cycles)

        elif sig_type == "doublet":
            duration_ranges = np.array(
                [
                    [
                        *cfg["signal_params"]
                        .get("doublet", {})
                        .get("duration_range", DEFAULT_SIGNAL_PARAMS["doublet"]["duration_range"])
                    ]
                    for cfg in type_configs
                ]
            )
            start_time_ranges = np.array(
                [
                    [
                        *cfg["signal_params"]
                        .get("doublet", {})
                        .get("start_time_range", DEFAULT_SIGNAL_PARAMS["doublet"]["start_time_range"])
                    ]
                    for cfg in type_configs
                ]
            )
            durations = np.random.uniform(duration_ranges[:, 0], duration_ranges[:, 1])
            start_times = np.random.uniform(start_time_ranges[:, 0], start_time_ranges[:, 1])
            duration_indices = np.maximum(1, (durations * seq_length).astype(int))
            start_indices = (start_times * seq_length).astype(int)
            result[indices] = _generate_doublet(seq_length, type_amps, duration_indices, start_indices)

    # Vectorized composition: noise and bias
    if noise_probability > 0:
        noise_flags = np.random.rand(total_signals) < noise_probability
        if noise_flags.any():
            noise_stds = np.random.uniform(*noise_std_range, size=total_signals) * np.abs(amplitudes)
            noise_signals = _generate_noise(seq_length, noise_stds[noise_flags])
            result[noise_flags] += noise_signals

    if bias_probability > 0:
        bias_flags = np.random.rand(total_signals) < bias_probability
        if bias_flags.any():
            biases = np.random.uniform(*bias_range, size=bias_flags.sum())
            result[bias_flags] += biases[:, np.newaxis]

    # Reshape and convert to torch
    result = result.reshape(batch_size, n_inputs, seq_length).transpose(0, 2, 1)
    return torch.from_numpy(result).to(device).float()

generate_random_states

generate_random_states(batch_size: int, n_outputs: int, output_ranges: list, device: str = 'cpu', seed: int | None = None) -> torch.Tensor

Generate random physical states for PINN collocation points.

Parameters:

Name Type Description Default
batch_size int

number of states to generate

required
n_outputs int

number of output dimensions

required
output_ranges list

list of (min, max) tuples for each dimension

required
device str

device for tensor

'cpu'
seed int | None

random seed

None

Returns:

Type Description
Tensor

Random state tensor of shape [batch_size, n_outputs].

Source code in tsfast/pinn/signals.py
def generate_random_states(
    batch_size: int,
    n_outputs: int,
    output_ranges: list,
    device: str = "cpu",
    seed: int | None = None,
) -> torch.Tensor:
    """Generate random physical states for PINN collocation points.

    Args:
        batch_size: number of states to generate
        n_outputs: number of output dimensions
        output_ranges: list of (min, max) tuples for each dimension
        device: device for tensor
        seed: random seed

    Returns:
        Random state tensor of shape [batch_size, n_outputs].
    """
    if seed is not None:
        np.random.seed(seed)

    # Ensure output_ranges is a list
    if not isinstance(output_ranges, list):
        output_ranges = [output_ranges] * n_outputs

    states = np.zeros((batch_size, n_outputs))
    for i in range(n_outputs):
        min_val, max_val = output_ranges[i]
        states[:, i] = np.random.uniform(min_val, max_val, batch_size)

    return torch.from_numpy(states).to(device).float()