Parallel correlation calculation with Spark
This task is an embarrassingly parallel task, as explored in a previous post.
import numpy as np
import pandas as pd
import time
from scipy.stats import pearsonr
from pyspark import SparkContext, SparkConf
from scipy.sparse import coo_matrix
## The measurement (input data) is specified in a matrix
## samples x variables
m = 150
n = 1000
measurements = np.random.rand(m*n).reshape((m,n))
nThreads = [1,2,4,6,8,10,12,14,16]
dt = np.zeros(len(nThreads))
for i in range(len(nThreads)):
## Parameters
NMACHINES = nThreads[i]
NPARTITIONS = NMACHINES*4
conf = (SparkConf()
.setMaster("local[%s]" % NMACHINES)
.setAppName("pcc-spark-demo")
.set("spark.driver.maxResultSize", "2g")
.set("spark.executor.memory", "2g")
.set("spark.driver.memory", "2g"))
sc = SparkContext(conf = conf)
## Preparing for computation
### Sharing the input matrix across workers
m_local = sc.broadcast(measurements[:,:])
indices = sc.parallelize(range(m_local.value.shape[1]))
cart = indices.cartesian(indices)
unique_pairs = cart.filter(lambda p: p[0] <= p[1]).repartition(NPARTITIONS)
### Defining the core task
def corr(pair):
x = m_local.value[:, pair[0]]
y = m_local.value[:, pair[1]]
pcc = pearsonr(x, y)
return (pair[0], pair[1], pcc[0], pcc[1])
### Running and collecting results
results = unique_pairs.map(corr)
start = time.time()
tmp = np.array(results.collect())
pcc_mat = coo_matrix((tmp[:,2], (tmp[:,0], tmp[:,1]))).todense()
pcc_evalue = coo_matrix((tmp[:,3], (tmp[:,0], tmp[:,1]))).todense()
dt[i] = time.time() - start
print("Calculated pairwise PCC in %ss\n, using %s threads." % (dt[i], NMACHINES))
sc.stop()
print(dt)
## Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
## Setting default log level to "WARN".
## To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
## 17/11/15 16:20:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
## 17/11/15 16:20:54 WARN Utils: Your hostname, neurocan resolves to a loopback address: 127.0.1.1; using 155.98.229.155 instead (on interface enp5s0)
## 17/11/15 16:20:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
## 17/11/15 16:20:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
##
[Stage 1:> (0 + 1) / 4]
[Stage 1:> (0 + 2) / 4]
[Stage 1:==============> (1 + 1) / 4]
[Stage 1:==============> (1 + 2) / 4]
[Stage 1:=============================> (2 + 1) / 4]
[Stage 1:=============================> (2 + 2) / 4]
[Stage 1:============================================> (3 + 1) / 4]
17/11/15 16:21:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
##
[Stage 0:============================================> (3 + 1) / 4]
[Stage 1:> (0 + 2) / 8]
[Stage 1:=======> (1 + 2) / 8]
[Stage 1:==============> (2 + 2) / 8]
[Stage 1:======================> (3 + 2) / 8]
[Stage 1:=============================> (4 + 2) / 8]
[Stage 1:====================================> (5 + 2) / 8]
[Stage 1:============================================> (6 + 2) / 8]
[Stage 1:===================================================> (7 + 1) / 8]
17/11/15 16:21:42 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
##
[Stage 0:==========================================> (12 + 4) / 16]
[Stage 1:> (0 + 4) / 16]
[Stage 1:==========> (3 + 4) / 16]
[Stage 1:==============> (4 + 4) / 16]
[Stage 1:=====================> (6 + 4) / 16]
[Stage 1:=========================> (7 + 4) / 16]
[Stage 1:=============================> (8 + 4) / 16]
[Stage 1:===================================> (10 + 4) / 16]
[Stage 1:==========================================> (12 + 4) / 16]
[Stage 1:=================================================> (14 + 2) / 16]
17/11/15 16:21:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
##
[Stage 0:=================================================> (31 + 5) / 36]
[Stage 1:> (0 + 6) / 24]
[Stage 1:=======> (3 + 6) / 24]
[Stage 1:============> (5 + 6) / 24]
[Stage 1:==============> (6 + 6) / 24]
[Stage 1:=====================> (9 + 7) / 24]
[Stage 1:=======================> (10 + 6) / 24]
[Stage 1:==========================> (11 + 6) / 24]
[Stage 1:============================> (12 + 6) / 24]
[Stage 1:=================================> (14 + 6) / 24]
[Stage 1:===================================> (15 + 6) / 24]
[Stage 1:======================================> (16 + 6) / 24]
[Stage 1:========================================> (17 + 6) / 24]
[Stage 1:==========================================> (18 + 6) / 24]
[Stage 1:=============================================> (19 + 5) / 24]
[Stage 1:=================================================> (21 + 3) / 24]
[Stage 1:====================================================> (22 + 2) / 24]
[Stage 1:======================================================> (23 + 1) / 24]
17/11/15 16:22:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
##
[Stage 0:=============================================> (51 + 8) / 64]
[Stage 1:> (0 + 8) / 32]
[Stage 1:=> (1 + 8) / 32]
[Stage 1:=========> (5 + 8) / 32]
[Stage 1:============> (7 + 8) / 32]
[Stage 1:==============> (8 + 8) / 32]
[Stage 1:================> (9 + 8) / 32]
[Stage 1:=====================> (12 + 8) / 32]
[Stage 1:==========================> (15 + 8) / 32]
[Stage 1:============================> (16 + 8) / 32]
[Stage 1:================================> (18 + 8) / 32]
[Stage 1:=======================================> (22 + 8) / 32]
[Stage 1:========================================> (23 + 8) / 32]
[Stage 1:==========================================> (24 + 8) / 32]
[Stage 1:==============================================> (26 + 6) / 32]
[Stage 1:=====================================================> (30 + 2) / 32]
17/11/15 16:22:08 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
##
[Stage 0:=============================> (54 + 10) / 100]
[Stage 0:================================================> (88 + 10) / 100]
[Stage 1:> (0 + 10) / 40]
[Stage 1:====> (3 + 10) / 40]
[Stage 1:============> (9 + 10) / 40]
[Stage 1:==============> (10 + 10) / 40]
[Stage 1:===================> (14 + 10) / 40]
[Stage 1:=========================> (18 + 10) / 40]
[Stage 1:============================> (20 + 10) / 40]
[Stage 1:==============================> (22 + 10) / 40]
[Stage 1:====================================> (26 + 10) / 40]
[Stage 1:=======================================> (28 + 10) / 40]
[Stage 1:==========================================> (30 + 10) / 40]
[Stage 1:=============================================> (32 + 8) / 40]
[Stage 1:====================================================> (37 + 3) / 40]
17/11/15 16:22:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
##
[Stage 0:===================> (52 + 12) / 144]
[Stage 0:=============================> (76 + 12) / 144]
[Stage 0:===========================================> (115 + 12) / 144]
[Stage 1:> (0 + 12) / 48]
[Stage 1:==> (2 + 12) / 48]
[Stage 1:=========> (8 + 12) / 48]
[Stage 1:==============> (12 + 12) / 48]
[Stage 1:================> (14 + 12) / 48]
[Stage 1:========================> (21 + 12) / 48]
[Stage 1:============================> (24 + 12) / 48]
[Stage 1:==============================> (26 + 12) / 48]
[Stage 1:===================================> (30 + 12) / 48]
[Stage 1:========================================> (35 + 12) / 48]
[Stage 1:===========================================> (37 + 11) / 48]
[Stage 1:==============================================> (39 + 9) / 48]
[Stage 1:=====================================================> (45 + 3) / 48]
17/11/15 16:22:21 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
##
[Stage 0:=================> (63 + 14) / 196]
[Stage 0:===========================> (101 + 14) / 196]
[Stage 0:=======================================> (144 + 14) / 196]
[Stage 0:======================================================>(195 + 1) / 196]
[Stage 1:> (0 + 14) / 56]
[Stage 1:===> (3 + 14) / 56]
[Stage 1:============> (12 + 14) / 56]
[Stage 1:=============> (13 + 14) / 56]
[Stage 1:===============> (15 + 14) / 56]
[Stage 1:=======================> (23 + 14) / 56]
[Stage 1:==========================> (26 + 14) / 56]
[Stage 1:=============================> (29 + 14) / 56]
[Stage 1:=================================> (33 + 14) / 56]
[Stage 1:=======================================> (39 + 14) / 56]
[Stage 1:============================================> (44 + 12) / 56]
[Stage 1:=============================================> (45 + 11) / 56]
[Stage 1:=================================================> (49 + 7) / 56]
17/11/15 16:22:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
##
[Stage 0:=========> (45 + 16) / 256]
[Stage 0:==================> (85 + 16) / 256]
[Stage 0:==========================> (125 + 16) / 256]
[Stage 0:====================================> (174 + 16) / 256]
[Stage 0:===============================================> (226 + 16) / 256]
[Stage 1:> (0 + 16) / 64]
[Stage 1:====> (5 + 16) / 64]
[Stage 1:===========> (13 + 16) / 64]
[Stage 1:==============> (16 + 16) / 64]
[Stage 1:===================> (22 + 16) / 64]
[Stage 1:======================> (26 + 16) / 64]
[Stage 1:===========================> (31 + 16) / 64]
[Stage 1:===============================> (36 + 16) / 64]
[Stage 1:===================================> (40 + 16) / 64]
[Stage 1:=========================================> (47 + 16) / 64]
[Stage 1:=============================================> (52 + 12) / 64]
[Stage 1:================================================> (55 + 9) / 64]
Calculated pairwise PCC in 28.9915158749s
## , using 1 threads.
## Calculated pairwise PCC in 15.4083008766s
## , using 2 threads.
## Calculated pairwise PCC in 9.2317237854s
## , using 4 threads.
## Calculated pairwise PCC in 7.85476899147s
## , using 6 threads.
## Calculated pairwise PCC in 6.78391695023s
## , using 8 threads.
## Calculated pairwise PCC in 5.52394413948s
## , using 10 threads.
## Calculated pairwise PCC in 5.55993890762s
## , using 12 threads.
## Calculated pairwise PCC in 5.4174580574s
## , using 14 threads.
## Calculated pairwise PCC in 5.60311603546s
## , using 16 threads.
## [ 28.99151587 15.40830088 9.23172379 7.85476899 6.78391695
## 5.52394414 5.55993891 5.41745806 5.60311604]
timing = data.frame(list(nThreads=c(1,2,4,6,8,10,12,14,16),
dt=c(29.79623985, 15.88540912, 10.55040097, 7.22993422, 6.71909809, 5.69363284, 5.91960597, 5.47642398, 5.65171599)))
ref = timing[1,'dt']
timing[,'SpeedUp'] = ref/ timing[,'dt']
library(ggplot2)
ggplot(timing,aes(x=nThreads,y=SpeedUp)) +
geom_point() +
geom_line() +
theme_bw()