Ответ 1
Основная проблема здесь заключалась в том, чтобы понять, как формализовать справедливость. В вопросе я уже упомянул рабочую аналогию. Выяснилось, что очевидными критериями справедливости является выбор потока, который генерировал меньше событий, чем другие, или принимал еще больше: кто генерировал потоки, ожидал меньше времени.
После этого было довольно тривиально формализовать желаемый результат с использованием денотационной семантики: код находится в GitHub
У меня не было времени для разработки денотационных комбинаторов, чтобы включить withStateMachine
из Bacon.js, поэтому следующим шагом было его переопределение в JavaScript с помощью Bacon.js. Полное управляемое решение доступно как сущность.
Идея состоит в том, чтобы создать конечный автомат с
- затраты на поток и очереди как состояние
- потоки и дополнительный поток обратной связи в качестве входных данных
Когда вывод всей системы будет возвращен, мы можем удалить событие следующего события, когда закончится предыдущий потоковый поток.
Для этого мне пришлось сделать немного уродливый rec
combinator
function rec(f) {
var bus = new Bacon.Bus();
var result = f(bus);
bus.plug(result);
return result;
}
Тип (EventStream a -> EventStream a) -> EventStream a
- тип напоминает другие комбинаторы рекурсии, например. fix
.
Это можно сделать с улучшенным общесистемным поведением, так как Bus нарушает распространение подписки. Мы должны работать над этим.
Вторая вспомогательная функция stateMachine
, которая берет массив потоков и превращает их в единый конечный автомат. По существу это .withStateMachine ∘ mergeAll ∘ zipWithIndex
.
function stateMachine(inputs, initState, f) {
var mapped = inputs.map(function (input, i) {
return input.map(function (x) {
return [i, x];
})
});
return Bacon.mergeAll(mapped).withStateMachine(initState, function (state, p) {
if (p.hasValue()) {
p = p.value();
return f(state, p[0], p[1]);
} else {
return [state, p];
}
});
}
Используя эти два помощника, мы можем написать не столь сложный честный планировщик:
function fairScheduler(streams, fn) {
var streamsCount = streams.length;
return rec(function (res) {
return stateMachine(append(streams, res), initialFairState(streamsCount), function (state, i, x) {
// console.log("FAIR: " + JSON.stringify(state), i, x);
// END event
if (i == streamsCount && x.end) {
var additionalCost = new Date().getTime() - x.started;
// add cost to input stream cost center
var updatedState = _.extend({}, state, {
costs: updateArray(
state.costs,
x.idx, function (cost) { return cost + additionalCost; }),
});
if (state.queues.every(function (q) { return q.length === 0; })) {
// if queues are empty, set running: false and don't emit any events
return [_.extend({}, updatedState, { running: false }), []];
} else {
// otherwise pick a stream with
// - non-empty queue
// - minimal cost
var minQueueIdx = _.chain(state.queues)
.map(function (q, i) {
return [q, i];
})
.filter(function (p) {
return p[0].length !== 0;
})
.sortBy(function (p) {
return state.costs[p[1]];
})
.value()[0][1];
// emit an event from that stream
return [
_.extend({}, updatedState, {
queues: updateArray(state.queues, minQueueIdx, function (q) { return q.slice(1); }),
running: true,
}),
[new Bacon.Next({
value: state.queues[minQueueIdx][0],
idx: minQueueIdx,
})],
];
}
} else if (i < streamsCount) {
// event from input stream
if (state.running) {
// if worker is running, just enquee the event
return [
_.extend({}, state, {
queues: updateArray(state.queues, i, function (q) { return q .concat([x]); }),
}),
[],
];
} else {
// if worker isn't running, start it right away
return [
_.extend({}, state, {
running: true,
}),
[new Bacon.Next({ value: x, idx: i})],
]
}
} else {
return [state, []];
}
})
.flatMapConcat(function (x) {
// map passed thru events,
// and append special "end" event
return fn(x).concat(Bacon.once({
end: true,
idx: x.idx,
started: new Date().getTime(),
}));
});
})
.filter(function (x) {
// filter out END events
return !x.end;
})
.map(".value"); // and return only value field
}
Остальная часть кода в сущности довольно прямолинейна.