Почему задача Spark занимает много времени, чтобы найти блок локально?
RDD имеет 512 одинаково распределенных разделов и 100% кэшируется в памяти 512 исполнителей.
У меня есть сборка фильтров-карт с 512 задачами. Иногда эта работа завершается второй. В других случаях 50% заданий завершают вторую часть, 45% заданий занимают 10 секунд, а 5% задач занимают 20 секунд.
Вот журнал от исполнителя, где задача заняла 20 секунд:
15/12/16 09:44:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5312
15/12/16 09:44:37 INFO executor.Executor: Running task 215.0 in stage 17.0 (TID 5312)
15/12/16 09:44:37 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 10
15/12/16 09:44:37 INFO storage.MemoryStore: ensureFreeSpace(1777) called with curMem=908793307, maxMem=5927684014
15/12/16 09:44:37 INFO storage.MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 1777.0 B, free 4.7 GB)
15/12/16 09:44:37 INFO broadcast.TorrentBroadcast: Reading broadcast variable 10 took 186 ms
15/12/16 09:44:37 INFO storage.MemoryStore: ensureFreeSpace(3272) called with curMem=908795084, maxMem=5927684014
15/12/16 09:44:37 INFO storage.MemoryStore: Block broadcast_10 stored as values in memory (estimated size 3.2 KB, free 4.7 GB)
15/12/16 09:44:57 INFO storage.BlockManager: Found block rdd_5_215 locally
15/12/16 09:44:57 INFO executor.Executor: Finished task 215.0 in stage 17.0 (TID 5312). 2074 bytes result sent to driver
Итак, кажется, что 20 секунд потрачено на поиск локального блока. Глядя на журналы для других медленных задач, они указывают, что все они задерживаются по той же причине. Я понимаю, что локальный блок означает в одном экземпляре JVM, и поэтому я не понимаю, почему он так долго находит его.
Поскольку отставание всегда либо ровно 10 секунд, либо ровно 20 секунд, я подозреваю это из-за 10-секундного тайм-аута на каком-то слушателе или что-то в этом роде. Если это правда, то я предполагаю, что мои варианты либо выясняют, почему он тайм-аут и исправить, либо сделать тайм-аут короче, поэтому он пытается чаще.
Почему задача занимает так много времени, чтобы найти локальный блок и как я могу это решить?
Обновление: Добавление журнала DEBUG для org.apache.spark.storage
.
16/02/01 12:14:07 INFO CoarseGrainedExecutorBackend: Got assigned task 3029
16/02/01 12:14:07 INFO Executor: Running task 115.0 in stage 9.0 (TID 3029)
16/02/01 12:14:07 DEBUG Executor: Task 3029 epoch is 1
16/02/01 12:14:07 DEBUG BlockManager: Getting local block broadcast_6
16/02/01 12:14:07 DEBUG BlockManager: Block broadcast_6 not registered locally
16/02/01 12:14:07 INFO TorrentBroadcast: Started reading broadcast variable 6
16/02/01 12:14:07 DEBUG TorrentBroadcast: Reading piece broadcast_6_piece0 of broadcast_6
16/02/01 12:14:07 DEBUG BlockManager: Getting local block broadcast_6_piece0 as bytes
16/02/01 12:14:07 DEBUG BlockManager: Block broadcast_6_piece0 not registered locally
16/02/01 12:14:07 DEBUG BlockManager: Getting remote block broadcast_6_piece0 as bytes
16/02/01 12:14:07 DEBUG BlockManager: Getting remote block broadcast_6_piece0 from BlockManagerId(385, node1._.com, 54162)
16/02/01 12:14:07 DEBUG TransportClient: Sending fetch chunk request 0 to node1._.com:54162
16/02/01 12:14:07 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 2017.0 B, free 807.3 MB)
16/02/01 12:14:07 DEBUG BlockManagerMaster: Updated info of block broadcast_6_piece0
16/02/01 12:14:07 DEBUG BlockManager: Told master about block broadcast_6_piece0
16/02/01 12:14:07 DEBUG BlockManager: Put block broadcast_6_piece0 locally took 2 ms
16/02/01 12:14:07 DEBUG BlockManager: Putting block broadcast_6_piece0 without replication took 2 ms
16/02/01 12:14:07 INFO TorrentBroadcast: Reading broadcast variable 6 took 87 ms
16/02/01 12:14:07 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 3.6 KB, free 807.3 MB)
16/02/01 12:14:07 DEBUG BlockManager: Put block broadcast_6 locally took 1 ms
16/02/01 12:14:07 DEBUG BlockManager: Putting block broadcast_6 without replication took 1 ms
16/02/01 12:14:17 DEBUG CacheManager: Looking for partition rdd_5_115
16/02/01 12:14:17 DEBUG BlockManager: Getting local block rdd_5_115
16/02/01 12:14:17 DEBUG BlockManager: Level for block rdd_5_115 is StorageLevel(false, true, false, true, 1)
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: removing broadcast 4
16/02/01 12:14:17 DEBUG BlockManager: Getting block rdd_5_115 from memory
16/02/01 12:14:17 DEBUG BlockManager: Removing broadcast 4
16/02/01 12:14:17 INFO BlockManager: Found block rdd_5_115 locally
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_4
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_4 of size 3680 dropped from memory (free 5092230668)
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_4_piece0
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_4_piece0 of size 2017 dropped from memory (free 5092232685)
16/02/01 12:14:17 DEBUG BlockManagerMaster: Updated info of block broadcast_4_piece0
16/02/01 12:14:17 DEBUG BlockManager: Told master about block broadcast_4_piece0
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Done removing broadcast 4, response is 2
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Sent response: 2 to node2._.com:45115
16/02/01 12:14:17 INFO Executor: Finished task 115.0 in stage 9.0 (TID 3029). 2164 bytes result sent to driver
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: removing broadcast 5
16/02/01 12:14:17 DEBUG BlockManager: Removing broadcast 5
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_5_piece0
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_5_piece0 of size 2017 dropped from memory (free 5092234702)
16/02/01 12:14:17 DEBUG BlockManagerMaster: Updated info of block broadcast_5_piece0
16/02/01 12:14:17 DEBUG BlockManager: Told master about block broadcast_5_piece0
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_5
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_5 of size 3680 dropped from memory (free 5092238382)
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Done removing broadcast 5, response is 2
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Sent response: 2 to node2._.com:45115
Ответы
Ответ 1
Единственное, что мне кажется, это то, что у вас есть репликация с помощью уровня хранения StorageLevel(false, true, false, true, 1)
Поскольку у вас есть 512 разделов в 512 исполнителей, это может быть репликация блоков между каждым исполнителем, что может привести к замедлению в конце. Я бы попытался отключить репликацию и посмотреть, что это делает для производительности.
Ответ 2
Сколько общих ядер вы используете для своего приложения Spark? Это может произойти, если вы выделяете 256 ядер, и если значение spark.locality.wait равно 10.
Я не знаю вашу среду, но, похоже, у вас слишком много исполнителей. У вас есть только несколько исполнителей (в зависимости от того, насколько мощны ваши вычислительные узлы) и имеют несколько ядер, доступных каждому исполнителю. Короче, вместо того, чтобы иметь много процессов с 1 потоком каждый, иметь несколько процессов со многими потоками каждый.