Amazon SQS FIFO

Amazon SQS is a distributed queue system that enables web service applications to quickly and reliably queue messages that one component in the application generates to be consumed by another component. Starting from the end of 2016 you can create a SQS FIFO.
FIFO queues are designed to ensure that the order in which messages are sent and received is strictly preserved and that each message is processed exactly once.
Here you can read more about the release of SQS FIFO:

These pictures found online clearly explain the differences between normal SQS and FIFO SQS:

sqs_standard
sqs_fifo

In this post we are going to see how to create a SQS FIFO queue, how to send some messages and how to consume them using Python. I am using Python 3.4 and the official AWS SDK: Boto3.

Connect to the sqs resource and create a new FIFO queue:

Please be aware that the name of a FIFO queue must end with the suffix .fifo and that FIFO queues are currently available only in the Oregon (us-west-2) and Ohio (us-east-2) regions. This feature will be available in more regions in the coming months.

The ‘ContentBasedDeduplication’ : ‘true’ attribute can be used when the messages are unique (usually a single producer and consumer). Here you can read more about content duplication in the FIFO queue: Recommendations for FIFO Queues

To send a message, use the send_message method:

In FIFO queues, messages are ordered based on message group ID. FIFO queue logic applies only per message group ID. Each message group ID represents a distinct ordered message group within an Amazon SQS queue.

To consume the messages in queue use the receive_message method:

Use the following lines to test that the messages are processed Firs-in First-Out:

You can run the code against a standard SQS queue and you will see that the messages are not consumed in order.
If your applications require messages to be processed in a strict sequence and exactly once you can use SQS FIFO.
Consider to use a SQS (both standard or FIFO) when:

when_use_sqs

Here you can find the code shown in this post: mz1991/AWS SQS FIFO

Here here some useful resources:

Machine learning with Tensorflow and Elasticsearch

In this post we are going to see how to build a machine learning system to perform the image recognition task. The image recognition is the process of identifying and detecting an object or a feature in a digital image or video. The tools that we will use are the following:

  • Amazon S3 bucket
  • Amazon Simple Queue Service
  • Google TensorFlow machine learning library
  • Elasticsearch

The idea is to build a system that will process the image recognition task against some images stored in a S3 bucket and will index the results to Elasticsearch.
The library used for the image recognition task is TensorFlow.
TensorFlow is an open source software library for numerical computation using data flow graphs. Nodes in the graph represent mathematical operations, while the graph edges represent the multidimensional data arrays (tensors) communicated between them. The flexible architecture allows you to deploy computation to one or more CPUs or GPUs in a desktop, server, or mobile device with a single API. TensorFlow was originally developed by researchers and engineers working on the Google Brain Team within Google’s Machine Intelligence research organization for the purposes of conducting machine learning and deep neural networks research, but the system is general enough to be applicable in a wide variety of other domains as well. You can read more about it here.

These are the main steps performed in the process:

  • Upload image to S3 bucket
  • Event notification from S3 to a SQS queue
  • Event consumed by a consumer
  • Image recognition on the image by TensorFlow
  • The result of the classification is indexed in Elasticsearch
  • Search in Elasticsearch by tags

This image shows the main steps of the process:

flowimgjpg

 

Event notifications

When an image is uploaded to the S3 bucket a message will be stored to a Amazon SQS queue. To configure the S3 Bucket and to read the queue programmatically you can read my previous post:
Amazon S3 event notifications to SQS

Consume messages from Amazon SQS queue

Now that the S3 bucket is configured, when an image is uploaded to the bucket an event will be notified and stored to the SQS queue. We are going to build a consumer to read this notification, download the image from the S3 bucket and perform the image classification using Tensorflow.

With this code you can read the messages from a SQS queue and download the image from the S3 bucket and store it locally (ready for the image classification task):

Image recognition task

Now that the image (originally uploaded to S3) has been downloaded we can use Tensorflow to run the image recognition task.
The model used by Tensorflow for the image recognition task is the Inception-V3. It achieved a 3.46% error rate in the ImageNet competition. You can read more about it here: Inception-V3 and here: Tensorflow image recognition.

I used the Tensorflow Python API, you can install it using Pip:

You can find all the information about Setup and Install here: Download and Setup Tensorflow.Here you can find an official code lab by Google:  Tensorflow for poets.

So, starting from the classify_image.py code (you can find it on Github: classify_image.py) I created a Python module that given the local path of an image (the one previously downloaded from S3) returns a dictionary with the result of the classification.
The result of the classification consists of a set of tags (the objects recognized in the image) and scores (the score represents the probability of a correct classification. The scores sum to one).

So, calling the function run_image_recognition with the image path as argument, will return a dictionary with the result of the classification.

In the previously shown code, the Tensorflow built-in functions definition are not reported (you can find them in the Github repository I linked).
The first time you will run the image classification task, the model (Inception-V3) will be downloaded and stored to your file system (it is around 300MB)

Index to Elasticsearch

So given an image we have now a set of tags that classify our image. We want now to index these tags to Elasticsearch. To do that I created a new index called imagerepository and a new type called image.

The image type we are going to create will have the following properties:

  • title: the title of the image
  • s3_location: the link to the S3 resource
  • tags: field that will contain the result of the classification task

For the tags property I used the Nested datatype. It allows arrays of objects to be indexed and queried independently of each other.
You can read more about it here:
Nested datatype
Nested query

We will not store the image to Elasticsearch but just the URL of the image within the S3 bucket.

New Index:

New Type:

You can now try to post a test document:

We can index a new document using the Elasitcsearch Python SDK.

Search

Now that we indexed our documents in Elasticsearch we can search for them.
This is an example of queries we can run:

  • Give me all the images that represent this object (searching by tag = object_name)
  • What does this image (give the title) represent?
  • Give me all the images that represent this object with at least 90% of probability (search by tag = object_name and score >= 0.9)

I wrote some Sense queries.

Images that represent a waterfall:

Images that represent a pizza with at least 90% of probability:

In this post we have seen how to combine the powerful machine learning library Tensorflow to perform a image recognition task and the search power of Elasticsearch to index the image classification results. The process pipeline includes also a S3 bucket (where the images are stored) and a SQS Queue used to receive event notifications when a new image is stored to S3 (and it is ready for the image classification task).

I ran this demo using the following environment configuration:

  • Elasticsearch 5.0.0
  • Python 3.4
  • tensorflow-0.11.0rc2
  • Ubuntu 14.04

Amazon S3 event notifications and Simple Queue Service

The Amazon S3 notification feature enables you to receive notifications when certain events happen in your bucket. You can be notified when a new object is added to the bucket or an existing object is overwritten or delete.
Right now the event notifications can be published to the following destinations:

  • Amazon Simple Notification Service (Amazon SNS) topic: a flexible, fully managed push messaging service. Using this service, you can push messages to mobile devices or distributed services. You can find more information here: Amazon SNS.
  • Amazon Simple Queue Service (Amazon SQS) queue: a scalable and fully managed message queuing service. You can find more information here: Amazon SQS.
  • AWS Lambda: compute service that makes it easy for you to build applications that respond quickly to new information. AWS Lambda runs your code in response to events such as image uploads, in-app activity, website clicks, or outputs from connected devices. You can find more information here: AWS Lambda.

In this post, we are going to publish S3 notifications to an Amazon Simple Queue queue.
Amazon SQS is a distributed queue system that enables web service applications to quickly and reliably queue messages that one component in the application generates to be consumed by another component.

You can receive notifications for any or all of the following S3 events:

  • s3:ObjectCreated:Put – An object was created by an HTTP PUT operation.
  • s3:ObjectCreated:Post – An object was created by HTTP POST operation.
  • s3:ObjectCreated:Copy – An object was created an S3 copy operation.
  • s3:ObjectCreated:CompleteMultipartUpload – An object was created by the completion of a S3 multi-part upload.
  • s3:ObjectCreated:* – An object was created by one of the event types listed above or by a similar object creation event added in the future.
  • s3:ReducedRedundancyObjectLost – An S3 object stored with Reduced Redundancy has been lost.

To create a new Amazon SQS queue select the SQS service from the AWS services dashboard.
new-sqs

You have to provide the queue name and if you wish edit the default settings (like max message size). Once you created the queue you can see all the details as the URL and the ARN endpoint (we will use this endpoint later). The Amazon Resource Names (ARNs) uniquely identify AWS resources.

queue-details

Now that the queue has been created we need to configure our S3 bucket. From the S3 bucket properties, under the event section, you can select the events for which you would like to be notified. In the example I selected all the creation events (PUT/POST HTTP method, Copy and Multi part Upload).
select-events
You have now to select the notifications destination (between the one listed before) and type in the ARN of the destination queue.
sqs_arn
With these simple steps when an event happens in our S3 bucket a notification will be sent to the SQS queue.

We can now upload a document to the S3 bucket and see that a message is stored in the queue.
The stored notification has the following format (you can notice the key and size of the uploaded file and the bucket details):

The official AWS SDK for Python, Boto3 , allows you to interact with the SQS queue programmatically.
I ran this example on Ubuntu 14.04 with Python 3.4.
First of all we need to create a new service object that will represent our queue service by providing our Amazon AWS keys and region (we specify the resource we are going to use, ‘sqs’).

Once that the service has been initialized, we can connect to a specific queue by specifying its name.

We can now receive/read a number of messages (max 10 messages).

Be aware that “Amazon SQS provides a loose-FIFO capability that attempts to preserve the order of messages. However, Amazon SQS has been designed to be massively scalable using a distributed architecture, thus, it does not guarantee that the messages will be received in the exact same order as they have been sent (FIFO).
If your system requires the order of messages to be preserved, place sequencing information in each message so that messages can be ordered when they are received
“.

When you use the receive_messages method, you should consider that “If the number of messages in the queue is small (less than 1000), it is likely you will get fewer messages than you requested per ReceiveMessage call. If the number of messages in the queue is extremely small, you might not receive any messages in a particular ReceiveMessage response; in which case you should repeat the request” (this is what’s written in the official SDK documentation: SQS.Queue.receive_messages).

The SDK allows you also to write to the queue by simply calling the send_message method and specifying the message body of your message.

Here you can find the Boto3 SQS official documentation.

The S3 bucket event notifications is a useful feature that allows you to perform a specific task as soon an object has been created, modified or deleted within the bucket. The SQS queue, where the messages can be stored, is fast, reliable, scalable, secure and simple to use (you can get started with SQS by using only three APIs: SendMessage, ReceiveMessage, and DeleteMessage).