Decoding Rails Magic: How Does ActiveJob work?
Executing background jobs is quite a common feature in many of the web applications. Switching between different background processing frameworks used to be quite painful as most of them had different API for enqueuing jobs, enqueuing mailers and scheduling jobs. One of the great addition in Rails 4.2 was a solution to this problem: ActiveJob, which provides extra layer on top of background jobs framework and unifies the API regardless of the queue adapter you use. But how exactly does it work? What are the requirements for adding new queue adapters? What kind of API does ActiveJob provide? Let’s dive deep into the codebase and answer these and some other questions.
Anatomy of the job
Let’s start with some simple job class, let it be MyAwesomeJob
:
# app/jobs/my_awesome_job.rb
class MyAwesomeJob < ActiveJob::Base
def perform(user)
User::DoSomethingAwesome.call(user)
end
end
To enqueue a job we could simply write: MyAwesomeJob.perform_later(some_user)
or if we wanted to schedule a job in some time in the future we could write: MyAwesomeJob.set(wait: 12.hours).perform_later(some_user)
or MyAwesomeJob.perform_now(some_user)
for executing the job immediately without enqueuing. But we never defined these methods, so what kind of extra work ActiveJob performs to make it happen?
Exploring internals of ActiveJob
To answer this question, let’s take a look at the ActiveJob::Base class:
# active_job/base.rb
module ActiveJob
class Base
include Core
include QueueAdapter
include QueueName
include QueuePriority
include Enqueuing
include Execution
include Callbacks
include Logging
include Translation
ActiveSupport.run_load_hooks(:active_job, self)
end
end
There are some interesting modules included in this class, which we will get to know in more details later, but let’s focus on the core API for now. Most likely this kind of logic would be defined in, well, Core
module. Indeed, the set
method is there:
# active_job/core.rb
module ActiveJob
module Core
module ClassMethods
def set(options={})
ConfiguredJob.new(self, options)
end
end
end
end
It returns an instance of ConfiguredJob
passing the job instance itself and arguments to the constructor. Let’s check what ConfiguredJob class is responsible for:
# active_job/configured_job.rb
module ActiveJob
class ConfiguredJob #:nodoc:
def initialize(job_class, options={})
@options = options
@job_class = job_class
end
def perform_now(*args)
@job_class.new(*args).perform_now
end
def perform_later(*args)
@job_class.new(*args).enqueue @options
end
end
end
We have 2 methods available here: perform_now
and perform_later
. Both of them create a new job instance with arguments passed to the method and they either call perform_now
method on the job instance or call enqueue
passing the options which are the arguments from the set
method.
Let’s go deeper and start with perform_now
method: it’s defined inside Execution module, which basically comes down to deserializing arguments if needed (there is nothing to deserialize when calling perform_now
directly), and calling our perform
method, which we defined in the job class. This logic is wrapped in run_callbacks
block, which lets you define callbacks before
, around
and after
the execution of perform
method.
# active_job/execution.rb
module ActiveJob
module Execution
module ClassMethods
def perform_now(*args)
job_or_instantiate(*args).perform_now
end
end
# rest of the code which was removed for brevity
def perform_now
deserialize_arguments_if_needed
run_callbacks :perform do
perform(*arguments)
end
rescue => exception
rescue_with_handler(exception) || raise
end
end
end
These callbacks are defined inside Callbacks module, but its only responsibility is defining callbacks for perform
and enqueue
method, which help extend the behaviour of the jobs in a pretty unobtrusive manner. For example, if we wanted to log when the job is finished, we could add the following after_perform
callback:
# app/jobs/my_awesome_job.rb
class MyAwesomeJob < ActiveJob::Base
after_perform do |job|
Rails.logger.info "#{Time.current}: finished execution of the job: #{job.inspect}"
end
def perform(user)
User::DoSomethingAwesome.call(user)
end
end
Let’s get back to perform_later
method from ConfiguredJob
. We could expect enqueue
method to be defined in Enqueuing module, which seems to be the case here as well:
# active_job/enqueuing.rb
module ActiveJob
module Enqueuing
def enqueue(options={})
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]
run_callbacks :enqueue do
if self.scheduled_at
self.class.queue_adapter.enqueue_at self, self.scheduled_at
else
self.class.queue_adapter.enqueue self
end
end
self
end
end
end
We can pass several options here - scheduled_at
attribute could be configured with wait
(which will schedule a job in specified amount of seconds from current time) and wait_until
(which will schedule a job at exact specified time). We can also enforce queue
used for the job execution and set the priority
. At the end, the method call is delegated to queue_adapter
. This logic is wrapped in run_callbacks
block, which lets you define callbacks before
, around
and after
the execution of this code.
In Enqueueing
module we can also find perform_later method, which is the part of most basic API of ActiveJob and it basically comes down to calling enqueue
method without any extra options
arguments.
# active_job/enqueuing.rb
module ActiveJob
module Enqueuing
extend ActiveSupport::Concern
module ClassMethods
def perform_later(*args)
job_or_instantiate(*args).enqueue
end
protected
def job_or_instantiate(*args)
args.first.is_a?(self) ? args.first : new(*args)
end
end
end
end
Queue Adapters
What is this queue_adapter
to which we delegate the enqueueing? Let’s take a look at QueueAdapter module. Its responsibility is exposing reader and writer for queue_adapter
accessor, which by default is async
adapter. Assigning adapter is quite flexible and we can pass here a string or a symbol (which will be used for the lookup of the proper adapter), instance of adapter itself or the class of the adapter (which is deprecated).
# active_job/queue_adapter.rb
module ActiveJob
module QueueAdapter #:nodoc:
extend ActiveSupport::Concern
included do
class_attribute :_queue_adapter, instance_accessor: false, instance_predicate: false
self.queue_adapter = :async
end
module ClassMethods
def queue_adapter
_queue_adapter
end
def queue_adapter=(name_or_adapter_or_class)
self._queue_adapter = interpret_adapter(name_or_adapter_or_class)
end
private
def interpret_adapter(name_or_adapter_or_class)
case name_or_adapter_or_class
when Symbol, String
ActiveJob::QueueAdapters.lookup(name_or_adapter_or_class).new
else
if queue_adapter?(name_or_adapter_or_class)
name_or_adapter_or_class
elsif queue_adapter_class?(name_or_adapter_or_class)
ActiveSupport::Deprecation.warn "Passing an adapter class is deprecated " \
"and will be removed in Rails 5.1. Please pass an adapter name " \
"(.queue_adapter = :#{name_or_adapter_or_class.name.demodulize.remove('Adapter').underscore}) " \
"or an instance (.queue_adapter = #{name_or_adapter_or_class.name}.new) instead."
name_or_adapter_or_class.new
else
raise ArgumentError
end
end
end
QUEUE_ADAPTER_METHODS = [:enqueue, :enqueue_at].freeze
def queue_adapter?(object)
QUEUE_ADAPTER_METHODS.all? { |meth| object.respond_to?(meth) }
end
def queue_adapter_class?(object)
object.is_a?(Class) && QUEUE_ADAPTER_METHODS.all? { |meth| object.public_method_defined?(meth) }
end
end
end
end
All supported queue adapters are defined in queue_adapters directory. There are quite a lot of adapters here, so let’s pick some of them.
Async Adapter
Let’s start with AsyncAdapter which is the default one. What is really interesting about this queue adapter is that it doesn’t use any extra services but runs jobs with an in-process thread pool. Under the hood it uses Concurrent Ruby, which is a collection of modern tools for writing concurrent code, I highly recommend to check it further. We can pass executor_options
to constructor, which are then used to create a new instance of Scheduler
.
# active_job/queue_adapters/async_adapter.rb
module ActiveJob
module QueueAdapters
class AsyncAdapter
def initialize(**executor_options)
@scheduler = Scheduler.new(**executor_options)
end
def enqueue(job) #:nodoc:
@scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
end
def enqueue_at(job, timestamp) #:nodoc:
@scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name
end
def shutdown(wait: true) #:nodoc:
@scheduler.shutdown wait: wait
end
def immediate=(immediate) #:nodoc:
@scheduler.immediate = immediate
end
class JobWrapper #:nodoc:
def initialize(job)
job.provider_job_id = SecureRandom.uuid
@job_data = job.serialize
end
def perform
Base.execute @job_data
end
end
class Scheduler #:nodoc:
# code removed for brevity
end
end
end
end
Remember how we could assign queue adapter
for ActiveJob in multiple ways? That’s exactly the use case for assigning specific instance of the queue adapter, besides just passing a string / symbol (or class, but that way is deprecated). The Scheduler
instance acts in fact like a queue backend and but specifics of how it works are beyond the scope of this article. Nevertheless, the thing to keep in mind is that it exposes two important methods: enqueue and enqueue_at:
# active_job/queue_adapters/async_adapter.rb
module ActiveJob
module QueueAdapters
class AsyncAdapter
class Scheduler #:nodoc:
DEFAULT_EXECUTOR_OPTIONS = {
min_threads: 0,
max_threads: Concurrent.processor_count,
auto_terminate: true,
idletime: 60, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
}.freeze
attr_accessor :immediate
def initialize(**options)
self.immediate = false
@immediate_executor = Concurrent::ImmediateExecutor.new
@async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options))
end
def enqueue(job, queue_name:)
executor.post(job, &:perform)
end
def enqueue_at(job, timestamp, queue_name:)
delay = timestamp - Time.current.to_f
if delay > 0
Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform)
else
enqueue(job, queue_name: queue_name)
end
end
def shutdown(wait: true)
@async_executor.shutdown
@async_executor.wait_for_termination if wait
end
def executor
immediate ? @immediate_executor : @async_executor
end
end
end
end
end
The main difference between these two methods is a timestamp (or lack of it) used for executing the job later.
Let’s get back to top-level AsyncAdapter
class. The primary interface that is required for all queue adapters to implement is two methods: enqueue
and enqueue_at
. For Async
adapter, these methods simply pass instance of JobWrapper
with queue_name
and timestamp
(only for enqueue_at
):
# active_job/queue_adapters/async_adapter.rb
module ActiveJob
module QueueAdapters
class AsyncAdapter
def initialize(**executor_options)
@scheduler = Scheduler.new(**executor_options)
end
def enqueue(job) #:nodoc:
@scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
end
def enqueue_at(job, timestamp) #:nodoc:
@scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name
end
end
end
end
And what is this JobWrapper
? It’s a simple abstraction for passing something that can serialize jobs and knows how to execute them:
# active_job/queue_adapters/async_adapter.rb
module ActiveJob
module QueueAdapters
class AsyncAdapter
class JobWrapper #:nodoc:
def initialize(job)
job.provider_job_id = SecureRandom.uuid
@job_data = job.serialize
end
def perform
Base.execute @job_data
end
end
end
end
end
Serialization and deserialization
Let’s take a closer look how it works: execute method is defined in Execution
module and it basically comes down to deserializing job data (which was serialized in JobWrapper
so that it can be enqueued) and calling perform_now
. This logic is wrapped with run_callbacks
block so we can extend this logic by performing some action before
, around
or after
execution logic:
# active_job/execution.rb
module ActiveJob
module Execution
module ClassMethods
def execute(job_data) #:nodoc:
ActiveJob::Callbacks.run_callbacks(:execute) do
job = deserialize(job_data)
job.perform_now
end
end
end
end
end
deserialize
class method is defined inside Core module and what it does is creating a new instance of the job, deserializing data and returning the job:
# active_job/core.rb
module ActiveJob
module Execution
module ClassMethods
# Creates a new job instance from a hash created with +serialize+
def deserialize(job_data)
job = job_data['job_class'].constantize.new
job.deserialize(job_data)
job
end
end
end
end
Before explaining what happens during the deserialization we should know how the serialized data look like - it’s a hash containing name of the job class, job id, queue name, priority, locale and serialized arguments:
# active_job/core.rb
module ActiveJob
module Core
def serialize
{
'job_class' => self.class.name,
'job_id' => job_id,
'queue_name' => queue_name,
'priority' => priority,
'arguments' => serialize_arguments(arguments),
'locale' => I18n.locale.to_s
}
end
end
end
serialize_arguments
method delegates the serialization process to ActiveJob::Arguments.serialize method, which is mainly responsible for mapping ActiveRecord models from arguments to global ids:
# active_job/core.rb
module ActiveJob
module Core
private
def serialize_arguments(serialized_args)
Arguments.serialize(serialized_args)
end
end
end
Here’s an example how serialized arguments may look like:
> ActiveJob::Arguments.serialize([User.find(1), [123, User.find(2)], { "current_user" => User.find(3)}])
=> ["gid://app-name/User/1", [123, "gid://app-name/User/2"], {"value"=>"gid://app-name/User/3"}]
This format can easily be used for enqueuing jobs in different queues.
Just before the execution of the job, the data needs to be deserialized. Like serialize
method, deserialize
is defined in Core module and it assigns job id, queue name, priority, locale and serialized arguments to the job using its accessors. But the arguments are not deserialized just yet, so how does the execution with perform_now
work?
Remember how I mentioned before that there is nothing to be deserialized when using perform_now
directly? In this case it will be a bit different as we operate on serialized arguments. Deserialization happens just before executing perform
method in deserialize_arguments_if_needed.
# activejob/lib/active_job/core.rb
module ActiveJob
module Core
private
def deserialize_arguments_if_needed
if defined?(@serialized_arguments) && @serialized_arguments.present?
@arguments = deserialize_arguments(@serialized_arguments)
@serialized_arguments = nil
end
end
end
end
Again, the deserialization is delegated to Arguments module and its primary responsibility is turning global ids into real models, so gid://app-name/User/3
would be in fact a User record with id equal to 3.
Exploring more queue adapters
Inline Adapter
Let’s explore some more adapters. Most likely you were using InlineAdapter in integration tests for testing the side effects of executing some job. Its logic is very limited: since it’s for the inline execution, it doesn’t support enqueueing jobs for the future execution and enqueue
method for performing logic merely calls execute
method with serialized arguments:
# activejob/queue_adapters/inline_adapter.rb
class InlineAdapter
def enqueue(job) #:nodoc:
Base.execute(job.serialize)
end
def enqueue_at(*) #:nodoc:
raise NotImplementedError, "Use a queueing backend to enqueue jobs in the future. Read more at http://guides.rubyonrails.org/active_job_basics.html"
end
end
Sidekiq Adapter
Let’s check a queue adapter for one of the most commonly used frameworks for background processing - Sidekiq. Sidekiq requires defining a class implementing perform
instance method executing the logic of the job and inclusion of Sidekiq::Worker
module to be enqueued in its queue. Just like AsyncAdapter
, SidekiqAdapter
uses internal JobWrapper
class, which includes Sidekiq::Worker
and implements perform
method taking job_data
as an argument and its logic is limited to delegating execution of the logic to ActiveJob::Base.execute
method:
# activejob/queue_adapters/sidekiq_adapter.rb
class SidekiqAdapter
def enqueue(job) #:nodoc:
#Sidekiq::Client does not support symbols as keys
job.provider_job_id = Sidekiq::Client.push \
'class' => JobWrapper,
'wrapped' => job.class.to_s,
'queue' => job.queue_name,
'args' => [ job.serialize ]
end
def enqueue_at(job, timestamp) #:nodoc:
job.provider_job_id = Sidekiq::Client.push \
'class' => JobWrapper,
'wrapped' => job.class.to_s,
'queue' => job.queue_name,
'args' => [ job.serialize ],
'at' => timestamp
end
class JobWrapper #:nodoc:
include Sidekiq::Worker
def perform(job_data)
Base.execute job_data
end
end
end
Again, like every other adapter, SidekiqAdapter
implements enqueue
and enqueue_at
methods and both of them push jobs to Sidekiq’s queue by passing some meta info that is later used for identifying proper job class, executing in specific queue and of course the serialized arguments. As an extra argument, enqueue_at passes timestamp for executing the job at specific time. Pushing a job to Sidekiq queue returns internal job id which is then assigned to provider_job_id
attribute.
DelayedJob Adapter
Let’s take a look at adapter for arguably most common choice backed by application’s database - DelayedJob. The pattern is exactly the same as for Sidekiq Adapter: We have enqueue
and enqueue_at
methods and both of them push the job to the queue with extra info about queue name, priority and, for enqueue_at
method, the time to run the job at. Just like SidekiqAdapter
, it wraps serialized job with internal JobWrapper
instance which delegates execution of the logic to ActiveJob::Base.execute
. At the end, the internal job id from DelayedJob’s queue is assigned to provider_job_id
attribute:
# activejob/queue_adatpers/delayed_job_adapter.rb
class DelayedJobAdapter
def enqueue(job) #:nodoc:
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority)
job.provider_job_id = delayed_job.id
delayed_job
end
def enqueue_at(job, timestamp) #:nodoc:
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp))
job.provider_job_id = delayed_job.id
delayed_job
end
class JobWrapper #:nodoc:
attr_accessor :job_data
def initialize(job_data)
@job_data = job_data
end
def perform
Base.execute(job_data)
end
end
end
TestAdapter
Have you ever needed to test which jobs were enqueued or performed when executing some specs? There’s a good change you were using test helpers provided by ActiveJob or rspec-activejob for that. All these assertions are quite easy to handle thanks to TestAdapter which exposes some extra API for keeping track of enqueued and performed jobs adding enqueued_jobs
and peformed_jobs
attributes, which are populated when calling enqueue
and enqueue_at
methods. You can also configure if the jobs should be actually executed by changing perform_enqueued_jobs
and perform_enqueued_at_jobs
flags. You can also whitelist which jobs could be enqueued with filter
attribute.
# activejob/queue_adapters/test_adapter.rb
class TestAdapter
attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter)
attr_writer(:enqueued_jobs, :performed_jobs)
# Provides a store of all the enqueued jobs with the TestAdapter so you can check them.
def enqueued_jobs
@enqueued_jobs ||= []
end
# Provides a store of all the performed jobs with the TestAdapter so you can check them.
def performed_jobs
@performed_jobs ||= []
end
def enqueue(job) #:nodoc:
return if filtered?(job)
job_data = job_to_hash(job)
enqueue_or_perform(perform_enqueued_jobs, job, job_data)
end
def enqueue_at(job, timestamp) #:nodoc:
return if filtered?(job)
job_data = job_to_hash(job, at: timestamp)
enqueue_or_perform(perform_enqueued_at_jobs, job, job_data)
end
private
def job_to_hash(job, extras = {})
{ job: job.class, args: job.serialize.fetch('arguments'), queue: job.queue_name }.merge!(extras)
end
def enqueue_or_perform(perform, job, job_data)
if perform
performed_jobs << job_data
Base.execute job.serialize
else
enqueued_jobs << job_data
end
end
def filtered?(job)
filter && !Array(filter).include?(job.class)
end
end
Wrapping up
We’ve learned quite a lot how ActiveJob
works under the hood - what kind of public API is available and how to extend it with custom queue adapters. Even though understanding the internals of Rails may require some effort and time, it’s worth going deeper and exploring the architecture of the framework we use for everyday development. Here are some key takeaways:
- You can provide the exact instance of queue adapter for ActiveJob, not only a string or symbol, which lets you pass some extra configuration options
- Adapter pattern is a great choice when we have several services with different interfaces but we want to have one unified interface for using all of them
- Most of the ActiveJob's logic is divided into modules (which seems to be a common pattern in other layers of Rails), but benefits of doing so are unclear: why Execution is a separate module from Core? What kind of benefits does splitting queue-related logic to QueuePriority, QueueName and QueueAdapter give? I don’t really see it as a way to decouple code as e.g.
Enqueuing
module depends on logic from QueueName, yet it’s not required explicitly, it just depends on existence ofqueue_adapter
attribute. It would be more clear if Base or Core module acted like a facade and delegated responsibilities to some other classes. If anyone knows any reason behind this kind of design, please write it in a comment, I’m really curious about it. - To support another background jobs execution framework, you just need to add a queue adapter class implementing
enqueue
andenequeue_at
methods which under the hood would push the job to the queue and delegate execution of the logic toActiveJob::Base.execute
method passing the serialised job as an argument. - Rails internals are not that scary :)
If there’s any particular part of Rails that seems "magical" and you would like to see it decoded, let me know in the comments, I want to make sure I cover the needs of my readers.