Executing background jobs is quite a common feature in many of the web applications. Switching between different background processing frameworks used to be quite painful as most of them had different API for enqueuing jobs, enqueuing mailers and scheduling jobs. One of the great addition in Rails 4.2 was a solution to this problem: ActiveJob, which provides extra layer on top of background jobs framework and unifies the API regardless of the queue adapter you use. But how exactly does it work? What are the requirements for adding new queue adapters? What kind of API does ActiveJob provide? Let’s dive deep into the codebase and answer these and some other questions.

Anatomy of the job

Let’s start with some simple job class, let it be MyAwesomeJob:

# app/jobs/my_awesome_job.rb
class MyAwesomeJob < ActiveJob::Base
  def perform(user)
    User::DoSomethingAwesome.call(user)
  end
end

To enqueue a job we could simply write: MyAwesomeJob.perform_later(some_user) or if we wanted to schedule a job in some time in the future we could write: MyAwesomeJob.set(wait: 12.hours).perform_later(some_user) or MyAwesomeJob.perform_now(some_user) for executing the job immediately without enqueuing. But we never defined these methods, so what kind of extra work ActiveJob performs to make it happen?

Exploring internals of ActiveJob

To answer this question, let’s take a look at the ActiveJob::Base class:

# active_job/base.rb
module ActiveJob
  class Base
    include Core
    include QueueAdapter
    include QueueName
    include QueuePriority
    include Enqueuing
    include Execution
    include Callbacks
    include Logging
    include Translation

    ActiveSupport.run_load_hooks(:active_job, self)
  end
end

There are some interesting modules included in this class, which we will get to know in more details later, but let’s focus on the core API for now. Most likely this kind of logic would be defined in, well, Core module. Indeed, the set method is there:

# active_job/core.rb
module ActiveJob
  module Core
    module ClassMethods
      def set(options={})
        ConfiguredJob.new(self, options)
      end
    end
  end
end

It returns an instance of ConfiguredJob passing the job instance itself and arguments to the constructor. Let’s check what ConfiguredJob class is responsible for:

# active_job/configured_job.rb
module ActiveJob
  class ConfiguredJob #:nodoc:
    def initialize(job_class, options={})
      @options = options
      @job_class = job_class
    end

    def perform_now(*args)
      @job_class.new(*args).perform_now
    end

    def perform_later(*args)
      @job_class.new(*args).enqueue @options
    end
  end
end

We have 2 methods available here: perform_now and perform_later. Both of them create a new job instance with arguments passed to the method and they either call perform_now method on the job instance or call enqueue passing the options which are the arguments from the set method.

Let’s go deeper and start with perform_now method: it’s defined inside Execution module, which basically comes down to deserializing arguments if needed (there is nothing to deserialize when calling perform_now directly), and calling our perform method, which we defined in the job class. This logic is wrapped in run_callbacks block, which lets you define callbacks before, around and after the execution of perform method.

# active_job/execution.rb
module ActiveJob
  module Execution
    module ClassMethods
      def perform_now(*args)
        job_or_instantiate(*args).perform_now
      end
    end

    # rest of the code which was removed for brevity

    def perform_now
      deserialize_arguments_if_needed
      run_callbacks :perform do
        perform(*arguments)
      end
    rescue => exception
      rescue_with_handler(exception) || raise
    end
  end
end

These callbacks are defined inside Callbacks module, but its only responsibility is defining callbacks for perform and enqueue method, which help extend the behaviour of the jobs in a pretty unobtrusive manner. For example, if we wanted to log when the job is finished, we could add the following after_perform callback:

# app/jobs/my_awesome_job.rb
class MyAwesomeJob < ActiveJob::Base
  after_perform do |job|
    Rails.logger.info "#{Time.current}: finished execution of the job: #{job.inspect}"
  end

  def perform(user)
    User::DoSomethingAwesome.call(user)
  end
end

Let’s get back to perform_later method from ConfiguredJob. We could expect enqueue method to be defined in Enqueuing module, which seems to be the case here as well:

# active_job/enqueuing.rb
module ActiveJob
  module Enqueuing
    def enqueue(options={})
      self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
      self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
      self.queue_name   = self.class.queue_name_from_part(options[:queue]) if options[:queue]
      self.priority     = options[:priority].to_i if options[:priority]
      run_callbacks :enqueue do
        if self.scheduled_at
          self.class.queue_adapter.enqueue_at self, self.scheduled_at
        else
          self.class.queue_adapter.enqueue self
        end
      end
      self
    end
  end
end

We can pass several options here - scheduled_at attribute could be configured with wait (which will schedule a job in specified amount of seconds from current time) and wait_until (which will schedule a job at exact specified time). We can also enforce queue used for the job execution and set the priority. At the end, the method call is delegated to queue_adapter. This logic is wrapped in run_callbacks block, which lets you define callbacks before, around and after the execution of this code.

In Enqueueing module we can also find perform_later method, which is the part of most basic API of ActiveJob and it basically comes down to calling enqueue method without any extra options arguments.

# active_job/enqueuing.rb
module ActiveJob
  module Enqueuing
    extend ActiveSupport::Concern

    module ClassMethods
      def perform_later(*args)
        job_or_instantiate(*args).enqueue
      end

      protected
        def job_or_instantiate(*args)
          args.first.is_a?(self) ? args.first : new(*args)
        end
    end
  end
end

Queue Adapters

What is this queue_adapter to which we delegate the enqueueing? Let’s take a look at QueueAdapter module. Its responsibility is exposing reader and writer for queue_adapter accessor, which by default is async adapter. Assigning adapter is quite flexible and we can pass here a string or a symbol (which will be used for the lookup of the proper adapter), instance of adapter itself or the class of the adapter (which is deprecated).

# active_job/queue_adapter.rb
module ActiveJob
  module QueueAdapter #:nodoc:
    extend ActiveSupport::Concern

    included do
      class_attribute :_queue_adapter, instance_accessor: false, instance_predicate: false
      self.queue_adapter = :async
    end

    module ClassMethods
      def queue_adapter
        _queue_adapter
      end

      def queue_adapter=(name_or_adapter_or_class)
        self._queue_adapter = interpret_adapter(name_or_adapter_or_class)
      end

      private

      def interpret_adapter(name_or_adapter_or_class)
        case name_or_adapter_or_class
        when Symbol, String
          ActiveJob::QueueAdapters.lookup(name_or_adapter_or_class).new
        else
          if queue_adapter?(name_or_adapter_or_class)
            name_or_adapter_or_class
          elsif queue_adapter_class?(name_or_adapter_or_class)
            ActiveSupport::Deprecation.warn "Passing an adapter class is deprecated " \
              "and will be removed in Rails 5.1. Please pass an adapter name " \
              "(.queue_adapter = :#{name_or_adapter_or_class.name.demodulize.remove('Adapter').underscore}) " \
              "or an instance (.queue_adapter = #{name_or_adapter_or_class.name}.new) instead."
              name_or_adapter_or_class.new
          else
            raise ArgumentError
          end
        end
      end

      QUEUE_ADAPTER_METHODS = [:enqueue, :enqueue_at].freeze

      def queue_adapter?(object)
        QUEUE_ADAPTER_METHODS.all? { |meth| object.respond_to?(meth) }
      end

      def queue_adapter_class?(object)
        object.is_a?(Class) && QUEUE_ADAPTER_METHODS.all? { |meth| object.public_method_defined?(meth) }
      end
    end
  end
end

All supported queue adapters are defined in queue_adapters directory. There are quite a lot of adapters here, so let’s pick some of them.

Async Adapter

Let’s start with AsyncAdapter which is the default one. What is really interesting about this queue adapter is that it doesn’t use any extra services but runs jobs with an in-process thread pool. Under the hood it uses Concurrent Ruby, which is a collection of modern tools for writing concurrent code, I highly recommend to check it further. We can pass executor_options to constructor, which are then used to create a new instance of Scheduler.

# active_job/queue_adapters/async_adapter.rb
module ActiveJob
  module QueueAdapters
    class AsyncAdapter
      def initialize(**executor_options)
        @scheduler = Scheduler.new(**executor_options)
      end

      def enqueue(job) #:nodoc:
        @scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
      end

      def enqueue_at(job, timestamp) #:nodoc:
        @scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name
      end

      def shutdown(wait: true) #:nodoc:
        @scheduler.shutdown wait: wait
      end

      def immediate=(immediate) #:nodoc:
        @scheduler.immediate = immediate
      end

      class JobWrapper #:nodoc:
        def initialize(job)
          job.provider_job_id = SecureRandom.uuid
          @job_data = job.serialize
        end

        def perform
          Base.execute @job_data
        end
      end

      class Scheduler #:nodoc:
        # code removed for brevity
      end
    end
  end
end

Remember how we could assign queue adapter for ActiveJob in multiple ways? That’s exactly the use case for assigning specific instance of the queue adapter, besides just passing a string / symbol (or class, but that way is deprecated). The Scheduler instance acts in fact like a queue backend and but specifics of how it works are beyond the scope of this article. Nevertheless, the thing to keep in mind is that it exposes two important methods: enqueue and enqueue_at:

# active_job/queue_adapters/async_adapter.rb
module ActiveJob
  module QueueAdapters
    class AsyncAdapter
      class Scheduler #:nodoc:
        DEFAULT_EXECUTOR_OPTIONS = {
          min_threads:     0,
          max_threads:     Concurrent.processor_count,
          auto_terminate:  true,
          idletime:        60, # 1 minute
          max_queue:       0, # unlimited
          fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
        }.freeze

        attr_accessor :immediate

        def initialize(**options)
          self.immediate = false
          @immediate_executor = Concurrent::ImmediateExecutor.new
          @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options))
        end

        def enqueue(job, queue_name:)
          executor.post(job, &:perform)
        end

        def enqueue_at(job, timestamp, queue_name:)
          delay = timestamp - Time.current.to_f
          if delay > 0
            Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform)
          else
            enqueue(job, queue_name: queue_name)
          end
        end

        def shutdown(wait: true)
          @async_executor.shutdown
          @async_executor.wait_for_termination if wait
        end

        def executor
          immediate ? @immediate_executor : @async_executor
        end
      end
    end
  end
end

The main difference between these two methods is a timestamp (or lack of it) used for executing the job later.

Let’s get back to top-level AsyncAdapter class. The primary interface that is required for all queue adapters to implement is two methods: enqueue and enqueue_at. For Async adapter, these methods simply pass instance of JobWrapper with queue_name and timestamp (only for enqueue_at):

# active_job/queue_adapters/async_adapter.rb
module ActiveJob
  module QueueAdapters
    class AsyncAdapter
      def initialize(**executor_options)
        @scheduler = Scheduler.new(**executor_options)
      end

      def enqueue(job) #:nodoc:
        @scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
      end

      def enqueue_at(job, timestamp) #:nodoc:
        @scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name
      end
    end
  end
end

And what is this JobWrapper? It’s a simple abstraction for passing something that can serialize jobs and knows how to execute them:

# active_job/queue_adapters/async_adapter.rb
module ActiveJob
  module QueueAdapters
    class AsyncAdapter
      class JobWrapper #:nodoc:
        def initialize(job)
          job.provider_job_id = SecureRandom.uuid
          @job_data = job.serialize
        end

        def perform
          Base.execute @job_data
        end
      end
    end
  end
end

Serialization and deserialization

Let’s take a closer look how it works: execute method is defined in Execution module and it basically comes down to deserializing job data (which was serialized in JobWrapper so that it can be enqueued) and calling perform_now. This logic is wrapped with run_callbacks block so we can extend this logic by performing some action before, around or after execution logic:

# active_job/execution.rb
module ActiveJob
  module Execution
    module ClassMethods
      def execute(job_data) #:nodoc:
        ActiveJob::Callbacks.run_callbacks(:execute) do
          job = deserialize(job_data)
          job.perform_now
        end
      end
    end
  end
end

deserialize class method is defined inside Core module and what it does is creating a new instance of the job, deserializing data and returning the job:

# active_job/core.rb
module ActiveJob
  module Execution
    module ClassMethods
      # Creates a new job instance from a hash created with +serialize+
      def deserialize(job_data)
        job = job_data['job_class'].constantize.new
        job.deserialize(job_data)
        job
      end
    end
  end
end

Before explaining what happens during the deserialization we should know how the serialized data look like - it’s a hash containing name of the job class, job id, queue name, priority, locale and serialized arguments:

# active_job/core.rb
module ActiveJob
  module Core
    def serialize
      {
        'job_class'  => self.class.name,
        'job_id'     => job_id,
        'queue_name' => queue_name,
        'priority'   => priority,
        'arguments'  => serialize_arguments(arguments),
        'locale'     => I18n.locale.to_s
      }
    end
  end
end

serialize_arguments method delegates the serialization process to ActiveJob::Arguments.serialize method, which is mainly responsible for mapping ActiveRecord models from arguments to global ids:

# active_job/core.rb
module ActiveJob
  module Core
    private

    def serialize_arguments(serialized_args)
      Arguments.serialize(serialized_args)
    end
  end
end

Here’s an example how serialized arguments may look like:

> ActiveJob::Arguments.serialize([User.find(1), [123, User.find(2)], { "current_user" => User.find(3)}])
=> ["gid://app-name/User/1", [123, "gid://app-name/User/2"], {"value"=>"gid://app-name/User/3"}]

This format can easily be used for enqueuing jobs in different queues.

Just before the execution of the job, the data needs to be deserialized. Like serialize method, deserialize is defined in Core module and it assigns job id, queue name, priority, locale and serialized arguments to the job using its accessors. But the arguments are not deserialized just yet, so how does the execution with perform_now work?

Remember how I mentioned before that there is nothing to be deserialized when using perform_now directly? In this case it will be a bit different as we operate on serialized arguments. Deserialization happens just before executing perform method in deserialize_arguments_if_needed.

# activejob/lib/active_job/core.rb
module ActiveJob
  module Core
    private

    def deserialize_arguments_if_needed
      if defined?(@serialized_arguments) && @serialized_arguments.present?
        @arguments = deserialize_arguments(@serialized_arguments)
        @serialized_arguments = nil
      end
    end
  end
end

Again, the deserialization is delegated to Arguments module and its primary responsibility is turning global ids into real models, so gid://app-name/User/3 would be in fact a User record with id equal to 3.

Exploring more queue adapters

Inline Adapter

Let’s explore some more adapters. Most likely you were using InlineAdapter in integration tests for testing the side effects of executing some job. Its logic is very limited: since it’s for the inline execution, it doesn’t support enqueueing jobs for the future execution and enqueue method for performing logic merely calls execute method with serialized arguments:

# activejob/queue_adapters/inline_adapter.rb
class InlineAdapter
  def enqueue(job) #:nodoc:
    Base.execute(job.serialize)
  end

  def enqueue_at(*) #:nodoc:
    raise NotImplementedError, "Use a queueing backend to enqueue jobs in the future. Read more at http://guides.rubyonrails.org/active_job_basics.html"
  end
end

Sidekiq Adapter

Let’s check a queue adapter for one of the most commonly used frameworks for background processing - Sidekiq. Sidekiq requires defining a class implementing perform instance method executing the logic of the job and inclusion of Sidekiq::Worker module to be enqueued in its queue. Just like AsyncAdapter, SidekiqAdapter uses internal JobWrapper class, which includes Sidekiq::Worker and implements perform method taking job_data as an argument and its logic is limited to delegating execution of the logic to ActiveJob::Base.execute method:

# activejob/queue_adapters/sidekiq_adapter.rb
class SidekiqAdapter
  def enqueue(job) #:nodoc:
    #Sidekiq::Client does not support symbols as keys
    job.provider_job_id = Sidekiq::Client.push \
      'class'   => JobWrapper,
      'wrapped' => job.class.to_s,
      'queue'   => job.queue_name,
      'args'    => [ job.serialize ]
  end

  def enqueue_at(job, timestamp) #:nodoc:
    job.provider_job_id = Sidekiq::Client.push \
      'class'   => JobWrapper,
      'wrapped' => job.class.to_s,
      'queue'   => job.queue_name,
      'args'    => [ job.serialize ],
      'at'      => timestamp
  end

  class JobWrapper #:nodoc:
    include Sidekiq::Worker

    def perform(job_data)
      Base.execute job_data
    end
  end
end

Again, like every other adapter, SidekiqAdapter implements enqueue and enqueue_at methods and both of them push jobs to Sidekiq’s queue by passing some meta info that is later used for identifying proper job class, executing in specific queue and of course the serialized arguments. As an extra argument, enqueue_at passes timestamp for executing the job at specific time. Pushing a job to Sidekiq queue returns internal job id which is then assigned to provider_job_id attribute.

DelayedJob Adapter

Let’s take a look at adapter for arguably most common choice backed by application’s database - DelayedJob. The pattern is exactly the same as for Sidekiq Adapter: We have enqueue and enqueue_at methods and both of them push the job to the queue with extra info about queue name, priority and, for enqueue_at method, the time to run the job at. Just like SidekiqAdapter, it wraps serialized job with internal JobWrapper instance which delegates execution of the logic to ActiveJob::Base.execute. At the end, the internal job id from DelayedJob’s queue is assigned to provider_job_id attribute:

# activejob/queue_adatpers/delayed_job_adapter.rb
class DelayedJobAdapter
  def enqueue(job) #:nodoc:
    delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority)
    job.provider_job_id = delayed_job.id
    delayed_job
  end

  def enqueue_at(job, timestamp) #:nodoc:
    delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp))
    job.provider_job_id = delayed_job.id
    delayed_job
  end

  class JobWrapper #:nodoc:
    attr_accessor :job_data

    def initialize(job_data)
      @job_data = job_data
    end

    def perform
      Base.execute(job_data)
    end
  end
end

TestAdapter

Have you ever needed to test which jobs were enqueued or performed when executing some specs? There’s a good change you were using test helpers provided by ActiveJob or rspec-activejob for that. All these assertions are quite easy to handle thanks to TestAdapter which exposes some extra API for keeping track of enqueued and performed jobs adding enqueued_jobs and peformed_jobs attributes, which are populated when calling enqueue and enqueue_at methods. You can also configure if the jobs should be actually executed by changing perform_enqueued_jobs and perform_enqueued_at_jobs flags. You can also whitelist which jobs could be enqueued with filter attribute.

# activejob/queue_adapters/test_adapter.rb
class TestAdapter
  attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter)
  attr_writer(:enqueued_jobs, :performed_jobs)

  # Provides a store of all the enqueued jobs with the TestAdapter so you can check them.
  def enqueued_jobs
    @enqueued_jobs ||= []
  end

  # Provides a store of all the performed jobs with the TestAdapter so you can check them.
  def performed_jobs
    @performed_jobs ||= []
  end

  def enqueue(job) #:nodoc:
    return if filtered?(job)

    job_data = job_to_hash(job)
    enqueue_or_perform(perform_enqueued_jobs, job, job_data)
  end

  def enqueue_at(job, timestamp) #:nodoc:
    return if filtered?(job)

    job_data = job_to_hash(job, at: timestamp)
    enqueue_or_perform(perform_enqueued_at_jobs, job, job_data)
  end

  private

  def job_to_hash(job, extras = {})
    { job: job.class, args: job.serialize.fetch('arguments'), queue: job.queue_name }.merge!(extras)
  end

  def enqueue_or_perform(perform, job, job_data)
    if perform
      performed_jobs &lt;&lt; job_data
      Base.execute job.serialize
    else
      enqueued_jobs &lt;&lt; job_data
    end
  end

  def filtered?(job)
    filter &amp;&amp; !Array(filter).include?(job.class)
  end
end

Wrapping up

We’ve learned quite a lot how ActiveJob works under the hood - what kind of public API is available and how to extend it with custom queue adapters. Even though understanding the internals of Rails may require some effort and time, it’s worth going deeper and exploring the architecture of the framework we use for everyday development. Here are some key takeaways:

  • You can provide the exact instance of queue adapter for ActiveJob, not only a string or symbol, which lets you pass some extra configuration options
  • Adapter pattern is a great choice when we have several services with different interfaces but we want to have one unified interface for using all of them
  • Most of the ActiveJob's logic is divided into modules (which seems to be a common pattern in other layers of Rails), but benefits of doing so are unclear: why Execution is a separate module from Core? What kind of benefits does splitting queue-related logic to QueuePriority, QueueName and QueueAdapter give? I don’t really see it as a way to decouple code as e.g. Enqueuing module depends on logic from QueueName, yet it’s not required explicitly, it just depends on existence of queue_adapter attribute. It would be more clear if Base or Core module acted like a facade and delegated responsibilities to some other classes. If anyone knows any reason behind this kind of design, please write it in a comment, I’m really curious about it.
  • To support another background jobs execution framework, you just need to add a queue adapter class implementing enqueue and enequeue_at methods which under the hood would push the job to the queue and delegate execution of the logic to ActiveJob::Base.execute method passing the serialised job as an argument.
  • Rails internals are not that scary :)

If there’s any particular part of Rails that seems "magical" and you would like to see it decoded, let me know in the comments, I want to make sure I cover the needs of my readers.

posted in: Ruby, Ruby on Rails, Design Patterns, Architecture, ActiveJob