Desarrollando una API REST con Twisted Klein

Desarrollando una API REST con Twisted Klein



Descarga: emailservice.zip.

Klein es un pequeñísimo web framework montado sobre Twisted, la plataforma de red asincrónica de la que ya hemos hablado bastante, y Werkzeug, una librería para el desarrollo de aplicaciones WSGI. Puesto que sobre esta última se ha desarrollado el microframework Flask, desarrollar aplicaciones en Klein resultará bastante familiar para aquellos que tengan alguna experiencia con él.

Por tratarse de un framework asincrónico, resulta ideal para desarrollar servicios web por cuanto puede atender a múltiples peticionas concurrentemente, a diferencia de los frameworks WSGI ─como Django, web2py, Pyramid─.

En este artículo estaremos desarrollando una pequeña API REST que permitirá enviar correos electrónicos vía SMTP y luego consultar su estado. Esto mejoraría, por ejemplo, la experiencia del usuario en una aplicación web que requiera el envío de correos, ya que no se tendrá que interactuar directamente con el servidor SMTP sino vía HTTP con nuestra API, que responserá considerablemente más rápido.

Instalación

Antes de empezar, instalemos Klein vía pip:

pip install klein

Este comando instalará todas las dependencias, incluyendo Twisted y Werkzeug.

Primeros pasos

Bien, comenzaremos por definir la estructura básica de una aplicación de Klein, que correrá en la dirección local y en el puerto 7001.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from klein import Klein


class EmailService:
    app = Klein()


if __name__ == '__main__':
    emailservice = EmailService()
    emailservice.app.run('localhost', 7001)

A continuación expondremos la ruta /email definiendo el método EmailService.email(), que aceptará dos operaciones: POST, para enviar un correo, y GET, que retornará una lista con los mensajes enviados.

    @app.route('/email', methods=['GET', 'POST'])
    def email(self, request):
        if request.method == b'POST':
            return 'Enviar email.'
        elif request.method == b'GET':
            return 'Retornar lista de emails.'

Como observamos, el segundo argumento de todo método es siempre request, que contiene información sobre la petición HTTP. El valor de retorno de la función será enviado al usuario como respuesta.

Ejecutemos nuestro pequeño código y hagamos algunas pruebas usando la librería Requests (si no la tienes instalada, simplemente ejecuta pip install requests).

>>> import requests
>>> url = 'http://localhost:7001/email'
>>> r = requests.get(url)
>>> r.text
'Retornar lista de emails.'
>>> r = requests.post(url)
>>> r.text
'Enviar email.'

Perfecto, nuestra aplicación responde correctamente a las peticiones. Primero desarrollaremos la operación POST. Haremos que se requieran los parámetros to, subject y message, que indicarán el recipiente del correo electrónico, el asunto y el mensaje.

Podemos acceder a los parámetros de la petición vía el diccionario request.args.

        if request.method == b'POST':
            print(request.args[b'to'])
            print(request.args[b'subject'])
            print(request.args[b'message'])

Si hacemos la siguiente prueba:

>>> params = {'to': 'nombre@ejemplo.com', 'subject': 'Test',
...           'message': '¡Hola, mundo!'}
>>> requests.post(url, data=params)

Veremos en la consola cómo se imprimen los valores de los parámetros.

2018-07-22 15:02:48-0300 [-] [b'nombre@ejemplo.com']
2018-07-22 15:02:48-0300 [-] [b'Test']
2018-07-22 15:02:48-0300 [-] [b'\xc2\xa1Hola, mundo!']

Tendremos en cuenta que los parámetros son siempre listas, por lo que habremos de acceder a su primer elemento, que es una instancia del tipo bytes (nótese la b antes de las cadenas).

Pero antes me interesa chequear que todos los parámetros estén presentes.

        if request.method == b'POST':
            # Look for missing params.
            for param in (b'to', b'message', b'subject'):
                if param not in request.args:
                    return "Missing parameter: '{}'.".format(
                        param.decode('utf-8'))

>>> r = requests.post(url)
>>> r.text
"Missing parameter: 'to'."

Ahora bien, por lo general las API REST trabajan con los formatos JSON y XML en sus respuestas, no con texto plano ni código HTML. Optaremos por JSON, que es la más utilizada. Entonces una respuesta de nuestra aplicación se verá más o menos así:

{'succeed': true, 'status': 200, ...}

Es decir, siempre tendrá los valores succeed y status, que indicarán si la operación se ejecutó correctamente y el código HTTP de retorno (200 indica una consulta exitosa). El resto de los valores retornados dependerá de la operación.

Pues bien, creemos un método que se ocupe de construir este valor de retorno como JSON y establezca la cabecera HTTP apropiada.

    def response(self, request, succeed=True, status=200, **kwargs):
        """
        Build the response body as JSON and set the proper content-type
        header.
        """
        request.setHeader('Content-Type', 'application/json')
        request.setResponseCode(status)
        return json.dumps(
            {'succeed': succeed, 'status': status, **kwargs})

Recordemos importar el módulo json al comienzo del archivo.

import json

Y por último modifiquemos el valor de retorno en caso que esté faltando algún parametro a la operación POST.

                if param not in request.args:
                    return self.response(
                        request,
                        succeed=False,
                        status=400,
                        reason="Missing parameter: '{}'.".format(
                            param.decode('utf-8'))
                    )

>>> r = requests.post(url)
>>> r.json()
{'succeed': False, 'status': 400, 'reason': "Missing parameter: 'to'."}

En este caso estamos retornando un código de error 400, que indica una petición mal formada por el cliente. Puedes ver una lista de los códigos HTTP en este enlace.

Manejando errores

Hay dos casos en los que Klein retornará dos errores HTTP automáticamente. Uno es cuando se intenta acceder a una ruta que no existe (404); otro, cuando se ejecuta una operación no definida sobre una ruta existente (405), por ejemplo, PUT o DELETE en /email. Veremos que la respuesta por defecto es un código HTML.

>>> r = requests.put(url)
>>> r
<Response [405]>
>>> r.text
'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n<title>405 Method Not
Allowed</title>\n<h1>Method Not Allowed</h1>\n<p>The method is not allowed for t
he requested URL.</p>\n'

Nuestra API debería ser capaz de manejar estos errores y retornar, en su lugar, una respuesta JSON acorde al formato que especificamos. Para ello vamos a importar las excepciones correspondientes.

from werkzeug.exceptions import NotFound, MethodNotAllowed

Luego definimos los métodos que manejarán esas excepciones.

    @app.handle_errors(NotFound)
    def notFoundHandler(self, request, failure):
        """
        Called when a 404 not found is raised.
        """
        return self.response(
            request, succeed=False, status=404, reason='Not found.')
    
    @app.handle_errors(MethodNotAllowed)
    def methodNotAllowedHandler(self, request, failure):
        """
        Called when a 405 is raised.
        """
        return self.response(
            request, succeed=False, status=405, reason="Method not allowed.")

Y confirmemos que esto sucede como esperamos:

>>> r = requests.put(url)
>>> r.json()
{'succeed': False, 'status': 405, 'reason': 'Method not allowed.'}
>>> r = requests.get('http://localhost:7001/no-existe')
>>> r.json()
{'succeed': False, 'status': 404, 'reason': 'Not found.'}

Base de datos

Veamos, ahora, cómo almacenar en una base de datos los correos que debe enviar nuestro servicio. Estaremos usando el motor de base de datos SQLite vía el módulo estándar sqlite3, junto con el módulo adbapi de Twisted. Este último nos permite acceder a una API sincrónica ─la de SQLite─ a través de una envoltura asincrónica; hemos hablado sobre ello en el artículo Twisted: web y base de datos.

Antes de eso, crearemos el script initdb.py que se encargará de inicializar la base de datos, esto es, crear el archivo y la tabla correspondiente.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sqlite3

conn = sqlite3.connect('emailservice.db')
cursor = conn.cursor()
cursor.execute(
    'CREATE TABLE emails (id INTEGER PRIMARY KEY, recipient TEXT, sent INTEGER);'
)
conn.commit()
conn.close()

Y ejecutémoslo una vez para que genere el archivo emailservice.db.

Ahora, volviendo a nuestra API web, el primer paso es importar la clase adbapi.ConnectionPool.

from twisted.enterprise.adbapi import ConnectionPool

Luego definiremos la conexión como un atributo.

class EmailService:
    app = Klein()
    conn = ConnectionPool('sqlite3', 'emailservice.db')

Queremos insertar una fila en nuestra tabla cada vez que se haga una petición POST a /email, para que funcione como un registro, y luego se pueda consultar su estado. Creemos el siguiente método para hacer lo primero.

    def insertEmail(self, transaction, to):
        """
        This function gets called by Twisted in a thread so we can
        safely use blocking functions such as execute(). The return
        value is then retrieved to the main thread where
        runInteraction() was called.
        """
        transaction.execute(
            'INSERT INTO emails (recipient, sent) VALUES (?, ?);',
            (to, False)
        )
        return transaction.lastrowid

El primer argumento transaction equivale a un cursor convencional (según el estándar DB-API) para ejecutar consultas (provisto por Twisted), el segundo indica el destinatario del mensaje. Una vez ejecutada la consulta, retornamos el ID único asignado por SQLite.

Dentro de nuestro método email(), luego de la comprobación de los parámetros añadiremos lo siguiente.

            to = request.args[b'to'][0].decode('utf-8')
            # Run the insert query and return the inserted ID.
            lastID = yield self.conn.runInteraction(self.insertEmail, to)
            return self.response(request, emailID=lastID)

Esto hará que Twisted ejecute nuestra consulta en un hilo aparte para que no bloquee a nuestra API. La utilización de yield es similar a la del módulo estándar asyncio: Twisted atenderá otros eventos mientras aguarda el restulado de la consulta. Nos resta, sin embargo, aplicar el decorador inlineCallbacks. Primero importémoslo:

from twisted.internet.defer import inlineCallbacks

Y después lo aplicamos, quedando nuestra función así:

    @app.route('/email', methods=['GET', 'POST'])
    @inlineCallbacks
    def email(self, request):
        if request.method == b'POST':
            # Look for missing params.
            for param in (b'to', b'message', b'subject'):
                if param not in request.args:
                    return self.response(
                        request,
                        succeed=False,
                        status=400,
                        reason="Missing parameter: '{}'.".format(
                            param.decode('utf-8'))
                    )
            to = request.args[b'to'][0].decode('utf-8')
            # Run the insert query and return the inserted ID.
            lastID = yield self.conn.runInteraction(self.insertEmail, to)
            return self.response(request, emailID=lastID)
        
        elif request.method == b'GET':
            return 'Retornar lista de emails.'

Por último, chequeamos que esto funcione según lo planificado.

>>> params = {
...     'to': 'nombre@ejemplo.com',
...     'subject': 'Test',
...     'message': '¡Hola, mundo!'
... }
>>> r = requests.post(url, data=params)
>>> r.json()
{'succeed': True, 'status': 200, 'emailID': 1}

Esta es solo la mitad del trabajo. Resta la acción contraria: retornar la lista de todos los correos o bien alguno en particular, vía la petición GET. Para ello debemos redefinir la estructura de nuestro método para que acepte opcionalmente el ID de un mensaje.

    @app.route('/email', methods=['GET', 'POST'],
               defaults={'emailID': None})
    @app.route('/email/<int:emailID>', methods=['GET'])
    @inlineCallbacks
    def email(self, request, emailID):

Ahora definamos la lógica que obtendrá la información de la base de datos y la retornará al usuario como JSON.

        elif request.method == b'GET':
            if emailID is None:
                # Get the full emails list.
                emails = yield self.query(
                    'SELECT id, recipient, sent FROM emails;')
            else:
                # Get the specified email.
                emails = yield self.query(
                    'SELECT id, recipient, sent FROM emails WHERE id=?',
                    (emailID,)
                )
                if not emails:
                    raise NotFound
            emails = emails_to_json(emails)
            if emailID is not None:
                emails = emails[0]
            response = self.response(request, emails=emails)
            return response

Aquí hemos hecho referencia a dos objetos que aún no definimos, a saber, el método query(), que ejecuta una consulta:

    def query(self, *args):
        """Run a query using the current connection."""
        return self.conn.runQuery(*args)

Y la función emails_to_json() (por fuera de la clase), que convierte el resultado que proviene de SQLite a JSON:

def emails_to_json(rows):
    def row_to_json(row):
        return json.dumps(
            {'id': row[0], 'to': row[1], 'sent': bool(row[2])})
    return tuple(map(row_to_json, rows))

Ahora las pruebas pertinentes:

>>> r = requests.get(url)
>>> r.json()
{'succeed': True, 'status': 200, 'emails': ['{"id": 1, "to": "nombre@ejemplo.com", "sent": false}']}
>>> r = requests.get(url + '/1')
>>> r.json()
{'succeed': True, 'status': 200, 'emails': '{"id": 1, "to": "nombre@ejemplo.com", "sent": false}'}

Enviando correos electrónicos

Comencemos por importar lo necesario: la función sendmail() de Twisted y la clase estándar MIMEText para construir el cuerpo del correo electrónico.

from email.mime.text import MIMEText
# [...]
from twisted.mail.smtp import sendmail

Luego modifiquemos el método email(), para que al recibir una petición POST envíe el mensaje una vez que lo almacenó en la base de datos.

        if request.method == b'POST':
            # [...]
            # Run the insert query and return the inserted ID.
            lastID = yield self.conn.runInteraction(self.insertEmail, to)
            message = MIMEText(
                request.args[b'message'][0].decode("utf-8"))
            message['Subject'] = \
                request.args[b'subject'][0].decode("utf-8")
            message['From'] = USER
            message['To'] = to
            d = sendmail(
                EMAIL_SERVER, USER, [to], message,
                username=USER, password=PASSWORD
            )
            d.addCallback(self.emailSent, lastID)
            return self.response(request, emailID=lastID)

Deberemos crear las constantes EMAIL_SERVER, USER y PASSWORD con los datos del servidor SMTP correspondiente.

Y definamos el evento emailSent(), que será invocado cuando Twisted haya enviado el mensaje correctamente.

    def emailSent(self, result, emailID):
        """Called once the email was sent."""
        emails_sent = result[0]
        if emails_sent == 0:
            return
        self.query('UPDATE emails SET sent=1 WHERE id=?', (emailID,))

Con estos arreglos nuestra API ya estará enviando correos electrónicos efectivamente.

# Enviar email.
>>> r = requests.post(url, data=params)
>>> r.json()
{'succeed': True, 'status': 200, 'emailID': 3}
# Consultar estado.
>>> r = requests.get(url + '/3')
>>> r.json()
{'succeed': True, 'status': 200, 'emails': '{"id": 3, "to": "nombre@ejemplo.com", "sent": true}'}

Caché

Incorporaremos un pequeño sistema de caché en memoria, usando un diccionario.

class EmailService:
    app = Klein()
    cache = {}
    conn = ConnectionPool('sqlite3', 'emailservice.db')

Esto evitará que se ejecute la misma consulta múltiples veces al obtener la lista de mensajes o alguno en particular.

        elif request.method == b'GET':
            # Look for a cached result before hitting the database.
            key = 'emails' if emailID is None else 'email-{}'.format(emailID)
            try:
                return self.cache[key]
            except KeyError:
                pass
            if emailID is None:
                # Get the full emails list.
                emails = yield self.query(
                    'SELECT id, recipient, sent FROM emails;')
            else:
                # Get the specified email.
                emails = yield self.query(
                    'SELECT id, recipient, sent FROM emails WHERE id=?',
                    (emailID,)
                )
                if not emails:
                    raise NotFound
            emails = emails_to_json(emails)
            if emailID is not None:
                emails = emails[0]
            response = self.response(request, emails=emails)
            self.cache[key] = response
            return response

Código completo

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
    REST API example using Twisted Klein.
    Copyright (C) 2018 Recursos Python - reucrsospython.com.
"""

from email.mime.text import MIMEText
import json

from klein import Klein
from twisted.enterprise.adbapi import ConnectionPool
from twisted.internet.defer import inlineCallbacks
from twisted.mail.smtp import sendmail
from werkzeug.exceptions import NotFound, MethodNotAllowed


EMAIL_SERVER = ''
USER = ''
PASSWORD = ''


def emails_to_json(rows):
    def row_to_json(row):
        return json.dumps(
            {'id': row[0], 'to': row[1], 'sent': bool(row[2])})
    return tuple(map(row_to_json, rows))


class EmailService:
    app = Klein()
    cache = {}
    conn = ConnectionPool('sqlite3', 'emailservice.db')
    
    def query(self, *args):
        """Run a query using the current connection."""
        return self.conn.runQuery(*args)
    
    def response(self, request, succeed=True, status=200, **kwargs):
        """
        Build the response body as JSON and set the proper content-type
        header.
        """
        request.setHeader('Content-Type', 'application/json')
        request.setResponseCode(status)
        return json.dumps(
            {'succeed': succeed, 'status': status, **kwargs})
    
    def emailSent(self, result, emailID):
        """Called once the email was sent."""
        emails_sent = result[0]
        if emails_sent == 0:
            return
        self.query('UPDATE emails SET sent=1 WHERE id=?', (emailID,))
    
    def insertEmail(self, transaction, to):
        """
        This function gets called by Twisted in a thread so we can
        safely use blocking functions such as execute(). The return
        value is then retrieved to the main thread where
        runInteraction() was called.
        """
        transaction.execute(
            'INSERT INTO emails (recipient, sent) VALUES (?, ?);',
            (to, False)
        )
        return transaction.lastrowid
    
    @app.handle_errors(NotFound)
    def notFoundHandler(self, request, failure):
        """
        Called when a 404 not found is raised.
        """
        return self.response(
            request, succeed=False, status=404, reason='Not found.')
    
    @app.handle_errors(MethodNotAllowed)
    def methodNotAllowedHandler(self, request, failure):
        """
        Called when a 405 is raised.
        """
        return self.response(
            request, succeed=False, status=405, reason="Method not allowed.")
    
    @app.route('/email', methods=['GET', 'POST'],
               defaults={'emailID': None})
    @app.route('/email/<int:emailID>', methods=['GET'])
    @inlineCallbacks
    def email(self, request, emailID):
        if request.method == b'POST':
            # Look for missing params.
            for param in (b'to', b'message', b'subject'):
                if param not in request.args:
                    return self.response(
                        request,
                        succeed=False,
                        status=400,
                        reason="Missing parameter: '{}'.".format(
                            param.decode('utf-8'))
                    )
            to = request.args[b'to'][0].decode('utf-8')
            # Run the insert query and return the inserted ID.
            lastID = yield self.conn.runInteraction(self.insertEmail, to)
            message = MIMEText(
                request.args[b'message'][0].decode("utf-8"))
            message['Subject'] = \
                request.args[b'subject'][0].decode("utf-8")
            message['From'] = USER
            message['To'] = to
            d = sendmail(
                EMAIL_SERVER, USER, [to], message,
                username=USER, password=PASSWORD
            )
            d.addCallback(self.emailSent, lastID)
            return self.response(request, emailID=lastID)
        
        elif request.method == b'GET':
            # Look for a cached result before hitting the database.
            key = 'emails' if emailID is None else 'email-{}'.format(emailID)
            try:
                return self.cache[key]
            except KeyError:
                pass
            if emailID is None:
                # Get the full emails list.
                emails = yield self.query(
                    'SELECT id, recipient, sent FROM emails;')
            else:
                # Get the specified email.
                emails = yield self.query(
                    'SELECT id, recipient, sent FROM emails WHERE id=?',
                    (emailID,)
                )
                if not emails:
                    raise NotFound
            emails = emails_to_json(emails)
            if emailID is not None:
                emails = emails[0]
            response = self.response(request, emails=emails)
            self.cache[key] = response
            return response


if __name__ == '__main__':
    emailservice = EmailService()
    emailservice.app.run('localhost', 7001)



Deja un comentario