Kapost Engineering

Recent Posts


Recent Comments


Archives


Categories


Meta


Analytics w/ ElasticSearch

Burke WebsterBurke Webster

We are currently working on a full back-end rewrite of the Kapost analytics stack.  In this post we will look at the history of analytics at Kapost including the issues with our current implementation, and then dive into how we are rewriting the system from the ground up.   The Kapost analytics stack is responsible for various analytics displayed within the application, from details about a specific post or campaign, to aggregate statistics helping to inform the customers about their production process as well as engagement and reach once content is published.  In order to do this, we track various statistics about the production process and we integrate with numerous third-party APIs to get information on how published content is performing.  

Where We’ve Been

Our current analytics stack has been molded and shaped over the years to provide the data we need.  The application was written in Rails (as is most of the Kapost stack) and relied heavily on Delayed::Job for background processing.  After the data was collected it was stored in a MongoDB instance.  The data was stored in aggregated form to power many of our views in Kapost.  This initial design was heavily inspired by a talk given by John Nunemaker, “MongoDB for Analytics”.  A typical document for a post might look like this:  


{   
  "stats": {   
    "2015": {   
      "12": {   
        "1": {   
          "facebook_likes": 112   
        },   
        "2": {   
          "facebook_likes": 156   
        }   
      },   
      "Q4": {   
        "facebook_likes": 156   
      }   
    }   
  },   
  "created_at": {   
    "facebook_likes": "2014-11-15"   
  },   
  "updated_at": {   
    "facebook_likes": "2015-12-02"   
  }  
}

As you can see, we store each statistic under nested structure that represents the current day, week, month, quarter and year.  We also store a cumulative all-time total, and keep track of the first time we record a given statistic and the last time it was updated.   If you’re familiar with Kapost you’ll know that posts are organized under a larger organizational construct called an instance.  So, we also stored cumulative instance statistics in another Mongo collection that represents the cumulative data for all posts in a instance.   This all sounds good and has worked well over the years, except for a few key issues that eventually drove us to rewrite the system.  

The Big Issues

Data Storage

The largest issue we ran into was the way we stored the data.  As shown above, we keep a running sum for each statistic.  So, let’s say we collect Facebook likes for post ABC123 on Dec 2nd, 2015.  Because we are keeping a running total we need to lookup the previous value for this post statistic, add the new amount of Facebook likes we just collected from the Facebook API, and then store the new cumulative value in all the various date-specific fields

We would need to do the same thing in the collection that is tracking newsroom aggregate statistics.  The complexity of this storage design is easy to mess up and hard to fix if we need to go back and recollect data at some given point in time.  

Handling Failures

When working with various third-party APIs it’s almost certain you will encounter various types of errors:

The existing system dealt with these types of errors in various ways, from letting the error bubble up and stop the job, to swallowing the error and proceeding on like nothing happened.  This caused the system to continue on, potentially not collecting data for a given day, without anybody knowing.   The existing system was also not consistent in how it handled various errors.  An authentication error due to invalid credentials is a case that we can’t recover from until the customer updates their authentication tokens in the system.  An authentication error due to an expired OAUTH token is a case we can recover from by refreshing the token and retrying the operation.  A transient SSL connection error is something that can be retried and will typically go away on the next attempt.  

Scaling Jobs

As Kapost continued to grow, we took on larger and larger customers.  Some of these customers generated so much content that our Delayed::Job system had a hard time keeping up.  We would have jobs that would be continuously killed due to reaching the max job runtime.   While this is not directly tied to Delayed::Job it was an issue that we wanted to solve in the new system.  

Non-Recoverable Data

Many of the APIs we leverage provide a point-in-time count of a statistic.  For these types of services, you hit the API and it gives you a value.  There is no way to specify a time range in the query to get data for a specific day.  This means that if our jobs failed to run on a particular day we would have a flat spot in our data with no way to recover.   This also becomes an issue if you find an error in your code and need to go back and recalculate a bunch of stats.  You can’t query the API for historical data to rebuild the stats.  

Where We’re Going

We have been working on rewriting the analytics system for a good part of 2015.  At the time of this post we are almost ready to roll into beta with a system that is well designed, addresses a lot of the issues mentioned above, and helps future proof us against changing requirements.  

Technologies

As with most things at Kapost we’ve chosen to continue using Rails.  We’ve switched our job processing provided to Amazon SQS paired with shoryuken.  We’ve ditched Mongo as our storage engine and have chosen to go with ElasticSearch.   As an interesting aside, we had initially chosen to use Apache CouchDB for data storage.  A few months into the project we started working on a particular use case we needed in the product and ran into a brick wall.  It turns out that we could not specify a total ordering over the document keys that would allow us to get the data needed.  We would have to write server-side Ruby code to process that CouchDB results and transform it into the view we needed.  This was a no-go as performance dropped off the cliff.   At that point we started looking at alternatives and settled on either Postgres or ElasticSearch.  After some research spikes and performance testing we settled on ElasticSearch, although we could have used either technology as they both provided what we needed.  

System Design

We were fortunate that one of the engineers that began working on this project came up with a very well thought out design.  This design addressed all of the above issue, provided a clean code abstraction that was easy to understand, and produced a system that was very scalable and resilient.   The basic design is as follows:


class AnalyticsJob
  def perform
    collector.run
    emitter.flush!
  end
end

class Collector
  def run
    adapter.get_stats.each do |response|
      stat = transform_response(response)
      emitter.emit(stat, response)
    end
  end
end

class Adapter
  def get_stats
    client.get(:page_views)
  end

  def client
    @client ||= Client.new
  end
end

As you can see there are five main objects:

I’ve simplified each of these objects substantially, but this is the basic design and it’s worked well thus far.  We currently have 12 different AnalyticsJobs, each one interacting with a different API and fetching various stats.  

Data Storage

Using ElasticSearch we are able to store the raw stats data and not do any sort of precomputing.  So for a given post we might store the same facebook likes stat this way


{
  "job_id": "a8782b45-a667-44da-acf7-638c886715ec",
  "timestamp": "2015-12-02T00:00:00.000+00:00",
  "instance_id": "DEFG1234",
  "post_id": "ABC123",
  "stats": {
    "facebook_likes": 42
  }
}

We can then use the power of the ElasticSearch aggregation framework to aggregate this data at runtime, applying various aggregations to get each specific view of the data.  For instance, let’s say we wanted to know the sum of all facebook likes for post ABC123 over the last month.  We could run an ElasticSearch query that looks like this


{
  "query": {
    "filtered": {
      "filter": {
        "and": [
          {
            "term": {
              "newsroom_id": "DEFG1234"
            }
          },
          {
            "term": {
              "post_id": "ABC123"
            }
          },
          {
            "range": {
              "timestamp": {
                "gte": "2015-11-03T00:00:00.000Z",
                "lte": "2015-12-02T00:00:00.000Z"
              }
            }
          ]
        }
      }
   },
  "aggs": {
    "facebook_likes": {
      "stats": {
        "field": "stats.facebook_likes"
      }
    }
  }
}

ElasticSearch makes it easy to get data for any time range, with various filter conditions and different aggregations.  Summing data is our primary use case, but we also do filtered aggregations for various fields, use date histogram aggregations to bucket data by day, week, or month, and even combine stats to form new stats.  

For instance let’s say we had a view that showed the sum of all facebook activities – facebook likes, facebook shares, facebook comments.  We can easily create an aggregation named “facebook_engagement” and have it sum all the individual stats together.   Using ElasticSearch in this way helped avoid the complexity of precomputing data aggregations at storage time.  It future-proofed the system against changing requirements by making just about any view of the data possible at query time.  

Handling Failures

Given the architecture above it was easy to put in various rescue and rescue_from blocks into the code to handle errors.  We have three classes of errors

  Using shoryuken and SQS makes retrying jobs trivial.  When creating your SQS queue you specify a dead-letter queue and a maxReceiveCount.  The maxReceiveCount is the number of times a job can be retried.  When the number of retries exceeds this number the job is moved into the dead-letter queue.  This is essentially a holding queue for jobs that failed.  Inspecting the messages in this queue can be very helpful when debugging why a job failed and provides all the information necessary to re-enqueue the job when the issue is fixed.   We’ve added code into our error handlers as well as job hooks to send any error that should be looked at to Honeybadger.  This provides great visibility into failing jobs.  

Non-Recoverable Data

We still need to ensure our jobs are run at least once a day to get the daily value for these types of APIs, but we have started storing every API response we get as we are collecting data.  This provides a history of all our API calls in the event we find an issue in the code and need to go back and recompute a bunch of data in the past.   Right now we are using ElasticSearch to store these API responses simply due to convenience.  I foresee that we might transition this data into offline storage as time goes by given we rarely need access to this data.  

Scaling Jobs

Using SQS provides an almost infinitely scalable queueing system.  Your biggest limitation will be the monetary cost of the worker threads that are processing messages from the SQS queue.  Since we are deployed on heroku, we have a separate worker dyno type that processes messages.  This allows us to easily scale up or down the number of active works (see Hirefire if you want to do dynamic scaling).   The majority of scaling issues arise from how the code is written

 

Wrapping It All Up

We are in the final stages of wrapping up the rewrite of the Kapost analytics stack.  So far this design has worked out well.  We’d love to hear from you if you’re working on something similar or have found other solutions to some of the problems presented here.  And if you’re looking to join a fast and exciting company, we’re hiring!   

Comments 0
There are currently no comments.