The Journey to Better Search
by Jess Monroe, Nuria Ruiz and Parima Shah
This is part one on a series of blog posts about our work on the search functionality at outschool.com. This first installment describes the limitations of our initial search and the reasons why we choose to build our own search pipeline. It also describes (high-level) the pieces we used to build it: Change Data Capture, Debezium, Kafka and ElasticSearch.
We had known for a while that we needed to improve the way parents and learners find classes in Outschool. About half of the signed-in users that visit outschool daily search for classes and there were weeks where we had millions of class searches. A big number of class searches per session is not necessarily a positive indicator of user interest as it can hint that users are struggling to get the results they are looking for. The shortcomings of our search functionality were well known: our first version was implemented directly on top of Postgres, and thus we were limited to what SQL could do. For example, searching for ‘art’ might return a number of classes with the word ‘art’ in their descriptions, but, contrary to what you would expect, very specific searches like ‘art drawing anime pokemon’ might return either few results or results that did not include drawing–the main focus of the query. Additionally, misspellings meant that the SQL search might return no results at all, since we could not do fuzzy matching and could not go from ‘pokimon’ to ‘pokemon’ despite those two words having an editing distance of ‘1’.
Search is not a new problem and, as you would expect, there are many technologies and companies that offer their services in this space. We evaluated three main options: “Search as a Service” through a provider like Algolia, self-hosted Open Source solutions like ElasticSearch or developing a completely custom solution. Outschool operates a product that, while purchased by parents, is used extensively by kids. According to Algolia’s privacy policy, their service cannot be used by those under the age of 13. This eliminated that option as an off-the-shelf solution. The more reasonable path forward was to build an in-house search pipeline. Since most of our services are hosted on AWS, it made sense for the team to move ahead with ElasticSearch, hosted on AWS.
ElasticSearch is a very well documented, open source product. It supports fuzzy matching and partial matching out of the box. For example, a search for “drawing anime pokimon” [sic] will return results that match:
-
drawing and pokemon
-
drawing (alone)
-
pokemon (alone)
It can also rank results based on how well they match the search terms. It also has support for English language stemming so it can match “drawing” with the word “draw”.
In addition to a more feature-full search, we also wanted to be able to re-index a class if its description or availability changed and, furthermore, have those changes reflected as soon as possible in the search results. By introducing ElasticSearch, we would have two sources of truth for our data which could potentially become out of sync. We needed a data pipeline to move updates happening in Postgres to ElasticSearch so we could serve up-to-date data in the search results. We looked at quite a few options to fulfill this requirement; there are very specific solutions like PGSync that are open source (thanks!) and purpose-built for moving data from Postgres to ElasticSearch. However, we also wanted the flexibility, for example, to be able to sync data from Postgres into Redshift, our data warehouse of choice, or into future data sources like Casandra. These requirements implied that the means by which we move data needed to be data destination agnostic. So, ultimately, we decided to build a more generic solution that would allow us to move data from Postgres to a variety of sources, not just ElasticSearch (which is the only data destination PGSync supports).
<What follows is heavily influenced by soundcloud’s experience and ideas when developing their own search>
Kafka is the standard when it comes to moving data across systems; it is scalable, performant and supports a rich ecosystem of programs that can connect to it to consume or produce data. While programs that sync data from Postgres to Kafka have existed for quite a while, the original solutions in this space were quite taxing on the database. These outdated solutions essentially ran repeated SQL queries against the database and transformed the results to JSON that was posted to a Kafka topic. We decided to use Debezium (an amazing open source project from Redhat) which uses Postgres streaming replication. This would reduce the load on the database when capturing change data compared to other solutions that poll the database on a periodic basis; when streaming replication is enabled, every transaction in Postgres is written to a log called write-ahead-log (WAL). A replica uses these WAL segments to continuously replicate data from the primary.
Debezium, a distributed java program, can consume row-level change data written to the WAL and then publish it to a set of Kafka topics. These row-level changes are sent by Debezium to Kafka in the form of a JSON object. This pattern is called Change Data Capture (CDC). Intuitively, Debezium converts your Kafka cluster into a database replica where tables (as many, or as few, as you want) are now Kafka topics. Once data is in Kafka, it can be moved to ElasticSearch or any other storage like Redshift using what Confluent (our Kafka provider of choice) calls “connectors.” Connectors are Kafka consumers/producers targeted towards a particular storage.
Our data pipeline to ElasticSearch (big picture):
Now, things are never quite that clean and simple. If you search a little bit you will find there are many tutorials on how easy it is to move data from your database to ElasticSearch using a Kafka connector and CDC. This only works if you don’t need to transform the data. However, if you want to offer search results based on, for example, ‘class descriptions’ and ‘class availability’ (classes with at least one free seat) you probably need to do a bit of massaging of the data because data about class descriptions and availability is kept in different tables. As a result of these differences, our data pipeline is a bit more complex.
While we can know if a row in a given table has changed through subscribing to the CDC topics, it is difficult to create a system that can combine these streams and do the required arbitrary transformation from the row-level changes to the required JSON document. Rather than requiring engineers to learn a new and possibly complex stream processing system, we choose to treat row-level changes as a “trigger” to regenerate ElasticSearch documents and to use SQL (which every engineer at OS uses on a regular basis) to represent the transformation from an id to the corresponding ElasticSearch document. When a given row changes, we find corresponding ids that changed and then run a SQL query which transforms these ids into the ElasticSearch document. This allows any engineer at Outschool to add or modify the ElasticSearch documents in an index easily.
Thus, the pipeline looks more like this:
-
Every row level change in Postgres is published to the Kafka cluster using Debezium.
-
We have a KafkaJs consumer subscribed to the incoming stream of row level changes. It uses SQL queries to enrich and transform the data then publishes the resulting JSON object to a topic that contains all the documents for a given index. This consumer reads from Kafka CDC topics and posts the enriched data (now full JSON docs) to another set of topics.
-
A different KafkaJS consumer reads from the documents and posts each to ElasticSearch in the corresponding index.
We have been running this pipeline in AWS for a few months. While we use Confluent cloud we manage our own Kafka connectors. They are deployed as AWS Fargate tasks managed by Terraform.
There are many details regarding this pipeline that we would like to share and we will do so in future blogposts. In the meantime, here are a few things that we feel is important to mention:
-
We choose to host our Kafka cluster with Confluent instead of AWS because AWS Kafka offering is very bare bones and does not include Kafka Connect connectors (which are not open source but open core) nor a Schema Registry.
-
Using Kafka allows us to reindex from an arbitrary point in time for both the triggers as well as the documents themselves.
-
The avid reader might have noticed that this post does not mention a Schema Registry. However, a registry is required to persist data from Kafka to Redshift, and is very helpful for CDC. It is possible to roll out your own schema registry using open source components; the Debezium team has a helpful post about that.
-
We run into a bit of trouble in RDS when it comes to managing a connector that is constantly consuming from a database with not a lot of updates (say, our staging db). We found that changing idle_in_transaction_session_timeout helped. Command:
ALTER USER debezium SET idle_in_transaction_session_timeout TO 1000000;
-
Credit to whimsical for graphs