Deduplicating SQS messages using DynamoDB in Perl

Amazon SQS is a massively scalable, highly available message queuing system.  It is capable of handling virtually unlimited amounts of traffic, but it has one particularly awkward issue: duplicate messages.  They are relatively infrequent, but they can have serious consequences, ie. debiting a bank account a second time.

SQS does provide a solution to this in the form of FIFO queues, but they have some drawbacks.  First, they are limited to about 300 messages/second (which may not be an issue for everyone), but more importantly they simply aren’t available in many regions.  At the time of this writing they are only operational in US West and US East.

If you have higher performance requirements or you aren’t lucky enough to be in a region that supports it, you still have options.  One solution is to architect your code such that the operations are idempotent, meaning they can execute multiple times without causing problems.  If the task is doing something like refreshing a cached webpage, you might not have to even consider idempotency as an infrequent duplicate refresh of a web page may be inconsequential.

On the other hand, sometimes operations absolutely must never happen twice, or they can be so computationally expensive that avoiding them is critical.  One of the easiest ways to achieve this is to simply assign a GUID to each message and deduplicate them.  The solution I have used is to combine DynamoDB and SQS.  DynamoDB is also highly scalable, with default capacity limits per table of 40,000 reads and writes per second.

To do this in Perl, I’m using Paws.  For clarity, I’m skipping error handling and retry logic.

First, we have to create a table in DynamoDB.  My structure is simple, a key string column named job_id and an integer column called taken.

store a record in DynamoDB containing the GUID, and an attribute indicating that the job hasn’t been taken.

use Data::UUID;
use Paws;
use JSON::XS;
# store the job id in DynamoDB
my $job_id = Data::UUID->new->create_str;
my $dynamodb = Paws->service('DynamoDB', region => 'us-west-2');
$dynamodb->PutItem(
   TableName => "sqs_job",
   Item => {
      job_id => {
         S => $job_id,
      },
      taken => {
         N => 0,
      },
   job_result => { S => " " },
});

# send the message
my $sqs = Paws->service('SQS', region => 'us-west-2');
my $queue_name = "my-queue-name";
my $queue_url = $sqs->GetQueueUrl(QueueName => $queue_name)->QueueUrl;
$sqs->SendMessage(MessageBody => encode_json({ job_id => $job_id }), QueueUrl => $queue_url);

In the receiver we use a conditional update expression to set taken=1 if and only if taken=0.  Otherwise, we exit and avoid duplicate processing.

my $recv = $sqs->ReceiveMessage(QueueUrl => $queue_url, MaxNumberOfMessages => 1, WaitTimeSeconds => $timeout);
foreach my $raw_message (@{$recv->Messages}) {
   my $message = decode_json($raw_message->{Body});
# ensure that SQS doesn't resend it by deleting it
   $sqs->DeleteMessage(QueueUrl => $queue_url, ReceiptHandle =>  $raw_message->{ReceiptHandle});
   # get the job_id from the payload (this could be done with a message attribute instead)
   my $job_id = $message->{job_id};
   eval {
      my $res = Screencap::DynamoDB->Get->UpdateItem(
         TableName => "sqs_job",
         Key => {
            job_id => {
               S => $job_id,
            },
         },
         ConditionExpression => "taken = :zero", # only update if taken=0
         UpdateExpression => "SET taken = :one", # set taken=1
         ExpressionAttributeValues => {
            ":zero" => {
               N => 0,
            },
            ":one" => {
               N => 1,
            },
         },
      );
   }
   if ($@ =~ /^The conditional request failed/) { # this is the error message when a conditional expression fails, if it does we skip processing because we've already handled it
      next;
   }
   # handle the message
}

And that’s it.  You’ll want to add a lot of different error handling for this to be very reliable, though.