Grind’s Queue provider is built on Kue internally, however Grind provides a much different interace so it feels right at home within the Grind ecosystem.

First, add the grind-queue package via your preferred package manager:

yarn add grind-queue

Next, you’ll need to add QueueProvider to your app providers in app/Boostrap.js:

import Grind from 'grind-framework'
import { QueueProvider } from 'grind-queue'
 
const app = new Grind()
app.providers.push(QueueProvider)

To configure your queue, create config/queue.json:

{
  "default": "redis",
  "connections": {
    "redis": {
      "prefix": "q", // Default prefix for the Queue
      "connection": {
        "host": "127.0.0.1",
        "port": 6379,
        "password": null
      }
    }
  }
}

Queue also supports referencing connections in the global Redis config file:

{
  "default": "redis",
  "connections": {
    "redis": {
      "prefix": "q",
      "connection": "prod" // This will reference the “prod” connection defined in config/redis.json
    }
  }
}

In fact, you can omit the connection object entirely to reference the default connection in config/redis.json:

{
  "default": "redis",
  "connections": {
    "redis": {
      "prefix": "q"
    }
  }
}

The most important of any Queue system is the ability to actually create and dispatch jobs. Grind’s Queue uses Job classes to provide a singular source for working with a job. When dispatching, you’ll create a new instance of your Job class and then when it’s time for the job to be processed, the $handle method will be called.

The fastest way to create a new job is by using the job generator via bin/cli make:job. To quick generate a new job, run the following in your project directory:

bin/cli make:job EmailJob

This will generate an EmailJob class and place it at app/Jobs/EmailJob.js.

Now that your Job has been created, it’ll look like this:

import { Job } from 'grind-queue'
 
export class EmailJob extends Job {
  static jobName = 'email-job'
 
  $handle(app, queue) {
    // 
  }
 
}

This is the name of job, it’s used within the queue to determine the type of job during querying and dispatching so the job goes to the correct class.


The $handle method is what will be called when your job is invoked by the processor. For our EmailJob job, this is where we’d actually send the email.

$handle is invoked with two different parameters:

  • app: The Grind app instance
  • queue: The queue instance this job was dispatched on

For jobs that have already been created, you can call job.id to get the id of the job.

There are a number of additional methods in the Job class to let you fine tune your job:

$priority allows you to set the priority of the job.

$priority(level)
  • level — An integer or priority name to set the job to.

Priority names map as follows:

  • low => 10
  • normal => 0
  • medium => -5
  • high => -10
  • critical => -15

$delay provides a way to delay a job before it’s processed. By default there will not be any delays, so if the queue is empty immediately upon dispatching a job, it will be available for processing.

$delay(milliseconds)
  • milliseconds — Number of milliseconds to delay the job

$attempts controls how many times a failed job will be retried before permanently failing.

$attempts(attempts)
  • attempts — number of times a job should be retried before failing

$backoff works with $attempts and allows control over how long of a gap there should be between retrying a job.

$backoff(value)

The value for $backoff supports a few different configurations:

// Honor job’s original delay (if set) at each attempt, defaults to fixed backoff 
job.$attempts(3).$backoff(true)
 
// Override delay value, fixed backoff 
job.$attempts(3).$backoff({
  delay: 60 * 1000,
  type: 'fixed'
})
 
// Enable exponential backoff using original delay (if set) 
job.$attempts(3).$backoff({
  type:'exponential'
})
 
// Use a function to get a customized next attempt delay value 
job.$attempts(3).$backoff((attempts, delay)  => {
  return 600
})

When you pass in a function it will be eval’d in another process and not called directly — do not attempt to use any variables/application context outside of the function.


Using $ttl you can set how a long job remains in the queue before it expires.

$ttl(milliseconds)
  • milliseconds — Number of milliseconds a job can remain in the queue before being expired.

$save is used to update an existing job before it’s been processed.

$save(queue)
  • queue — The queue argument is only necessary for new jobs. For jobs that have already been dispatched, no value is needed.

$save will return a promise that will resolve/fail once the job has been updated or placed in the queue


$tojson() is an override point to convert the current job class to JSON before it’s stored in the queue. If your job overrides this, be sure to call super first.


Many of the above methods have a corresponding static property you can set on the job class to provide a default value:

export class EmailJob extends Job {
  static jobName = 'email-job'
 
  static priority = 'normal'
  static removeOnComplete = true
  static attempts = 1
  static backoff = null
  static concurrency = 1
}

Before you can use a job, you’ll first need to register it. Jobs should be registered in app/Providers/JobsProvider.js:

import 'App/Jobs/EmailJob'
 
export function JobsProvider(app) {
  app.queue.register(EmailJob)
}

If you haven’t already created the JobsProvider be sure to register it in Bootstrap, for more information on this see the Providers guide.

To dispatch a job, you’ll create a job instance and call app.queue.dispatch(job):

sendEmail(req, res) {
  return this.app.queue.dispatch(new EmailJob({
    email: req.body.email,
    subject: req.body.subject,
    body: req.body.body,
  })).then(() => res.send({ success: true }))
}

app.queue.dispatch(new EmailJob({ ... }).$delay(1000)) // Delays EmailJob by 1s 

To process jobs you’ve added to the queue, Grind provides a simple queue:work command:

bin/cli queue:work

Once invoked, the command will stay running and process jobs as they arrive in the queue. For large queues, you can run this on multiple servers to maximize performance.

You can also limit the command to only process a single job, allowing for discrete workers:

bin/cli queue:work --job=email-job

Once this is ran, it will only process the EmailJob jobs.

You can look up existing jobs by id or using the query builder.

To retrieve a single job from the queue, use queue.fetchJob:

app.queue.fetchJob(687).then(job => {
  job.body = 'Updated email body…'
  return job.$save()
})

This will fetch job 687 from the queue, update the body and save it.

To retrieve multiple jobs at once, use the query builder via queue.query(). The query builder class has the following methods:

All methods in the query builder are chainable.


Restrict jobs by their current state.

state(state)
  • state — The state for the Jobs you want to restrict to

Valid states are:

  • active
  • inactive
  • failed
  • complete

Using for you can restrict which types of jobs you want to query for by passing in a Job class.

for(jobClass)
  • jobClass — The class for the Jobs you want to restrict to

Heads Up! At this time, you can’t use for() without also using state().


Limit the number of jobs returned

limit(limit)
  • limit — Number of jobs to return

Skip a certain number of jobs before returning

offset(offset)
  • offset — Number of jobs to skip

Change how the returned jobs are ordered.

orderBy(orderBy)
  • orderBy — Direction to sort jobs, acceptable values are asc or desc

Calling first() at the end of a query will return a promise that will resolve to the first job in a query.

first()

// Get a list of jobs being processed: 
this.app.queue.query().for(EmailJob).state('active').then(jobs => {
  for(const job of jobs) {
    Log.comment('Currently sending', job.toJSON())
  }
})
 
// Fetch and update the first inactive job: 
this.app.queue.query().for(EmailJob).state('inactive').first(job => {
  job.body = 'Updated email body…'
  return job.$save()
})
Edit