Paralelizar con Python

Paralelizar bucle for con Python

Joaquín Amat Rodrigo
Noviembre, 2020

Más sobre ciencia de datos en cienciadedatos.net

Introducción


Este documento contiene ejemplos de cómo paralelizar bucles for con python. En concreto, se muestra cómo utilizar las funcionalidades de paralelizado que ofrecen las librerías multiprocessing y joblib.

Para las dos librerías, se muestra cómo paralelizar una función sencilla y cómo paralelizar el entrenamiento de modelos de scikit-learn.

Ejemplo 1: suma acumulada


Secuencial

In [1]:
# secuencial (no paralelizado)
# ==============================================================================
import pandas as pd
import numpy as np

# Se define la función
def suma_acumulada(number):
    return sum(range(1, number + 1))

# Lista de elementos sobre los que se quiere aplicar la función
valores = [10**8, 10**8, 10**8, 10**8, 10**8]
In [2]:
%%time

# Aplicar la función sobre cada elemento de forma secuencial
resultados = []

for valor in valores:
    resultado = suma_acumulada(valor)
    resultados.append(resultado)
    
resultados
CPU times: user 9.28 s, sys: 32.2 ms, total: 9.31 s
Wall time: 9.26 s
Out[2]:
[5000000050000000,
 5000000050000000,
 5000000050000000,
 5000000050000000,
 5000000050000000]

multiprocessing Pool.map

In [3]:
# multiprocessing Pool.map
# ==============================================================================
import pandas as pd
import numpy as np
import multiprocessing

# Se define la función
def suma_acumulada(number):
    return sum(range(1, number + 1))

# Lista de elementos sobre los que se quiere aplicar la función
valores = [10**8, 10**8, 10**8, 10**8, 10**8]
In [4]:
%%time

# Aplicar la función sobre cada elemento en paralelo
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
resultados = pool.map(suma_acumulada, valores)
resultados
CPU times: user 5.92 ms, sys: 28.1 ms, total: 34 ms
Wall time: 3 s
Out[4]:
[5000000050000000,
 5000000050000000,
 5000000050000000,
 5000000050000000,
 5000000050000000]

joblib

In [5]:
# joblib
# ==============================================================================
import pandas as pd
import numpy as np
import multiprocessing
from joblib import Parallel, delayed

# Se define la función
def suma_acumulada(number):
    return sum(range(1, number + 1))

# Lista de elementos sobre los que se quiere aplicar la función
valores = [10**8, 10**8, 10**8, 10**8, 10**8]
In [6]:
%%time

# Aplicar la función sobre cada elemento en paralelo
n_jobs = multiprocessing.cpu_count()
Parallel(n_jobs=n_jobs)(delayed(suma_acumulada)(i) for i in valores)
CPU times: user 29.4 ms, sys: 43.3 ms, total: 72.7 ms
Wall time: 3.13 s
Out[6]:
[5000000050000000,
 5000000050000000,
 5000000050000000,
 5000000050000000,
 5000000050000000]

Ejemplo 2: entrenamiento de modelos


Secuencial

In [7]:
# Secuencial (no paralelizado)
# ==============================================================================
import pandas as pd
import numpy as np
from sklearn.datasets import load_boston
from sklearn.ensemble import RandomForestRegressor

# Datos de entrenamiento
X, y = load_boston(return_X_y=True)

# Valores sobre los que iterar en paralelo
list_n_estimators = [1000, 2000, 5000, 10000]

# Función de entrenamiento
def train_model(X, y, n_estimators):
    
    model = RandomForestRegressor(
                n_estimators = n_estimators,
                n_jobs       = 1,
                random_state = 123
            )
    
    model.fit(X, y)
    
    return model
In [8]:
%%time
modelos = []

for n_estimators in list_n_estimators:
    modelo = train_model(X, y, n_estimators)
    modelos.append(modelo)
CPU times: user 55.8 s, sys: 488 ms, total: 56.3 s
Wall time: 56.3 s

multiprocessing Pool.map

In [9]:
# Entrenamiento paralelo de múltiples modelos multiprocessing Pool.map()
# ==============================================================================

import pandas as pd
import numpy as np
import multiprocessing
from sklearn.datasets import load_boston
from sklearn.ensemble import RandomForestRegressor

# Datos de entrenamiento
X, y = load_boston(return_X_y=True)

# Valores sobre los que iterar en paralelo
list_n_estimators = [1000, 2000, 5000, 10000]

# Función de entrenamiento
def train_model(X, y, n_estimators):
    
    model = RandomForestRegressor(
                n_estimators = n_estimators,
                n_jobs       = 1,
                random_state = 123
            )
    
    model.fit(X, y)
    
    return model
In [10]:
%%time

n_jobs  = multiprocessing.cpu_count()
pool    = multiprocessing.Pool(processes=multiprocessing.cpu_count())
modelos = pool.starmap(train_model, [(X, y, n_estimators) for n_estimators in list_n_estimators])
CPU times: user 1.15 s, sys: 1.55 s, total: 2.7 s
Wall time: 33.7 s

joblib

In [11]:
# Entrenamiento paralelo de múltiples modelos joblib
# ==============================================================================

import pandas as pd
import numpy as np
from joblib import Parallel, delayed
from sklearn.datasets import load_boston
from sklearn.ensemble import RandomForestRegressor

# Datos de entrenamiento
X, y = load_boston(return_X_y=True)

# Valores sobre los que iterar en paralelo
list_n_estimators = [1000, 2000, 5000, 10000]

# Función de entrenamiento
def train_model(X, y, n_estimators):
    
    model = RandomForestRegressor(
                n_estimators = n_estimators,
                n_jobs       = 1,
                random_state = 123
            )
    
    model.fit(X, y)
    
    return model
In [12]:
%%time

n_jobs  = multiprocessing.cpu_count()
modelos = Parallel(n_jobs=n_jobs)(delayed(train_model)(X, y, n_estimators) for n_estimators in list_n_estimators)
CPU times: user 1.65 s, sys: 1.64 s, total: 3.3 s
Wall time: 35.9 s

Información de sesión

In [13]:
from sinfo import sinfo
sinfo()
-----
joblib      0.15.1
numpy       1.19.2
pandas      1.1.3
sinfo       0.3.1
sklearn     0.23.1
-----
IPython             7.18.1
jupyter_client      6.1.7
jupyter_core        4.6.3
jupyterlab          2.2.9
notebook            6.1.4
-----
Python 3.7.9 (default, Aug 31 2020, 12:42:55) [GCC 7.3.0]
Linux-5.4.0-1029-aws-x86_64-with-debian-buster-sid
8 logical CPU cores, x86_64
-----
Session information updated at 2020-10-31 13:07

¿Cómo citar este documento?

Paralelizar con Python por Joaquín Amat Rodrigo, disponible con licencia CC BY-NC-SA 4.0 en https://www.cienciadedatos.net/documentos/py12-paralelizar-con-python.html DOI

Creative Commons Licence
Este contenido, creado por Joaquín Amat Rodrigo, tiene licencia Attribution-NonCommercial-ShareAlike 4.0 International.

Se permite:

  • Compartir: copiar y redistribuir el material en cualquier medio o formato.

  • Adaptar: remezclar, transformar y crear a partir del material.

Bajo los siguientes términos:

  • Atribución: Debes otorgar el crédito adecuado, proporcionar un enlace a la licencia e indicar si se realizaron cambios. Puedes hacerlo de cualquier manera razonable, pero no de una forma que sugiera que el licenciante te respalda o respalda tu uso.

  • NoComercial: No puedes utilizar el material para fines comerciales.

  • CompartirIgual: Si remezclas, transformas o creas a partir del material, debes distribuir tus contribuciones bajo la misma licencia que el original.