Zero Downtime Elasticsearch Migrations
At intermix.io, Elasticsearch is a final destination for data that is processed through our data pipeline. Data gets loaded from Amazon Redshift into Elasticsearch via an indexing job. Elasticsearch data then gets served to the intermix.io dashboard to data engineers, giving them a view of the performance and health of their own data pipelines.
The challenge of using Elasticsearch as a serving layer is that migrating data can be painful and requires downtime. There are many things that require a data migration: upgrades, mapping changes, etc. The recommended way of handling this is to use snapshot / restores, but this is problematic for a real-time application for a number of reasons.
In this post, we detail a method to do Elasticsearch migrations with zero downtime. We will share the reasons why we need to do migrations, and the exact procedures we’ve used to execute successful Elasticsearch migrations in our environment.
How we use Elasticsearch
The intermix.io data pipeline is quite complex, but for this discussion we will focus on the relevant pieces regarding Elasticsearch. At a very high-level, the pipeline works as follows:
- Load data from Amazon S3 into Redshift
- Run a DAG of SQL transformations in Redshift
- Run a job to index new rows from our final transformed Redshift tables into Elasticsearch
Elasticsearch indexing jobs do CDC (change data capture) and only append documents with newer timestamps since the last indexing run. Existing documents are never touched, and the indexing jobs are idempotent: on each run, they query Elasticsearch for the most recent document and use that value to query newer rows from Redshift.
The indexing jobs also use a JSON mapping file which defines the indices, fields, field types, and Redshift source columns that should be indexed. This JSON mapping file changes with new releases of our application as we add/remove fields or change data types.
No Schema, no Problem
One of the great things about Elasticsearch is how easy it is to get going. You can spin up a new cluster and immediately start pushing documents into it. Unlike RDBMSs, there’s no need to think through and define your data schema, or mapping, up front. By default, Elasticsearch will automatically try to determine the data type of new fields as you push them in and will set the data type accordingly.
This is great since it reduces the time it takes to get started writing your application code. But as with any NoSQL datastore, not enforcing a process to manage changes to a data schema can lead to headaches down the road. Fortunately, Elasticsearch provides a setting called dynamic which controls this default “anything goes” behavior. By setting dynamic to strict, we can instruct Elasticsearch to reject any documents which have unknown fields in them. This setting can be defined globally in the mapping for your index so that it applies to all documents in the index, or can be used within an inner object to override the setting for that object only.
So while you can avoid defining and enforcing your schema up front, it’s generally not a good idea. And in cases where you need some dynamic fields, Elasticsearch provides a great dynamic templating feature.
So now that you have your mapping clearly defined and enforced with dynamic=strict, you may discover that with Elasticsearch’s performance and flexibility come with a few constraints on how data mappings can be modified:
- You can’t directly change the type of an existing Elasticsearch field
- You can’t directly delete fields from Elasticsearch mappings
To handle both of these cases, you need to effectively reindex your data—either by using Elasticsearch’s _reindex or _update APIs, or by running a job to manually reindex and update documents.
As we started thinking about how to manage Elasticsearch data schema changes, we identified the following challenges:
- How do we manage Elasticsearch field type changes?
- How do we delete unused Elasticsearch fields from the cluster?
- How to we handle other document changes, such as changes to document data?
- How do we manage and validate cluster upgrades?
- How do we make these changes with zero downtime?
Consider the following scenario we faced recently: We wanted to upgrade our Elasticsearch clusters from 5.3 to 6.2, copy over all data to the new cluster, replace the deprecated _type mapping type field with a new doc_type field in all documents, remove several unused fields, and then run the clusters in parallel in our pipeline while we compared and validated the new cluster against the old one and tested our application. Finally, after validation, we wanted to hot-swap the new cluster for the old.
With almost 1.5 billion documents (5 TB) spread across 20 nodes in four environments, we needed a system that would allow us to make these changes in a controlled and systematic way, allowing us to fully validate the result before making any changes to our production system.
To accomplish our goals, we developed a flexible system for making changes to our our Elasticsearch clusters. The system adds an extra flow to our data pipeline which we use during migrations:
To update our Elasticsearch data schema or upgrade our Elasticsearch clusters we do the following:
- Provision a new “Target” Elasticsearch cluster to replace an existing cluster
- Copy the current most recent document from the old Elasticsearch cluster to the new Target cluster
- Launch a new copy of our Elasticsearch indexing job which starts loading new data into the Target cluster using the new data mapping
- Backfill the new cluster by copying documents from the old cluster, applying an optional migration to transform the document to match the new schema.
- Validate the new cluster against the old using a comparison script
- Swap the clusters and stop the old Elasticsearch indexing job
Steps 1, 2 and 4 are managed via a single python script, and step 5 is managed via our Elasticsearch cluster comparison script.
During migrations, we launch a new “Target” Elasticsearch cluster which is indexed using a new “Target” index job. This Target cluster will eventually replace the original Elasticsearch cluster. The Target index job may be running a new version of our indexing code with an updated JSON schema definition for the Target cluster (e.g. if we are changing the logical structure of the Elasticsearch documents, need to make other changes to be compatible with a new version of Elasticsearch, etc), and it runs in parallel with the legacy indexing job.
Since our index job loads all Redshift rows newer than the most recent document already in the Elasticsearch cluster, we first need to copy a single “anchor” document from each source cluster index into the Target cluster before starting our Target index job. Once this is done, we can fire up the Target index job. We’ll start seeing all new data indexed into the Target cluster immediately and at this point can start any other software development further up our stack knowing that fresh data is available in the new cluster for developer testing.
Note, we use an anchor document rather than allowing the Target indexing job to start loading all Redshift data for two reasons: 1. We prune data from Redshift, so only recent data is available for indexing; and 2. for development and testing purposes it is often more useful for us to have recent data available quickly so that we can start validation and development without having to wait for all historical data to be indexed. This is one of the reasons we currently don’t use the _reindex Elasticsearch API—if we did we would need to wait for the entire Cluster to be reindexed before we started our Target indexing job since the reindexing order isn’t well defined.
The final step of the migration is to backfill data from the original Elasticsearch clusters into the Target cluster. This is done with a simple idempotent backfill job that copies documents from the source to the Target cluster, working backwards in time from the oldest document in each Target cluster. The backfill job is also able to apply an optional python function to mutate each document as it copies them (much like the script that Elasticsearch’s _reindex API supports).
For example, we might apply the following function to all documents as they’re copied:
def migrate(index, doc):
# copy the deprecated _type field into a new doc_type field
doc['_source']['doc_type'] = doc['_type']
doc['_type'] = '_doc'
# remove these fields
# return the updated doc
This function is called on each source document, and the returned mutated document is loaded into the Target cluster. Note this migration is also applied when copying the first “anchor” document.
Data validation plays a central role in any data pipeline architecture planning we do. Without proper validation tools and processes in place, development proceeds at a much slower pace since every change requires ad hoc testing.
For validating our Elasticsearch data, we developed a script which compares documents between two Elasticsearch clusters. The script detects missing documents and compares each document on a field-by-field basis. It is also able to skip fields we expect to be different, and is able to apply a migration function when comparing documents to take into account the differences between the source and Target document structure.
Once we have backfilled our new Target cluster, validated the data, and tested our application against the new cluster, we can put it live by simply hot-swapping it out for the old cluster. We do this by:
- pointing our web application to the new cluster,
- stopping the original indexing job in our data pipeline, and then
- decommissioning the old cluster.