multiprocessing – Comunicación entre procesos



Versión: 2.6+, 3.x.

El módulo estándar multiprocessing nos provee varios métodos para compartir información entre dos o más procesos, por lo que es necesario conocerlos a todos para decidir cuál se ajusta mejor a vuestras necesidades.

Punto de partida

Recordemos la forma en la que creamos un nuevo proceso.

from multiprocessing import Process

def f():
    print("Hola, mundo!")

def main():
    p = Process(target=f)
    p.start()

if __name__ == "__main__":
    main()

Cabe aclarar que es particularmente importante la penúltima línea para el correcto funcionamiento del código.

Ahora bien, suponiendo que queremos indicarle a nuestro proceso hijo algunas personas a las que tiene que saludar, utilizamos el parámetro args para pasar una lista. Por lo tanto, la función pasa a ser:

def f(names):
    for name in names:
        print("Hello, {0}!".format(name))

Y luego, la llamada:

    names = ["Pedro", "Juan", "Jorge"]
    p = Process(target=f, args=(names,))

Como todos los objetos en Python son pasados «por referencia», convencionalmente podríamos modificar el valor de names desde f() para agregar el nombre Luis.

def f(names):
    for name in names:
        print("Hello, {0}!".format(name))
    names.append("Luis")

names = ["Pedro", "Juan", "Jorge"]
f(names)
print(names)

El resultado en pantalla sería el siguiente.

Hello, Pedro!
Hello, Juan!
Hello, Jorge!
['Pedro', 'Juan', 'Jorge', 'Luis']

Sin embargo, recordemos que nuestra función f en el ejemplo inicial se ejecuta en otro proceso, es decir, en un espacio de memoria diferente. Por lo tanto, desde el proceso hijo no es posible añadir un nombre a la lista y observar los cambios en el padre, pues lo que obtenemos no es más que una mera copia del objeto original que está alojada en una región diferente. Observamos este comportamiento en el siguiente código.

def f(names):
    for name in names:
        print("Hello, {0}!".format(name))
    names.append("Luis")

def main():
    names = ["Pedro", "Juan", "Jorge"]
    p = Process(target=f, args=(names,))
    p.start()
    p.join()  # Esperar a que finalice la ejecución del proceso.
    print(names)

Imprime:

['Pedro', 'Juan', 'Jorge']

La lista original se mantiene intacta.

Objetos compartidos

Todos los objetos pasados vía args al utilizar la clase Process son serializados utilizando pickle y enviados a través de un pipe al proceso hijo. De la misma forma pueden utilizarse diversas herramientas para realizar este procedimiento durante la ejecución del programa. Por ejemplo, multiprocessing provee un sistema de colas similar al módulo queue con la seguridad necesaria para ser utilizado por múltiples procesos.

from multiprocessing import Process, Queue

def f(q):
    q.put([10.5, False, "Recursos Python"])

def main():
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())

if __name__ == "__main__":
    main()

Los objetos añadidos a la cola son transferidos entre procesos de la misma forma comentada anteriormente. Este sistema resulta ideal cuando se quiere compartir información entre más de dos procesos. Debido a las limitaciones de pickle, puede que rara vez te encuentres con algún PickleError por la incapacidad de serializar un objeto.

Internamente las colas utilizan una vía de comunicación llamada Pipe, similar a la arquitectura cliente-servidor del módulo socket, para transferir los datos serializados. Por lo tanto, si la comunicación debe ser entre únicamente dos procesos, puede ser conveniente utilizar pipes.

from multiprocessing import Process, Pipe

def f(pipe):
    pipe.send([10.5, False, "Recursos Python"])

def main():
    server_pipe, client_pipe = Pipe()
    p = Process(target=f, args=(client_pipe,))
    p.start()
    print(server_pipe.recv())

if __name__ == "__main__":
    main()

De esta manera, el proceso padre actúa como servidor y el hijo como cliente. La clase Pipe retorna ambos «lados» de la conexión, y la parte del cliente es enviada al proceso hijo. Ambas partes pueden escribir (send) y leer (recv) datos, a menos que el parámetro opcional duplex en Pipe sea False. En este caso, el servidor solo puede recibir y el cliente solo puede enviar.

Aún más internamente se utiliza un sistema de sincronización denominado Lock, justamente un «bloqueo» para evitar que varios procesos accedan a una misma información al mismo tiempo. El lock es adquirido por un proceso al realizar una operación de lectura o escritura y, al finalizar, es liberado. El objeto lock es compartido por todos los procesos y de esta manera un proceso no puede acceder a datos que estén siendo utilizados por otro. Raramente se presentan ocasiones para utilizar este tipo de sincronización directamente, pero la documentación oficial lo ha dotado de una buena utilidad: acceder a la consola de forma ordenada, evitando que los mensajes de un proceso se superpongan con los de otro.

# Ejemplo de la documentación oficial.
from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    print 'hello world', i
    l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

Memoria compartida

Por último, multiprocessing también nos permite utilizar realmente memoria compartida. El proceso padre es dueño de un objeto al cual el resto puede acceder para leer o escribir. A diferencia de los métodos comentados anteriormente, los procesos hijos no obtienen una copia del objeto sino que acceden al original, por lo que solo pueden utilizarse los tipos que provee ctypes.

El objeto puede ser uno de lo siguientes o bien un vector de cualquiera de ellos.

'c': ctypes.c_char
'u': ctypes.c_wchar
'b': ctypes.c_byte
'B': ctypes.c_ubyte
'h': ctypes.c_short
'H': ctypes.c_ushort
'i': ctypes.c_int
'I': ctypes.c_uint
'l': ctypes.c_long
'L': ctypes.c_ulong
'f': ctypes.c_float
'd': ctypes.c_double

La letra corresponde al identificador de cada tipo, pasado como argumento a Value o Array.

El siguiente ejemplo simula el funcionamiento de Process.join() pero utilizando memoria compartida, un objeto del tipo ctypes.c_ubyte que indica si el proceso hijo a finalizado con sus tareas.

from multiprocessing import Process, Value
from time import sleep

def f(finished):
    sleep(5)  # Simular un procedimiento.
    finished.value = 1

def main():
    finished = Value("B")
    finished.value = 0
    p = Process(target=f, args=(finished,))
    p.start()

    while not finished.value:
        pass

if __name__ == "__main__":
    main()

Un vector de caracteres puede ser utilizado para representar una cadena al estilo C.

from multiprocessing import Process, Array
from time import sleep

def f(string):
    sleep(5)
    string.value = "Finalizado"

def main():
    # 10 indica el tamaño del vector.
    string = Array("c", 10)
    string.value = "Iniciado"
    p = Process(target=f, args=(string,))
    p.start()

    while not string.value == "Finalizado":
        pass

if __name__ == "__main__":
    main()

El módulo se encarga de utilizar un lock internamente para evitar la superposición al acceder al objeto compartido.



Deja una respuesta