Kapost Engineering

Recent Posts


Recent Comments


Archives


Categories


Meta


Instrumenting ActiveJob with Subscribers

Paul SadauskasPaul Sadauskas

Here at Kapost, we’ve been working on a project to modernize our Analytics Collectors. These are a set of jobs that contact the APIs of various third party sites that have content published or shared, and pull down various statistics. One of the major reasons for this project is to get some better instrumentation around the job duration and error rates. We’re also using this opportunity to update the app from Rails 3.2 to 4.2, so we can take advantage of the ActiveJob tooling rather than the one we made ourselves.

Every ActiveJob produces a standard set of notifications. They’re not documented, but they’re easy enough to find in the source. Here’s the current list:

Instrument #perform timing

First off, we’ll make a subscriber to listen for these events. You can use any stats collector you like, in this example we’re just prettying up the event name with some more detail, and reporting the duration to Statsd.

# app/subscribers/active_job_subscriber.rb
class ActiveJobSubscriber < ActiveSupport::Subscriber
  attach_to :active_job

  def perform(event)
    job = event.payload[:job]
    name = "active_job.perform.#{job.class.to_s}.#{job.queue_name}"
    Statsd.timing name, event.duration
  end

end

Note that because we put this in app/subscribers, Rails won’t automatically load it in development mode until that constant is referred to somewhere else. This is probably fine for reporting stats, but is worth being aware of.

Next up, we’ll want a test to verify that the subscriber is working like we expect. There’s some boilerplate you need to get the ActiveJob working within the spec. Its probably better off included in a spec/job_helper.rb file, but we’re including it here for brevity.

# spec/subscribers/active_job_subscriber_spec.rb
require 'rails_helper'

RSpec.describe ActiveJobSubscriber do
  include ActiveJob::TestHelper
  ActiveJob::Base.queue_adapter = :test

  MyJob = Class.new(ActiveJob::Base) do
    def perform(*args); end
  end

  it "should report the duration to statsd" do
    Statsd = double()
    expect(Statsd).to
      receive(:timing).with("active_job.perform.MyJob.default",
                            kind_of(Numeric))

    perform_enqueued_jobs do
      MyJob.perform_later
    end
  end
end

We make a fake MyJob class that does no work, mock the Statsd singleton with a test double, and then verify it got called with the arguments and values we expect. In a real job, I’d probably wrap the Statsd singleton in a “metrics collector” class then dependency-inject it into the job directly, but thats outside the scope of this post.

Instrument and Report Errors

Next up, we’ll want to collect and instrument any upstream server errors, and send them off to an error aggregator like HoneyBadger.

# app/jobs/my_job.rb
class MyJob < ActiveJob::Base
  RETRYABLE_ERRORS = [TimeoutError, Net::HTTPError, Errno::ECONNREFUSED]
  UNRETRYABLE_ERRORS = [AuthenticationError]

  rescue_from(*RETRYABLE_ERRORS) do |e|
    Statsd.increment "active_job.perform.#{self.class.to_s}.#{e.class.to_s}.retries"
    retry_job(wait: 1.minute)
  end

  rescue_from(*UNRETRYABLE_ERRORS) do |e|
    Statsd.increment "active_job.perform.#{self.class.to_s}.#{e.class.to_s}.failures"
    Honeybadger.notify(e)
  end
end

I put this in a superclass that all my real jobs inherit from instead of ActiveJob::Base. This will also retry all retry-able errors forever. There’s better support for retries in Rails > 4.2.1, or it can be monkey-patched in with the activejob-retry gem.

Here’s a single spec to get you going, you can also verify the un-retry-able errors and delivery to honeybadger.

# spec/jobs/job_error_handling_spec.rb
require 'rails_helper'

RSpec.describe "AnalyticsJob error handling" do
  include ActiveJob::TestHelper
  ActiveJob::Base.queue_adapter = :test

  JobWithRetryableError = Class.new(AnalyticsJob) do
    def perform; raise TimeoutError; end
  end

  before do
    Statsd = double
  end

  context "on retryable errors" do
    it "should instrument the retry count" do
      name = "active_job.perform.JobWithRetryableError.Timeout::Error.retries"
      expect(Statsd).to receive(:increment).with(name)

      perform_enqueued_jobs do
        JobWithRetryableError.perform_later
      end
    end
  end
end

Hopefully this is enough to get you started on instrumenting and reporting on your ActiveJobs. If you’ve got any other cool tips & tricks, please share them with us!

Comments 0
There are currently no comments.