Handling Amazon SNS notifications with a Tastypie Resource

Using Django and Tastypie, we automagically respond to SNS subscription requests. After that part is handled, the notification messages start coming in and those are used to trigger an SQS polling cycle (trying to do a thorough job there which may seem like an overkill but it's not). A received SQS message is parsed and contents are passed to an external program that forks and exits which keeps the request from blocking.

from django.conf import settings
from tastypie import fields, http
from tastypie.resources import Resource
from tastypie.bundle import Bundle
from tastypie.authentication import Authentication
from tastypie.authorization import Authorization
from tastypie.throttle import BaseThrottle
import boto.sq 
from boto.sqs.message import Message
from urlparse import urlparse
import base64, httplib, tempfile, subprocess, time, json, os, sys, syslog

# Http://django-tastypie.readthedocs.org/en/latest/non_orm_data_sources.html
class NotificationObject(object):
    def __init__(self, initial=None):
        self.__dict__['_data'] = {}
        if hasattr(initial, 'items'):
            self.__dict__['_data'] = initial
    def __getattr__(self, name):
        return self._data.get(name, None)
    def __setattr__(self, name, value):
        self.__dict__['_data'][name] = value

class NotificationResource(Resource):
    sns_messageid = fields.CharField(attribute='MessageId')
    sns_timestamp = fields.CharField(attribute='Timestamp')
    sns_topicarn = fields.CharField(attribute='TopicArn')
    sns_type = fields.CharField(attribute='Type')
    sns_unsubscribeurl = fields.CharField(attribute='UnsubscribeURL')
    sns_subscribeurl = fields.CharField(attribute='SubscribeURL')
    sns_token = fields.CharField(attribute='Token')
    sns_message = fields.CharField(attribute='Message')
    sns_subject = fields.CharField(attribute='Subject')
    sns_signature = fields.CharField(attribute='Signature')
    sns_signatureversion = fields.CharField(attribute='SignatureVersion')
    sns_signingcerturl = fields.CharField(attribute='SigningCertURL')

    class Meta:
        resource_name = 'notification'
        object_class = NotificationObject
        fields = ['sns_messageid']
        list_allowed_methods = ['post']
        authentication = Authentication()
        authorization = Authorization()

    def get_resource_uri(self, bundle_or_obj):
        return ''

    def obj_create(self, bundle, request=None, **kwargs):

        bundle.obj = NotificationObject(initial={ 'MessageId': '', 'Timestamp': '', 'TopicArn': '', 'Type': '', 'UnsubscribeURL': '', 'SubscribeURL': '', 'Token': '', 'Message': '', 'Subject': '', 'Signature': '', 'SignatureVersion': '', 'SigningCertURL': '' })
        bundle = self.full_hydrate(bundle)

        o = urlparse(bundle.data['SigningCertURL'])
        if not o.hostname.endswith('.amazonaws.com'):
            return bundle

        topicarn = bundle.data['TopicArn']

        if topicarn != settings.SNS_TOPIC:
            return bundle

        if not self.verify_message(bundle):
            return bundle

        if bundle.data['Type'] == 'SubscriptionConfirmation':
            self.process_subscription(bundle)
        elif bundle.data['Type'] == 'Notification':
            self.process_notification(bundle)

        return bundle

    def process_subscription(self, bundle):
        syslog.syslog('SNS Subscription ' + bundle.data['SubscribeURL'])
        o = urlparse(bundle.data['SubscribeURL'])
        conn = httplib.HTTPSConnection(o.hostname)
        conn.putrequest('GET', o.path + '?' + o.query)
        conn.endheaders()
        response = conn.getresponse()
        subscription = response.read()

    def process_notification(self, bundle):
        sqs = boto.sqs.connect_to_region(settings.SQS_REGION)
        queue = sqs.lookup(settings.SQS_QUEUE)
        retries = 5
        done = False
        while True:
            if retries < 1:
                break
            retries -= 1
            time.sleep(5)
            messages = queue.get_messages(10, visibility_timeout=60)
            if len(messages) < 1:
                continue
            for message in messages:
                try:
                    m = json.loads(message.get_body())
                    m['return_sns_region'] = settings.SNS_REGION
                    m['return_sns_topic'] = settings.SNS_TOPIC
                    m['return_sqs_region'] = settings.SQS_REGION
                    m['return_sqs_queue'] = settings.SQS_QUEUE
                    process = subprocess.Popen(['/usr/bin/nice', '-n', '15', os.path.dirname(os.path.normpath(os.sys.modules[settings.SETTINGS_MODULE].__file__)) + '/process.py', base64.b64encode(json.dumps(m))], shell=False)
                    process.wait()
                except:
                    e = sys.exc_info()[1]
                    syslog.syslog(str(e))
                queue.delete_message(message)

    def verify_message(self, bundle):
        message = u''
        if bundle.data['Type'] == 'SubscriptionConfirmation':
            message += 'Message\n'
            message += bundle.data['Message'] + '\n'
            message += 'MessageId\n'
            message += bundle.data['MessageId'] + '\n'
            message += 'SubscribeURL\n'
            message += bundle.data['SubscribeURL'] + '\n'
            message += 'Timestamp\n'
            message += bundle.data['Timestamp'] + '\n'
            message += 'Token\n'
            message += bundle.data['Token'] + '\n'
            message += 'TopicArn\n'
            message += bundle.data['TopicArn'] + '\n'
            message += 'Type\n'
            message += bundle.data['Type'] + '\n'
        elif bundle.data['Type'] == 'Notification':
            message += 'Message\n'
            message += bundle.data['Message'] + '\n'
            message += 'MessageId\n'
            message += bundle.data['MessageId'] + '\n'
            if bundle.data['Subject'] != '':
                message += 'Subject\n'
                message += bundle.data['Subject'] + '\n'
            message += 'Timestamp\n'
            message += bundle.data['Timestamp'] + '\n'
            message += 'TopicArn\n'
            message += bundle.data['TopicArn'] + '\n'
            message += 'Type\n'
            message += bundle.data['Type'] + '\n'
        else:
            return False

        o = urlparse(bundle.data['SigningCertURL'])
        conn = httplib.HTTPSConnection(o.hostname)
        conn.putrequest('GET', o.path)
        conn.endheaders()
        response = conn.getresponse()
        cert = response.read()

        # ok; attempt to use m2crypto failed, using openssl command line tool instead

        file_cert = tempfile.NamedTemporaryFile(mode='w', delete=False)
        file_sig = tempfile.NamedTemporaryFile(mode='w', delete=False)
        file_mess = tempfile.NamedTemporaryFile(mode='w', delete=False)

        file_cert.write(cert)
        file_sig.write(bundle.data['Signature'])
        file_mess.write(message)

        file_cert.close()
        file_sig.close()
        file_mess.close()

        # see: https://async.fi/2011/10/sns-verify-sh/
        verify_process = subprocess.Popen(['/usr/local/bin/sns-verify.sh', file_cert.name, file_sig.name, file_mess.name], shell=False)
        verify_process.wait()

        if verify_process.returncode == 0:
            return True

        return False

That process.py would be something like:

#!/usr/bin/env python

import boto.sqs
from boto.sqs.message import Message
import base64, json, os, sys, syslog

if len(sys.argv) != 2:
    sys.exit('usage: %s <base64 encoded json object>' % (sys.argv[0], ))

m = json.loads(base64.b64decode(sys.argv[1]))

# http://code.activestate.com/recipes/66012-fork-a-daemon-process-on-unix/
try:
    pid = os.fork()
    if pid > 0:
        sys.exit(0)
except OSError, e:
    print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
    sys.exit(1)

os.chdir("/")
os.setsid()
os.umask(0)

try:
    pid = os.fork()
    if pid > 0:
        sys.exit(0)
except OSError, e:
    sys.exit(1)

syslog.syslog(sys.argv[0] + ': ' + str(m))

# ...

That is, process.py gets the received (and doped) SQS message, Base64 encoded, as it's only command line argument, forks, exits and does what it's supposed to do after that on its own. Control returns to NotificationResource so the request doesn't block unnecessarily.

Tagged with:

Categorised as:


sns-verify.sh

#!/bin/sh

if [ $# -lt 3 ]; then
  echo "usage: sns-verify.sh CERT SIG MESS"
  exit 1
fi

CERT=$1
SIG=$2
MESS=$3

PUB=`/bin/tempfile`
SIGRAW=`/bin/tempfile`

# http://sns-public-resources.s3.amazonaws.com/SNS_Message_Signing_Release_Note_Jan_25_2011.pdf
/usr/bin/openssl x509 -in $CERT -pubkey -noout > $PUB
/usr/bin/base64 -i -d $SIG > $SIGRAW
RET=`/usr/bin/openssl dgst -sha1 -verify $PUB -signature $SIGRAW $MESS`

if [ X"$RET" = X"Verified OK" ]; then
  exit 0
fi

exit 1

Tagged with:

Categorised as:


S3 client-upload parameter generation with Python

# http://aws.amazon.com/articles/1434   
def S3UploadParams(bucket_name, object_name, expiry, maxsize, redirect):
    import os, boto, json, base64, hmac, hashlib
    from time import time, gmtime, strftime

    def SignS3Upload(policy_document):
        policy = base64.b64encode(policy_document)
        return base64.b64encode(hmac.new(
                boto.config.get('Credentials', 'aws_secret_access_key'),
                policy,
                hashlib.sha1
                ).digest())

    def GenerateS3PolicyString(bucket_name, object_name, expiry, maxsize, redirect):
        policy_template = '{ "expiration": "%s", "conditions": [ {"bucket": "%s"}, ["eq", "$key", "%s"], {"acl": "private"}, {"success_action_redirect": "%s"}, ["content-length-range", 0, %s] ] }'
        return policy_template % (
            strftime("%Y-%m-%dT%H:%M:%SZ", gmtime(time() + expiry)),
            bucket_name,
            object_name,
            redirect,
            maxsize
            )
    
    params = {
        'key': object_name,
        'AWSAccessKeyId': boto.config.get('Credentials', 'aws_access_key_id'),
        'acl': 'private',
        'success_action_redirect': redirect,
        }

    policy = GenerateS3PolicyString(bucket_name, object_name, expiry, maxsize, redirect)
    params['policy'] = base64.b64encode(policy)

    signature = SignS3Upload(policy)
    params['signature'] = signature

    return params

Tagged with:

Categorised as:


Reading EC2 tags with Boto

(Ouch! Looks like WordPress update to 3.1.3 wiped all the modifications I made to the default theme. Admittedly I should've seen that coming.) What I want to do is basically attach a key-value pair to an EC2 instance when launching it in AWS Management Console and read the value inside the instance when it's running. To be more specific, I use this to to set a key called environment that can have values like dev, stage and prod so that the Django config can decide which database to connect to etc. while starting up. I suspect that in Boto the current instance can somehow be referenced in a more direct fashion but this works as well. First, append the following to /etc/profile:
# See: http://stackoverflow.com/questions/625644/find-out-the-instance-id-from-within-an-ec2-machine
export EC2_INSTANCE_ID="`wget -q -O - http://169.254.169.254/latest/meta-data/instance-id || die \"wget instance-id has failed: $?\"`"
test -n "$EC2_INSTANCE_ID" || die 'cannot obtain instance-id'
export EC2_AVAIL_ZONE="`wget -q -O - http://169.254.169.254/latest/meta-data/placement/availability-zone || die \"wget availability-zone has failed: $?\"`"
test -n "$EC2_AVAIL_ZONE" || die 'cannot obtain availability-zone'
export EC2_REGION="`echo \"$EC2_AVAIL_ZONE\" | sed -e 's:\\([0-9][0-9]*\\)[a-z]*\\$:\\\\1:'`"
Now we know the region and instance ID. Next, install Boto by running the following commands:
wget "http://boto.googlecode.com/files/boto-2.0b4.tar.gz"
zcat boto-2.0b4.tar.gz | tar xfv -
cd boto-2.0b4
python ./setup.py install
Then, add these lines to ~/.profile:
export AWS_ACCESS_KEY_ID=<ACCESS_KEY>
export AWS_SECRET_ACCESS_KEY=<SECRET_KEY>
Or the equivalent in ~/.boto:
[Credentials]
aws_access_key_id = <ACCESS_KEY>
aws_secret_access_key = <SECRET_KEY>
Now, to read the tag we want in Python:
#!/usr/bin/env python                                                                                                                                           

import os
from boto import ec2

ec2_instance_id = os.environ.get('EC2_INSTANCE_ID')
ec2_region = os.environ.get('EC2_REGION')

conn = ec2.connect_to_region(ec2_region)

reservations = conn.get_all_instances()
instances = [i for r in reservations for i in r.instances]

for instance in instances:
    if instance.__dict__['id'] == ec2_instance_id:
        print instance.__dict__['tags']['environment']

Tagged with:

Categorised as:


Verifying Amazon SNS Messages with PHP

Messages sent by Amazon Simple Notification Service are signed, and checking that any received message is indeed from AWS and not from some douche trying to outsmart you is not very hard (nor should it be optional, for that matter): sns-verify.php The verify_sns() function expects the message in JSON format, plus region (e.g. "eu-west-1"), numerical account ID without dashes and an array containing the topics you're interested in. The code will verify both SubscriptionConfirmation and Notification messages. It loads the certificate from the address in SigningCertURL field to check against for each message separately because the certificate changes over time, as described here. It is also checked that the host where the certificate is loaded from is in the amazonaws.com domain. Example usage where subscriptions are automatically confirmed:
require_once('sns-verify.php');

if($_SERVER['REQUEST_METHOD'] != 'POST') {
    logger('Not POST request, quitting');
    exit();
}

$post = file_get_contents('php://input');

if(!verify_sns($post, 'REGION', 'ACCOUNT', array('TOPIC 1', 'TOPIC 2'))) {
    exit;
}

$msg = json_decode($post);


if($msg->Type == 'SubscriptionConfirmation') {
    logger('SNS SubscriptionConfirmation received);
    file_get_contents($msg->SubscribeURL);
} elseif($msg->Type == 'Notification') {
    logger('SNS Notification received);
    process_message($msg);
}

Tagged with:

Categorised as: