Dart: обрабатывать входящие HTTP-запросы параллельно
Я пытаюсь написать HTTP-сервер в Dart, который может обрабатывать несколько запросов параллельно. До сих пор я не смог достичь "параллельной" части.
Вот что я пробовал сначала:
import 'dart:io';
main() {
HttpServer.bind(InternetAddress.ANY_IP_V4, 8080).then((HttpServer server) {
server.listen((HttpRequest request) {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (stopwatch.elapsedMilliseconds < 1000) { /* do nothing */ }
request.response.statusCode = HttpStatus.OK;
request.response.write(stopwatch.elapsedMilliseconds.toString());
request.response.close().catchError(print);
});
});
}
По каждому запросу он занят работой в течение одной секунды, а затем завершает работу. Я сделал так, чтобы он обрабатывал запросы таким образом, чтобы его время было предсказуемым, и поэтому я мог легко увидеть эффект запроса в диспетчере задач Windows (ядро процессора перепрыгивает до 100% использования).
Я могу сказать, что это не обрабатывает запросы параллельно, потому что:
-
Если я загружаю несколько вкладок браузера в http://example:8080/
, а затем обновляю их все, вкладки загружаются один за другим последовательно, примерно 1 секунда между ними.
-
Если я использую инструмент load-testing wrk с этими настройками... wrk -d 10 -c 8 -t 8 http://example:8080/
... он завершает От 5 до 8 запросов за 10 секунд, которые я дал. Если сервер использовал все мои 8 ядер, я ожидал бы, что число будет ближе к 80 запросам.
-
Когда я открываю диспетчер задач Windows во время теста wrk, я наблюдаю, что только один из моих ядер почти 100% используется, а остальные довольно простаивают.
Итак, я попытался использовать изоляты, надеясь вручную создать новую изоляцию/поток для каждого запроса:
import 'dart:io';
import 'dart:isolate';
main() {
HttpServer.bind(InternetAddress.ANY_IP_V4, 8080).then((HttpServer server) {
server.listen((HttpRequest request) {
spawnFunction(handleRequest).send(request);
});
});
}
handleRequest() {
port.receive((HttpRequest request, SendPort sender) {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (stopwatch.elapsedMilliseconds < 1000) { /* do nothing */ }
request.response.statusCode = HttpStatus.OK;
request.response.write(stopwatch.elapsedMilliseconds.toString());
request.response.close().catchError(print);
});
}
Это не работает вообще. Мне не нравится, что я пытаюсь отправить HttpRequest в качестве сообщения изолированному. Вот ошибка:
#0 _SendPortImpl._sendInternal (dart:isolate-patch/isolate_patch.dart:122:3)
#1 _SendPortImpl._sendNow (dart:isolate-patch/isolate_patch.dart:95:18)
#2 _SendPortImpl.send (dart:isolate-patch/isolate_patch.dart:91:18)
#3 main.<anonymous closure>.<anonymous closure> (file:///C:/Development/dartbenchmark/simple2.dart:7:40)
#4 _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#5 _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#6 _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#7 _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#8 _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#9 StreamController.add (dart:async/stream_controller.dart:10:35)
#10 _HttpServer._handleRequest (http_impl.dart:1261:20)
#11 _HttpConnection._HttpConnection.<anonymous closure> (http_impl.dart:1188:33)
#12 _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#13 _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#14 _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#15 _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#16 _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#17 StreamController.add (dart:async/stream_controller.dart:10:35)
#18 _HttpParser._doParse (http_parser.dart:415:26)
#19 _HttpParser._parse (http_parser.dart:161:15)
#20 _HttpParser._onData._onData (http_parser.dart:509:11)
#21 _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#22 _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#23 _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#24 _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#25 _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#26 StreamController.add (dart:async/stream_controller.dart:10:35)
#27 _Socket._onData._onData (dart:io-patch/socket_patch.dart:726:42)
#28 _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#29 _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#30 _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#31 _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#32 _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#33 StreamController.add (dart:async/stream_controller.dart:10:35)
#34 _RawSocket._RawSocket.<anonymous closure> (dart:io-patch/socket_patch.dart:452:52)
#35 _NativeSocket.multiplex (dart:io-patch/socket_patch.dart:253:18)
#36 _NativeSocket.connectToEventHandler.<anonymous closure> (dart:io-patch/socket_patch.dart:338:54)
#37 _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)
Unhandled exception:
Illegal argument(s): Illegal argument in isolate message : (object is a closure)
#0 _throwDelayed.<anonymous closure> (dart:async/stream_impl.dart:22:5)
#1 _asyncRunCallback._asyncRunCallback (dart:async/event_loop.dart:15:17)
#2 _asyncRunCallback._asyncRunCallback (dart:async/event_loop.dart:25:9)
#3 Timer.run.<anonymous closure> (dart:async/timer.dart:17:21)
#4 Timer.run.<anonymous closure> (dart:async/timer.dart:25:13)
#5 Timer.Timer.<anonymous closure> (dart:async-patch/timer_patch.dart:9:15)
#6 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:99:28)
#7 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:107:7)
#8 _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:115:23)
#9 _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)
Используемые версии:
- Редактор Dart версии 0.5.9_r22879
- Dart SDK версия 0.5.9.0_r22879
Можно ли обрабатывать эти запросы параллельно, используя все мои доступные ядра ядра, используя Dart?
Ответы
Ответ 1
Я написал библиотеку под названием dart-isoserver, чтобы сделать это некоторое время назад. Теперь он сильно сгнил, но вы можете видеть этот подход.
https://code.google.com/p/dart-isoserver/
То, что я сделал, это прокси-сервер HttpRequest и HttpResponse через изолированные каналы, так как вы не можете отправлять их напрямую. Он работал, хотя было несколько предостережений:
- Ввод/вывод по запросу и ответу прошел через главный изолятор, так что часть не была параллельной. Другая работа, выполненная в изолированном рабочем месте, не блокировала главный изолят. Что действительно должно произойти, так это то, что соединение сокета должно быть передано между изолятами.
- Исключения в изоляторе будут уничтожать весь сервер.
spawnFunction()
теперь имеет необязательный параметр обработчика исключений, так что это несколько исправление, но spawnUri() этого не делает. dart-isoserver использовал spawnUri() для реализации "горячей загрузки", поэтому его нужно было бы удалить.
- Изолиты немного медленнее запускать, и вы, вероятно, не хотите, чтобы одно соединение для тысяч одновременных случаев использования соединений, которые nginx и node.js target. Изолирующий пул с рабочими очередями, вероятно, будет работать лучше, хотя это устранит приятную функцию, которую вы могли бы использовать блокирующий ввод-вывод для рабочего.
Заметка о вашем первом примере кода. Это определенно не будет работать параллельно, как вы заметили, потому что Дарт однопоточный. Ни один код Dart в том же изоляте никогда не запускается одновременно.
Ответ 2
Даже при существующих ограничениях HttpServer можно использовать несколько ядер, запустив несколько серверных процессов за обратным прокси-сервером, таким как Apache или Nginx. Внутри Dart вы также можете разветвлять дочерние процессы, чтобы разделить интенсивные задачи с вычислением.
Хорошим местом для начала было бы прочитать о масштабировании node.js, так как это также использует единую архитектуру для каждого процесса.
Изменить: ответ теперь устарел, теперь можно обмениваться запросами между изолятами, позволяя процессу Дарт использовать несколько ядер.
Смотрите документы для ServerSocket.bind(общий).
"Необязательный аргумент shared указывает, возможно ли добавление дополнительных привязок к одному и тому же адресу, комбинации портов и v6Only из одного и того же процесса Dart. Если shared равно true и выполняются дополнительные привязки, то входящие соединения будут распределены между этим набором ServerSockets. Один из способов использования этого - иметь количество изолятов, между которыми распределяются входящие соединения.