Table of Contents
Today we’re really excited to be writing about the launch of the new Amazon Redshift RA3 instance type. The launch of this new node type is very significant for several reasons:
This is the first feature where Amazon Redshift can credibly claim “separation of storage and compute”. And because a ra3.12xlarge cluster must have at least two nodes, the minimum cluster size is a whopping 128Tb. This number is so high that it effectively makes storage a non-issue.
In this post, we’re going to explore the performance of the new ra3.16xlarge instance type and compare it to the next largest instance type, the ds2.8xlarge. Since the ra3.16xlarge is significantly larger than the ds2.8xlarge, we’re going to compare a 2-node ra3.16xlarge cluster against a 4-node ds2.8xlarge cluster to see how it stacks up.
On paper, the ra3.16xlarge nodes are around 1.5 times larger than ds2.8xlarge nodes in terms of CPU and Memory, 2.5 times larger in terms of I/O performance, and 4 times larger in terms of storage capacity:
On to the tests!
A reported improvement for the RA3 instance type is a bigger pipe for moving data into and out of Redshift. Since loading data from a storage layer like S3 or DynamoDB to compute is a common workflow, we wanted to test this transfer speed.
We used Redshift’s COPY command to read and load data files from S3, which had been unloaded from a source table with 3.8 billion rows. The target table was dropped and recreated between each copy. We followed best practices for loading data into Redshift, such as using a manifest file to define the data files being loaded and defining a distribution style on the target table.
While the DS2 cluster averaged 2h 9m 47s to COPY data from S3 to Redshift, the RS3 cluster performed the same operation at an average of 1h 8m 21s:
The test demonstrated that improved network I/O on the ra3.16xlarge cluster loaded identical data nearly 2x faster than the ds2.8xlarge cluster.
One of the things we were particularly interested in benchmarking is the advertised benefits of improved I/O, both in terms of network and storage. In our experience, I/O is most often the cause of slow query performance. These benefits should supposedly improve the performance not only of getting data into and out of Redshift from S3, but also the performance of transferring data between nodes (for example, when data needs to be redistributed for queries that join on non-distkey table columns), and of storing intermediate results during query execution.
To compare relative I/O performance, we looked at the execution time of a deep copy of a large table to a destination table that uses a different distkey. This should force Redshift to redistribute the data between the nodes over the network, as well as exercise the disk I/O for reads and writes.
For this test, we used a 244 Gb test table consisting of 3.8 billion rows which was distributed fairly evenly using a DISTKEY. We created an empty target table which differed from the source table in that it used DISTSTYLE of EVEN, and then did an INSERT INTO … SELECT * from the source table into the target table several times. To make it easy to track the performance of the SQL queries, we annotated each query with the task benchmark-deep-copy and then used the Intermix dashboard to view the performance on each cluster for all SQL queries in that task.
The test showed that the DS2 cluster performed the deep copy on average in about 1h 58m 36s:
while the RA3 cluster performed almost twice the number of copies in the same amount of time, clocking in at 1h 2m 55s on average per copy:
This indicated an improvement of almost 2x in performance for queries which are heavily in network and disk I/O.
Benchmarks are great to get a rough sense of how a system might perform in the real-world, but all benchmarks have their limitations. So in the end, the best way to evaluate performance is with real-world code running on real-world data.
To compare the 2-node ra3.16xlarge and 4-node ds2.8xlarge clusters, we setup our internal data pipeline for each cluster. We copied a large dataset into the ds2.8xlarge, paused all loads so the cluster data would remain fixed, and then snapshotted that cluster and restored it to a 2-node ra3.16xlarge cluster.
We then started our data product pipeline and fired up our intermix dashboard to quantitatively monitor performance and characteristics of the two clusters.
Our primary Redshift data product pipeline consists of batch ETL jobs that reduce raw data loaded from S3 (aka “ELT”). The ETL transformations start with around 50 primary tables, and go through several transformations to produce around 30 downstream tables. These 30 tables are then combined and loaded into serving databases (such as Elasticsearch) for serving. So this all translates to a heavy read/write set of ETL jobs, combined with regular reads to load the data into external databases.
While our pipeline also includes some external jobs that occur in platforms outside of Redshift, we’ve excluded the performance of those jobs from this post, since it is not relevant to the ra3.16xlarge to ds2.8xlarge comparison.
To start with, we looked at the overall query performance of our pipeline running on the identical data on the ds2.8xlarge cluster and the ra3.16xlarge cluster. Our Intermix dashboards reported a P95 latency of 1.1 seconds and a P99 latency of 34.2 seconds for the ds2.8xlarge cluster:
The ra3.16xlarge cluster showed a noticeable improved overall performance:
P95 latency was 36% faster at 0.7s, and P99 latency was 19% faster–a significant improvement.
Viewing our query pipeline at a high-level told us that throughput had on average improved significantly on the ra3.16xlarge cluster. But the performance of data product pipelines is often limited by the worst-performing queries in the pipeline. So next we looked at the performance of the slowest queries in the clusters.
Since we tag all queries in our data pipeline with SQL query annotations, it is trivial to quickly identify the steps in our pipeline that are slowest by plotting max query execution time in a given time range and grouping by the SQL query annotation:
Each series in this report corresponds to a task (typically one or more SQL queries or transactions) which runs as part of an ETL DAG (in this case, an internal transformation process we refer to as sheperd).
The slowest task on both clusters in this time range was get_samples-query, which is a fairly complex SQL transformation that joins, processes, and aggregates 11 tables. On the 4-node ds2.8xlarge, this task took on average 39 minutes and 18 seconds:
This same task running on the 2-node ra3.16xlarge took on average 32 minutes and 25 seconds, an 18% improvement!
This result is pretty exciting: For roughly the same price as a larger ds2.8xlarge cluster, we can get a significant boost in data product pipeline performance, while getting twice the storage capacity.
Moving on to the next-slowest-query in our pipeline, we saw average query execution improve from 2 minutes on the ds2.8xlarge down to 1 minute and 20 seconds on the ra3.16xlarge–a 33% improvement!
The launch of the new RA3 instances addresses one of the biggest pain-points we’ve seen our customers have with administering an Amazon Redshift cluster: managing storage. While seemingly straightforward, dealing with storage in Redshift causes several headaches:
We’ve seen variations of these problems over and over with our customers, and expect to see this new RA3 instance type greatly reduce or eliminate the need to scale Redshift clusters just to add storage. In practice, we expect that workloads will likely always become CPU, Memory, or I/O bound before they become storage bound, making the decision to add a node (vs scale back or optimize the data product pipeline) much simpler.
And then there’s also Amazon Redshift Spectrum, to join data in your RA3 instance with data in S3 as part of your data lake architecture, to independently scale storage and compute.
The performance boost of this new node type (a big part of which comes from improvements in network and storage I/O) gives RA3 a significantly better bang-for-the-buck compared to previous generation clusters.
We’ve also received confirmation from AWS that they will be launching another RA3 instance type, ra3.4xlarge, so you’ll be able to get all the benefits of this node type even if your workload doesn’t require quite as much horsepower.
We highly recommend giving this new node type a try–we’re planning on moving our workloads to it!
As always, we’d love your feedback on our results and to hear your experiences with the new RA3 node type. Feel free to get in touch directly, or join our Redshift community on Slack.
The AWS team recently released a new feature of their Automatic WLM that we’re really excited about: Query Priorities. This feature aims to address the main limitation of Auto WLM head-on, i.e. it allows you to prioritize some queries over other queries. This is a crucial performance enhancement that is needed to achieve Data SLAs.
We first covered Redshift’s new Automatic WLM feature on our blog before Query Priority was fully released, and found mixed results: it was highly effective (maybe too effective!) at reducing the percentage of disk-based queries, but had the side effect of increasing overall queuing on our clusters since big queries consumed most of the memory for the cluster. For our use-case, this tradeoff didn’t work–the additional queueing caused unacceptable delays to our data pipeline. Since writing that blog post, we ended up reverting all of our clusters to our well-tuned Manual WLM.
We’ve already covered the basics of setting up Automatic WLM with Query Priorities from a very high-level, and in this post are going to give a first look at the performance we’re seeing with this new feature.
As a reminder, Automatic WLM dynamically assigns cluster resources to queries as they run. This is in contrast to Manual WLM, where you need to manually choose a memory and concurrency value for each queue that queries run in. The benefit of having Automatic WLM choose these values for you is less about making it easier (optimally configuring Manual WLM is pretty easy), and more about making it dynamic–in theory each query should get the right amount of cluster resources, no more, no less. This should lead to a more optimal use of your cluster, resulting in a higher query throughput and less wasted memory.
In practice however, Automatic WLM (without Query Priority) has no way of knowing the impact additional queue time will have on your data SLAs and hence your business. So the net result of Automatic WLM on your data SLA might be negative, even if the cluster’s use of resources is more efficient by some measure.
This is exactly the effect we saw in our earlier post: long-running queries ran up to 3x faster (and fewer went to disk), since big queries were allocated more resources. But this came at the expense of additional queue time for queries overall. The net result was a decrease in overall throughput in our data pipeline. Manual WLM allows us to tune that tradeoff, letting some queries go disk-based if it means keeping the overall latency of our pipeline below our data SLA.
Automatic WLM with Query Priorities intends to solve this exact problem: they allow you to let Amazon Redshift know how to prioritize queries when allocating cluster resources. Query Priorities are managed via Automatic WLM query queues, with each queue getting a priority:
As with Manual WLM, queries are assigned to queues using either database user groups or “query groups” (a label you can set in the database session using the SET query_group TO statement before you submit a query). The idea is that you simply rank your workloads in terms of priority, and Redshift handles allocating cluster resources optimally.
Redshift uses these query priorities in three ways:
Following the guidelines in our Automatic WLM configuration post, we looked at one of our development Redshift clusters and first enumerated the workloads we have running. We then grouped their users into groups based on workload:
|Workload||Database User Group|
|Automated queries from other parts of our system||intermix_app|
|Data loads and Transformations||intermix_write|
|Developer Ad-hoc queries||intermix|
Next, we assigned a priority to each workload and decided if any should have Concurrency Scaling enabled:
|Workload||Database User Group||Priority||Concurrency Scaling|
|Data loads and Transformations||intermix_write||Highest||No|
|Automated queries from other parts of our system||intermix_app||High||Yes|
|Developer Ad-hoc queries||intermix||Low||Yes|
Finally, we rebooted and started checking our Intermix.io dashboard for results.
At a high-level, our results looked markedly improved compared to our previous Automatic WLM tests. We saw a significant improvement in average execution time (light blue) accompanied by a corresponding increase in average queue time (dark blue):
Overall, the net result of this was a small (14%) decline in overall query throughput.
As before, these changes were driven by a dramatic decrease in execution times for our slow, disk-based queries:
Next, we looked at a breakdown of execution time by workload using the Insights feature of our dashboard:
This again showed a dramatic improvement in average execution times, though the overall count of queries (i.e. query throughput) went down slightly:
We also saw an increase in queue times for all users, which is expected given that Redshift must queue lower priority queries in order to allocate resources for higher priority queries:
Finally, we saw an expected increase in aborted queries for lower priority workloads (which includes queries that are evicted and re-queued to make room for higher priority queries):
We suspect that the reason we didn’t see a significant improvement when using Automatic WLM with Query Priority is due to the fact that our cluster workloads are primarily dominated by a single very resource-intensive batch processing pipeline (executed by the intermix_write group). Once data is processed and reduced by this workload, it gets offloaded into other databases for serving (either Elasticsearch or RDS via DBLink). So while it is the case that our clusters do run a mix of workloads, the fact that the bulk of the work is dominated by this single resource-intensive transformation pipeline means that there may simply not be enough wiggle-room in terms of workloads to move around to give Redshift enough parameters to optimize.
Further, with Manual WLM we’re able to tune our clusters for optimal performance given this dominant transformation workload, which itself is a complex mix of queries. Our data SLA metric isn’t simply “minimize disk-based queries” or “maximize query execution time for this group of queries”–it is “decrease the final latency at the end of this complex data pipeline”.
Finally, since our workload is dominated by write queries, we don’t get much gain from enabling concurrency scaling or SQA to offload read-only queries.
Despite the fact that Automatic WLM was slightly slower than our Manual WLM configuration, we think that it is the right way to approach WLM: it allows you as a Redshift Administrator to prioritize workloads to align with your business needs, as opposed to tuning lower-level knobs (concurrency and memory) to achieve the same result. And as the AWS Redshift team improves their WLM tuning algorithms, we expect that the performance of Automatic WLM will continue to improve.
We recommend that most Redshift admins try Automatic WLM, especially if any of the following are true:
If you do use Automatic WLM, we strongly recommend you enable Query Priorities if you have any workloads for which consistent performance is important (and who doesn’t?) We also recommend enabling SQA for your cluster, and enabling Concurrency Scaling for any Priority Queues which run eligible queries. Both of these will reduce and maybe even eliminate queuing for those workloads, especially during times of increased query volume (think BI tools where your users are querying your warehouse in real-time).
We’d love to hear your thoughts and if Automatic WLM with Query Priorities has worked for you–reach out to us in our Redshift community slack channel and share your experience with the community!
At intermix.io, we work with companies that build data pipelines and data lakes in the cloud. Some start “cloud-native”, others migrate from on-premise solutions like Oracle or Teradata. What they all have in common though is the one question they ask us at the very beginning:
“How do other companies build their data pipelines?”
And so that’s why we decided to compile and publish a list of publicly available blog posts about how companies build their data pipelines. In those posts, the companies talk in detail about how they’re using data in their business, and how they’ve become “data-centric”.
Here’s the short list of the 14 companies:
11. Dollar Shave Club
And we’re sorry if we missed your post – we’re happy to include it, just fill out this form (take less than a minute). And with that – please meet the 14 examples of data pipelines from the world’s most data-centric companies
Getting data-driven is the main goal for Simple. It’s important for the entire company to have access to data internally. Instead of the analytics and engineering teams to jump from one problem to another, a unified data architecture spreading across all departments in the company allows building a unified way of doing analytics. The main problem then is how to ingest data from multiple sources, process it, store it in a central data warehouse, and present it to staff across the company. Similar to many solutions nowadays, data is ingested from multiple sources into Kafka, before passing it to compute and storage systems. The warehouse of choice is Redshift, selected because of its SQL interfaces, and the ease to process petabytes of data. Finally reports, analytics and visualizations are powered using Periscope Data. In such a way, the data is easily spread across different teams allowing them to make decisions based on data.
Clearbit was a rapidly growing, early-stage startup when it started thinking of expanding its data infrastructure and analytics. After trying out a few out-of-the-box analytics tools (and each of them failed to satisfy the company’s demands) they took building the infrastructure in their own hands. Their efforts converged into a trio of providers: Segment, Redshift, and Mode. Segment is responsible for ingesting all kind of data, combining it, and syncing it daily into a Redshift instance. The main data storage is obviously left to Redshift, with backups into AWS S3. Finally, since Redshift supports SQL, Mode is a perfectly fitted tool which is used to running queries (while using Redshift’s powerful data processing abilities) and creating data insights.
Mode make it easy to explore, visualize and share that data across your organization.
But as data volume grows, that’s when data warehouse performance goes down. With ever increasing calls to your data from analysts, your cloud warehouse becomes the bottleneck. Uncertainty why queries take longer and longer to complete frustrates analysts and engineers alike.
That’s why we’ve built intermix.io, so that Mode users get all the tools they need to optimize their queries running on Amazon Redshift. Here one of our dashboards that shows you how you can track queries from Mode down to the single user:
The whole data architecture at 500px is mainly based on two tools: Redshift – for data storage, and Periscope – for analytics, reporting, and visualization. From a customer-facing side, the company’s web and mobile apps run on top of a few API servers, backed by several databases – mostly MySQL. Data in these DBs is then processed through a Luigi ETL, before storing it to S3 and Redshift. Splunk here does a great job in querying and summarizing text-based logs. Periscope Data is responsible for building data insights and sharing them across different teams in the company. All in all, this infrastructure supports around 60 people distributed across a couple of teams within the company, as of 2015.
The data infrastructure at Netflix is certainly one of the most complex ones, having in mind that they serve over 550 billion events per day, equaling roughly to 1.3 petabytes of data. In general, Netflix’s architecture is broken down into smaller systems, such as systems for data ingestion, analytics, predictive modeling etc. The data stack employed in the core of Netflix is mainly based on Apache Kafka for real-time (sub-minute) processing of events and data. Data needed in the long-term is sent from Kafka to AWS’s S3 and EMR for persistent storage, but also to Redshift, Hive, Snowflake, RDS and other services for storage regarding different sub-systems. Metacat is built to make sure the data platform can interoperate across these data sets as a one “single” data warehouse. Its task is to actually connect different data sources (RDS, Redshift, Hive, Snowflake, Druid) with different compute engines (Spark, Hive, Presto, Pig). Other Kafka outputs lead to a secondary Kafka sub-system, predictive modeling with Apache Spark, and Elasticsearch. Operational metrics don’t flow through the data pipeline but through a separate telemetry system named Atlas.
The tech world has changed dramatically since Yelp was launched back in 2004. Eight years later and Yelp started its change. It transformed from running a huge monolithic application on-premises to one built on microservices running in the AWS cloud. By the end of 2014, there were more than 150 production services running, with over 100 of them owning data. Its main part of the cloud stack is better known as PaaSTA, based on Mesos and Docker, offloading data to warehouses such as Redshift, Salesforce and Marketo. Data enters the pipeline through Kafka, which in turn receives it from multiple different “producer” sources.
Gusto, founded in 2011, is a company that provides a cloud-based payroll, benefits and workers’ compensation solution for businesses. Their business has grown steadily over the years, currently topping to around 60 thousand customers. By early 2015, there was a growing demand within the company for access to data. Up until then, the engineering team and product managers were running their own ad-hoc SQL scripts on production databases. There was obviously a need to build a data-informed culture, both internally and for their customers. When coming to the crossroad to either build a data science or data engineering team, Gusto seems to have done the right choice: first build a data infrastructure which then would support analysts in generating insights and drawing prediction models.
The first step for Gusto was to replicate and pipe all of their major data sources into a single warehouse. The warehouse choice landed on an AWS Redshift cluster, with S3 as underlying data lake. Moving data from production app databases into Redshift was then facilitated with Amazon’s Database Migration Service. On the other side of the pipeline, Looker is used as a BI front-end that teams throughout the company can use to explore data and build core dashboards. Aleph is a shared web-based tool for writing ad-hoc SQL queries. Finally, monitoring (in the form of event tracking) is done by Snowplow, which can easily integrate with Redshift, and as usual, Airflow is used to orchestrate the work through the pipeline.
Building such pipeline massively simplified data access and manipulation across departments. For instance, analysts can simply build their own datasets as part of an Airflow task, and expose it to Looker to use in dashboards and further analyses.
Cloudflare is a web performance and security company that provides online services to protect and accelerate websites online. Online content distribution, web optimization, web security, and analytics are a few examples of the company’s business range.
While different services may require different data stacks to work on, they are all built on top of Cloudflare’s core infrastructure. In the core of their data stack there are Kafka clusters as a streaming platform, and CitusDB as a data warehouse – a scaled up version of PostgreSQL. Data is ingested through Cloudflare’s edge services using HTTP requests, then passed on to Kafka clusters, before getting stored in CitusDB warehouse. A nice example of a service working on top of this infrastructure is the DNS Analysis – a service which processes around 1 million DNS queries per second! The DNS edge service pre-processes and aggregates data, before sending it encrypted to one of Cloudflare’s data centers. Within the data center, data is de-multiplexed and pushed into several Apache Kafka clusters, which in turn pushes data to consumers grouped by Kafka topic. Consumers can store processed information into corresponding DBs which are later queried by the company’s API services and information delivered to customers.
Cloudflare gives their services to millions of websites around the world, processing and storing hundreds of terabytes of data daily. Interestingly, Cloudflare is not a fan of commercial cloud technologies, but they implement their own data centers across the world, in total 152 as of this moment.
Teads is a video advertising marketplace, often ranked as the number 1 video platform in the world. Working with data-heavy videos must be supported by a powerful data infrastructure, but that’s not the end of the story. Teads’ business needs to log user interactions with their videos through the browser (like play, pause, resume, complete…), which count up to 10 million events per day. Another source of data is video auctions (real-time bidding processes) which generate another 60 million events per day. To build their complex data infrastructure, Teads has turned to both Google and Amazon for help.
Originally the data stack at Teads was based on a lambda architecture, using Storm, Spark and Cassandra. This architecture couldn’t scale well, so the company turned toward Google’s BigQuery in 2016. They already had their Kafka clusters on AWS, which was also running some of their ad delivery components, so the company chose a multi-cloud infrastructure. Transferring data between different cloud providers can get expensive and slow. To address the second part of this issue, Teads placed their AWS and GCP clouds as close as possible and connected them with managed VPNs.
So how does their complex multi-cloud data stack look like? Well, first of all, data coming from users’ browsers and data coming from ad auctions is enqueued in Kafka topics in AWS. Then using an inter-cloud link, data is passed over to GCP’s Dataflow which is then well paired with BigQuery in the next step. Having all data in a single warehouse means half of the work is done. The next step would be to deliver data to consumers, and Analytics is one of them. The Analytics service at Teads is a Scala-based app that queries data from the warehouse and stores it to tailored data marts. Interestingly, the data marts are actually AWS Redshift servers. In the final step, data is presented into intra-company dashboards, and the user’s web apps.
You'll get additional resources like:
Remind’s data engineering team strives to provide the whole company with access to the data they need, as big as 10 million daily events, and empower them to directly make decisions. They initially started with Redshift as its source of truth resource for data, and AWS S3 to optimize for cost. While S3 is used for long-term storage of historical data in JSON format, Redshift only stores the most valuable data, not older than 3 months. The company uses Interana to run custom queries on their JSON files on S3, but they’ve also recently started using AWS Athena, as a fully managed Presto system – to query both S3 and Redshift databases. The move for Athena also triggered a change in the data format – from JSON to Parquet, which they say was the hardest step in building up their data platform. An EMR/Hive system is responsible for doing the needed data transformations between S3 and Athena. In the data ingestion part of the story, Remind gathers data through their APIs from both mobile devices and personal computers, as the company business targets schools, parents and students. This data is then passed to a streaming Kinesis Firehose system, before streaming it out to S3 and Redshift.
Remind’s future plans are probably focused on facilitating data format conversions using AWS Glue. This step would allow them to replace EMR/Hive from their architecture and use Spark SQL instead of Athena for diverse ETL tasks.
Robinhood is a stock brokerage application that democratizes access to the financial markets, which enables its customers to buy and sell U.S. listed stocks and ETFs with zero commission. The company debuted with a waiting list of nearly 1 million people, which means they had to pay attention to scale from the very beginning.
Robinhood’s data stack is hosted on AWS, and the core technology they use is ELK (Elasticsearch, Logstash, and Kibana) – a tool for powering search and analytics. Logstash is responsible for collecting, parsing and transforming logs, before passing them on to Elasticsearch, while data is visualized through Kibana. They grew up from a single ELK cluster with a few GBs of data to three clusters with over 15 TBs. Before data goes to ELK clusters, it is buffered in Kafka, as the rates of which documents enter vary significantly between different data sources. Kafka also shields the system from failures and communicates its state with data producers and consumers. As with many other companies, Robinhood uses Airflow to schedule various jobs across the stack, beating competition such as Pinball, Azkaban and Luigi. Robinhood data science team uses Amazon Redshift to help identify possible instances of fraud and money laundering.
Dollar Shave Club (DSC) is a lifestyle brand and e-commerce company that’s revolutionizing the bathroom by inventing smart, affordable products. Don’t be fooled by their name, they have a pretty cool data architecture, for a company in the shaving business. Their business model works with online sales through a subscription service. Currently, they serve around 3 million subscribed customers.
DSC’s web applications, internal services, and data infrastructure are 100% hosted on AWS. A Redshift cluster serves as the central data warehouse, receiving data from various systems. Data movement is facilitated with Apache Kafka and can move in different directions – from production DBs into the warehouse, in between different apps and in between internal pipeline components. There’s also Snowplow which collects data from the web and mobile clients. Once data reaches Redshift, it is accessed through various analytics platforms for monitoring, visualization, and insights. The main tool for the job is, of course, Apache Spark, which is mainly used to build predictive models, such as recommender systems for future sales.
Coursera is an education company that partners with the top universities and organizations in the world to offer online courses. They started building their data architecture somewhere around 2013, as both numbers of users and available courses increased. As of late 2017, Coursera provides courses to 27 million worldwide users.
Coursera collects data from their users through API calls coming from mobile and web apps, their production DBs, and logs gathered from monitoring. A backend service called “eventing” periodically uploads all received events to S3 and continuously publishes events to Kafka. The engineering team has selected Redshift a central warehouse, offering much lower operational cost when compared with Spark or Hadoop at the time.
On the analytics end, the engineering team created an internal web-based query page where people across the company can write SQL queries to the warehouse and get the needed information. Of course, there are analytics dashboard across the company which are refreshed on a daily basis. Finally, many decisions made in Coursera are based on machine learning algorithms, such as A/B testing, course recommendations, understanding student dropouts and others.
Wish is a mobile commerce platform. It provides online services that include media sharing and communication tools, personalized and other content, as well as e-commerce. During the last few years, it grew up to 500 million users, making their data architecture out of date.
The data architecture at Wish, before scaling up, had two different production databases: a MongoDB one storing user data, and a Hive/Presto cluster for logging data. Data engineers had to manually query both to respond to ad-hoc data requests, and this took weeks at some points. Another small pipeline orchestrated by Python crons, also queried both DBs and generated Email reports.
After rethinking their data architecture, Wish decided to build a single warehouse using Redshift. Data from both production DBs flowed through the data pipeline into Redshift. BigQuery is also used for some types of data. It feeds data into secondary tables needed for analytics. Finally, analytics and dashboards are created with Looker.
You'll get additional resources like:
Blinkist transforms the big ideas from the world’s best nonfiction books into powerful little packs users can read or listen to in 15 minutes. At first, they started selling their services through a pretty basic website, and monitored statistics through Google Analytics. Unfortunately, visitor statistics gathered from Google Analytics didn’t match the ones engineers computed. This is one of the reasons why Blinkist decided to move to the AWS cloud.
They choose a central Redshift warehouse where data flow in from user apps, backend and web frontend (for visitors tracking). To get data to Redshift, data is streamed with Kinesis Firehose, also using Amazon Cloudfront, Lambda and Pinpoint. The engineering team at Blinkist is working on a newer pipeline where ingested data comes to Alchemist, before passing it to a central Kinesis system, and onwards to the warehouse.
We hope this post along with its 14 examples gives you the inspiration to build your own data pipelines in the cloud.
If you don’t have any data pipelines yet, it’s time to start building them. Begin with baby steps and focus on spinning up an Amazon Redshift cluster, ingest your first data set and run your first SQL queries.
After that, you can look at expanding by adding a dashboard for data visualization, and schedule a workflow, to build your first true data pipeline. And once data is flowing, it’s time to understand what’s happening in your data pipelines.
That’s why we built intermix.io. We give you a single dashboard to understand when & why data is slow, stuck, or unavailable.
With intermix.io you can:
Our customers have the confidence to handle all the raw data their companies need to be successful. What you get is a real-time analytics platform that collects metrics from your data infrastructure and transforms them into actionable insights about your data pipelines, apps, and users who touch your data.
Setting up intermix.io takes less than 10 minutes, and because you can leverage our intermix.io experts, you can say goodbye to paying for a team of experts with expensive and time-consuming consulting projects. We can help you plan your architecture, build your data lake and cloud warehouse, and verify that you’re doing the right things.
It’s easy – start now by scheduling a call with one our of experts.
When you’re building or improving your data pipeline, batch data processing seems simple. Pull data from a source, apply some business logic to it, and load it for later use. When done well, automating these jobs is a huge win. It saves time and empowers decision-makers with fresh and accurate data.
But this kind of ETL (Extract, Transform, and Load) process gets very tricky, very quickly. A failure or bug in a single step of an ETL process has cascading effects which can require hours of manual intervention and cleanup. These kinds of issues can be a gigantic time sink for the analysts or engineers that have to clean up the mess. They negatively impact data quality. They erode end-user confidence. And they can make teams terrified of pushing out even the most innocuous changes to a data pipeline.
To avoid falling into this kind of ETL hell, analysts and engineers need to make data pipelines that:
In short, they need to make data pipelines that are idempotent. Idempotent is a mathematical term describing an operation which can be applied arbitrarily many times without changing the result. Idempotent data pipelines are fault-tolerant, reliable, and easy to troubleshoot. And they’re much simpler to build than you might expect.
Here are 3 simple steps you can take to make your data pipeline idempotent.
See 14 real-life examples of data pipelines built with Amazon Redshift
The easiest way to be sure that a buggy or failed ETL run doesn’t dirty up your data is to isolate the data that you deal with in a given run.
If you’re working with time-series data this is fairly straightforward. When pulling data from your initial source, simply filter it by time. Operate on an hour, a day, or a month, depending on the volume of your data. By operating on data in time-windowed batches like this, you can be confident that any bugs or failures aren’t impacting your data quality beyond a single window.
If you’re working with cross-sectional data, try to deal with some explicitly-defined subset of that data in a given run. Knowing exactly what data has been impacted by a bug or failure makes clean up and remediation of data pipeline issues relatively straightforward. Just delete the data that has been impacted from your final destination and then run again. And if you isolate your data correctly, you never have to worry about duplicated or double-counted data points.
In order to avoid time-consuming cleanup of failed or buggy data pipeline runs, it’s important to build your pipeline in such a way that it either loads everything or it loads nothing at all. That is to say, it should be atomic. I
f you’re loading into a SQL database, achieving atomicity for your data pipeline is as simple as wrapping all of your loading statements–inserts, copies, creates, or updates–in a transaction with a BEGIN statement at the beginning and an END or COMMIT statement at the end. If the final destination for your data is not a SQL database–say an Elasticsearch cluster or an Amazon S3 bucket–building atomicity into your pipeline is a little more tricky. But it’s not impossible. You’ll simply need to implement your own “rollback” logic within the load step.
For instance, if you’re writing CSV files to an Amazon S3 bucket from a python script, maintain a list of the resource identifiers for the files you’ve written. Wrap your loading logic in a try-except-finally statement which deletes all of the objects you’ve written to if the script errors out. By making your data pipeline atomic, you completely obviate the need for time-consuming cleanup after failed runs.
Now that you’ve explicitly isolated the data your pipeline deals with in a given run and implemented transactions, having your pipeline clean up after itself is a breeze. You just need to delete before you insert. This is the most efficient way to avoid loading duplicate data.
Say your data pipeline is loading all of your company’s sales data from the previous month into a SQL database. Before you load it, delete any data from last month that’s already in the database. Be sure to do this within the same transaction as your load statement. This may seem scary. But if you’ve implemented your transaction correctly, your pipeline will never delete data without also replacing it.
Many SQL dialects have native support for this type of operation, which is typically referred to as an upsert. For instance in PostgreSQL (which is also the SQL dialect used by Amazon Redshift) you could prevent loading duplicate data with the following:
INSERT INTO sales (employee, customer, contract_number, annual_revenue, date)
VALUES ('Janet', 5439, 50000, '2018-07-20')
ON CONFLICT (contract_number)
DO UPDATE SET employee = excluded.employee;
This SQL statement will attempt to insert a new row into the database. If a row already exists with contract number 5439, the existing row will be updated with the new employee name. In this way, duplicate rows for the same contract will never be loaded into the database.
By having your pipeline clean up after itself and prevent the loading of duplicate data, backfilling data becomes as simple as rerunning. If you discover a bug in your business logic or you want to add a new dimension to your data, you can push out a change and backfill your data without any manual intervention whatsoever.
ETL pipelines can be fragile, difficult to troubleshoot, and labor-intensive to maintain. They’re inevitably going to have bugs. They’re going to fail every once in a while. The key to building a stable, reliable, fault-tolerant data pipeline is not to build it so that it always runs perfectly. The key is to build it in such a way that recovering from bugs and failures is quick and easy. The best data pipelines can recover from bugs and failures without ever requiring manual intervention. By applying the three simple steps outlined above, you’re well on your way to achieving a more fault-tolerant, idempotent data pipeline.
One of the core challenges of using any data warehouse is the process of moving data to a place where the data can be queried. Amazon Redshift COPY command provides two methods to access data:
1- copy data into Redshift local storage by using the COPY command
2- use Amazon Redshift Spectrum to query S3 data directly (no need to copy it in)
This post highlights an optimization that can be made when copying data into Amazon Redshift.
The Amazon Redshift COPY command loads data into a table. The files can be located in an S3 bucket, an Amazon EMR cluster, or a remote host that is accessed using SSH. The maximum size of a single input row from any source is 4 MB. Amazon Redshift Spectrum external tables are read-only. You can’t COPY to an external table.
The COPY command appends the new data to the table. In Amazon Redshift, primary keys are not enforced. This means that deduplication must be handled by your application.
The recommended way of de-duplicating records in Amazon Redshift is to use the UPSERT process.
UPSERT is a method of de-duplicating data when copying into Amazon Redshift. The UPSERT operation merges new records with existing records using primary keys. While some RDBMSs support a single “UPSERT” statement, Amazon Redshift does not support it. Instead, you should use a staging table for merging records.
Here is an example of an “UPSERT” statement for Amazon Redshift.
-- Start transaction BEGIN;
-- Create a staging table
CREATE TABLE my_staging (LIKE cities);
-- Load data into the staging table
COPY my_staging (name, zipcode, state)
-- Update records
SET name = s.name, zipcode = s.zipcode
FROM my_staging s
WHERE name.id = s.id;
-- Insert records
INSERT INTO cities
SELECT s.* FROM my_staging s LEFT JOIN cities
ON s.id = cities.id
WHERE cities.id IS NULL;
-- Drop the staging table
DROP TABLE my_staging;
-- End transaction
The default behavior of Redshift COPY command is to automatically run two commands as part of the COPY transaction:
Amazon Redshift runs these commands to determine the correct encoding for the data being copied. This may be useful when a table is empty. But in the following cases the extra queries are useless and thus should be eliminated:
In the below example, a single COPY command generates 18 ‘analyze compression’ commands and a single ‘copy analyze’ commands.
Extra queries can create performance issues for other queries running on Amazon Redshift. Extra queries may saturate the number of slots in a WLM queue, thus causing all other queries to have queue wait times.
The solution is to adjust the COPY command parameters to add “COMPUPDATE OFF” and “STATUPDATE OFF”. These parameters will disable these features during “UPSERT”s. Here is an example of a “COPY” command with these options set.
-- Load data into the staging table
COPY users_staging (id, name, city)
COMPUPDATE OFF STATUPDATE OFF;