Skip to content

Add code samples for time partition pattern and name conflicts #1073

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion docs/concepts/get-offline-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ If any of feature names provided by `Feature Query` conflict with column names o

The checking steps are:
1. Check if the `conflicts_auto_correction` in the `observation_settings`is set (default by None). If it's not None, it means spark will handle checking and solving these conflicts. In this case, python client side will submit this job to spark directly. Otherwise, it will go to the below steps. In terms of `conflicts_auto_correction`, it also contains two parameters, `rename_features` and `suffix`. By default, spark will rename dataset columns with a suffix "_1". You may rename feature names by set `rename_features` to True and provide a customized suffix.

An example of `ObservationSettings` with auto correction enabled:
```
settings = ObservationSettings(
observation_path="wasbs://...",
event_timestamp_column="...",
timestamp_format="yyyy-MM-dd HH:mm:ss",
conflicts_auto_correction=ConflictsAutoCorrection(rename_features=True, suffix="test"))

```

2. Try to load dataset without credential and compare column names with feature names. This is to support the case when the dataset is saved in a public storage.
3. If cannot load the dataset in the first step, will try to load it with credential anc compare column names with feature names. It can only support loading files from storages requiring credential your environment defined. For example, if your `spark_cluster` is `databricks`, it can only load dataset under the 'dbfs' path belonging to this databricks.
4. If cannot load the dataset from step1 and step2, will try to compare column names provided by the parameter `dataset_column_names` if it's not empty.
Expand All @@ -89,7 +100,10 @@ The checking steps are:

Workflow graph for the conflicts checking and handling:
![conflicts-check-and-handle](../images/conflicts-check-and-handle.png)


For more details, please check the code example as a reference:
[conflicts check and handle samples](../samples/feature_naming_conflicts_samples.py)

## Difference between `materialize_features` and `get_offline_features` API

It is sometimes confusing between "getting offline features" in this document and the "[getting materialized features](./materializing-features.md)" part, given they both seem to "get features and put it somewhere". However there are some differences and you should know when to use which:
Expand Down
10 changes: 9 additions & 1 deletion docs/how-to-guides/feathr-input-format.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ This pattern of path will be treated as 'timestamp' of the related data for both

This pattern can only be worked with aggregation features for now. It cannot be recognized for other cases.

An example of data source:
```
batch_source = HdfsSource(name="testTimePartitionSource",
path="data_source_path",
time_partition_pattern="yyyy/MM/dd",
postfix_path="postfix_path"
)
```
## How to control paths to visit
Normally, it's not necessary to visit all data sources that match the path pattern. We may only need parts of them to be used in our jobs. Feathr have different ways to support that for 'get_offline_features' and 'materialize_features'.
### For 'get_offline_features':
Expand All @@ -34,7 +42,7 @@ Paths would be visited is decided by your dataset and feature's definition. Eg.
We can decide a time range by `BackfillTime` and `window`(in `WindowAggTransformation`) in the definition of feature. Eg. If we have a backfill_time = datetime(2020, 5, 21) and 'window=3d', then feathr will try to visit data under paths: ['{base_path}/2020/05/18', '{base_path}/2020/05/19', '{base_path}/2020/05/20'].

For more details, please check the code example as a reference:
[timePartitionPattern test cases](../../feathr_project/test/test_time_partition_pattern_e2e.py)
[timePartitionPattern code samples](../samples/time_partition_pattern_samples.py)
### Interval of time pattern
In terms of the interval or step between each time pattern, we only support 'DAILY' and 'HOURLY' for now.

Expand Down
110 changes: 110 additions & 0 deletions docs/samples/feature_naming_conflicts_samples.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Samples for feature naming conflicts check and handle
import pytest

from feathr import (TypedKey, ValueType, FeatureQuery, ObservationSettings, HdfsSource,
Feature,WindowAggTransformation, FLOAT)
from feathr import (FeathrClient, FeatureAnchor, ConflictsAutoCorrection)
from feathr.utils.job_utils import get_result_df
from datetime import datetime

# Example for feature naming conflicts check from python client side
# with no 'auto-correction' enabled

# replace by your own config path
client = client = FeathrClient("feathr_config.yaml")

location_id = TypedKey(key_column="DOLocationID",
key_column_type=ValueType.INT32,
description="location id in NYC",
full_name="nyc_taxi.location_id")

feature_query = FeatureQuery(
feature_list=["trip_distance","fare_amount"], key=location_id)

# Defined feature names conflict with observation data set column names
settings = ObservationSettings(
observation_path="wasbs://[email protected]/sample_data/green_tripdata_2020-04_with_index.csv",
event_timestamp_column="lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH:mm:ss")
output_path = "wasbs://fake_path"
with pytest.raises(RuntimeError) as e:
client.get_offline_features(observation_settings=settings,
feature_query=feature_query,
output_path=output_path
)
assert str(e.value) == "Feature names exist conflicts with dataset column names: trip_distance,fare_amount"

# Defined feature names conflict with provided column names
settings = ObservationSettings(
observation_path="wasbs://public@fake_file",
event_timestamp_column="lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH:mm:ss")
output_path = "wasbs://fakepath"
with pytest.raises(RuntimeError) as e:
client.get_offline_features(observation_settings=settings,
feature_query=feature_query,
output_path=output_path,
dataset_column_names=set(('trip_distance','fare_amount'))
)
assert str(e.value) == "Feature names exist conflicts with dataset column names: trip_distance,fare_amount"

# Example for feature naming conflicts when auto-correction is enabled

# replace by yout own confi path
client = FeathrClient(config_path='feathr_config.yaml', local_workspace_dir="conflicts_test")
batch_source = HdfsSource(name="nycTaxiBatchSource",
path="wasbs://[email protected]/sample_data/green_tripdata_2020-04_with_index.csv",
event_timestamp_column="lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH:mm:ss")
location_id = TypedKey(key_column="DOLocationID",
key_column_type=ValueType.INT32,
description="location id in NYC",
full_name="nyc_taxi.location_id")
pu_location_id = TypedKey(key_column="PULocationID",
key_column_type=ValueType.INT32,
description="location id in NYC",
full_name="nyc_taxi.location_id")

agg_features = [Feature(name="tip_amount",
key=[location_id, pu_location_id],
feature_type=FLOAT,
transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
agg_func="AVG",
window="3d")),
Feature(name="total_amount",
key=[location_id, pu_location_id],
feature_type=FLOAT,
transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
agg_func="MAX",
window="3d")),
]

agg_anchor = FeatureAnchor(name="aggregationFeatures",
source=batch_source,
features=agg_features)

client.build_features(anchor_list=[agg_anchor])

now = datetime.now()

# Feature names 'tip_amount' and 'total_amount' are conflicted with dataset columns
# they will be renamed to 'tip_amount_test' and 'total_amoun_test' in the result
feature_query = FeatureQuery(
feature_list=["tip_amount", "total_amount"], key=location_id)
settings = ObservationSettings(
observation_path="wasbs://[email protected]/sample_data/green_tripdata_2020-04_with_index.csv",
event_timestamp_column="lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH:mm:ss",
conflicts_auto_correction=ConflictsAutoCorrection(rename_features=True, suffix="test"))

# replace by your own output path
output_path = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), ".avro"])

client.get_offline_features(observation_settings=settings,
feature_query=feature_query,
output_path=output_path
)
client.wait_job_to_finish(timeout_sec=500)

res_df = get_result_df(client, data_format="avro", res_url = output_path)
assert res_df.shape[0] > 0
108 changes: 108 additions & 0 deletions docs/samples/time_partition_pattern_samples.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Code samples for time partition pattern
from feathr import AvroJsonSchema
from feathr import KafKaSource
from feathr import KafkaConfig
from typing import List
import os
import random
from datetime import datetime, timedelta

from feathr import (BOOLEAN, FLOAT, INPUT_CONTEXT, INT32, STRING,
FeatureQuery, ObservationSettings, Feature, FeatureAnchor, HdfsSource,
TypedKey, ValueType, WindowAggTransformation, BackfillTime,
MaterializationSettings,HdfsSink, Constants)
from feathr import FeathrClient
from feathr.utils.job_utils import get_result_df

# replace by your config path
config_path = "feathr_config.yaml"
# replace by your own data source path
data_source_path = 'dbfs:/timePartitionPattern_postfix_test/df0/daily/'
# replace by your own postfix path
postfix_path = "postfixPath"

# Example for materialize job with 'time_partition_pattern'
client = FeathrClient(config_path=config_path)

batch_source = HdfsSource(name="testTimePartitionSource",
path=data_source_path,
time_partition_pattern="yyyy/MM/dd",
postfix_path=postfix_path
)
key = TypedKey(key_column="key0",
key_column_type=ValueType.INT32)
agg_features = [
Feature(name="f_loc_avg_output",
key=[key],
feature_type=FLOAT,
transform=WindowAggTransformation(agg_expr="f_location_avg_fare",
agg_func="AVG",
window="3d")),
Feature(name="f_loc_max_output",
feature_type=FLOAT,
key=[key],
transform=WindowAggTransformation(agg_expr="f_location_max_fare",
agg_func="MAX",
window="3d")),
]

agg_anchor = FeatureAnchor(name="testTimePartitionFeatures",
source=batch_source,
features=agg_features)
client.build_features(anchor_list=[agg_anchor])

backfill_time_pf = BackfillTime(start=datetime(
2020, 5, 2), end=datetime(2020, 5, 2), step=timedelta(days=1))
now = datetime.now()
# replace by your own output path
output_path_pf = ''.join(['dbfs:/feathrazure_cijob_materialize_offline_','_', str(now.minute), '_', str(now.second), ""])
offline_sink_pf = HdfsSink(output_path=output_path_pf)
settings_pf = MaterializationSettings("nycTaxiTable",
sinks=[offline_sink_pf],
feature_names=[
"f_loc_avg_output", "f_loc_max_output"],
backfill_time=backfill_time_pf)
client.materialize_features(settings_pf)
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)

res_df = get_result_df(client, data_format="avro", res_url=output_path_pf + "/df0/daily/2020/05/02")
assert res_df.shape[0] > 0

# Example for get offline job with 'time_partition_pattern'
client = FeathrClient(config_path=config_path)

batch_source = HdfsSource(name="testTimePartitionSource",
path=data_source_path,
time_partition_pattern="yyyy/MM/dd",
postfix_path=postfix_path
)
tpp_key = TypedKey(key_column="f_location_max_fare",
key_column_type=ValueType.FLOAT)
tpp_features = [
Feature(name="key0",
key=tpp_key,
feature_type=FLOAT,
transform=WindowAggTransformation(agg_expr="key0",
agg_func="LATEST",
window="3d"
))
]
tpp_anchor = FeatureAnchor(name="tppFeatures",
source=batch_source,
features=tpp_features)
client.build_features(anchor_list=[tpp_anchor])

feature_query = FeatureQuery(feature_list=["key0"], key=tpp_key)
settings = ObservationSettings(
observation_path='wasbs://[email protected]/sample_data/tpp_source.csv',
event_timestamp_column="lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH:mm:ss")
# replace by your own output path
output_path = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), ".avro"])
client.get_offline_features(observation_settings=settings,
feature_query=feature_query,
output_path=output_path)
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)

res_df = get_result_df(client, data_format="avro", res_url = output_path)
assert res_df.shape[0] > 0