Executing background jobs is quite a common feature in many of the web applications. Switching between different background processing frameworks used to be quite painful as most of them had different API for enqueuing jobs, enqueuing mailers and scheduling jobs. One of the great addition in Rails 4.2 was a solution to this problem: ActiveJob, which provides extra layer on top of background jobs framework and unifies the API regardless of the queue adapter you use. But how exactly does it work? What are the requirements for adding new queue adapters? What kind of API does ActiveJob provide? Let’s dive deep into the codebase and answer these and some other questions.
Anatomy of the job
Let’s start with some simple job class, let it be MyAwesomeJob
:
1 2 3 4 5 |
|
To enqueue a job we could simply write: MyAwesomeJob.perform_later(some_user)
or if we wanted to schedule a job in some time in the future we could write: MyAwesomeJob.set(wait: 12.hours).perform_later(some_user)
or MyAwesomeJob.perform_now(some_user)
for executing the job immediately without enqueuing. But we never defined these methods, so what kind of extra work ActiveJob performs to make it happen?
Exploring internals of ActiveJob
To answer this question, let’s take a look at the ActiveJob::Base class:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
There are some interesting modules included in this class, which we will get to know in more details later, but let’s focus on the core API for now. Most likely this kind of logic would be defined in, well, Core
module. Indeed, the set
method is there:
1 2 3 4 5 6 7 8 9 |
|
It returns an instance of ConfiguredJob
passing the job instance itself and arguments to the constructor. Let’s check what ConfiguredJob class is responsible for:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
|
We have 2 methods available here: perform_now
and perform_later
. Both of them create a new job instance with arguments passed to the method and they either call perform_now
method on the job instance or call enqueue
passing the options which are the arguments from the set
method.
Let’s go deeper and start with perform_now
method: it’s defined inside Execution module, which basically comes down to deserializing arguments if needed (there is nothing to deserialize when calling perform_now
directly), and calling our perform
method, which we defined in the job class. This logic is wrapped in run_callbacks
block, which lets you define callbacks before
, around
and after
the execution of perform
method.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
These callbacks are defined inside Callbacks module, but its only responsibility is defining callbacks for perform
and enqueue
method, which help extend the behaviour of the jobs in a pretty unobtrusive manner. For example, if we wanted to log when the job is finished, we could add the following after_perform
callback:
1 2 3 4 5 6 7 8 9 |
|
Let’s get back to perform_later
method from ConfiguredJob
. We could expect enqueue
method to be defined in Enqueuing module, which seems to be the case here as well:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
We can pass several options here - scheduled_at
attribute could be configured with wait
(which will schedule a job in specified amount of seconds from current time) and wait_until
(which will schedule a job at exact specified time). We can also enforce queue
used for the job execution and set the priority
. At the end, the method call is delegated to queue_adapter
. This logic is wrapped in run_callbacks
block, which lets you define callbacks before
, around
and after
the execution of this code.
In Enqueueing
module we can also find perform_later method, which is the part of most basic API of ActiveJob and it basically comes down to calling enqueue
method without any extra options
arguments.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
|
Queue Adapters
What is this queue_adapter
to which we delegate the enqueueing? Let’s take a look at QueueAdapter module. Its responsibility is exposing reader and writer for queue_adapter
accessor, which by default is async
adapter. Assigning adapter is quite flexible and we can pass here a string or a symbol (which will be used for the lookup of the proper adapter), instance of adapter itself or the class of the adapter (which is deprecated).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
|
All supported queue adapters are defined in queue_adapters directory. There are quite a lot of adapters here, so let’s pick some of them.
Async Adapter
Let’s start with AsyncAdapter which is the default one. What is really interesting about this queue adapter is that it doesn’t use any extra services but runs jobs with an in-process thread pool. Under the hood it uses Concurrent Ruby, which is a collection of modern tools for writing concurrent code, I highly recommend to check it further. We can pass executor_options
to constructor, which are then used to create a new instance of Scheduler
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
|
Remember how we could assign queue adapter
for ActiveJob in multiple ways? That’s exactly the use case for assigning specific instance of the queue adapter, besides just passing a string / symbol (or class, but that way is deprecated). The Scheduler
instance acts in fact like a queue backend and but specifics of how it works are beyond the scope of this article. Nevertheless, the thing to keep in mind is that it exposes two important methods: enqueue and enqueue_at:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
|
The main difference between these two methods is a timestamp (or lack of it) used for executing the job later.
Let’s get back to top-level AsyncAdapter
class. The primary interface that is required for all queue adapters to implement is two methods: enqueue
and enqueue_at
. For Async
adapter, these methods simply pass instance of JobWrapper
with queue_name
and timestamp
(only for enqueue_at
):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
And what is this JobWrapper
? It’s a simple abstraction for passing something that can serialize jobs and knows how to execute them:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
|
Serialization and deserialization
Let’s take a closer look how it works: execute method is defined in Execution
module and it basically comes down to deserializing job data (which was serialized in JobWrapper
so that it can be enqueued) and calling perform_now
. This logic is wrapped with run_callbacks
block so we can extend this logic by performing some action before
, around
or after
execution logic:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
deserialize
class method is defined inside Core module and what it does is creating a new instance of the job, deserializing data and returning the job:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Before explaining what happens during the deserialization we should know how the serialized data look like - it’s a hash containing name of the job class, job id, queue name, priority, locale and serialized arguments:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
serialize_arguments
method delegates the serialization process to ActiveJob::Arguments.serialize method, which is mainly responsible for mapping ActiveRecord models from arguments to global ids:
1 2 3 4 5 6 7 8 9 |
|
Here’s an example how serialized arguments may look like:
1 2 |
|
This format can easily be used for enqueuing jobs in different queues.
Just before the execution of the job, the data needs to be deserialized. Like serialize
method, deserialize
is defined in Core module and it assigns job id, queue name, priority, locale and serialized arguments to the job using its accessors. But the arguments are not deserialized just yet, so how does the execution with perform_now
work?
Remember how I mentioned before that there is nothing to be deserialized when using perform_now
directly? In this case it will be a bit different as we operate on serialized arguments. Deserialization happens just before executing perform
method in deserialize_arguments_if_needed.
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Again, the deserialization is delegated to Arguments module and its primary responsibility is turning global ids into real models, so gid://app-name/User/3
would be in fact a User record with id equal to 3.
Exploring more queue adapters
Inline Adapter
Let’s explore some more adapters. Most likely you were using InlineAdapter in integration tests for testing the side effects of executing some job. Its logic is very limited: since it’s for the inline execution, it doesn’t support enqueueing jobs for the future execution and enqueue
method for performing logic merely calls execute
method with serialized arguments:
1 2 3 4 5 6 7 8 9 |
|
Sidekiq Adapter
Let’s check a queue adapter for one of the most commonly used frameworks for background processing - Sidekiq. Sidekiq requires defining a class implementing perform
instance method executing the logic of the job and inclusion of Sidekiq::Worker
module to be enqueued in its queue. Just like AsyncAdapter
, SidekiqAdapter
uses internal JobWrapper
class, which includes Sidekiq::Worker
and implements perform
method taking job_data
as an argument and its logic is limited to delegating execution of the logic to ActiveJob::Base.execute
method:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
|
Again, like every other adapter, SidekiqAdapter
implements enqueue
and enqueue_at
methods and both of them push jobs to Sidekiq’s queue by passing some meta info that is later used for identifying proper job class, executing in specific queue and of course the serialized arguments. As an extra argument, enqueue_at passes timestamp for executing the job at specific time. Pushing a job to Sidekiq queue returns internal job id which is then assigned to provider_job_id
attribute.
DelayedJob Adapter
Let’s take a look at adapter for arguably most common choice backed by application’s database - DelayedJob. The pattern is exactly the same as for Sidekiq Adapter: We have enqueue
and enqueue_at
methods and both of them push the job to the queue with extra info about queue name, priority and, for enqueue_at
method, the time to run the job at. Just like SidekiqAdapter
, it wraps serialized job with internal JobWrapper
instance which delegates execution of the logic to ActiveJob::Base.execute
. At the end, the internal job id from DelayedJob’s queue is assigned to provider_job_id
attribute:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
|
TestAdapter
Have you ever needed to test which jobs were enqueued or performed when executing some specs? There’s a good change you were using test helpers provided by ActiveJob or rspec-activejob for that. All these assertions are quite easy to handle thanks to TestAdapter which exposes some extra API for keeping track of enqueued and performed jobs adding enqueued_jobs
and peformed_jobs
attributes, which are populated when calling enqueue
and enqueue_at
methods. You can also configure if the jobs should be actually executed by changing perform_enqueued_jobs
and perform_enqueued_at_jobs
flags. You can also whitelist which jobs could be enqueued with filter
attribute.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
|
Wrapping up
We’ve learned quite a lot how ActiveJob
works under the hood - what kind of public API is available and how to extend it with custom queue adapters. Even though understanding the internals of Rails may require some effort and time, it’s worth going deeper and exploring the architecture of the framework we use for everyday development. Here are some key takeaways:
- You can provide the exact instance of queue adapter for ActiveJob, not only a string or symbol, which lets you pass some extra configuration options
- Adapter pattern is a great choice when we have several services with different interfaces but we want to have one unified interface for using all of them
- Most of the ActiveJob’s logic is divided into modules (which seems to be a common pattern in other layers of Rails), but benefits of doing so are unclear: why Execution is a separate module from Core? What kind of benefits does splitting queue-related logic to QueuePriority, QueueName and QueueAdapter give? I don’t really see it as a way to decouple code as e.g.
Enqueuing
module depends on logic from QueueName, yet it’s not required explicitly, it just depends on existence ofqueue_adapter
attribute. It would be more clear if Base or Core module acted like a facade and delegated responsibilities to some other classes. If anyone knows any reason behind this kind of design, please write it in a comment, I’m really curious about it. - To support another background jobs execution framework, you just need to add a queue adapter class implementing
enqueue
andenequeue_at
methods which under the hood would push the job to the queue and delegate execution of the logic toActiveJob::Base.execute
method passing the serialised job as an argument. - Rails internals are not that scary :)
If there’s any particular part of Rails that seems “magical” and you would like to see it decoded, let me know in the comments, I want to make sure I cover the needs of my readers.