Handling Amazon SNS notifications with a Tastypie Resource
Using Django and Tastypie, we automagically respond to SNS subscription requests. After that part is handled, the notification messages start coming in and those are used to trigger an SQS polling cycle (trying to do a thorough job there which may seem like an overkill but it's not). A received SQS message is parsed and contents are passed to an external program that forks and exits which keeps the request from blocking.
from django.conf import settings from tastypie import fields, http from tastypie.resources import Resource from tastypie.bundle import Bundle from tastypie.authentication import Authentication from tastypie.authorization import Authorization from tastypie.throttle import BaseThrottle import boto.sq from boto.sqs.message import Message from urlparse import urlparse import base64, httplib, tempfile, subprocess, time, json, os, sys, syslog # Http://django-tastypie.readthedocs.org/en/latest/non_orm_data_sources.html class NotificationObject(object): def __init__(self, initial=None): self.__dict__['_data'] = {} if hasattr(initial, 'items'): self.__dict__['_data'] = initial def __getattr__(self, name): return self._data.get(name, None) def __setattr__(self, name, value): self.__dict__['_data'][name] = value class NotificationResource(Resource): sns_messageid = fields.CharField(attribute='MessageId') sns_timestamp = fields.CharField(attribute='Timestamp') sns_topicarn = fields.CharField(attribute='TopicArn') sns_type = fields.CharField(attribute='Type') sns_unsubscribeurl = fields.CharField(attribute='UnsubscribeURL') sns_subscribeurl = fields.CharField(attribute='SubscribeURL') sns_token = fields.CharField(attribute='Token') sns_message = fields.CharField(attribute='Message') sns_subject = fields.CharField(attribute='Subject') sns_signature = fields.CharField(attribute='Signature') sns_signatureversion = fields.CharField(attribute='SignatureVersion') sns_signingcerturl = fields.CharField(attribute='SigningCertURL') class Meta: resource_name = 'notification' object_class = NotificationObject fields = ['sns_messageid'] list_allowed_methods = ['post'] authentication = Authentication() authorization = Authorization() def get_resource_uri(self, bundle_or_obj): return '' def obj_create(self, bundle, request=None, **kwargs): bundle.obj = NotificationObject(initial={ 'MessageId': '', 'Timestamp': '', 'TopicArn': '', 'Type': '', 'UnsubscribeURL': '', 'SubscribeURL': '', 'Token': '', 'Message': '', 'Subject': '', 'Signature': '', 'SignatureVersion': '', 'SigningCertURL': '' }) bundle = self.full_hydrate(bundle) o = urlparse(bundle.data['SigningCertURL']) if not o.hostname.endswith('.amazonaws.com'): return bundle topicarn = bundle.data['TopicArn'] if topicarn != settings.SNS_TOPIC: return bundle if not self.verify_message(bundle): return bundle if bundle.data['Type'] == 'SubscriptionConfirmation': self.process_subscription(bundle) elif bundle.data['Type'] == 'Notification': self.process_notification(bundle) return bundle def process_subscription(self, bundle): syslog.syslog('SNS Subscription ' + bundle.data['SubscribeURL']) o = urlparse(bundle.data['SubscribeURL']) conn = httplib.HTTPSConnection(o.hostname) conn.putrequest('GET', o.path + '?' + o.query) conn.endheaders() response = conn.getresponse() subscription = response.read() def process_notification(self, bundle): sqs = boto.sqs.connect_to_region(settings.SQS_REGION) queue = sqs.lookup(settings.SQS_QUEUE) retries = 5 done = False while True: if retries < 1: break retries -= 1 time.sleep(5) messages = queue.get_messages(10, visibility_timeout=60) if len(messages) < 1: continue for message in messages: try: m = json.loads(message.get_body()) m['return_sns_region'] = settings.SNS_REGION m['return_sns_topic'] = settings.SNS_TOPIC m['return_sqs_region'] = settings.SQS_REGION m['return_sqs_queue'] = settings.SQS_QUEUE process = subprocess.Popen(['/usr/bin/nice', '-n', '15', os.path.dirname(os.path.normpath(os.sys.modules[settings.SETTINGS_MODULE].__file__)) + '/process.py', base64.b64encode(json.dumps(m))], shell=False) process.wait() except: e = sys.exc_info()[1] syslog.syslog(str(e)) queue.delete_message(message) def verify_message(self, bundle): message = u'' if bundle.data['Type'] == 'SubscriptionConfirmation': message += 'Message\n' message += bundle.data['Message'] + '\n' message += 'MessageId\n' message += bundle.data['MessageId'] + '\n' message += 'SubscribeURL\n' message += bundle.data['SubscribeURL'] + '\n' message += 'Timestamp\n' message += bundle.data['Timestamp'] + '\n' message += 'Token\n' message += bundle.data['Token'] + '\n' message += 'TopicArn\n' message += bundle.data['TopicArn'] + '\n' message += 'Type\n' message += bundle.data['Type'] + '\n' elif bundle.data['Type'] == 'Notification': message += 'Message\n' message += bundle.data['Message'] + '\n' message += 'MessageId\n' message += bundle.data['MessageId'] + '\n' if bundle.data['Subject'] != '': message += 'Subject\n' message += bundle.data['Subject'] + '\n' message += 'Timestamp\n' message += bundle.data['Timestamp'] + '\n' message += 'TopicArn\n' message += bundle.data['TopicArn'] + '\n' message += 'Type\n' message += bundle.data['Type'] + '\n' else: return False o = urlparse(bundle.data['SigningCertURL']) conn = httplib.HTTPSConnection(o.hostname) conn.putrequest('GET', o.path) conn.endheaders() response = conn.getresponse() cert = response.read() # ok; attempt to use m2crypto failed, using openssl command line tool instead file_cert = tempfile.NamedTemporaryFile(mode='w', delete=False) file_sig = tempfile.NamedTemporaryFile(mode='w', delete=False) file_mess = tempfile.NamedTemporaryFile(mode='w', delete=False) file_cert.write(cert) file_sig.write(bundle.data['Signature']) file_mess.write(message) file_cert.close() file_sig.close() file_mess.close() # see: https://async.fi/2011/10/sns-verify-sh/ verify_process = subprocess.Popen(['/usr/local/bin/sns-verify.sh', file_cert.name, file_sig.name, file_mess.name], shell=False) verify_process.wait() if verify_process.returncode == 0: return True return False
That process.py
would be something like:
#!/usr/bin/env python import boto.sqs from boto.sqs.message import Message import base64, json, os, sys, syslog if len(sys.argv) != 2: sys.exit('usage: %s <base64 encoded json object>' % (sys.argv[0], )) m = json.loads(base64.b64decode(sys.argv[1])) # http://code.activestate.com/recipes/66012-fork-a-daemon-process-on-unix/ try: pid = os.fork() if pid > 0: sys.exit(0) except OSError, e: print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror) sys.exit(1) os.chdir("/") os.setsid() os.umask(0) try: pid = os.fork() if pid > 0: sys.exit(0) except OSError, e: sys.exit(1) syslog.syslog(sys.argv[0] + ': ' + str(m)) # ...
That is, process.py
gets the received (and doped) SQS message, Base64 encoded, as it's only command line argument, forks, exits and does what it's supposed to do after that on its own. Control returns to NotificationResource
so the request doesn't block unnecessarily.
Categorised as: snippet