How Spark and Redis help derive geographical insights about customers
Knowledge of where customers are located and the places they are looking to travel and stay is critical to the Data and Strategy team at HotelTonight. It helps us segment customers better, provide a personalized app experience, plan market growth strategies and a lot more.
Clickstream data is tracked on our apps and mobile web using Segment.io’s SDK, and is received as hourly compressed json dumps on Amazon S3. The files are ingested, flattened, and transformed using Spark, a powerful open source distributed data processing platform. The result is then loaded into our Snowflake data warehouse, which is a popular cloud based columnar database.
One significant challenge we recently faced in Data Engineering was figuring out how to include geographical metadata(in the form of postal codes) on every event that our app fires. If GPS is enabled on the customer’s device (roughly 70% of customers have this turned on), then their lat/long coordinates are captured and fired out, enabling us to derive postal codes. If the lat/long coordinates are unavailable, the postal code can be less accurately determined from the user’s IP addresses.
Original Solution — SQL
The first solution was to compute this in the Snowflake Data Warehouse using good old SQL after the events had been loaded into the staging area. A comprehensive list of postal codes with their centroid latitude-longitude coordinates is available online, provided by GeoNames for download free of charge under a creative commons attribution license. We loaded all 1.2+ million into a table, stripping out zip codes associated with US military postings.
IP address to zip code maps are provided by certain proprietary databases such as Maxmind or ip2location. The IP’s can only be mapped to a certain degree of accuracy, and hence is a fallback plan for when the lat/long is unavailable.
Once events were staged into the data warehouse, a SQL statement was run to update the table with a range join against the postal codes using the haversine function, over a range of up to 1,500 kilometers (to account for sparsely coded regions or codes that represent a large area).
The problem with the above query, especially in columnar databases, is that a cross join needs to be performed between the tables. In other words, for every event, the distance to every postal code needs to be computed, followed by ordering and choosing the minimum. This operation proved to be very expensive, as it ran for over 10 hours despite being given large compute resources.
New Solution — Spredis
Starting with version 3.2, Redis added geospatial abilities with GeoSets, which enabled us to store latitudes and longitudes as geospatial indexes. A Redis GeoSet is simply a Redis SortedSet with latitude and longitude hashed into a Geohash using 52 bit integer encoding. Geohashing is a technique to encode a pair of lat/long coordinates into a single ASCII string. A description on GeoHashing, its implementation and its efficiency in answering geospatial queries can be found in the wiki page.
Python scripts scheduled through Airflow, a workflow orchestrator developed by AirBnB, load Geonames postal code data once a week (postal codes don’t change that often) into Redis, using the GEOADD command with the postal code as the member for each lat/long coordinate.
The data is ingested into a Spark dataframe, which is a distributed collection of data in memory organized like a table structure with named columns.
A dataframe UDF was then built to to obtain the postal code from Redis using the georadius api for every event. UDFs in Spark are executed as lambda function calls which operate once per dataframe record. In order to maintain state across UDF calls (within an executor) such as database connection pools, Singletons in Scala implemented through companion objects need to be used.
We use the Redis java client Jedis rather than the direct Spark connector or the scala-redis library because neither currently support the APIs necessary to query the GeoSets.
A UDF was built to connect to Redis with the connector above and run a Georadius query. The Georadius search has a time complexity of O(N+log(M)) where N is the number of postal codes inside the bounding box of the circular area within a 1500 kms radius and M is the number of items inside the index which is about 1.2 millions entires.
On a small Amazon EMR spark cluster comprising of 3 m4.xlarge instance(1 master and 2 workers) and a single Elasticache(Redis) non replicated instance sized cache.r3.xlarge, uncompressing and ingesting data, processing it by accessing Redis and finally loading into the Data Warehouse took 2 hours for 100000 events. A couple of hours on a small EMR cluster was way better than 10 hours on a large compute warehouse, but the goal was to compute it within an hour. We could analyze from the that most of the time spent was on getting information from Redis.
The time taken to search for the associated postal code could only be brought down by reducing the number of postal codes in the index and the number of postal codes per bounding circle. To get there, we tried a few different strategies.
The one large GeoSet (representing 1.2 million postal codes) is split into 2 sets of contiguous regions — UCM (U.S, Canada and Mexico) and ROW (Rest of the World). More than 90% of our events are generated by customers from within UCM, and UCM accounts for less than 1% of all postal codes in the world. The assumption is that searching the UCM GeoSet first would find matches for most of the events, and only a small portion of the searches related to ROW would be slow.
Searching for postal codes over a large radius can be slow, especially in densely coded regions. From the data it could be inferred that 92% of searches are within the first 5 miles and 98% of searches are within the first 10 miles. This meant that searching over an incremental radius until a zip code is found would yield results faster.
GeoSet search operations are implemented with a start search radius of 5 miles, alternating between UCM and ROW GeoSets, doubling until found with a boundary limit of 1,500 kms.
It was also noticed that only one half of the event user lat/longs coming in the batch of events were unique. A local cache over Redis was implemented to store the location coordinates and the postal codes to avoid a network call to Redis if the same coordinates had been processed earlier.
IP Address → Zip Code
IP address ranges are stored as a Redis SortedSet (internally implemented as skip-lists) without the need for expansion as was done in the original solution, with the score represented by the range start IP address. The zip code retrieval is a zrevrangebyscore query, which has a time complexity of O(log(N) + M) with N being the number of elements in the sorted set and M the number of elements being returned. In our case, N = 3.5 million and M = 1, since just the start of the IP range that the user IP address belongs to is required.
After the optimizations, the overall time consumed by the Spark job to unzip files containing 100,000 events and load them into the warehouse after determining the postal codes, is now at an average of 12 minutes.
Redis is still single threaded and can be a bottleneck as multiple spark executors try to access the same Redis instance especially during large backfills. Replication can help speed up reads and we are currently building a service that will help load balance several small replicas with the same geo indexes across spark executors in the spark cluster.
When cooking a meal, doing yard work or fixing a kitchen faucet, it is essential to have the right tools for the right job. Over the past few years, explosion of data generated in terms of volume and variety is leading to increasingly complex data integration challenges that cannot be solved just by one tool or technology. Spark and Redis solved our problem of efficiently adding geographical context in the form of postal codes to our customer events. That being said, it cannot replace SQL, which is our de facto language to solve many of our data warehousing and analytical problems, rather complements it.