multiprocessing – Tareas concurrentes con procesos



Indroducción

multiprocessing es un paquete que permite crear nuevos procesos utilizando un API similar a la del módulo threading. Debido a que utiliza subprocesos en lugar de hilos (threads), permite llevar a cabo varias operaciones concurrentes sin las limitaciones del Global Interpreter Lock. Corre en sistemas Unix y Windows.

¿Threads o procesos?

El GIL de CPython evita que múltiples threads ejecuten bytecode simultáneamente. Este bloqueo es necesario porque la gestión de memoria en CPython no es segura para múltiples hilos. Véase el artículo en ingles GlobalInterpreterLock. El usuario puede suponer que las diversas tareas se ejecutan al mismo tiempo, gracias a la ilusión que se produce cuando el procesador alterna la ejecución de cada una de ellas rápidamente. Por ende, la simultaneidad en procesadores de un único núcleo no es real, tanto para hilos como para procesos.

Para procesadores de múltiples núcleos, hago las siguientes recomendaciones. Los hilos son una buena alternativa para lograr concurrencia en tareas de entrada y salida (por ejemplo, lectura y escritura de archivos en disco), caso único en donde la simultaneidad es real (véase Buscador multiplataforma de archivos iguales). Sin embargo, pueden ser útiles también para evitar bloqueos simples (por más que no haya simultaneidad real), por ejemplo, para separar operaciones pesadas del bucle principal de una aplicación gráfica.

Otras implementaciones de Python como Jython o IronPython carecen de GIL, por lo que pueden explotar al máximo los beneficios de la programación multi-hilo. Sackless Python permite ejecutar grandes cantidades de “microthreads”, “tasklets” o “hilos livianos” (véase StacklessPython).

La programación multi-proceso resulta una óptima alternativa, limpia y eficiente. El paquete incluye módulos para intercambiar objetos entre los procesos (incluyendo dos métodos de comunicación), compartir memoria (poco recomendado) y sincronizarlos.

Aplicación

Al tratarse de un API similar a la del módulo threading, primero veamos cómo se lanzan nuevos hilos con dicho paquete:

from threading import Thread

def say_hello(name):
    print("Hello, %s!" % name)

if __name__ == "__main__":
    t = Thread(target=say_hello, args=("world",))
    t.start()
    t.join()

En la clase Thread se pasa una función para ser ejecutada en otro hilo (target) junto con sus argumentos (args). Luego, se llama al método start() para iniciar la ejecución y join() para esperar a que finalice.

Ahora veamos cómo se logra esto con procesos:

from multiprocessing import Process

def say_hello(name):
    print("Hello, %s!" % name)

if __name__ == '__main__':
    p = Process(target=say_hello, args=("world",))
    p.start()
    p.join()

Process sigue el API de Thread.

También es posible lanzar un nuevo proceso heredando desde la clase Process:

from multiprocessing import Process

class ConcurrentProcess(Process):
    
    def say_hello(self, name):
        print("Hello, %s!" % name)
    
    def run(self):
        self.say_hello("world")

if __name__ == "__main__":
    p = ConcurrentProcess()
    p.start()
    p.join()

O:

from multiprocessing import Process

class ConcurrentProcess(Process):
    
    def run(self):
        print("Hello, world!")

if __name__ == "__main__":
    p = ConcurrentProcess()
    p.start()
    p.join()

El método run() es llamada una vez iniciado el proceso.

Ejemplo

Descargar 9 archivos HTML, dividiendo la tarea en tres subprocesos. Desde un principio, se insertan los URLs en una cola y cada uno de ellos es removido por alguno de los subprocesos para llevar a cabo la tarea.

Python 2

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from multiprocessing import Process, Queue
from urllib2 import urlopen, URLError
from Queue import Empty

# Carácteres alfanuméricos.
NAME_VALID_CHARS = [chr(i) for i in range(48, 58) + range(97, 123)]

def chars_filter(s):
    """Remover carácteres inválidos."""
    return "".join(
        [c if c in NAME_VALID_CHARS else "" for c in s.lower()]
    )

def download_page_content(url):
    print "Descargando %s..." % url
    try:
        r = urlopen(url)
    except URLError as e:
        print "Error al acceder a %s." % url
        print e
    else:
        filename = chars_filter(url.lower()) + ".html"
        try:
            f = open(filename, "w")
        except IOError as e:
            print "Error al abrir %s." % filename
            print e
        else:
            f.write(r.read())
            f.close()
            r.close()

def worker(queue):
    """
    Toma un ítem de la cola y descarga su contenido,
    hasta que la misma se encuentre vacía.
    """
    while True:
        try:
            url = queue.get_nowait()
        except Empty:
            break
        else:
            download_page_content(url)

def main():
    urls = (
        "http://python.org/",
        "http://perl.org/",
        "http://ruby-lang.org/",
        "http://rust-lang.org/",
        "http://php.net/",
        "http://stackless.com/",
        "http://pypy.org/",
        "http://jython.org/",
        "http://ironpython.net/"
    )
    
    queue = Queue(9)
    for url in urls:
        queue.put(url)
    
    processes = []
    for i in range(3):
        processes.append(Process(target=worker, args=(queue,)))
        processes[i].start()
        print "Proceso %d lanzado." % (i + 1)
    
    for process in processes:
        process.join()
    
    print u"La ejecución a concluído."

if __name__ == "__main__":
    main()

Python 3

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from multiprocessing import Process, Queue
from urllib.request import urlopen
from urllib.error import URLError
from time import time
from queue import Empty

# Carácteres alfanuméricos.
NAME_VALID_CHARS = [chr(i) for i in list(range(48, 58)) + list(range(97, 123))]

def chars_filter(s):
    """Remover carácteres inválidos."""
    return "".join(
        [c if c in NAME_VALID_CHARS else "" for c in s.lower()]
    )

def download_page_content(url):
    print("Downloading %s..." % url)
    try:
        r = urlopen(url)
    except URLError as e:
        print("Error al acceder a %s." % url)
        print(e)
    else:
        filename = chars_filter(url.lower()) + ".html"
        try:
            f = open(filename, "w")
        except IOError as e:
            print("Error al abrir %s." % filename)
            print(e)
        else:
            f.write(r.read())
            f.close()
            r.close()

def worker(queue):
    """
    Toma un ítem de la cola y descarga su contenido,
    hasta que la misma se encuentre vacía.
    """
    while True:
        try:
            url = queue.get_nowait()
        except Empty:
            break
        else:
            download_page_content(url)

def main():
    urls = (
        "http://python.org/",
        "http://perl.org/",
        "http://ruby-lang.org/",
        "http://rust-lang.org/",
        "http://php.net/",
        "http://stackless.com/",
        "http://pypy.org/",
        "http://jython.org/",
        "http://ironpython.net/"
    )
    
    queue = Queue(9)
    for url in urls:
        queue.put(url)
    
    processes = []
    for i in range(3):
        processes.append(Process(target=worker, args=(queue,)))
        processes[i].start()
        print("Proceso %d lanzado." % (i + 1))
    
    for process in processes:
        process.join()
    
    print("La ejecución a concluído.")

if __name__ == "__main__":
    main()

Versión

Python 2, Python 3



5 comentarios.

  1. El ejemplo de Descargar 9 archivos HTML de python 3 me daba un problema y lo he solucionado poniendo la escritura a archivo en binario.
    f = open(filename, “wb”)

  2. hola.
    Existe una solución usando python para ejecutar n temporizaciones en paralelo ?
    Por ejemplo arranco un motor y quiero que este encendido solo 40 minutos, en seguida
    arranco otro motor que quiero que este encendido otros 40 minutos, y así sucesivamente hasta 20 motores .
    o sea que este temporizando cada motor.
    pd.. En python
    De antemano gracias.

Deja un comentario