Ответ 1
O (log n) и O (n log n) предварительная обработка/пространство могут быть достигнуты путем нахождения и использования интервалов мажорирования со следующими свойствами:
- Для каждого значения из входного массива может быть один или несколько интервалов мажориты (или может быть нет, если элементы с этими значениями слишком разрежены, нам не нужны мажоритарные интервалы длины 1, поскольку они могут быть полезны только для запроса интервалы размера 1, которые лучше обрабатываются как особый случай).
- Если интервал запросов полностью лежит внутри одного из этих интервалов с мажоритом, соответствующее значение может быть элементом большинства этого интервала запросов.
- Если интервал большинства, полностью содержащий интервал запросов, соответствующее значение не может быть элементом большинства этого интервала запросов.
- Каждый элемент входного массива покрывается малыми интервалами O (log n).
Другими словами, единственная цель большинства интервалов состоит в том, чтобы предоставить кандидатам элемента O (log n) для любого интервала запроса.
Этот алгоритм использует следующие структуры данных:
- Список позиций для каждого значения из входного массива (
map<Value, vector<Position>>
). Альтернативноunordered_map
может использоваться здесь для повышения производительности (но нам нужно будет извлечь все ключи и отсортировать их так, чтобы структура № 3 была заполнена в правильном порядке). - Список большинства интервалов для каждого значения (
vector<Interval>
). - Структура данных для обработки запросов (
vector<small_map<Value, Data>>
). ГдеData
содержит два индекса соответствующего вектора из структуры # 1, указывающие на следующие/предыдущие позиции элементов с заданным значением. Обновление:. Благодаря @justhalf лучше хранить вData
совокупные частоты элементов с заданным значением.small_map
может быть реализован как отсортированный вектор пар - предварительная обработка будет добавлять элементы уже в отсортированном порядке, и запрос будет использоватьsmall_map
только для линейного поиска.
Препроцессирование:
- Сканировать массив ввода и нажать текущую позицию на соответствующий вектор в структуре # 1.
- Выполните шаги 3.. 4 для каждого вектора в структуре # 1.
- Преобразовать список позиций в список мажоритарных интервалов. Подробнее см. Ниже.
- Для каждого индекса входного массива, охватываемого одним из большинства интервалов, вставьте данные в соответствующий элемент структуры № 3: значение и позиции предыдущих/следующих элементов с этим значением (или кумулятивной частотой этого значения).
Query:
- Если длина интервала запроса равна 1, верните соответствующий элемент исходного массива.
- Для начальной точки интервала запроса получаем соответствующий элемент вектора 3-й структуры. Для каждого элемента карты выполните шаг 3. Сканируйте все элементы карты, соответствующие конечной точке интервала запроса, параллельно этой карте, чтобы разрешить сложность O (1) для шага 3 (вместо O (log log n)).
- Если карта, соответствующая конечной точке интервала запроса, содержит значение соответствия, вычислите
s3[stop][value].prev - s3[start][value].next + 1
. Если оно превышает половину интервала запроса, возвращайте значение. Если вместо следующих/предыдущих индексов используются кумулятивные частоты, вместо этого вычислитеs3[stop+1][value].freq - s3[start][value].freq
. - Если ничего не найдено на шаге 3, верните "Nothing".
Основная часть алгоритма получает большинство интервалов из списка позиций:
- Назначьте вес каждой позиции в списке:
number_of_matching_values_to_the_left - number_of_nonmatching_values_to_the_left
. - Отфильтруйте только веса в строго убывающем порядке (с жадностью) до массива "префикс" :
for (auto x: positions) if (x < prefix.back()) prefix.push_back(x);
. - Отфильтруйте только весы в строго возрастающем порядке (жадно, назад) до массива "суффикс" :
reverse(positions); for (auto x: positions) if (x > suffix.back()) suffix.push_back(x);
. - Сканируйте массивы "префикс" и "суффикс" вместе и найдите интервалы от каждого элемента "префикс" до соответствующего места в массиве "суффикс" и от каждого элемента "суффикса" до соответствующего места в массиве "префикс" . (Если все "суффиксные" элементы "веса" меньше заданного "префиксного" элемента или их позиция не правее от него, не генерируется интервал, если нет элемента "суффикса" с точно весом данного элемента "префикс" , получить ближайший элемент "суффикса" с большим весом и увеличить интервал с этой разницей в весе вправо).
- Объединить перекрывающиеся интервалы.
Свойства 1.. 3 для большинства интервалов гарантируются этим алгоритмом. Что касается свойства # 4, то единственный способ, который я мог бы представить, чтобы покрыть некоторый элемент с максимальным количеством интервалов мажоритарного значения, выглядит следующим образом: 11111111222233455666677777777
. Здесь элемент 4
покрывается интервалами 2 * log n
, поэтому это свойство, кажется, выполняется. См. Более формальное доказательство этого свойства в конце этого сообщения.
Пример:
Для входного массива "0 1 2 0 0 1 1 0" будут созданы следующие списки позиций:
value positions
0 0 3 4 7
1 1 5 6
2 2
Позиции для значения 0
получат следующие свойства:
weights: 0:1 3:0 4:1 7:0
prefix: 0:1 3:0 (strictly decreasing)
suffix: 4:1 7:0 (strictly increasing when scanning backwards)
intervals: 0->4 3->7 4->0 7->3
merged intervals: 0-7
Позиции для значения 1
получат следующие свойства:
weights: 1:0 5:-2 6:-1
prefix: 1:0 5:-2
suffix: 1:0 6:-1
intervals: 1->none 5->6+1 6->5-1 1->none
merged intervals: 4-7
Структура данных запроса:
positions value next prev
0 0 0 x
1..2 0 1 0
3 0 1 1
4 0 2 2
4 1 1 x
5 0 3 2
...
Запрос [0,4]:
prev[4][0]-next[0][0]+1=2-0+1=3
query size=5
3>2.5, returned result 0
Запрос [2,5]:
prev[5][0]-next[2][0]+1=2-1+1=2
query size=4
2=2, returned result "none"
Обратите внимание, что нет попытки проверить элемент "1", потому что его основной интервал не включает ни один из этих интервалов.
Доказательство свойства # 4:
Интервалы большинства построены таким образом, что строго более 1/3 всех их элементов имеют соответствующее значение. Это отношение ближе всего к 1/3 для подмассивов типа any*(m-1) value*m any*m
, например, 01234444456789
.
Чтобы сделать это доказательство более очевидным, мы могли бы представлять каждый интервал как точку в 2D: каждую возможную отправную точку, представленную горизонтальной осью, и каждую возможную конечную точку, представленную вертикальной осью (см. диаграмму ниже).
Все допустимые интервалы расположены на диагонали или над ней. Белый прямоугольник представляет все интервалы, охватывающие некоторый элемент массива (представленный как единичный размер в нижнем правом углу).
Позвольте покрыть этот белый прямоугольник квадратами размером 1, 2, 4, 8, 16,..., используя один и тот же правый нижний угол. Это делит белую область на области O (log n), похожие на желтую (и один квадрат размера 1, содержащий один интервал размера 1, который игнорируется этим алгоритмом).
Позвольте подсчитать, сколько интервалов большинства может быть помещено в желтую область. Один интервал (расположенный в ближайшем к диагональном углу) занимает 1/4 элементов, принадлежащих интервалу в самом дальнем от диагонального угла (и этот самый большой интервал содержит все элементы, принадлежащие любому интервалу в желтой области). Это означает, что минимальный интервал содержит строго более 1/12 значений, доступных для всей желтой области. Поэтому, если мы попытаемся поместить 12 интервалов в желтую область, у нас недостаточно элементов для разных значений. Таким образом, желтая область не может содержать более 11 основных интервалов. И белый прямоугольник не может содержать больше, чем 11 * log n
. Доказательство завершено.
11 * log n
является переоценкой. Как я уже говорил ранее, трудно представить более чем 2 * log n
интервалы большинства, охватывающие некоторый элемент. И даже это значение намного больше среднего числа охватывающих большинства интервалов.
Реализация С++ 11. Посмотрите на ideone или здесь:
#include <iostream>
#include <vector>
#include <map>
#include <algorithm>
#include <functional>
#include <random>
constexpr int SrcSize = 1000000;
constexpr int NQueries = 100000;
using src_vec_t = std::vector<int>;
using index_vec_t = std::vector<int>;
using weight_vec_t = std::vector<int>;
using pair_vec_t = std::vector<std::pair<int, int>>;
using index_map_t = std::map<int, index_vec_t>;
using interval_t = std::pair<int, int>;
using interval_vec_t = std::vector<interval_t>;
using small_map_t = std::vector<std::pair<int, int>>;
using query_vec_t = std::vector<small_map_t>;
constexpr int None = -1;
constexpr int Junk = -2;
src_vec_t generate_e()
{ // good query length = 3
src_vec_t src;
std::random_device rd;
std::default_random_engine eng{rd()};
auto exp = std::bind(std::exponential_distribution<>{0.4}, eng);
for (int i = 0; i < SrcSize; ++i)
{
int x = exp();
src.push_back(x);
//std::cout << x << ' ';
}
return src;
}
src_vec_t generate_ep()
{ // good query length = 500
src_vec_t src;
std::random_device rd;
std::default_random_engine eng{rd()};
auto exp = std::bind(std::exponential_distribution<>{0.4}, eng);
auto poisson = std::bind(std::poisson_distribution<int>{100}, eng);
while (int(src.size()) < SrcSize)
{
int x = exp();
int n = poisson();
for (int i = 0; i < n; ++i)
{
src.push_back(x);
//std::cout << x << ' ';
}
}
return src;
}
src_vec_t generate()
{
//return generate_e();
return generate_ep();
}
int trivial(const src_vec_t& src, interval_t qi)
{
int count = 0;
int majorityElement = 0; // will be assigned before use for valid args
for (int i = qi.first; i <= qi.second; ++i)
{
if (count == 0)
majorityElement = src[i];
if (src[i] == majorityElement)
++count;
else
--count;
}
count = 0;
for (int i = qi.first; i <= qi.second; ++i)
{
if (src[i] == majorityElement)
count++;
}
if (2 * count > qi.second + 1 - qi.first)
return majorityElement;
else
return None;
}
index_map_t sort_ind(const src_vec_t& src)
{
int ind = 0;
index_map_t im;
for (auto x: src)
im[x].push_back(ind++);
return im;
}
weight_vec_t get_weights(const index_vec_t& indexes)
{
weight_vec_t weights;
for (int i = 0; i != int(indexes.size()); ++i)
weights.push_back(2 * i - indexes[i]);
return weights;
}
pair_vec_t get_prefix(const index_vec_t& indexes, const weight_vec_t& weights)
{
pair_vec_t prefix;
for (int i = 0; i != int(indexes.size()); ++i)
if (prefix.empty() || weights[i] < prefix.back().second)
prefix.emplace_back(indexes[i], weights[i]);
return prefix;
}
pair_vec_t get_suffix(const index_vec_t& indexes, const weight_vec_t& weights)
{
pair_vec_t suffix;
for (int i = indexes.size() - 1; i >= 0; --i)
if (suffix.empty() || weights[i] > suffix.back().second)
suffix.emplace_back(indexes[i], weights[i]);
std::reverse(suffix.begin(), suffix.end());
return suffix;
}
interval_vec_t get_intervals(const pair_vec_t& prefix, const pair_vec_t& suffix)
{
interval_vec_t intervals;
int prev_suffix_index = 0; // will be assigned before use for correct args
int prev_suffix_weight = 0; // same assumptions
for (int ind_pref = 0, ind_suff = 0; ind_pref != int(prefix.size());)
{
auto i_pref = prefix[ind_pref].first;
auto w_pref = prefix[ind_pref].second;
if (ind_suff != int(suffix.size()))
{
auto i_suff = suffix[ind_suff].first;
auto w_suff = suffix[ind_suff].second;
if (w_pref <= w_suff)
{
auto beg = std::max(0, i_pref + w_pref - w_suff);
if (i_pref < i_suff)
intervals.emplace_back(beg, i_suff + 1);
if (w_pref == w_suff)
++ind_pref;
++ind_suff;
prev_suffix_index = i_suff;
prev_suffix_weight = w_suff;
continue;
}
}
// ind_suff out of bounds or w_pref > w_suff:
auto end = prev_suffix_index + prev_suffix_weight - w_pref + 1;
// end may be out-of-bounds; that OK if overflow is not possible
intervals.emplace_back(i_pref, end);
++ind_pref;
}
return intervals;
}
interval_vec_t merge(const interval_vec_t& from)
{
using endpoints_t = std::vector<std::pair<int, bool>>;
endpoints_t ep(2 * from.size());
std::transform(from.begin(), from.end(), ep.begin(),
[](interval_t x){ return std::make_pair(x.first, true); });
std::transform(from.begin(), from.end(), ep.begin() + from.size(),
[](interval_t x){ return std::make_pair(x.second, false); });
std::sort(ep.begin(), ep.end());
interval_vec_t to;
int start; // will be assigned before use for correct args
int overlaps = 0;
for (auto& x: ep)
{
if (x.second) // begin
{
if (overlaps++ == 0)
start = x.first;
}
else // end
{
if (--overlaps == 0)
to.emplace_back(start, x.first);
}
}
return to;
}
interval_vec_t get_intervals(const index_vec_t& indexes)
{
auto weights = get_weights(indexes);
auto prefix = get_prefix(indexes, weights);
auto suffix = get_suffix(indexes, weights);
auto intervals = get_intervals(prefix, suffix);
return merge(intervals);
}
void update_qv(
query_vec_t& qv,
int value,
const interval_vec_t& intervals,
const index_vec_t& iv)
{
int iv_ind = 0;
int qv_ind = 0;
int accum = 0;
for (auto& interval: intervals)
{
int i_begin = interval.first;
int i_end = std::min<int>(interval.second, qv.size() - 1);
while (iv[iv_ind] < i_begin)
{
++accum;
++iv_ind;
}
qv_ind = std::max(qv_ind, i_begin);
while (qv_ind <= i_end)
{
qv[qv_ind].emplace_back(value, accum);
if (iv[iv_ind] == qv_ind)
{
++accum;
++iv_ind;
}
++qv_ind;
}
}
}
void print_preprocess_stat(const index_map_t& im, const query_vec_t& qv)
{
double sum_coverage = 0.;
int max_coverage = 0;
for (auto& x: qv)
{
sum_coverage += x.size();
max_coverage = std::max<int>(max_coverage, x.size());
}
std::cout << " size = " << qv.size() - 1 << '\n';
std::cout << " values = " << im.size() << '\n';
std::cout << " max coverage = " << max_coverage << '\n';
std::cout << " avg coverage = " << sum_coverage / qv.size() << '\n';
}
query_vec_t preprocess(const src_vec_t& src)
{
query_vec_t qv(src.size() + 1);
auto im = sort_ind(src);
for (auto& val: im)
{
auto intervals = get_intervals(val.second);
update_qv(qv, val.first, intervals, val.second);
}
print_preprocess_stat(im, qv);
return qv;
}
int do_query(const src_vec_t& src, const query_vec_t& qv, interval_t qi)
{
if (qi.first == qi.second)
return src[qi.first];
auto b = qv[qi.first].begin();
auto e = qv[qi.second + 1].begin();
while (b != qv[qi.first].end() && e != qv[qi.second + 1].end())
{
if (b->first < e->first)
{
++b;
}
else if (e->first < b->first)
{
++e;
}
else // if (e->first == b->first)
{
// hope this doesn't overflow
if (2 * (e->second - b->second) > qi.second + 1 - qi.first)
return b->first;
++b;
++e;
}
}
return None;
}
int main()
{
std::random_device rd;
std::default_random_engine eng{rd()};
auto poisson = std::bind(std::poisson_distribution<int>{500}, eng);
int majority = 0;
int nonzero = 0;
int failed = 0;
auto src = generate();
auto qv = preprocess(src);
for (int i = 0; i < NQueries; ++i)
{
int size = poisson();
auto ud = std::uniform_int_distribution<int>(0, src.size() - size - 1);
int start = ud(eng);
int stop = start + size;
auto res1 = do_query(src, qv, {start, stop});
auto res2 = trivial(src, {start, stop});
//std::cout << size << ": " << res1 << ' ' << res2 << '\n';
if (res2 != res1)
++failed;
if (res2 != None)
{
++majority;
if (res2 != 0)
++nonzero;
}
}
std::cout << "majority elements = " << 100. * majority / NQueries << "%\n";
std::cout << " nonzero elements = " << 100. * nonzero / NQueries << "%\n";
std::cout << " queries = " << NQueries << '\n';
std::cout << " failed = " << failed << '\n';
return 0;
}
Связанные работы:
Как указано в другом ответе на этот вопрос, есть другая работа, в которой эта проблема уже решена: "Максимальное количество пробелов в постоянном времени и линейном пространстве" С. Дюрора, М. Он, я Манро, ПК Николсон, М. Scala.
Алгоритм, представленный в этой статье, имеет лучшие асимптотические сложности для времени запроса: O(1)
вместо O(log n)
и для пробела: O(n)
вместо O(n log n)
.
Лучшая сложность пространства позволяет этому алгоритму обрабатывать большие наборы данных (по сравнению с алгоритмом, предложенным в этом ответе). Меньшая память, необходимая для предварительно обработанных данных и более регулярный шаблон доступа к данным, скорее всего, позволит этому алгоритму быстрее обрабатывать данные. Но время запроса не так просто...
Предположим, что у нас есть исходные данные, наиболее благоприятные для алгоритма из статьи: n = 1000000000 (трудно представить систему с объемом памяти более 10..30 гигабайт, в 2013 году).
Алгоритм, предложенный в этом ответе, должен обрабатывать до 120 (или 2 запросов границ * 2 * log n) элементов для каждого запроса. Но он выполняет очень простые операции, похожие на линейный поиск. И он последовательно обращается к двум смежным областям памяти, поэтому он не похож на кеш.
Алгоритм из бумаги должен выполнять до 20 операций (или 2 границы запросов * 5 кандидатов * 2 уровня вейвлет-деревьев) для каждого запроса. Это в 6 раз меньше. Но каждая операция сложнее. Каждый запрос для краткого представления самих счетчиков бит содержит линейный поиск (что означает 20 линейных поисков вместо одного). Хуже всего, каждая такая операция должна иметь доступ к нескольким независимым областям памяти (если размер запроса и, следовательно, размер в четыре раза невелик), запрос является недружественным. Это означает, что каждый запрос (в то время как операция с постоянным временем) довольно медленный, вероятно, медленнее, чем в предложенном здесь алгоритме. Если мы уменьшим размер входного массива, увеличьте вероятность того, что предложенный здесь алгоритм будет быстрее.
Практическим недостатком алгоритма в работе является вейвлет-дерево и реализация сжатого битового счетчика. Реализация их с нуля может занять много времени. Использование ранее существующей реализации не всегда удобно.