Building point-in-time correct, production-grade feature pipelines ā from raw Kafka events to online feature serving in milliseconds, using Spark Structured Streaming and the Databricks Feature Store.
Table of Contents
- The Feature Engineering Problem
- Architecture Overview
- Feature Store Concepts: ERD
- Environment Setup
- Streaming Feature Pipeline
- Point-in-Time Correct Training Dataset Generation
- Writing Features to the Online Store
- Serving Features at Inference Time
- Feature Table Reference
- References
The Feature Engineering Problem
Feature engineering is where most ML projects silently fail in production. Not because the model is wrong ā but because the features the model sees at training time are different from the features it sees at inference time. This is called training-serving skew, and it's the #1 silent killer of ML systems.
Three specific failure modes cause it:
- Online/offline inconsistency ā the batch pipeline that computes training features uses different logic than the real-time service that computes inference features
- Data leakage ā training features accidentally include information from the future (e.g. joining on a label that was created after the event)
- Feature staleness ā a model trained on 30-day rolling averages is served features that are 6 hours stale because the pipeline backfills are slow
The Databricks Feature Store ā now part of Unity Catalog as Feature Engineering in Unity Catalog ā solves all three by:
- Storing feature computation logic alongside the data (no drift between training and serving)
- Enforcing point-in-time lookups during training dataset creation
- Providing a unified API for both batch offline reads and low-latency online reads
Architecture Overview
Feature Store Concepts: ERD
Understanding the data model behind the Feature Store is essential for designing correct pipelines. Here's how the entities relate:
The critical relationship: a Model Version is bound to a Training Set, which records exactly which feature tables and which point-in-time lookups were used. This is how Databricks guarantees reproducibility ā you can always re-create the exact training data that produced any model version.
Environment Setup
# Databricks Runtime ML 13.x+ recommended
# Feature Engineering in Unity Catalog (formerly Feature Store)
%pip install databricks-feature-engineering==0.6.0 --quiet
dbutils.library.restartPython()
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
from databricks.feature_engineering.entities.feature_serving_endpoint import (
ServedEntity, EndpointCoreConfig
)
from pyspark.sql import functions as F, SparkSession
from pyspark.sql.types import (
StructType, StructField, StringType, LongType,
DoubleType, TimestampType, ArrayType
)
import mlflow
spark = SparkSession.builder.getOrCreate()
fe = FeatureEngineeringClient()
# Unity Catalog paths
CATALOG = "prod"
FEATURE_DB = f"{CATALOG}.feature_store"
EVENTS_TABLE = f"{CATALOG}.silver.events_clean"
KAFKA_BROKER = "kafka-broker.internal:9092"
KAFKA_TOPIC = "user-events"
# Checkpoint locations (ADLS / S3 / GCS)
CHECKPOINT_BASE = "abfss://checkpoints@storage.dfs.core.windows.net/features"
Streaming Feature Pipeline
The streaming pipeline reads from Kafka, computes windowed aggregations using Spark's stateful streaming engine, and writes features to the Feature Store via foreachBatch. This keeps the feature table continuously fresh.
# āā Streaming Feature Pipeline āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
# Step 1: Define the raw event schema from Kafka
event_schema = StructType([
StructField("user_id", StringType(), False),
StructField("event_type", StringType(), True),
StructField("product_id", StringType(), True),
StructField("revenue", DoubleType(), True),
StructField("session_id", StringType(), True),
StructField("platform", StringType(), True),
StructField("event_ts", TimestampType(), False),
])
# Step 2: Read from Kafka
raw_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BROKER)
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
.select(
F.from_json(F.col("value").cast("string"), event_schema).alias("data"),
F.col("timestamp").alias("kafka_ts")
)
.select("data.*", "kafka_ts")
)
# Step 3: Apply watermark and compute windowed features
# Watermark: tolerate up to 10 minutes of late data
windowed_features = (
raw_stream
.withWatermark("event_ts", "10 minutes")
.groupBy(
F.col("user_id"),
F.window(F.col("event_ts"), "1 hour", "15 minutes").alias("window")
)
.agg(
F.count("*").alias("event_count_1h"),
F.sum(F.when(F.col("event_type") == "purchase", F.col("revenue"))
.otherwise(0)).alias("revenue_1h"),
F.countDistinct("session_id").alias("session_count_1h"),
F.countDistinct("product_id").alias("unique_products_1h"),
F.sum(F.when(F.col("event_type") == "purchase", 1)
.otherwise(0)).alias("purchase_count_1h"),
F.first("platform").alias("last_platform"),
)
# Flatten window struct to scalar columns
.withColumn("window_start", F.col("window.start"))
.withColumn("window_end", F.col("window.end"))
.withColumn("feature_ts", F.col("window.end")) # timestamp key for PIT lookup
.drop("window")
# Derived features
.withColumn("conversion_rate_1h",
F.when(F.col("event_count_1h") > 0,
F.col("purchase_count_1h") / F.col("event_count_1h"))
.otherwise(0.0))
.withColumn("avg_revenue_per_purchase_1h",
F.when(F.col("purchase_count_1h") > 0,
F.col("revenue_1h") / F.col("purchase_count_1h"))
.otherwise(0.0))
)
# Step 4: Write to Feature Store via foreachBatch
# foreachBatch gives us transactional writes per micro-batch
def write_to_feature_store(batch_df, batch_id):
"""
Called on each micro-batch. Merges feature data into the Feature Store
table using merge_on keys (user_id + feature_ts).
"""
if batch_df.isEmpty():
return
fe.write_table(
name=f"{FEATURE_DB}.user_activity_features",
df=batch_df,
mode="merge", # upsert: update existing, insert new
)
print(f"Batch {batch_id}: wrote {batch_df.count()} feature rows")
# Step 5: Create the feature table (idempotent ā safe to re-run)
try:
fe.create_table(
name=f"{FEATURE_DB}.user_activity_features",
primary_keys=["user_id"],
timestamp_keys=["feature_ts"],
schema=windowed_features.schema,
description=(
"Real-time user activity features computed from event stream. "
"1-hour sliding window, refreshed every 15 minutes. "
"Primary key: user_id. Timestamp key: feature_ts (window end)."
),
)
print("Feature table created.")
except Exception:
print("Feature table already exists ā continuing.")
# Step 6: Launch the streaming query
streaming_query = (
windowed_features.writeStream
.outputMode("update") # update mode for stateful aggregations
.option("checkpointLocation", f"{CHECKPOINT_BASE}/user_activity")
.trigger(processingTime="5 minutes") # micro-batch every 5 min
.foreachBatch(write_to_feature_store)
.start()
)
print(f"Streaming query '{streaming_query.name}' running...")
print(f"Status: {streaming_query.status}")
Point-in-Time Correct Training Dataset Generation
This is the most critical part of the Feature Store. When creating training data, we must join labels to features at the timestamp of the label event ā not the current time. This prevents data leakage.
# āā Point-in-Time Correct Training Dataset āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
# Step 1: Load the label dataset
# Each row = one prediction target event, with the exact timestamp
# at which a model would have needed to make a prediction.
labels_df = (
spark.table(f"{CATALOG}.gold.churn_labels")
.select(
"user_id",
"churn_label", # 0 = retained, 1 = churned
F.col("observation_ts").alias("event_timestamp"), # point-in-time anchor
"experiment_split" # train/val/test
)
.filter(F.col("observation_ts") >= "2024-01-01")
)
print(f"Label rows: {labels_df.count():,}")
labels_df.show(5)
# +----------+-----------+---------------------+-----------------+
# | user_id |churn_label| event_timestamp | experiment_split|
# +----------+-----------+---------------------+-----------------+
# | u_123456 | 0 | 2024-03-15 14:22:00 | train |
# | u_789012 | 1 | 2024-03-15 18:45:00 | train |
# Step 2: Define feature lookups
# as_of_timestamp=None ā use the label's event_timestamp (point-in-time)
# Databricks will join each label row to the feature values
# that were valid at event_timestamp ā not the latest values.
feature_lookups = [
# User activity features ā 1h window features from the streaming pipeline
FeatureLookup(
table_name=f"{FEATURE_DB}.user_activity_features",
feature_names=[
"event_count_1h",
"revenue_1h",
"session_count_1h",
"unique_products_1h",
"purchase_count_1h",
"conversion_rate_1h",
"avg_revenue_per_purchase_1h",
"last_platform",
],
lookup_key="user_id",
timestamp_lookup_key="event_timestamp", # ā PIT anchor
),
# User profile features ā slower-changing, from batch pipeline
FeatureLookup(
table_name=f"{FEATURE_DB}.user_profile_features",
feature_names=[
"account_age_days",
"lifetime_revenue",
"preferred_category",
"subscription_tier",
],
lookup_key="user_id",
timestamp_lookup_key="event_timestamp", # ā PIT anchor
),
# Transaction aggregates ā 30d and 90d rolling windows
FeatureLookup(
table_name=f"{FEATURE_DB}.transaction_features",
feature_names=[
"purchase_count_30d",
"purchase_count_90d",
"avg_order_value_30d",
"days_since_last_purchase",
"category_diversity_score",
],
lookup_key="user_id",
timestamp_lookup_key="event_timestamp",
),
]
# Step 3: Create training dataset (Feature Store handles the PIT join)
training_set = fe.create_training_set(
df=labels_df,
feature_lookups=feature_lookups,
label="churn_label",
exclude_columns=["observation_ts", "experiment_split"],
)
# The returned DataFrame has features + labels, PIT-correct
training_df = training_set.load_df()
print(f"Training rows: {training_df.count():,}")
print(f"Training cols: {len(training_df.columns)}")
training_df.show(3)
# Step 4: Train model and log via Feature Store (preserves lineage!)
from sklearn.ensemble import GradientBoostingClassifier
import pandas as pd
train_pdf = (
training_df
.filter(F.col("experiment_split") == "train")
.drop("experiment_split", "user_id")
.fillna(0)
.toPandas()
)
X_train = train_pdf.drop(columns=["churn_label"])
y_train = train_pdf["churn_label"]
model = GradientBoostingClassifier(
n_estimators=300,
learning_rate=0.05,
max_depth=5,
subsample=0.8,
random_state=42,
)
with mlflow.start_run(run_name="churn-gbm-v1") as run:
model.fit(X_train, y_train)
# Log model via Feature Store ā this records the feature lineage
fe.log_model(
model=model,
artifact_path="churn_model",
flavor=mlflow.sklearn,
training_set=training_set, # ā binds model to its feature lookups
registered_model_name=f"{CATALOG}.ml.user_churn_model",
)
print(f"Logged model with feature lineage. Run: {run.info.run_id}")
Writing Features to the Online Store
For real-time inference, the model needs features in milliseconds ā not the seconds it takes to query Delta Lake. Databricks Feature Store can publish features to an online store (DynamoDB, Cosmos DB, MySQL, etc.) for low-latency reads.
# āā Publish Features to Online Store āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
# Online stores are configured per feature table.
# Here we publish user_activity_features to DynamoDB for <5ms lookups.
from databricks.feature_engineering.entities.feature_store_online_table import (
OnlineTable, OnlineTableSpec, TriggeredSchedulingPolicy
)
# Create an online table spec (backed by a serverless real-time compute layer)
online_table_spec = OnlineTableSpec(
primary_key_columns=["user_id"],
source_table_full_name=f"{FEATURE_DB}.user_activity_features",
run_triggered=OnlineTableSpec.TriggeredSchedulingPolicy(), # sync on-demand
# OR for continuous sync:
# run_continuous=OnlineTableSpec.ContinuousSchedulingPolicy()
)
# Create the online table (idempotent)
online_table = fe.create_online_table(spec=online_table_spec)
print(f"Online table: {online_table.name}")
print(f"Status: {online_table.status.detailed_state}")
# Trigger an initial sync from the offline Delta table to the online store
fe.refresh_online_table(name=f"{FEATURE_DB}.user_activity_features")
Serving Features at Inference Time
At inference time, the Feature Store SDK performs automatic feature lookups, joining the incoming request data with features from the online store before passing them to the model.
# āā Real-Time Feature Serving at Inference āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
import requests, json
WORKSPACE_URL = "https://<workspace>.azuredatabricks.net"
TOKEN = dbutils.secrets.get("prod-scope", "databricks-token")
# Option 1: Model Serving with automatic feature lookup
# When you logged the model with fe.log_model(), Databricks knows which
# features to fetch. You only send the lookup key (user_id) at inference time.
def predict_churn(user_ids: list) -> list:
"""
Send only user_id ā the serving endpoint fetches features automatically
from the online store and runs inference.
"""
payload = {
"dataframe_records": [
{"user_id": uid} for uid in user_ids
]
}
resp = requests.post(
f"{WORKSPACE_URL}/serving-endpoints/churn-predictor/invocations",
headers={
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
},
data=json.dumps(payload),
timeout=5,
)
resp.raise_for_status()
return resp.json()["predictions"]
# Example usage
predictions = predict_churn(["u_123456", "u_789012", "u_345678"])
for uid, pred in zip(["u_123456", "u_789012", "u_345678"], predictions):
print(f"{uid}: churn_probability = {pred:.4f}")
# u_123456: churn_probability = 0.0821
# u_789012: churn_probability = 0.7643
# u_345678: churn_probability = 0.1209
# Option 2: Direct feature lookup via the Feature Serving endpoint
# Useful when you want raw features without running inference
def get_features(user_ids: list) -> dict:
payload = {
"dataframe_records": [{"user_id": uid} for uid in user_ids]
}
resp = requests.post(
f"{WORKSPACE_URL}/serving-endpoints/user-features-serving/invocations",
headers={
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
},
data=json.dumps(payload),
timeout=5,
)
return resp.json()
# Option 3: Batch scoring (offline) ā uses Delta offline store
# No online store needed; reads directly from the feature table with PIT lookup
batch_labels = spark.table(f"{CATALOG}.gold.users_to_score_today") \
.select("user_id", F.current_timestamp().alias("event_timestamp"))
batch_predictions = fe.score_batch(
model_uri=f"models:/{CATALOG}.ml.user_churn_model@champion",
df=batch_labels,
result_type="double",
)
batch_predictions.select("user_id", "prediction") \
.write.format("delta").mode("overwrite") \
.saveAsTable(f"{CATALOG}.gold.churn_scores_daily")
Feature Table Reference
A summary of the feature tables in our pipeline, their update cadence, and their role in the ML lifecycle:
| Feature Table | Primary Key | Timestamp Key | Update Method | Latency | Used In |
|---|---|---|---|---|---|
user_activity_features |
user_id |
feature_ts |
Spark Structured Streaming | ~5 min | Real-time churn, recommendation |
transaction_features |
user_id |
feature_ts |
Scheduled batch (hourly) | ~60 min | Churn, LTV prediction |
user_profile_features |
user_id |
updated_at |
CDC from OLTP (near real-time) | ~2 min | All models |
product_features |
product_id |
feature_ts |
Scheduled batch (daily) | ~24 hr | Recommendation, search ranking |
session_features |
session_id |
session_end_ts |
Streaming (micro-batch) | ~1 min | Click-through rate, abandon prediction |
cohort_features |
cohort_id |
computed_at |
Weekly batch | ~7 days | Segmentation, A/B analysis |
Freshness vs cost tradeoff: Streaming features are ~10Ć more expensive to compute than batch features (continuous cluster vs scheduled job). Only promote a feature to streaming if your model's performance degrades meaningfully with stale data ā validate this with an offline ablation study first.
Key Takeaways
- Training-serving skew is the silent killer of production ML ā the Feature Store eliminates it by encoding feature computation logic once and using it in both training and serving paths.
-
Point-in-time correct joins via
timestamp_lookup_keyare non-negotiable for any model trained on time-series data. A missingevent_timestampin your label table is a data leakage bug waiting to happen. -
fe.log_model()is the right model logging call, notmlflow.sklearn.log_model(). It records feature lineage, enabling reproducible re-training and automatic feature lookup at serving time. - Watermarks in Structured Streaming are critical for stateful aggregations ā without them, Spark accumulates state indefinitely and the job eventually OOMs. Set them to the maximum tolerable late-data window.
- Online stores are only worth the operational cost when your SLA is under ~100ms. For batch scoring jobs or APIs with >500ms budgets, read directly from the offline Delta table.
-
fe.score_batch()is the cleanest way to run periodic batch inference ā it handles PIT feature lookups automatically, keeps inference logic DRY, and logs results to Delta for downstream consumers.
References
Databricks ā Feature Engineering in Unity Catalog (Overview)
š https://docs.databricks.com/en/machine-learning/feature-store/uc/feature-tables-uc.htmlDatabricks ā Create and Manage Online Tables
š https://docs.databricks.com/en/machine-learning/feature-store/online-tables.htmlDatabricks ā Point-in-Time Feature Lookups
š https://docs.databricks.com/en/machine-learning/feature-store/time-series.htmlApache Spark ā Structured Streaming Programming Guide
š https://spark.apache.org/docs/latest/structured-streaming-programming-guide.htmlApache Spark ā Streaming Watermarks for Late Data Handling
š https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarkingDatabricks ā Feature Store Python API Reference
š https://docs.databricks.com/en/machine-learning/feature-store/python-api.htmlDatabricks ā Score Batch with Feature Store
š https://docs.databricks.com/en/machine-learning/feature-store/score-batch.html"Feature Stores for ML" ā Feast Documentation (open-source reference)
š https://docs.feast.dev/"Rethinking Feature Stores" ā Chip Huyen (huyenchip.com)
š https://huyenchip.com/2023/01/08/feature-store.htmlDatabricks ā Model Serving with Automatic Feature Lookup
š https://docs.databricks.com/en/machine-learning/model-serving/feature-store-model-serving.html"Building Machine Learning Pipelines" ā Hannes Hapke & Catherine Nelson (O'Reilly)
š https://www.oreilly.com/library/view/building-machine-learning/9781492053187/
This concludes the 4-part Databricks series:
- ā Part 1: Building Production-Grade Delta Lake Pipelines with Apache Spark
- ā Part 2: Fine-Tuning LLMs at Scale with Databricks MLflow and Spark
- ā Part 3: Apache Spark Query Optimization ā Catalyst, AQE, and Photon Engine
- ā Part 4: Real-Time AI Feature Engineering with Spark Structured Streaming












