Как обрабатывать HTTP-запросы в архитектуре Microservice/Event Driven?
Фон:
Я создаю приложение, и предлагаемая архитектура - Event/Message Driven на архитектуре микросервиса.
Монолитный способ делать то, что у меня есть User/HTTP request
и который выполняет некоторые команды, которые имеют прямой synchronous response
. Таким образом, для ответа на один и тот же запрос User/HTTP "бесполезно".
Эта проблема:
Пользователь отправляет HTTP request
в службу пользовательского интерфейса (имеется несколько служб пользовательского интерфейса), который запускает некоторые события в очередь (Kafka/RabbitMQ/any). N служб выбирает, что событие/сообщение совершают какое-то волшебство на этом пути, а затем в какой-то момент тот же UI-сервис должен выбрать ответ и вернуть его пользователю, который инициировал HTTP-запрос. Обработка запроса - это ASYNC
но User/HTTP REQUEST->RESPONSE
- SYNC
в соответствии с вашим типичным взаимодействием HTTP.
Вопрос. Как отправить ответ на ту же службу пользовательского интерфейса, которая инициировала действие (служба, взаимодействующая с пользователем через HTTP) в этом мире, управляемом агностиками?
Мои исследования до сих пор я оглядывался, и кажется, что некоторые люди решают эту проблему с помощью WebSockets.
Но уровень сложности заключается в том, что должна быть какая-то таблица, которая отображает (RequestId->Websocket(Client-Server))
который используется для "обнаружения того, какой узел на шлюзе имеет соединение с веб-сайтами для определенного ответа. Но даже если я понимаю проблему и сложность, я застрял, что не могу найти статей, которые бы дали мне информацию о том, как решить эту проблему на уровне реализации. И это все еще не является жизнеспособным вариантом из-за сторонних интеграций, таких как поставщики платежей (WorldPay), которые ожидают REQUEST->RESPONSE
- специально для проверки 3DS.
Поэтому я не хочу думать, что WebSockets - это вариант. Но даже если WebSockets подходят для приложений Webfacing, API, который подключается к внешним системам, не является отличной архитектурой.
** ** ** Обновление: ** ** **
Даже если длительный опрос является возможным решением для API WebService с Location header
202 Accepted
a Location header
retry-after header
он не будет работать для веб-сайта с высокой совместимостью и высокой способностью. Представьте себе огромное количество людей, пытающихся получить обновление состояния транзакции на каждом запросе, которое они делают, и вы должны аннулировать кеш CDN (идите и играйте с этой проблемой сейчас! Ha).
Но самое важное и относимое к моему делу. У меня есть сторонние API, такие как платежные системы, в которых системы 3DS имеют автоматические переадресации, которые обрабатываются системой поставщиков платежей, и они ожидают типичного REQUEST/RESPONSE flow
, поэтому эта модель не будет работать для меня или модель сокетов будут работать.
Из-за этого варианта использования HTTP REQUEST/RESPONSE
следует обрабатывать в типичном режиме, когда у меня есть немой клиент, который ожидает, что сложность прецессии будет обработана в фоновом режиме.
Поэтому я ищу решение, где извне у меня есть типичный REQUEST->Response
(SYNC) и сложность состояния (ASYNCrony системы) обрабатывается внутренне
Пример длинного опроса, но эта модель не будет работать для стороннего API, такого как поставщик платежей по 3DS Redirects
, которые не входят в мой контроль.
POST /user
Payload {userdata}
RETURNs:
HTTP/1.1 202 Accepted
Content-Type: application/json; charset=utf-8
Date: Mon, 27 Nov 2018 17:25:55 GMT
Location: https://mydomain/user/transaction/status/:transaction_id
Retry-After: 10
GET
https://mydomain/user/transaction/status/:transaction_id
Ответы
Ответ 1
С более общей точки зрения - при получении запроса вы можете зарегистрировать подписчика в очереди в текущем контексте запроса (что означает, когда объект запроса находится в области), который получает подтверждение от ответственных служб по мере завершения своих заданий (например, конечный автомат который поддерживает ход общего числа операций). Когда достигнуто конечное состояние, он возвращает ответ и удаляет слушателя. Я думаю, что это будет работать в любой очереди сообщений pub/sub style. Вот чрезмерно упрощенная демонстрация того, что я предлагаю.
// a stub for any message queue using the pub sub pattern
let Q = {
pub: (event, data) => {},
sub: (event, handler) => {}
}
// typical express request handler
let controller = async (req, res) => {
// initiate saga
let sagaId = uuid()
Q.pub("saga:register-user", {
username: req.body.username,
password: req.body.password,
promoCode: req.body.promoCode,
sagaId: sagaId
})
// wait for user to be added
let p1 = new Promise((resolve, reject) => {
Q.sub("user-added", ack => {
resolve(ack)
})
})
// wait for promo code to be applied
let p2 = new Promise((resolve, reject) => {
Q.sub("promo-applied", ack => {
resolve(ack)
})
})
// wait for both promises to finish successfully
try {
var sagaComplete = await Promise.all([p1, p2])
// respond with some transformation of data
res.json({success: true, data: sagaComplete})
} catch (e) {
logger.error('saga failed due to reasons')
// rollback asynchronously
Q.pub('rollback:user-added', {sagaId: sagaId})
Q.pub('rollback:promo-applied', {sagaId: sagaId})
// respond with appropriate status
res.status(500).json({message: 'could not complete saga. Rolling back side effects'})
}
}
Как вы, вероятно, можете сказать, это выглядит как общий шаблон, который можно отвлечь в рамки, чтобы уменьшить дублирование кода и управлять сквозными проблемами. Это то, о чем по сути относится сага. Клиент будет ждать только столько, сколько потребуется для завершения необходимых операций (что и произойдет, даже если все было синхронно), плюс добавленная латентность из-за межсервисной связи. Убедитесь, что вы не блокируете поток, если используете систему на основе цикла событий, например NodeJS или Python Tornado.
Простое использование push-механизма на основе веб-сокетов не обязательно повышает эффективность или производительность вашей системы. Тем не менее рекомендуется отправлять сообщения клиенту с использованием подключения к сокету, поскольку это делает вашу архитектуру более общей (даже ваши клиенты ведут себя так, как это делают ваши службы), и позволяет лучше разделять проблемы. Это также позволит вам самостоятельно масштабировать push-сервис, не беспокоясь о бизнес-логике. Шаблон саги можно расширить, чтобы включить откаты в случае частичных сбоев или тайм-аутов и сделать вашу систему более управляемой.
Ответ 2
К сожалению, я считаю, что вам, вероятно, придется использовать длинный опрос или веб-сокеты, чтобы выполнить что-то подобное. Вам нужно "подтолкнуть" что-то к пользователю или оставить запрос http открытым, пока что-то не вернется.
Для того, чтобы вернуть данные реальному пользователю, вы можете использовать что-то вроде socket.io. Когда пользователь подключается, socket.io создает идентификатор. Каждый раз, когда пользователь подключается, вы сопоставляете идентификатор пользователя с идентификатором socket.io. После того как каждый запрос имеет прикрепленный к нему идентификатор пользователя, вы можете вернуть результат правильному клиенту. Поток будет примерно таким:
запрос веб-запросов (POST с данными и userId)
Сервис ui размещает заказ в очереди (этот заказ должен иметь userId)
x количество сервисов работает на заказ (каждый раз пропуская userId)
Сервис ui потребляет из темы. В какой-то момент на этой теме появляются данные. Данные, которые он потребляет, имеют userId, сервис ui просматривает карту, чтобы выяснить, какой сокет должен испускать.
Независимо от того, какой код работает в вашем пользовательском интерфейсе, он также должен управляться событиями, поэтому он будет обрабатывать push-данные без контекста исходного запроса. Для этого вы можете использовать что-то вроде redux. По сути, у вас будет сервер, создающий на сервере действия reducex, он работает очень хорошо!
Надеюсь это поможет.
Ответ 3
Как насчет использования обещаний? Socket.io также может быть решением, если вы хотите в реальном времени.
Посмотрите также на CQRS. Этот архитектурный шаблон соответствует модели, управляемой событиями и микросервисной архитектурой.
Даже лучше. Прочтите это.
Ответ 4
Ниже приведен пример очень простой, как вы можете реализовать службу пользовательского интерфейса, чтобы он работал с обычным потоком HTTP-запроса/ответа. Он использует класс node.js events.EventEmitter
для "маршрутизации" ответов на правильный обработчик HTTP.
Краткое описание реализации:
-
Подключить производителя/потребительского клиента к Kafka
- Производитель используется для отправки данных запроса во внутренние микросервисы
- Потребитель используется для прослушивания данных из микросервисов, что означает, что запрос обработан, и я предполагаю, что те элементы Kafka также содержат данные, которые должны быть возвращены клиенту HTTP.
-
Создание глобального диспетчера EventEmitter
класса EventEmitter
- Зарегистрируйте обработчик HTTP-запроса, который
- Создает UUID для запроса и включает его в полезную нагрузку, поданную в Kafka
- Регистрирует прослушиватель событий с нашим диспетчером событий, где UUID используется как имя события, которое он прослушивает для
- Начните употреблять тему Kafka и извлекайте UUID, который ожидает обработчик HTTP-запроса, и выпустите для него событие. В примере кода я не включаю какую-либо полезную нагрузку в испущенное событие, но обычно вы хотите включить некоторые данные из данных Kafka в качестве аргумента, чтобы обработчик HTTP мог вернуть его клиенту HTTP.
Обратите внимание, что я пытался сохранить код как можно меньше, не учитывая ошибки и тайм-аут и т.д.!
Также обратите внимание, что kafkaProduceTopic
и kafkaConsumTopic
- это те же темы, которые упрощают тестирование, и не нужно использовать другую услугу/функцию для использования в теме обслуживания UI.
В коде предполагается, что пакеты kafka-node
и uuid
установлены на npm
и что Kafka доступен на localhost:9092
const http = require('http');
const EventEmitter = require('events');
const kafka = require('kafka-node');
const uuidv4 = require('uuid/v4');
const kafkaProduceTopic = "req-res-topic";
const kafkaConsumeTopic = "req-res-topic";
class ResponseEventEmitter extends EventEmitter {}
const responseEventEmitter = new ResponseEventEmitter();
var HighLevelProducer = kafka.HighLevelProducer,
client = new kafka.Client(),
producer = new HighLevelProducer(client);
var HighLevelConsumer = kafka.HighLevelConsumer,
client = new kafka.Client(),
consumer = new HighLevelConsumer(
client,
[
{ topic: kafkaConsumeTopic }
],
{
groupId: 'my-group'
}
);
var s = http.createServer(function (req, res) {
// Generate a random UUID to be used as the request id that
// that is used to correlated request/response requests.
// The internal micro-services need to include this id in
// the "final" message that is pushed to Kafka and consumed
// by the ui service
var id = uuidv4();
// Send the request data to the internal back-end through Kafka
// In real code the Kafka message would be a JSON/protobuf/...
// message, but it needs to include the UUID generated by this
// function
payloads = [
{ topic: kafkaProduceTopic, messages: id},
];
producer.send(payloads, function (err, data) {
if(err != null) {
console.log("Error: ", err);
return;
}
});
responseEventEmitter.once(id, () => {
console.log("Got the response event for ", id);
res.write("Order " + id + " has been processed\n");
res.end();
})
});
s.timeout = 10000;
s.listen(8080);
// Listen to the Kafka topic that streams messages
// indicating that the request has been processed and
// emit an event to the request handler so it can finish.
// In this example the consumed Kafka message is simply
// the UUID of the request that has been processed (which
// is also the event name that the response handler is
// listening to).
//
// In real code the Kafka message would be a JSON/protobuf/... message
// which needs to contain the UUID the request handler generated.
// This Kafka consumer would then have to deserialize the incoming
// message and get the UUID from it.
consumer.on('message', function (message) {
responseEventEmitter.emit(message.value);
});
Ответ 5
Как я и ожидал - люди пытаются вписаться в концепцию, даже если она не подходит. Это не критика, это наблюдение из моего опыта и после прочтения вашего вопроса и других ответов.
Да, вы правы в том, что архитектура микросервисов основана на асинхронных шаблонах обмена сообщениями. Однако, когда мы говорим об пользовательском интерфейсе, в моем сознании есть два возможных случая:
-
Пользовательский интерфейс требует немедленного ответа (например, операции чтения или те команды, от которых пользователь сразу отвечает). Они не должны быть асинхронными. Почему вы добавляете накладные расходы на обмен сообщениями и асинхронность, если ответ сразу требуется на экране? Не имеет смысла. Архитектура Microservice должна решать проблемы, а не создавать новые, добавляя накладные расходы.
-
Пользовательский интерфейс может быть реорганизован, чтобы переносить задержку ответа (например, вместо ожидания результата пользовательский интерфейс может просто отправить команду, получить подтверждение и позволить пользователю делать что-то еще, пока готовят ответ). В этом случае вы можете ввести асинхронность. Служба шлюза (с которой пользовательский интерфейс взаимодействует напрямую) может организовать асинхронную обработку (ждет завершенных событий и т.д.), А когда она готова, она может связаться с пользовательским интерфейсом. В таких случаях я видел UI, использующий SignalR, а служба шлюза - это API, который принимал соединения сокетов. Если браузер не поддерживает сокеты, он должен идеально отходить от опроса. Во всяком случае, важно то, что это может работать только с непредвиденными обстоятельствами: пользовательский интерфейс может переносить отложенные ответы.
Если Microservices действительно релевантны в вашей ситуации (случай 2), тогда структура UI потока соответственно, и не должно быть проблем в микросервисах на внутреннем сервере. В этом случае ваш вопрос сводится к применению архитектуры, управляемой событиями, к набору сервисов (край - это микросервис шлюза, который связывает взаимодействие с событиями и взаимодействие с пользователем). Эта проблема (службы, управляемые событиями) разрешима, и вы это знаете. Вам просто нужно решить, можете ли вы переосмыслить, как работает ваш пользовательский интерфейс.
Ответ 6
Хороший вопрос. Мой ответ на этот вопрос - введение синхронных потоков в систему.
Я использую rabbitMq, поэтому я не знаю о кафке, но вы должны искать синхронный поток кафки.
WebSockets кажется одним overkiil.
Надеюсь, это поможет.