Jun 4, 2020
Introduction to JOINs in Apache Druid
In Apache Druid 0.18/Imply 3.3, we added support for SQL Joins in Druid. This capability, which has long been demanded from Druid by the community, opens the door to a large number of possibilities in the future. In this blog I want to highlight some of the motivations behind us undertaking the effort and give you, the reader, an understanding of how it can be useful and where we’re going with it.
One of our largest customers, a top 10 global retailer, has been one of the biggest advocates for SQL Joins. They had three pain points that Joins would resolve: frequent dimension updates required full re-ingestion, 100s of dimension columns that could be summarized by a single key took 50% of the data volume, and finally they wanted to simplify the data model for their users. Let’s examine each of these.
Frequent Dimension Updates
Analytical data often comprises facts (metrics or other observations such as sale price for a bottle of soda at a certain store) and metadata about the objects involved in those facts (such as the department that the bottle of soda belongs to and the store’s geographic region). This kind of data lends itself very well to a form of schema called star schema.
In star schema, one or more fact tables reference dimension tables through a common key, and queries involving both fact and dimensions (such as to get all sales of soda bottles in a specific geographic region) would use a SQL join. Data warehouses are excellent at planning and executing complex join queries on large data efficiently.
In Druid, executing queries like this required denormalizing (i.e. flattening) the data, effectively joining the fact table with the dimension tables at ingestion time. This allowed Druid to execute extremely fast queries on this data, without having to materialize the join at query time.
Denormalizing the data in this way effectively explodes what was a single key (such as the item ID) into potentially a large number of dimension columns (item department, item department parent, item category, item packaging, country of origin, supplier, etc). At a large retailer with millions of items to sell, these dimensions change at least a few times a week.
So how do you deal with these dimension updates so you can execute queries with updated data? You have to redo the flattening of the data with the updated dimension tables and reingest the entire flattened data set. When you’re at tens of terabytes of data, this is a very slow and expensive process to execute a few times a week.
With Druid 0.18/Imply 3.3, this reingestion is no longer necessary. Druid now can handle star schema formats by allowing users to load dimension tables that can be joined with fact tables. This retailer can now keep the data normalized in Druid using a star schema, and only reingest the significantly smaller dimension tables when they are updated. This reduces the cost and time of a dimension update by 1000x, allowing them to always use the latest dimension data for their queries.
Large Storage Costs
By exploding the number of columns in each row, another issue that denormalized data causes is the significant storage cost. Instead of storing a single key in a row, now the database has to store potentially thousands of columns. While Druid is very efficient in storing dimension data, using bitmaps and compression to reduce the size, it continues to be expensive for large numbers of dimensions.
With joins available and data stored normalized into separate dimension tables with minimal redundancy, the space required for data storage was reduced by half. Instead of storing thousands of columns for each row, only 2 keys are stored for our customer.
Simplifying Data Modeling
Our customer wanted to use a popular BI tool with the data in Druid, but didn’t want to have to model the data differently when it is stored in Druid vs their data warehouse. That would require significant complexity for the user. By allowing them to keep the same schema in their data warehouse as in Druid, the customer was able to significantly reduce the cost of building tooling on top of the BI tools and to reduce the cognitive load on their users trying to make use of the data.
Under the Hood
In Druid 0.18, joins are implemented with a broadcast hash-join algorithm. With this approach, multiple joins can be stacked on top of each other, and each join stack starts with a “base” table. The algorithm works by first acquiring or building hash tables for each non-base table to join on. Then, it scans the base table straight-through, using those hash tables to evaluate the join condition.
Consider a join clause like this:
flights_fact JOIN lookup.airport_join
ON flights_fact.destairportid = lookup.airport_join.k
When Druid executes this join, the base table (flights_fact) is scanned straight through, and a hash table for lookup.airport_join is used to check the join condition for each row. This requires the airport_join table — in this case, a “lookup” table — to be broadcast to every server. This hash table building and broadcast work is done ahead of time, so no extra data transfer or compute needs to happen during the query. In other cases, such as when you are joining onto the results of a subquery, broadcasting and hash table building happens during query runtime.
This algorithm means the “base” table can be distributed and has no limits on its size. But, all other tables other than the base table must fit in memory. It also means that the join condition must be an equality, because Druid needs to be able to evaluate the join condition by using the left-hand side of an equality as a key into a hash table built from the right-hand side.
We expect that when you use this feature with a star schema, your base table will be a large fact table, and each of the other tables will be modestly-sized dimension tables that can fit on every server.
For best performance, these dimension tables should be lookup tables rather than regular Druid tables, because lookup tables are pre-broadcast and have pre-built joinable hash tables. One notable limitation here is that lookup tables currently only support a simple one-key-column, one-value-column schema. We’re currently working on support for wider tables with multiple key columns, and have an experimental version of this in Imply 3.3.
Today, Druid copies the entire dimension table into each historical, which means that the size of the dimension table must be limited to fit into the memory of a single historical. This makes data management simple because it doesn’t need to make sure that the dimension tables are partitioned to match the fact tables and it doesn’t need to shuffle dimensions around when executing joins. On the other hand, it can significantly reduce the number, depth and width of dimension tables loadable into Druid at a reasonable cost. Future work involves making dimension tables easily partitionable across historicals to match fact table partitioning using secondary keys.
Imply 3.3 has experimental support for wide dimension tables that map a single key to multiple values yet and support for multi-column keys. Both these are not yet in open-source Druid.
Druid can only do left and inner joins, and joining against other fact tables is currently not supported. We are planning on expanding these capabilities over time, but that is a significant amount of work.
Finally, joins are not free. The overhead we’ve measured so far against a denormalized data set is on the order of a 3x increase in query latency. Over time, this will be reduced significantly and we plan to build out capabilities for caching joins to reduce the query latency even further.
Where is Druid going?
The addition of joins allows users to move more of their use cases into Druid to benefit from the blazing fast queries it can execute. For our customers, it enables them to roll out self-service analytics using Pivot to more of their users over time and see productivity soar in the business.
As we continue to improve Druid’s and the Imply platform’s capabilities, we see ourselves continuing to speed up more workflows on top of existing data warehouses and creating new ones by empowering business users to explore and answer questions on their own without relying on an analyst.