Capabilities
Feathr is a scalable platform and below are some of the capabilities it has:
Define Features with Transformation
features = [
Feature(name="f_trip_distance", # Ingest feature data as-is
feature_type=FLOAT),
Feature(name="f_is_long_trip_distance",
feature_type=BOOLEAN,
transform="cast_float(trip_distance)>30"), # SQL-like syntax to transform raw data into feature
Feature(name="f_day_of_week",
feature_type=INT32,
transform="dayofweek(lpep_dropoff_datetime)") # Provides built-in transformation
]
anchor = FeatureAnchor(name="request_features", # Features anchored on same source
source=batch_source,
features=features)
Access Features
# Requested features to be joined
# Define the key for your feature
location_id = TypedKey(key_column="DOLocationID",
key_column_type=ValueType.INT32,
description="location id in NYC",
full_name="nyc_taxi.location_id")
feature_query = FeatureQuery(feature_list=["f_location_avg_fare"], key=[location_id])
# Observation dataset settings
settings = ObservationSettings(
observation_path="abfss://green_tripdata_2020-04.csv", # Path to your observation data
event_timestamp_column="lpep_dropoff_datetime", # Event timestamp field for your data, optional
timestamp_format="yyyy-MM-dd HH:mm:ss") # Event timestamp format, optional
# Prepare training data by joining features to the input (observation) data.
# feature-join.conf and features.conf are detected and used automatically.
feathr_client.get_offline_features(observation_settings=settings,
output_path="abfss://output.avro",
feature_query=feature_query)
Deploy Features to Online (Redis) Store
client = FeathrClient()
redisSink = RedisSink(table_name="nycTaxiDemoFeature")
# Materialize two features into a redis table.
settings = MaterializationSettings("nycTaxiMaterializationJob",
sinks=[redisSink],
feature_names=["f_location_avg_fare", "f_location_max_fare"])
client.materialize_features(settings)
And get features from online store:
# Get features for a locationId (key)
client.get_online_features(feature_table = "agg_features",
key = "265",
feature_names = ['f_location_avg_fare', 'f_location_max_fare'])
# Batch get for multiple locationIds (keys)
client.multi_get_online_features(feature_table = "agg_features",
key = ["239", "265"],
feature_names = ['f_location_avg_fare', 'f_location_max_fare'])
Define Window Aggregation Features
agg_features = [Feature(name="f_location_avg_fare",
key=location_id, # Query/join key of the feature(group)
feature_type=FLOAT,
transform=WindowAggTransformation( # Window Aggregation transformation
agg_expr="cast_float(fare_amount)",
agg_func="AVG", # Apply average aggregation over the window
window="90d")), # Over a 90-day window
]
agg_anchor = FeatureAnchor(name="aggregationFeatures",
source=batch_source,
features=agg_features)
Define Named Data Sources
batch_source = HdfsSource(
name="nycTaxiBatchSource", # Source name to enrich your metadata
path="abfss://green_tripdata_2020-04.csv", # Path to your data
event_timestamp_column="lpep_dropoff_datetime", # Event timestamp for point-in-time correctness
timestamp_format="yyyy-MM-dd HH:mm:ss") # Supports various formats including epoch
Define features on top of other features - Derived Features
# Compute a new feature(a.k.a. derived feature) on top of an existing feature
derived_feature = DerivedFeature(name="f_trip_time_distance",
feature_type=FLOAT,
key=trip_key,
input_features=[f_trip_distance, f_trip_time_duration],
transform="f_trip_distance * f_trip_time_duration")
# Another example to compute embedding similarity
user_embedding = Feature(name="user_embedding", feature_type=DENSE_VECTOR, key=user_key)
item_embedding = Feature(name="item_embedding", feature_type=DENSE_VECTOR, key=item_key)
user_item_similarity = DerivedFeature(name="user_item_similarity",
feature_type=FLOAT,
key=[user_key, item_key],
input_features=[user_embedding, item_embedding],
transform="cosine_similarity(user_embedding, item_embedding)")
Define Streaming Features
# Define input data schema
schema = AvroJsonSchema(schemaStr="""
{
"type": "record",
"name": "DriverTrips",
"fields": [
{"name": "driver_id", "type": "long"},
{"name": "trips_today", "type": "int"},
{
"name": "datetime",
"type": {"type": "long", "logicalType": "timestamp-micros"}
}
]
}
""")
stream_source = KafKaSource(name="kafkaStreamingSource",
kafkaConfig=KafkaConfig(brokers=["feathrazureci.servicebus.windows.net:9093"],
topics=["feathrcieventhub"],
schema=schema)
)
driver_id = TypedKey(key_column="driver_id",
key_column_type=ValueType.INT64,
description="driver id",
full_name="nyc driver id")
kafkaAnchor = FeatureAnchor(name="kafkaAnchor",
source=stream_source,
features=[Feature(name="f_modified_streaming_count",
feature_type=INT32,
transform="trips_today + 1",
key=driver_id),
Feature(name="f_modified_streaming_count2",
feature_type=INT32,
transform="trips_today + 2",
key=driver_id)]
)