Я мог бы найти несколько примеров кода в Scala и Java.
Может ли кто-нибудь привести пример того, как это можно реализовать с помощью PySpark?
Ответ 1
Я столкнулся с той же проблемой, поэтому я создал крошечный самодостаточный пример. Я создаю несколько потоков, используя модуль потоковой передачи python и одновременно отправляю несколько искровых заданий.
Обратите внимание, что по умолчанию искра запускает задания в First-In First-Out (FIFO): http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application. В приведенном ниже примере я меняю его на расписание FAIR
# Prereqs:
# set
# spark.dynamicAllocation.enabled true
# spark.shuffle.service.enabled true
spark.scheduler.mode FAIR
# in spark-defaults.conf
import threading
from pyspark import SparkContext, SparkConf
def task(sc, i):
print sc.parallelize(range(i*10000)).count()
def run_multiple_jobs():
conf = SparkConf().setMaster('local[*]').setAppName('appname')
# Set scheduler to FAIR: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
conf.set('spark.scheduler.mode', 'FAIR')
sc = SparkContext(conf=conf)
for i in range(4):
t = threading.Thread(target=task, args=(sc, i))
t.start()
print 'spark task', i, 'has started'
run_multiple_jobs()
Вывод:
spark task 0 has started
spark task 1 has started
spark task 2 has started
spark task 3 has started
30000
0
10000
20000
Ответ 2
Сегодня я спрашивал об этом. Модуль многопроцессорности предлагает ThreadPool
, который создает для вас несколько потоков и, следовательно, параллельно выполняет задания. Сначала создайте экземпляр функций, затем создайте пул, а затем map
его в диапазоне, который хотите повторить.
В моем случае я вычислял эти номера WSSSE для разных номеров центров (настройка гиперпараметра), чтобы получить "хорошую" кластеризацию k-классов... точно так же, как это описано в Документация MLSpark. Без дальнейших объяснений, вот некоторые ячейки из моей таблицы IPython:
from pyspark.mllib.clustering import KMeans
import numpy as np
c_points - массивы 12dim:
>>> c_points.cache()
>>> c_points.take(3)
[array([ 1, -1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]),
array([-2, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]),
array([ 7, -1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0])]
В следующем, для каждого i
я вычисляю это значение WSSSE и возвращаю его как кортеж:
def error(point, clusters):
center = clusters.centers[clusters.predict(point)]
return np.linalg.norm(point - center)
def calc_wssse(i):
clusters = KMeans.train(c_points, i, maxIterations=20,
runs=20, initializationMode="random")
WSSSE = c_points\
.map(lambda point: error(point, clusters))\
.reduce(lambda x, y: x + y)
return (i, WSSSE)
Здесь начинается интересная часть:
from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=4)
Запустите его:
wssse_points = tpool.map(calc_wssse, range(1, 30))
wssse_points
дает:
[(1, 195318509740785.66),
(2, 77539612257334.33),
(3, 78254073754531.1),
...
]