Real-Time Machine Learning at Headspace
Author: Yu Chen, Senior Software Engineer / Co-author: Koyuki Nakamori, Senior Engineering Manager
Data is often most valuable when it can be immediately leveraged to make decisions in the moment, but traditionally consumer data is ingested, transformed, persisted, and sits dormant for lengthy periods of time before being used by machine learning and analytics teams.
Headspace’s core products are iOS, Android, and web-based apps that focus on improving the health and happiness of its users through mindfulness, meditation, sleep, exercise, and focus content. Machine learning models are core to our user experiences by offering recommendations that engage our users with new relevant, personalized content that builds consistent habits in their lifelong journey.
Finding a way to leverage user data to generate real-time insights and decisions means that consumer-facing products like the Headspace app can dramatically shorten the end-to-end user feedback loop: actions that users perform a few seconds, or minutes ago can be incorporated in generating a more relevant, personalized, and context-specific content recommendation for the user.
This means our ML models could incorporate dynamic features that update throughout the course of a user’s day, or even an individual session:
- Current session bounce rates for sleep content
- Semantic embeddings for recent user search terms (if a user recently searched for “preparing for big exam”, the ML model can assign more weight to Focus-themed meditations)
- Users’ biometric data (i.e., if step counts and heart rate are increasing over the last 10 minutes, we can recommend Move or Exercise content)
The Headspace Machine Learning team architected a solution by decomposing the infrastructure systems into modular Publishing, Receiver, Orchestration, and Serving layers, leveraging Apache Spark Structured Streaming on Databricks, AWS SQS, Lambda, and Sagemaker to deliver real-time inference capabilities for our machine learning models.
Real-Time Inference Requirements
In order facilitate real-time inference that personalize our user’s content recommendations, we need to
- Ingest, process, and forward along relevant events (actions) that our users perform on our client apps (iOS, Android, web).
- Quickly compute store, and fetch online features (millisecond latency) that enrich the feature set used by a real-time inference model
- Serve and reload the real-time inference model in a way that synchronizes the served model with online feature stores while minimizing (and ideally avoiding) any downtime.
Our ballpark end-to-end latency target (from user event forwarded to Kinesis stream to real-time inference prediction available) was 30 seconds.
Challenges Adapting the Traditional ML Model Workflow
The requirements we listed are problems that are often not solved (and don’t need to be solved) with offline models that serve daily batch predictions. ML models that make inferences from records pulled and transformed from an ELT / ETL data pipeline usually have lead times of multiple hours for raw event data. Traditionally, an ML model’s training and serving workflow would involve the following steps, executed via periodic jobs that run every few hours or daily:
- Pull relevant raw data from upstream data stores: For Headspace, this involves using Spark SQL to query from the upstream data lake maintained by our Data Engineering team.
For real-time inference: We experience hundreds/thousands of prediction requests per second, so using SQL to query from a backend database introduces unacceptable latency. While model training requires pulling complete data sets, real-time inference often involves small, individual user subset slices of this same data. Therefore, we use AWS Sagemaker Online Feature Groups, which is capable of fetching and writing individual user features with single-digit millisecond response times (Step 3 in diagram).
2. Perform data preprocessing (feature engineering, feature extraction, etc.) using a mix of SQL and Python.
For real-time inference: We enrich Spark Structured Streaming micro-batches of raw event data with real-time features from Sagemaker Feature Store Groups.
3. Train the model and log relevant experiment metrics: with MLflow, we can register models and then log their performance across different experiment runs from within the Databricks Notebook interface.
4. Persist the model to disk: When MLflow logs a model, it serializes the model using the ML library’s native format. For instance, scikit-learn models are serialized using the pickle library.
Make predictions on the relevant inference dataset: In this case, we use our newly trained recommendation model to generate fresh content recommendations for our user base.
5. Persist the predictions to be served to users. This depends on the access patterns in production to deliver a ML prediction to an end user.
For real-time inference: We can either register predictions to our Prediction Service so that end users who navigate to ML-powered tabs can pull down predictions, or we can forward the predictions to another SQS queue, which will end up sending content recommendations via push iOS/Android notifications to the user.
6. Orchestration: Traditional batch-inference models utilize tools like Airflow, Kubeflow to schedule and coordinate the different stages/steps.
For real-time inference: We use lightweight Lambda functions to unpack/pack data in the appropriate messaging formats, invoke the actual Sagemaker endpoints, and perform any required post-processing and persistence.
High Level Architecture Overview
Users generate events by performing actions inside of their Headspace app — these are ultimately forwarded to our Kinesis streams to be processed by Spark Structured Streaming. User apps fetch predictions by making RESTful HTTP requests to our backend services, passing along their user IDs and feature flags to indicate which type of ML recommendations to send back. The other components of the architecture will be described in more detail below.
The Publishing and Serving Layer: Model Training and Lifecycle Deployments
ML models are developed in Databricks notebooks and evaluated via MLflow experiments on core offline metrics (for example, recall at k for recommendation systems). The Headspace ML team has written wrapper classes that extend the base Python Function Model Flavor class in MLflow:
The Headspace ML team’s model wrapper class invokes MLflow’s own save_model method to perform much of the implementation logic, creating a directory in our ML Models S3 bucket that contains the metadata, dependencies, and model artifacts needed to build an MLflow model Docker image:
We can then create a formal Github Release that points to the model we just saved in S3. can be picked up by CI/CD tools such as CircleCI that test and build MLflow model images that are ultimately pushed to AWS ECR, where they are deployed onto Sagemaker model endpoints.
Updating and Reloading Real-Time Models
We re-train our models frequently, but updating a real-time inference model in production is tricky. AWS has a variety of deployment patterns (gradual rollout, canary, etc.) that we can leverage to update the actual served Sagemaker model. However, real-time models also require in-sync online feature stores, which — given the size of Headspace’s user base, can take up to 30 minutes to fully update. Given that we do not want downtime each time we update our model images, we need to be careful to ensure we synchronize our feature store with our model image.
Take, for example, a model that maps a Headspace user ID to a user sequence ID as part of a collaborative filtering model — our feature stores must contain the most updated mapping of user ID to sequence ID. Unless user populations remain completely static, if we only update the model, our user IDs will be mapped to stale sequence ID at inference time, resulting in the model generating a prediction for a random user instead of the target user.
Blue Green Architecture
To address this issue, we can adopt a blue-green architecture that follows from the DevOps practice of blue-green deployments. The workflow is illustrated below:
- Maintain two parallel pieces of infrastructure (two copies of feature stores, in this case).
- Designate one as the production environment (let’s call it the “green” environment, to start) and route requests for features and predictions towards it via our Lambda
- Every time we wish to update our model, we use a batch process/script to update the complementary infrastructure (the “blue” environment) with the latest features. Once this update is complete, switch the Lambda to point towards the blue production environment for features/predictions.
- Repeat this each time we want to update the model (and its corresponding feature store).
The Receiver Layer: Event Stream Ingestion with Apache Spark Structured Streaming Scheduled Job
Headspace users’ event actions (logging into the app, playing a specific piece of content, renewing a subscription, searching for content, etc.) are aggregated and forwarded onto Kinesis Streams (Step 1 in diagram). We leverage the Spark Structured Streaming framework on top of Databricks to consume from these Kinesis Streams. Structured Streaming offers several benefits:
- It leverages the same unified language (Python/Scala) and framework (Spark) shared by data scientists, data engineers, and analytics team members, allowing multiple Headspace teams to reason about user data using the familiar Dataset / DataFrame APIs and abstractions.
- It allows our teams to implement custom micro-batching logic to meet business requirements (for example, we could trigger and define micro batches based on custom event-time windows and session watermarks logic on a per-user basis).
- It comes with existing Databricks infrastructure tools (Scheduled Jobs, automatic retries, efficient DBU credit pricing, email notifications for process failure events, built-in Spark Streaming dashboards, ability to quickly auto-scale to meet spikes in user app event activity) that significantly reduce the infrastructure administration burden on ML engineers.
Structured Streaming uses micro-batching to break up the continuous stream of events into discrete chunks, processing incoming events in small micro-batch dataframes.
When working with streaming data, you’ll have to differentiate between event-time (when the event actually occurs on the client device) and processing-time (when the data is seen by your compute machines) — due to network partitions, client-side buffering, and a whole host of other reasons, you’ll often see non-trivial discrepancies between these two timestamps. You can customize this logic relatively easily within the Structured Streaming API:
We configure the Structured Streaming Job with the following parameters:
- 1 Maximum Concurrent Runs
- Unlimited retries
- New Scheduled Job clusters (as opposed to an All-Purpose cluster)
Using Scheduled Job clusters significantly reduces compute DBU costs while also mitigating the likelihood of correlated infrastructure failures. Jobs that run on a faulty cluster (perhaps with missing/incorrect dependencies, instance profiles, overloaded availability zones, etc.) will fail until the underlying cluster issue is fixed, but separating jobs across clusters prevents interference.
We then point the stream query to read from a specially configured Amazon Kinesis Stream that aggregates user client-side events (Step 2 of diagram). The stream query can be configured using the following logic:
Here, outputMode defines the policy for how data is written to a streaming sink, and can take on three values: append, complete, and update. Since our Structured Streaming Job is concerned with handling incoming events, we select append to only process “new” rows.
It is a good idea to configure a checkpoint location to gracefully restart a failed streaming query, allowing you to “replay” or pick back up your processing if necessary.
Depending on the business use case, we can also choose to reduce latency by setting the argument processingTime = “0 seconds”, which starts each micro-batch as soon as possible:
In addition, our Spark Structured Streaming job cluster assumes a special EC2 Instance Profile with the appropriate IAM policies to interact with AWS Sagemaker Feature Groups and put messages onto our prediction job SQS queue.
Ultimately, since each Structured Streaming job you run will incorporate different business logic, we will need to implement different micro batch processing functions that will be invoked once per micro-batch.
In our case, we’ve implemented a process_batch method that first computes/updates online features on AWS Sagemaker Feature Store, and then forwards user events to the job queue (Step 3):
The Orchestration Layer: Decoupled Event Queues and Lambdas as Feature Transformers
Headspace users produce events that our real-time inference models consume downstream to make fresh recommendations. However, user event activity volume is not uniformly distributed. There are various peaks and valleys — our users are often most active during specific times of day (early morning, mid-afternoon break, evening before bedtime).
Messages that are placed into the SQS prediction job queue are processed by AWS Lambda functions (Step 4 in above diagram), which performs the following steps:
- Unpack the message and fetch the corresponding online and offline features for the user whom we want to make a recommendation for (Step 5 in above diagram). For instance, we may augment the event’s temporal/session-based features with user tenure level, gender, locale, etc.
- Performs any final pre-processing business logic (for instance, mapping our Headspace user IDs to user sequence IDs to fit the interface expected by certain collaborative filtering models that we deploy).
- Selecting the appropriate served Sagemaker model and invoking it with the input features (Step 6 in above diagram).
- Forwarding along the recommendation to its downstream destination (Step 7 in above diagram). The actual location depends on whether we want users to pull down content recommendations or push recommendations out to users:
- Pull: Persisting the final recommended content to our internal Prediction Service, which is responsible for ultimately supplying users with their updated personalized content for many of the Headspace app’s tabs upon client app request. An example experiment of using real-time inference infrastructure to allow users to fetch personalized recommendations from the Headspace app’s Today tab:
- Push: Placing the recommendation onto another SQS push notifications queue that results in a push notification or in-app modal content recommendations being pushed to users. Examples of (above) in-app modal push recommendations triggered from a user recent search for sleep content and (below) iOS push notification from a recent user content completion:
Within minutes of completing a specific meditation or performing a search, these push notifications can serve a relevant next piece of content while the context is still top of mind for the user.
In addition, utilizing this event queue allows prediction job requests to be retried — a small visibility timeout window (10–15 seconds) for the SQS queue can be set so that if a prediction job is not completed within that time window, another Lambda function is invoked to retry.
From an infrastructure and architecture perspective, a key learning is prioritizing designing flexible hand-off points between different services — in our case, the Publishing, Receiver, Orchestrator, and Serving layers. For instance,
- What format should the message payload that our Structured Stream jobs send to the prediction SQS queue use?
- What is in the model signature and HTTP POST payload that each Sagemaker model expects?
- How do we synchronize the model image and the online feature stores so that we can safely and reliably update retrained models once in production?
Proactively addressing these questions will help decouple the various components of a complex ML architecture into smaller, modular sets of infrastructure.
The Headspace ML team is still rolling out production use cases for this infrastructure, but initial A/B tests and experiments have seen strong lifts in content start rates, content completion rates, and (for push notifications) direct/total push open rates relative to both other Headspace initiatives and industry benchmarks.
By leveraging models capable of real-time inference, Headspace significantly reduces the end-to-end lead time between user actions and personalized content recommendations. Events stream — recent searches, content starts/exits/pauses, in-app navigation actions, even biometric data — within the current session can all be leveraged to constantly update the recommendations we serve to users while they are still interacting with the Headspace application.