Skip to content

Simple forecast pipeline example #124

Open
@matthias-wende-frequenz

Description

What's needed?

Write a simple example that shows how sdk internal
tools can be used to create a simple forecasting pipeline.

A forecasting pipeline is a data pipeline with the goal
to transform data, such that it can be directly passed
into a ml models prediction function.

As starting point of the pipeline we define a logical meter, that
means the data itself has already been processed, i.e. re-sampled and
possible combined from other sources into the desired source.

Proposed solution

For now our anticipated features are single feature values, derived from
time series data, such as current, voltage or power, as well as a bounded
subset of certain time series, again we can think of current, voltage or power.
Note, that the logical meter presents us an unbounded time series.
However one could also think of price data for price predictions just to name
a further example of input data.

Examples

Example 1

In this example we sketch how the average current of a certain weekday
can be obtained.

graph LR;
    LogicalMeter-->SlidingWindow;
    SlidingWindow-->TimeseriesAvg;
    TimeseriesAvg-->Mean;
    Mean-->predict;
Loading

As a first step, we can create a bounded subset by using a
sliding window operator.
This data batch can then used to create an averaged weekly profile for current data using
the TimeseriesAvg
and as the last step calculate the mean of this averaged load profile. As a result
that gives as the average current that flows through the observed point on a specific weekday.

The code might look as follows.

# Create an instance of the logical meter and aquire the first phase of the current
# note that formulas for power and current are the same
lm = LogicalMeter(channel_registry, resampler_subscription_sender)
current_ph1_rx = lm.start_formula(consumption_load_formula, ComponentMetricId.CURRENT_PHASE_1)

# update the ring buffer with the latest streamed data from the microgrid
window = SlidingWindow(size = n_days * 3600 * 24, current_ph1_rx)

# get an averaged day of the last n weeks with start day yesterday
ts_avg = TimeseriesAvg(
    window,
    window_size_s = 3600 * 24,               # one day of data with sample rate 1Hz
    window_distance_s = 3600 * 24 * 6 ,      # six days distance between the days
    offset_s = 3600 * 24 + no_seconds_today, # start day is yesterday
    margin = 0
)

# iterate over the timeseries and calculate the mean
mean = ts_avg.mean()
mean = ts_avg.std()

Example 2

In the next example we want to resample the incoming data stream before it creating a window.

graph LR;
    LogicalMeter-->Resampler;
    SlidingWindow-->Resampler;
    SlidingWindow-->TimeseriesAvg;
    TimeseriesAvg-->sth[do Something];
Loading
# Create an instance of the logical meter and aquire the first phase of the current
# note that formulas for power and current are the same
lm = LogicalMeter(channel_registry, resampler_subscription_sender)
current_ph1_rx = lm.start_formula(consumption_load_formula, ComponentMetricId.CURRENT_PHASE_1)

# create a channel for the resampled current
resampled_current_chan = Broadcast[Sample]("current_ph1")
resampled_current_rx = resampled_current_chan.new_receiver()
resampled_current_tx = resampled_current_chan.new_sender()

def sink_adapter(sample: Sample):
    await resampled_current_tx.send(sample)

# Instantiate resampler with 3s sampling period
resampler = Resampler(ResamplerConfig(sample_rate = 1/3))
resampler.add_timeseries(current_ph1_rx, sink_adapter)

# update the ring buffer with the latest streamed data from the microgrid
window = SlidingWindow(size = n_days * 1200 * 24, resample:_current_rx)

# get an averaged day of the last n weeks with start day yesterday
ts_avg = TimeseriesAvg(
    window,
    window_size_s = 3600 * 24,               # one day of data with sample rate 1Hz
    window_distance_s = 3600 * 24 * 6 ,      # six days distance between the days
    offset_s = 3600 * 24 + no_seconds_today, # start day is yesterday
    margin = 0
)

# do sth with the average ...

Example 3 (OPTIONAL)

graph LR;
    LogicalMeter-->SlidingWindow;
    SlidingWindow-->Resampler
    Resampler-->sth[do Something];
Loading

In the next example we want to resample the data window after it's been created.
NOTE We are not planning on implementing this. As an alternative we plan on branching the data source in a earlier instance of the pipeline (e.g. after the LogicalMeter create an second stage ResamplingActor that forwards into it's own SlidingWindow.

# Create an instance of the logical meter and aquire the first phase of the current
# note that formulas for power and current are the same
lm = LogicalMeter(channel_registry, resampler_subscription_sender)
current_ph1_rx = lm.start_formula(consumption_load_formula, ComponentMetricId.CURRENT_PHASE_1)

# update the ring buffer with the latest streamed data from the microgrid
window = SlidingWindow(size = n_days * 1200 * 24, resample:_current_rx)

# resample the window to 5Hz
window.resample(sample_rate=5)

# ...

Use cases

A simple, concise and efficient way write forecast pipelines for feature generation.

Alternatives and workarounds

Using pandas dataframes or arrow are alternatives. Pandas dataframes have been explored and are considered to inefficient.
The alternative trying arrow hasn't been further explored so far.
Both are supported as input data by xgboost as well as tensorflow.

Metadata

Metadata

Labels

part:data-pipelineAffects the data pipelinepart:docsAffects the documentationtype:enhancementNew feature or enhancement visitble to users

Type

No type

Projects

Status

To do

Relationships

None yet

Development

No branches or pull requests

Issue actions