Steve Davis
Using SQS to Create a Connected Workflow from Disconnected Systems
Two people collaboarting on work

We recently worked with a client that has a variety of hardware instruments generating loads of data that is consumed down the road by an assortment of software systems and applications. These software systems function independently (they don't need to communicate to do their job), but in a big picture view one system could be reliant on the results of another — it is a workflow with a lot of manual intervention. The dirty work to generate the data users care about was a distraction that we saw an opportunity to remove.

The Existing Workflow

Our main job was to modernize and implement new features on a DNA analysis system. Our client starts with a large collection of DNA samples and runs them through Next-Generation Sequencing machines to generate raw genomic sequence data of their samples. This raw sequence output becomes the input to our system — after we churn the raw sequence data through our pipeline the output is what our users care about.

It sounds like a simple input/output system, so what's the problem? The next-gen sequencers can run for 48+ hours and produce data in the order of magnitude of gigabytes. The data lives on the sequencer until it is done and can be moved. Much of the time, the user is waiting around until the right time that the data (1) is complete and (2) moved to an offsite storage location. It's only then that our user can manually observe that the data is available and move it to a location where the analysis system can access it. The user must then know which data they are looking for and tell the system to analyze our data. But that still isn't the end of the process— the user has to come back and check to see if the analysis is complete. This "workflow" is quite simple in the steps it takes, but relies completely on the availability of the user and their attention to manually initiate each step.

Using modern cloud-computing technologies, we aimed to make this a completely automated workflow that allowed the user to focus on what they care about, the results of the analysis system.

The Solution

During this project, our client was going through a technology stack update and started to heavily utilize Amazon AWS to host applications and store data. Our analysis system was being hosted on an Amazon EC2 instance and all of the raw data from the sequencers was being stored in an S3 bucket. This presented an opportunity to make use of other Amazon AWS services to allow us to streamline our disconnected workflow.

Amazon SQS

Amazon SQS (Simple Queue Service) is a utility on AWS that allows users to broadcast messages for listeners (client applications) to consume and act upon. With the sequencer output data being pushed up to AWS S3, we decided to use that knowledge to automatically create a message that would be broadcast over SQS when the data is available and our analysis system could consume and act on. This essentially allowed us to created channels of communication without having to build expensive APIs for these independent systems to communicate over.

The Implementation

The DNA analysis system is as Ruby-on-Rails (RoR) project that performs all the pipeline processing and data analysis. Our requirements for this automated implementation required (1) the SQS processing to run in the background and (2) run on a schedule to check for newly available sequencing output. With this in mind, we decided to use a long-polling technique with the AWS SDK ruby gem.

We wanted to provide future flexibility with our SQS message, so we decided that the body of the message should be JSON and we would parse that in our job:

{ 'data_location' : '<path to sequencer output in S3>'}

We started by creating a RoRs ApplicationJob that could be started and run in the background. The basic skeleton of the job performs our necessary work:

class CheckSqsForRunsJob < ApplicationJob                                       
  queue_as :default                                                             
                                                                                
  def perform(*args)                                                            
    return if ENV['AWS_SQS_QUEUE'].blank?                                       
                                                                                
    # Check our configured SQS queue for any messages                           
    sqs_client = Aws::SQS::Client.new()                                         
    resp = sqs_client.receive_message(queue_url: ENV['AWS_SQS_QUEUE'], visibility_timeout: 120)
                                                                                
    if resp.messages.size > 0                                                   
      msg = resp.messages.first                                                 
      msg_body = JSON.parse(msg.body)                                           
                                                                                
      if msg_body.include?('data_location')                                        
        # Do our work of pulling down the data and starting the processing pipeline, all which used to be done manually                                                             
      end                                                                       
                                                                                
      # Now that we read the message, delete it from the queue                  
      sqs_client.delete_message(queue_url: ENV['AWS_SQS_QUEUE'], receipt_handle: msg.receipt_handle)
    end                                                                         
                                                                                
    # Lastly, schedule our next SQS check job                                   
    CheckSqsForRunsJob.set(wait: 1.hour).perform_later()                        
  end                                                                           
end   

The job is actually quite simple but can be extremely powerful:

  1. We first make sure that our application is configured to talk to AWS
  2. Read from our configured SQS queue
  3. Ensure that our required attribute is present in our message
  4. If so, go off and do our processing
  5. Remove our message from the queue so it is not read again
  6. Setup the polling so that we check for new output every hour

This is a powerful technique that allow us to automatically check for available input, start processing it and create output valuable to our user. We only were missing one final piece: starting this SQS check when the application starts, so it is a fully automated process. For that, we made use of the RoRs initializers that allow you to configure a Rails application (or perform operations) as the system comes up.

Rails.application.config.after_initialize do                                    
  # If AWS SQS Queue is setup poll SQS                                          
  if ENV['AWS_SQS_QUEUE'].present? && ENV['AWS_INPUT_BUCKET'].present?          
    CheckSqsForRunsJob.perform_later()                                          
  end                                                                           
end

A very simple, but effective way to initiate our SQS polling that will run every hour, as long as the application is up.

The Result

Usually when you set out to connect different applications and systems, you focus on defining protocols of communication and creating APIs that each software system should implement so each piece can talk to the other. Using a much simpler method of communication, we were able to help our clients achieve a completely automated analysis pipeline that relied output derived from physical hardware to produce meaningful analysis results— which in the end is all our users care about.

The process is roughly:

(1) Next-gen sequencer -> 
      (2) upload to S3 -> 
            (3) message to analysis system -> 
                  (4) data download -> 
                        (5) analysis system processing -> 
                              (6) user analysis

Before we created this process, each one of these steps required manual user intervention to ensure the workflow continued and easily could introduce delays as users were out of the office or simply were too busy to initiate the next step. With all this automated, our clients are able to simplify focus on analyzing their results and performing the science they care about.