Laravel

Creating a Custom Queue Driver For Laravel

This article from Christopher Pitt was a great help in putting this tutorial together. Thank you.

Custom Queue

Firstly, you'll need a new queue class. This will be responsible for putting jobs on, taking jobs off and deleting jobs from the queue as well as gathering data such as the current queue size.

Create a class that implements Laravel's Illuminate\Contracts\Queue\Queue interface. Looking at this interface, you will see that you need to implement 7 methods:

  • size - return the current number of jobs on the queue,
  • push - push a new job on to the queue
  • pushRaw - push a raw payload on to the queue
  • later - push a new job on the queue to be processed later
  • pushOn - push a new job on to a specific queue
  • laterOn - push a new job on a given queue to be processed later
  • pop - take a job from the queue

Define these methods for your given queue service.

Custom Connector

Now, you'll need to build our own queue connector class. This will be responsible for connecting to your chosen queue service. Define yourself a class and implement Laravel's Illuminate\Queue\Connectors\ConnectorInterface

Again, looking at this interface will tell you that you need only one method in your implementation, connect(array $config)

This method accepts an array of configuration options which are defined in the config/queue.php configuration file. More on that later.

From this method, you need to return an implementation of the custom queue you defined above.

Queue Service Provider

Now, how do we get Laravel to know about your new queue driver? Simple, add it to a service provider. You can add this to an existing service provider or generate a new one on the command line with php artisan make:provider QueueServiceProvider

In the boot method of the service provider, add the following:

1$manager = $this->app['queue'];
2
3$manager->addConnector('your-custom-driver-name', function()
4{
5 return new YourCustomConnector;
6});

Let's break this down:

$manager = $this->app['queue']; grabs the queue manager from the application container.

Then we use the addConnector method on the queue manager instance to bind your new custom connector.

Supply the addConnector method with the name for your driver. In my case, this was custom-sqs-worker. You will use this name when defining the queue configuration, so make sure this is clear to you and, more importantly, clear to those that may maintain your code in the future. The second parameter in the addConnector method is a closure. From this closure, you need to return an instance of your newly defined custom connector. For me, this was CustomSqsConnector.

Configuration

Now you're pretty much ready to go, all that is left is to add the config to config/queue.php. Open up this file and add your required configuration to the connections array. The key should be the name of your driver (as supplied to addConnector in the queue service provider above) and the value should be an array of configuration options required by your driver. You may recall from earlier in the article that this is passed to the connect method of the connector class.

That's it! You can now update your default queue driver to utilise your newly created implementation. Nice work!

Further Reading

Out of the box, Laravel is geared up to grab only a single job from the queue at any one time. Amazon SQS provides the option to grab up to 10 jobs per fetch. This is something I wanted to utilise.

In order to show this process, we need to take a step back and look at some of the default Laravel worker code.

When running php artisan queue:work, under the hood, laravel will eventually call runNextJob on the Illuminate\Queue\Worker class.

As usual with Laravel, the clue is in the API here. This method will grab the next job from the queue and run it.

Here we are interested in this conditional:

1if ($job) {
2 return $this->process(
3 $connectionName, $job, $options
4 );
5}

This method is only ever expecting a single job, but my custom queue will be returning an array of up to 10 jobs. As such, the native Laravel worker implementation will not suit my requirements.

To get around this, I only needed to update the runNextJob method. No matter how the queue jobs are run, Laravel will always go to this method to run the job.

With this in mind, I created a new class that extends the native worker class and overrides this method. My method looks a bit like this:

1if ($jobs) {
2
3 //if the job isn't an array, make it so
4 if(!is_array($jobs)) $jobs = [$jobs];
5
6 //loop over the jobs
7 foreach($jobs as $job)
8 {
9 //run the process method on each job
10 $this->process(
11 $connectionName, $job, $options
12 );
13 }
14 return true;
15}

Great, this should do the trick, but how to make Laravel use it?

Head back over to the queue service provider created earlier in the article and add the following:

1$this->app->singleton('queue.worker', function ($app) {
2 return new SqsCustomWorker(
3 $app['queue'], $app['events'],
4 $app['Illuminate\Contracts\Debug\ExceptionHandler']
5 );
6});

Simply put, this will update the queue worker instantiated in the application container with your custom implementation. The worker constructor accepts the queue manager, registered application events and an exception handler which we can grab from the container.

That's it, Laravel will now utilise your new worker.

Code highlighting provided by Torchlight