Descarga: emailservice.zip.
Klein es un pequeñísimo web framework montado sobre Twisted, la plataforma de red asincrónica de la que ya hemos hablado bastante, y Werkzeug, una librería para el desarrollo de aplicaciones WSGI. Puesto que sobre esta última se ha desarrollado el microframework Flask, desarrollar aplicaciones en Klein resultará bastante familiar para aquellos que tengan alguna experiencia con él.
Por tratarse de un framework asincrónico, resulta ideal para desarrollar servicios web por cuanto puede atender a múltiples peticionas concurrentemente, a diferencia de los frameworks WSGI ─como Django, web2py, Pyramid─.
En este artículo estaremos desarrollando una pequeña API REST que permitirá enviar correos electrónicos vía SMTP y luego consultar su estado. Esto mejoraría, por ejemplo, la experiencia del usuario en una aplicación web que requiera el envío de correos, ya que no se tendrá que interactuar directamente con el servidor SMTP sino vía HTTP con nuestra API, que responserá considerablemente más rápido.
Instalación
Antes de empezar, instalemos Klein vía pip:
pip install klein
Este comando instalará todas las dependencias, incluyendo Twisted y Werkzeug.
Primeros pasos
Bien, comenzaremos por definir la estructura básica de una aplicación de Klein, que correrá en la dirección local y en el puerto 7001.
#!/usr/bin/env python # -*- coding: utf-8 -*- from klein import Klein class EmailService: app = Klein() if __name__ == '__main__': emailservice = EmailService() emailservice.app.run('localhost', 7001)
A continuación expondremos la ruta /email
definiendo el método EmailService.email()
, que aceptará dos operaciones: POST
, para enviar un correo, y GET
, que retornará una lista con los mensajes enviados.
@app.route('/email', methods=['GET', 'POST']) def email(self, request): if request.method == b'POST': return 'Enviar email.' elif request.method == b'GET': return 'Retornar lista de emails.'
Como observamos, el segundo argumento de todo método es siempre request
, que contiene información sobre la petición HTTP. El valor de retorno de la función será enviado al usuario como respuesta.
Ejecutemos nuestro pequeño código y hagamos algunas pruebas usando la librería Requests (si no la tienes instalada, simplemente ejecuta pip install requests
).
>>> import requests >>> url = 'http://localhost:7001/email' >>> r = requests.get(url) >>> r.text 'Retornar lista de emails.' >>> r = requests.post(url) >>> r.text 'Enviar email.'
Perfecto, nuestra aplicación responde correctamente a las peticiones. Primero desarrollaremos la operación POST
. Haremos que se requieran los parámetros to
, subject
y message
, que indicarán el recipiente del correo electrónico, el asunto y el mensaje.
Podemos acceder a los parámetros de la petición vía el diccionario request.args
.
if request.method == b'POST': print(request.args[b'to']) print(request.args[b'subject']) print(request.args[b'message'])
Si hacemos la siguiente prueba:
>>> params = {'to': 'nombre@ejemplo.com', 'subject': 'Test', ... 'message': '¡Hola, mundo!'} >>> requests.post(url, data=params)
Veremos en la consola cómo se imprimen los valores de los parámetros.
2018-07-22 15:02:48-0300 [-] [b'nombre@ejemplo.com']
2018-07-22 15:02:48-0300 [-] [b'Test']
2018-07-22 15:02:48-0300 [-] [b'\xc2\xa1Hola, mundo!']
Tendremos en cuenta que los parámetros son siempre listas, por lo que habremos de acceder a su primer elemento, que es una instancia del tipo bytes
(nótese la b
antes de las cadenas).
Pero antes me interesa chequear que todos los parámetros estén presentes.
if request.method == b'POST': # Look for missing params. for param in (b'to', b'message', b'subject'): if param not in request.args: return "Missing parameter: '{}'.".format( param.decode('utf-8'))
>>> r = requests.post(url) >>> r.text "Missing parameter: 'to'."
Ahora bien, por lo general las API REST trabajan con los formatos JSON y XML en sus respuestas, no con texto plano ni código HTML. Optaremos por JSON, que es la más utilizada. Entonces una respuesta de nuestra aplicación se verá más o menos así:
{'succeed': true, 'status': 200, ...}
Es decir, siempre tendrá los valores succeed
y status
, que indicarán si la operación se ejecutó correctamente y el código HTTP de retorno (200
indica una consulta exitosa). El resto de los valores retornados dependerá de la operación.
Pues bien, creemos un método que se ocupe de construir este valor de retorno como JSON y establezca la cabecera HTTP apropiada.
def response(self, request, succeed=True, status=200, **kwargs): """ Build the response body as JSON and set the proper content-type header. """ request.setHeader('Content-Type', 'application/json') request.setResponseCode(status) return json.dumps( {'succeed': succeed, 'status': status, **kwargs})
Recordemos importar el módulo json
al comienzo del archivo.
import json
Y por último modifiquemos el valor de retorno en caso que esté faltando algún parametro a la operación POST
.
if param not in request.args: return self.response( request, succeed=False, status=400, reason="Missing parameter: '{}'.".format( param.decode('utf-8')) )
>>> r = requests.post(url) >>> r.json() {'succeed': False, 'status': 400, 'reason': "Missing parameter: 'to'."}
En este caso estamos retornando un código de error 400
, que indica una petición mal formada por el cliente. Puedes ver una lista de los códigos HTTP en este enlace.
Manejando errores
Hay dos casos en los que Klein retornará dos errores HTTP automáticamente. Uno es cuando se intenta acceder a una ruta que no existe (404
); otro, cuando se ejecuta una operación no definida sobre una ruta existente (405
), por ejemplo, PUT
o DELETE
en /email
. Veremos que la respuesta por defecto es un código HTML.
>>> r = requests.put(url) >>> r <Response [405]> >>> r.text '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n<title>405 Method Not Allowed</title>\n<h1>Method Not Allowed</h1>\n<p>The method is not allowed for t he requested URL.</p>\n'
Nuestra API debería ser capaz de manejar estos errores y retornar, en su lugar, una respuesta JSON acorde al formato que especificamos. Para ello vamos a importar las excepciones correspondientes.
from werkzeug.exceptions import NotFound, MethodNotAllowed
Luego definimos los métodos que manejarán esas excepciones.
@app.handle_errors(NotFound) def notFoundHandler(self, request, failure): """ Called when a 404 not found is raised. """ return self.response( request, succeed=False, status=404, reason='Not found.') @app.handle_errors(MethodNotAllowed) def methodNotAllowedHandler(self, request, failure): """ Called when a 405 is raised. """ return self.response( request, succeed=False, status=405, reason="Method not allowed.")
Y confirmemos que esto sucede como esperamos:
>>> r = requests.put(url) >>> r.json() {'succeed': False, 'status': 405, 'reason': 'Method not allowed.'} >>> r = requests.get('http://localhost:7001/no-existe') >>> r.json() {'succeed': False, 'status': 404, 'reason': 'Not found.'}
Base de datos
Veamos, ahora, cómo almacenar en una base de datos los correos que debe enviar nuestro servicio. Estaremos usando el motor de base de datos SQLite vía el módulo estándar sqlite3
, junto con el módulo adbapi
de Twisted. Este último nos permite acceder a una API sincrónica ─la de SQLite─ a través de una envoltura asincrónica; hemos hablado sobre ello en el artículo Twisted: web y base de datos.
Antes de eso, crearemos el script initdb.py
que se encargará de inicializar la base de datos, esto es, crear el archivo y la tabla correspondiente.
#!/usr/bin/env python # -*- coding: utf-8 -*- import sqlite3 conn = sqlite3.connect('emailservice.db') cursor = conn.cursor() cursor.execute( 'CREATE TABLE emails (id INTEGER PRIMARY KEY, recipient TEXT, sent INTEGER);' ) conn.commit() conn.close()
Y ejecutémoslo una vez para que genere el archivo emailservice.db
.
Ahora, volviendo a nuestra API web, el primer paso es importar la clase adbapi.ConnectionPool
.
from twisted.enterprise.adbapi import ConnectionPool
Luego definiremos la conexión como un atributo.
class EmailService: app = Klein() conn = ConnectionPool('sqlite3', 'emailservice.db')
Queremos insertar una fila en nuestra tabla cada vez que se haga una petición POST
a /email
, para que funcione como un registro, y luego se pueda consultar su estado. Creemos el siguiente método para hacer lo primero.
def insertEmail(self, transaction, to): """ This function gets called by Twisted in a thread so we can safely use blocking functions such as execute(). The return value is then retrieved to the main thread where runInteraction() was called. """ transaction.execute( 'INSERT INTO emails (recipient, sent) VALUES (?, ?);', (to, False) ) return transaction.lastrowid
El primer argumento transaction
equivale a un cursor convencional (según el estándar DB-API) para ejecutar consultas (provisto por Twisted), el segundo indica el destinatario del mensaje. Una vez ejecutada la consulta, retornamos el ID único asignado por SQLite.
Dentro de nuestro método email()
, luego de la comprobación de los parámetros añadiremos lo siguiente.
to = request.args[b'to'][0].decode('utf-8') # Run the insert query and return the inserted ID. lastID = yield self.conn.runInteraction(self.insertEmail, to) return self.response(request, emailID=lastID)
Esto hará que Twisted ejecute nuestra consulta en un hilo aparte para que no bloquee a nuestra API. La utilización de yield
es similar a la del módulo estándar asyncio
: Twisted atenderá otros eventos mientras aguarda el restulado de la consulta. Nos resta, sin embargo, aplicar el decorador inlineCallbacks
. Primero importémoslo:
from twisted.internet.defer import inlineCallbacks
Y después lo aplicamos, quedando nuestra función así:
@app.route('/email', methods=['GET', 'POST']) @inlineCallbacks def email(self, request): if request.method == b'POST': # Look for missing params. for param in (b'to', b'message', b'subject'): if param not in request.args: return self.response( request, succeed=False, status=400, reason="Missing parameter: '{}'.".format( param.decode('utf-8')) ) to = request.args[b'to'][0].decode('utf-8') # Run the insert query and return the inserted ID. lastID = yield self.conn.runInteraction(self.insertEmail, to) return self.response(request, emailID=lastID) elif request.method == b'GET': return 'Retornar lista de emails.'
Por último, chequeamos que esto funcione según lo planificado.
>>> params = { ... 'to': 'nombre@ejemplo.com', ... 'subject': 'Test', ... 'message': '¡Hola, mundo!' ... } >>> r = requests.post(url, data=params) >>> r.json() {'succeed': True, 'status': 200, 'emailID': 1}
Esta es solo la mitad del trabajo. Resta la acción contraria: retornar la lista de todos los correos o bien alguno en particular, vía la petición GET
. Para ello debemos redefinir la estructura de nuestro método para que acepte opcionalmente el ID de un mensaje.
@app.route('/email', methods=['GET', 'POST'], defaults={'emailID': None}) @app.route('/email/<int:emailID>', methods=['GET']) @inlineCallbacks def email(self, request, emailID):
Ahora definamos la lógica que obtendrá la información de la base de datos y la retornará al usuario como JSON.
elif request.method == b'GET': if emailID is None: # Get the full emails list. emails = yield self.query( 'SELECT id, recipient, sent FROM emails;') else: # Get the specified email. emails = yield self.query( 'SELECT id, recipient, sent FROM emails WHERE id=?', (emailID,) ) if not emails: raise NotFound emails = emails_to_json(emails) if emailID is not None: emails = emails[0] response = self.response(request, emails=emails) return response
Aquí hemos hecho referencia a dos objetos que aún no definimos, a saber, el método query()
, que ejecuta una consulta:
def query(self, *args): """Run a query using the current connection.""" return self.conn.runQuery(*args)
Y la función emails_to_json()
(por fuera de la clase), que convierte el resultado que proviene de SQLite a JSON:
def emails_to_json(rows): def row_to_json(row): return json.dumps( {'id': row[0], 'to': row[1], 'sent': bool(row[2])}) return tuple(map(row_to_json, rows))
Ahora las pruebas pertinentes:
>>> r = requests.get(url) >>> r.json() {'succeed': True, 'status': 200, 'emails': ['{"id": 1, "to": "nombre@ejemplo.com", "sent": false}']} >>> r = requests.get(url + '/1') >>> r.json() {'succeed': True, 'status': 200, 'emails': '{"id": 1, "to": "nombre@ejemplo.com", "sent": false}'}
Enviando correos electrónicos
Comencemos por importar lo necesario: la función sendmail()
de Twisted y la clase estándar MIMEText
para construir el cuerpo del correo electrónico.
from email.mime.text import MIMEText # [...] from twisted.mail.smtp import sendmail
Luego modifiquemos el método email()
, para que al recibir una petición POST envíe el mensaje una vez que lo almacenó en la base de datos.
if request.method == b'POST': # [...] # Run the insert query and return the inserted ID. lastID = yield self.conn.runInteraction(self.insertEmail, to) message = MIMEText( request.args[b'message'][0].decode("utf-8")) message['Subject'] = \ request.args[b'subject'][0].decode("utf-8") message['From'] = USER message['To'] = to d = sendmail( EMAIL_SERVER, USER, [to], message, username=USER, password=PASSWORD ) d.addCallback(self.emailSent, lastID) return self.response(request, emailID=lastID)
Deberemos crear las constantes EMAIL_SERVER
, USER
y PASSWORD
con los datos del servidor SMTP correspondiente.
Y definamos el evento emailSent()
, que será invocado cuando Twisted haya enviado el mensaje correctamente.
def emailSent(self, result, emailID): """Called once the email was sent.""" emails_sent = result[0] if emails_sent == 0: return self.query('UPDATE emails SET sent=1 WHERE id=?', (emailID,))
Con estos arreglos nuestra API ya estará enviando correos electrónicos efectivamente.
# Enviar email. >>> r = requests.post(url, data=params) >>> r.json() {'succeed': True, 'status': 200, 'emailID': 3} # Consultar estado. >>> r = requests.get(url + '/3') >>> r.json() {'succeed': True, 'status': 200, 'emails': '{"id": 3, "to": "nombre@ejemplo.com", "sent": true}'}
Caché
Incorporaremos un pequeño sistema de caché en memoria, usando un diccionario.
class EmailService: app = Klein() cache = {} conn = ConnectionPool('sqlite3', 'emailservice.db')
Esto evitará que se ejecute la misma consulta múltiples veces al obtener la lista de mensajes o alguno en particular.
elif request.method == b'GET': # Look for a cached result before hitting the database. key = 'emails' if emailID is None else 'email-{}'.format(emailID) try: return self.cache[key] except KeyError: pass if emailID is None: # Get the full emails list. emails = yield self.query( 'SELECT id, recipient, sent FROM emails;') else: # Get the specified email. emails = yield self.query( 'SELECT id, recipient, sent FROM emails WHERE id=?', (emailID,) ) if not emails: raise NotFound emails = emails_to_json(emails) if emailID is not None: emails = emails[0] response = self.response(request, emails=emails) self.cache[key] = response return response
Código completo
#!/usr/bin/env python # -*- coding: utf-8 -*- """ REST API example using Twisted Klein. Copyright (C) 2018 Recursos Python - reucrsospython.com. """ from email.mime.text import MIMEText import json from klein import Klein from twisted.enterprise.adbapi import ConnectionPool from twisted.internet.defer import inlineCallbacks from twisted.mail.smtp import sendmail from werkzeug.exceptions import NotFound, MethodNotAllowed EMAIL_SERVER = '' USER = '' PASSWORD = '' def emails_to_json(rows): def row_to_json(row): return json.dumps( {'id': row[0], 'to': row[1], 'sent': bool(row[2])}) return tuple(map(row_to_json, rows)) class EmailService: app = Klein() cache = {} conn = ConnectionPool('sqlite3', 'emailservice.db') def query(self, *args): """Run a query using the current connection.""" return self.conn.runQuery(*args) def response(self, request, succeed=True, status=200, **kwargs): """ Build the response body as JSON and set the proper content-type header. """ request.setHeader('Content-Type', 'application/json') request.setResponseCode(status) return json.dumps( {'succeed': succeed, 'status': status, **kwargs}) def emailSent(self, result, emailID): """Called once the email was sent.""" emails_sent = result[0] if emails_sent == 0: return self.query('UPDATE emails SET sent=1 WHERE id=?', (emailID,)) def insertEmail(self, transaction, to): """ This function gets called by Twisted in a thread so we can safely use blocking functions such as execute(). The return value is then retrieved to the main thread where runInteraction() was called. """ transaction.execute( 'INSERT INTO emails (recipient, sent) VALUES (?, ?);', (to, False) ) return transaction.lastrowid @app.handle_errors(NotFound) def notFoundHandler(self, request, failure): """ Called when a 404 not found is raised. """ return self.response( request, succeed=False, status=404, reason='Not found.') @app.handle_errors(MethodNotAllowed) def methodNotAllowedHandler(self, request, failure): """ Called when a 405 is raised. """ return self.response( request, succeed=False, status=405, reason="Method not allowed.") @app.route('/email', methods=['GET', 'POST'], defaults={'emailID': None}) @app.route('/email/<int:emailID>', methods=['GET']) @inlineCallbacks def email(self, request, emailID): if request.method == b'POST': # Look for missing params. for param in (b'to', b'message', b'subject'): if param not in request.args: return self.response( request, succeed=False, status=400, reason="Missing parameter: '{}'.".format( param.decode('utf-8')) ) to = request.args[b'to'][0].decode('utf-8') # Run the insert query and return the inserted ID. lastID = yield self.conn.runInteraction(self.insertEmail, to) message = MIMEText( request.args[b'message'][0].decode("utf-8")) message['Subject'] = \ request.args[b'subject'][0].decode("utf-8") message['From'] = USER message['To'] = to d = sendmail( EMAIL_SERVER, USER, [to], message, username=USER, password=PASSWORD ) d.addCallback(self.emailSent, lastID) return self.response(request, emailID=lastID) elif request.method == b'GET': # Look for a cached result before hitting the database. key = 'emails' if emailID is None else 'email-{}'.format(emailID) try: return self.cache[key] except KeyError: pass if emailID is None: # Get the full emails list. emails = yield self.query( 'SELECT id, recipient, sent FROM emails;') else: # Get the specified email. emails = yield self.query( 'SELECT id, recipient, sent FROM emails WHERE id=?', (emailID,) ) if not emails: raise NotFound emails = emails_to_json(emails) if emailID is not None: emails = emails[0] response = self.response(request, emails=emails) self.cache[key] = response return response if __name__ == '__main__': emailservice = EmailService() emailservice.app.run('localhost', 7001)
Curso online 👨💻
¡Ya lanzamos el curso oficial de Recursos Python en Udemy!
Un curso moderno para aprender Python desde cero con programación orientada a objetos, SQL y tkinter
en 2024.
Consultoría 💡
Ofrecemos servicios profesionales de desarrollo y capacitación en Python a personas y empresas. Consultanos por tu proyecto.