Эффективный уникальный по неупорядоченным элементам
Я хотел бы улучшить скорость работы моего метода подсчета ящиков, который я использую в фрактальном анализе.
Об этой задаче
У меня есть поток ints (около n = 2 ^ 24 long), и я должен рассчитать, сколько разных значений в потоке. Там не допускается верхняя граница и отрицательные значения (но количество отрицательных значений, возможно, меньше, чем sqrt (n)). Там небольшая корреляция в потоке, то есть фактический элемент, вероятно, будет равен или не слишком далеко от предыдущего. Во многих случаях у меня много одинаковых значений во всем диапазоне.
Методы, которые я уже пробовал
вектор, сортировка, uniqe
Моя первая реализация заключалась в том, чтобы поместить все элементы в вектор, а затем я применил std:: sort, а затем std:: unique.
Сложность этого метода O (n * log (n)), и я не думаю, что любой другой алгоритм может быть быстрее вообще, когда дело доходит до масштабирования. Но я уверен, что код должен существовать быстрее, чем это, но с теми же свойствами масштабирования - так быстрее, только с постоянным коэффициентом. Причины таковы:
- У меня есть много равных значений, хранящихся в векторе, поэтому сортировка не так эффективна, вектор чрезмерно большой
- В этом методе я не использую информацию о том, что фактический элемент и предыдущие близки друг к другу.
- Мне не нужна информация о том, каковы эти уникальные значения, мне нужно только количество различных элементов
набор, вставка, размер
Чтобы исключить первую точку неэффективности, я помещаю все элементы в набор с помощью set:: insert. И в конце я подсчитал количество элементов с set:: size.
Я ожидал, что этот код должен быть быстрее, потому что в наборе хранятся только уникальные значения, и ему не нужно сравнивать новые элементы с большим количеством равных значений. Но, к сожалению, этот метод был в 1,5 раза медленнее предыдущего.
set, emplace_hint, размер
Чтобы исключить вторую недействительную точку, я не только помещаю все элементы в набор, но и с помощью функции set:: emplace_hint. И каждый раз давал подсказку, чтобы добавить новый элемент рядом с предыдущим. И в конце я попросил размер набора с набором:: size
Я ожидал, что этот код будет быстрее предыдущего, потому что я могу угадать значение нового элемента, и это лучше, чем ничего. Но, к сожалению, этот метод был в 5 раз медленнее предыдущего.
Вопрос
Можно ли предложить какой-либо эффективный метод, который может вычислять количество различных элементов (ints) в потоке? Можете ли вы оптимизировать код, если известно, что
- существует измеримая корреляция в числах
- некоторые цифры отображаются повторно
Целевая архитектура - современный процессор x86 или x86-64 PC (с sse, sse2), и подходит только один код потока. Я предпочитаю не использовать boost, но С++ 11.
Решение
Во-первых, спасибо за многие предложения, терпение и понимание, и я сожалею, что не могу проверить все методы, и я также уверен, что эффективность зависит от деталей потока ints, что у меня нет предоставлена. Однако я разделяю результаты, полученные с помощью компилятора VS2013. (Код тестируется под gcc4.7, но не измеряется.) Этот вопрос стоит гораздо больше времени для исследования, но у меня есть решение, соответствующее моим потребностям.
![Time statistics for different methods]()
О методах:
- вектор bool: решение BitVector от Dieter Lücking
- двоичный поиск: метод, предложенный Tony D
- неупорядоченный набор: просто введите все элементы в std:: unordered_set, а затем спросите количество его элементов, как предложено Ixanezis
- векторная вставка отсортирована: используя Дитер Люкинг Сортированный векторный подход
- set insert: метод, описанный в форме вопроса
- radix sort: предложение Ixanezis, используя популярный алгоритм сортировки по вектору
- установить emplace hint: используя std:: emplace_hint, как описано в форме вопроса
Ответы
Ответ 1
Просто сравнивая разные подходы (не учитывая сортировку по методу radix):
#include <algorithm>
#include <deque>
#include <iostream>
#include <unordered_set>
#include <set>
#include <vector>
#include <chrono>
template <template <typename ...> class Container, typename T, typename ... A, typename Comp>
inline bool insert_sorted(Container<T, A...>& container, T const& e, Comp const& comp) {
auto const it = std::lower_bound(container.begin(), container.end(), e, comp);
if (it != container.end() and not comp(e, *it)) { return false; }
container.insert(it, e);
return true;
}
template <template <typename ...> class Container, typename T, typename ... A>
inline bool insert_sorted(Container<T, A...>& container, T const& e) {
return insert_sorted(container, e, std::less<T>{});
}
int main() {
using namespace std::chrono;
typedef std::vector<int> data_type;
const unsigned Size = unsigned(1) << 24;
const unsigned Limit = 1000;
data_type data;
data.reserve(Size);
for(unsigned i = 0; i < Size; ++i) {
int value = double(Limit) * std::rand() / RAND_MAX - 0.1;
data.push_back(value);
while(i < Size - 1 && rand() < RAND_MAX * 0.25) {
data.push_back(value);
++i;
}
}
std::cout
<< "Data\n"
<< "====\n"
<< " Size of data: " << Size << '\n';
std::cout
<< "Unorderd Set\n"
<< "============\n";
{
auto start = system_clock::now();
typedef std::unordered_set<int> set_type;
set_type set;
unsigned i = 0;
for( ; i < Size - 1; ++i) {
// Ignore a range of equal values
while(data[i] == data[i+1]) ++i;
set.insert(data[i]);
}
if(i < Size)
set.insert(data[i]);
auto stop = system_clock::now();
std::cout
<< "Number of different elements: "
<< set.size() << '\n';
std::cout
<< " Timing: "
<< duration_cast<duration<double>>(stop - start).count()
<< '\n';
}
std::cout
<< "Set\n"
<< "===\n";
{
auto start = system_clock::now();
typedef std::set<int> set_type;
set_type set;
unsigned i = 0;
for( ; i < Size - 1; ++i) {
// Ignore a range of equal values
while(data[i] == data[i+1]) ++i;
set.insert(data[i]);
}
if(i < Size)
set.insert(data[i]);
auto stop = system_clock::now();
std::cout
<< "Number of different elements: "
<< set.size() << '\n';
std::cout
<< " Timing: "
<< duration_cast<duration<double>>(stop - start).count()
<< '\n';
}
std::cout
<< "Sorted Vector\n"
<< "=============\n";
{
auto start = system_clock::now();
typedef std::vector<int> set_type;
set_type set;
unsigned i = 0;
for( ; i < Size - 1; ++i) {
// Ignore a range of equal values
while(data[i] == data[i+1]) ++i;
insert_sorted(set, data[i]);
}
if(i < Size)
insert_sorted(set, data[i]);
auto stop = system_clock::now();
std::cout
<< "Number of different elements: "
<< set.size() << '\n';
std::cout
<< " Timing: "
<< duration_cast<duration<double>>(stop - start).count()
<< '\n';
}
std::cout
<< "BitVector\n"
<< "=========\n";
{
auto start = system_clock::now();
typedef std::vector<bool> set_type;
set_type set(Limit);
unsigned i = 0;
unsigned elements = 0;
for( ; i < Size; ++i) {
if( ! set[data[i]]) {
set[data[i]] = true;
++elements;
}
}
auto stop = system_clock::now();
std::cout
<< "Number of different elements: "
<< elements << '\n';
std::cout
<< " Timing: "
<< duration_cast<duration<double>>(stop - start).count()
<< '\n';
}
std::cout
<< "Sorted Data\n"
<< "===========\n";
{
auto start = system_clock::now();
std::sort(data.begin(), data.end());
auto last = std::unique(data.begin(), data.end());
auto stop = system_clock::now();
std::cout
<< "Number of different elements: "
<< last - data.begin() << '\n';
std::cout
<< " Timing: "
<< duration_cast<duration<double>>(stop - start).count()
<< '\n';
}
return 0;
}
Скомпилированный с g++ -std = С++ 11 -O3 дает:
Data
====
Size of data: 16777216
Unorderd Set
============
Number of different elements: 1000
Timing: 0.269752
Set
===
Number of different elements: 1000
Timing: 1.23478
Sorted Vector
=============
Number of different elements: 1000
Timing: 1.13783
BitVector
=========
Number of different elements: 1000
Timing: 0.038408
Sorted Data
===========
Number of different elements: 1000
Timing: 1.32827
Следовательно, если память не проблема или диапазон чисел ограничен, бит является лучшим выбором. В противном случае unordered_set является хорошим.
Ответ 2
Поскольку вы работаете только с ограниченным диапазоном целочисленных чисел, здесь можно эффективно использовать алгоритм radix sort, уменьшая часть log(N)
сложности. Вы можете выбрать любую действительно быструю реализацию где-то в Интернете. Некоторые из них требуют поддержки SSE, другие - многопоточные или даже кодированные для работы на GPU.
Если у вас есть возможность использовать boost::unordered_set
или C++11
std::unordered_set
, то ваш второй подход может быть легко изменен, если вы его используете, что также приводит к алгоритму линейной сложности. Однако, если у вас есть хотя бы несколько миллионов номеров в потоке, я считаю, что первый метод будет быстрее.
Ответ 3
Предполагая, что 32-разрядный int
s, худший вариант сценария состоит в том, что вам нужно 2 ^ 32 бит для отслеживания видимого/неизвестного состояния каждого числа, которое вы можете увидеть. Это 4 миллиарда бит, или 512 миллионов байт - 512 мегабайт - не запретительно для современного настольного компьютера. Вы можете в основном индексировать байт [n/8]
в массив, а затем побитовое - или или - или с помощью 1 << (n % 8)
, чтобы установить или проверить состояние увиденного числа. Поскольку вы говорите, что числа, близкие к входным данным, имеют тенденцию быть близкими друг к другу по значению, использование кэша должно быть довольно хорошим. Вы можете проверить только что просмотренные числа и обойти обработку битовых массивов.
Если вам известно, что у вас есть менее 2 ^ 32 различных номеров для отслеживания на входе, вы должны, естественно, уменьшить размер бит, установленного соответственно. (Просто прочитайте свой комментарий "Отрицательные числа разрешены, но это очень редко (вероятность меньше 1/n)". - В этом случае вы можете использовать set
для отрицательных чисел и использовать половину памяти для положительных результатов).
(Если вас беспокоит окончательная итерация на многих страницах памяти, которые могут вообще не иметь битов, вы можете создать дополнительный индекс "грязной страницы" с бит на страницу для руководства такой итерацией, но с учетом количества ввода, если этот вход сильно распространен в int
числовом диапазоне, который может быть незначительным или даже контрпродуктивным.)
EDIT/- дополнительные пояснения, как указано в комментарии. Во-первых, реализация:
template <size_t N>
class Tracker
{
public:
Tracker() { std::fill_n(&data_[0], words, 0); }
void set(int n) { data_[n / 32] |= (1u << (n % 8)); }
bool test(int n) const { return data_[n / 32] &= (1u << (n % 8)); }
template <typename Visitor>
void visit(Visitor& visitor)
{
for (size_t word = 0, word_n = 0; word < words; ++word, word_n += 32)
if (data_[word])
for (uint32_t n = 0, value = 1; n < 32; ++n, value *= 2)
if (data_[word] & value)
visitor(word_n + n);
}
private:
static const int words = N / 32 + (N % 32 ? 1 : 0);
uint32_t data_[words];
};
Использование:
Tracker<size_t(std::numeric_limits<int>::max()) + 1> my_tracker;
int n;
while (std::cin >> n)
my_tracker.set(n);
my_tracker.visit([](unsigned n) { std::cout << n << '\n'; });
(не проверено... возможно, несколько небольших проблем)
Можете ли вы объяснить свой ответ более подробно?
Все это создает то, что концептуально просто массив bool have_seen[]
, который может быть напрямую проиндексирован любым интересующим вас целым числом: вы просто просматриваете входные данные для булевых элементов по индексам, которые вы видите во входном значении true. Если вы установили что-то истинное два или более раз - кого это волнует? Чисто сохранить память и получить скорость для поиска заданных битов (и, например, fill/clear), она вручную упаковывает значения bool
в биты в более крупном интегральном типе данных.
Я думаю, что могу потрудиться с отрицательными значениями, потому что могу вычислить самые большие и наименьшие значения для общей стоимости O (n).
Ну, может быть, но может быть быстрее или медленнее сделать два прохода. При подходе, который я документировал, вам не нужно дважды перебирать данные... вы можете подготовить ответ во время первой итерации. Конечно, если начальная итерация выполняется быстро (например, с SSD-носителя), и вы достаточно плотно поместили в память, что хотите сделать фактический анализ только для фактического диапазона данных, а затем идите.
Это также облегчает сжатие ширины int в правильном значении, поэтому более половины страниц будет непустым.
Не уверен, что вы имеете в виду.
Ответ 4
стандартная структура данных для этой задачи - это хеш-набор, aka std::unordered_set
в stl (btw, google dense_hash_set обычно выполняет немного лучше)
вам не нужны уникальные значения, которые нужно отсортировать, поэтому std::set
для вашего прецедента является незаметно медленным.
Как и другие, вы также можете использовать битвектор, если ваш юниверс (возможные значения) не слишком велик. Если у вас есть отрицательные значения, вы можете просто сбрасывать в unsigned и относиться к ним как к действительно большим числам.
Ответ 5
Помогло бы просто сравнить текущий элемент с предыдущим, прежде чем передавать его методу подсчета, что бы это ни было?
Или сохранить небольшой/быстрый кеш из, например, последних 10 элементов, для удаления дубликатов коротких ссылок?
Или делать подсчет в партиях (рассчитывать на последовательность из 100 с временными счетчиками, затем сливаться с предыдущими счетами)?
Ответ 6
Я утверждаю, что пытаюсь использовать контейнеры STL (set
, unordered_set
,...), но, к сожалению, вы платите за них штраф: их стабильность в памяти и требования к меньшим итераторам потребовали, чтобы они были реализованы как node - основанные на контейнерах, с огромными (относительно говоря) служебными данными для каждого элемента.
Вместо этого я бы предложил два метода:
- Придерживайтесь
vector
(только эффективный с низкой долей уникальных элементов)
- Реализация хэширования Robin-Hood
- Использовать вероятностный подход
Отсортированный вектор
Для подхода vector
: ничто не мешает вам сохранить vector
, отсортированную по мере вставки, и, таким образом, избежать вставки повторяющихся элементов. Пример здесь:
#include <iostream>
#include <algorithm>
#include <vector>
template <typename T, typename Comp>
void insert_sorted(std::vector<T>& vec, T const& e, Comp const& comp) {
auto const it = std::lower_bound(vec.begin(), vec.end(), e, comp);
if (it != vec.end() and not comp(e, *it)) { return; }
vec.insert(it, e);
}
template <typename T>
void insert_sorted(std::vector<T>& vec, T const& e) {
insert_sorted(vec, e, std::less<T>{});
}
int main() {
int const array[] = { 4, 3, 6, 2, 3, 6, 8, 4 };
std::vector<int> vec;
for (int a: array) {
insert_sorted(vec, a);
}
std::cout << vec.size() << ":";
for (int a: vec) { std::cout << " " << a; }
std::cout << "\n";
return 0;
}
Отображается: 5: 2 3 4 6 8
.
Это все равно O (n log n), очевидно, однако для этого требуется меньше памяти:
- меньше памяти, больше вектор находится в кеше
- предыдущий элемент, близкий к его преемнику, используется бинарным поиском
lower_bound
, проходящим через почти те же строки кэша
Это должно быть уже значительное улучшение.
Примечание: было указано, что вставка в середине вектора неэффективна. Конечно, поскольку это включает в себя перетасовку половины уже существующих элементов (в среднем). Тем не менее benchmark предполагает, что он может превзойти текущее решение vector
, когда количество уникальных элементов невелико (0.1% - это мой тест).
Робин Гуд Хешинг
Более активное участие, но хеширование Робин Гуда имеет очень хорошие характеристики, и таким образом, производительность. Наиболее заметно, что он реализован поверх одного динамического массива (например, vector
), поэтому демонстрирует хорошую локальность памяти.
Ржавчина переключилась на хеширование Робин Гуда для реализации хэш-таблиц по умолчанию и очень довольна им.
Примечание. Из быстрых тестов даже unordered_set
удаляет штаны с магазина и сохраняет их, а наивная хэш-таблица с открытым адресом на 25% быстрее.
Вероятностный подход
Для очень больших проблем очень хорошо известен алгоритм HyperLogLog. Недавно он был реализован в Redis.
Он имеет очень хорошее соотношение используемой памяти и частоты ошибок и относительно просто реализовать (особенно после кода Antirez).
Бросить больше аппаратного обеспечения на проблему
Обратите внимание, что это неловко параллельная проблема, поэтому вы можете легко иметь несколько потоков:
- выбор пачки идентификаторов из потока (скажем: 2 ** 10 сразу)
- объединение их в локальный набор уникальных идентификаторов (независимо от его реализации)
- цикл до тех пор, пока поток не станет пустым
- и, наконец, объединить результаты.
И вы можете получить ускорение близко к числу потоков (есть некоторые накладные расходы, очевидно).
Примечание. Не случайно оба подхода легко адаптируются с помощью этого метода, и оба поддерживают эффективные слияния.