Нужен эффективный кеш в памяти, который может обрабатывать запросы от 4k до 7k или записи в секунду
У меня есть эффективное приложение С#, которое получает 80 байт данных со скоростью от 5k до 10k записей в секунду на многопоточном CPU.
Мне нужно настроить a в кэше памяти, чтобы обнаруживать и фильтровать повторяющиеся записи, чтобы я мог их подавить от дальнейшего перемещения по трубопроводу.
Характеристики кэша (максимальные пороги)
- 80 байт данных
- 10 000 записей в секунду
- 60 секунд кеша = количество клавиш = 60 000
- (общая сумма 48000000 байт = 48 МБ)
- Идеальный размер кеша = 5 минут (или 240 Мб)
- Допустимый размер кэша времени выполнения bloat = 1 ГБ
Вопрос
Каков наилучший способ настройки кэша в памяти, словаря, хеш-таблицы, массива и т.д., который позволит наиболее эффективный поиск, очистку старых данных кеша и предотвращение истечения удаленных данных.
Я посмотрел кэш ASP.Net, System.Runtime.MemoryCache, но думаю, что мне нужно что-то более легкое и настроенное для достижения правильной пропускной способности. Я также рассматриваю System.Collections.Concurrent в качестве альтернативы и этот связанный документ.
Есть ли у кого-нибудь предложения относительно наилучшего подхода?
Ответы
Ответ 1
Помните, преждевременно не оптимизируйте!
Там может быть достаточно сжатый способ сделать это, не прибегая к неуправляемому коду, указателям и т.д.
Быстрый тест на моем старом, обычном ноутбуке показывает, что вы можете добавить 1 000 000 записей в HashSet
, удалив 100 000 записей в ~ 100 мс. Затем вы можете повторить это с теми же 1000000 значениями в ~ 60 мс. Это для работы с просто длинными - 80-байтовые структуры данных, очевидно, больше, но простой тест в порядке.
Мои рекомендации:
-
Внедрить 'lookup' и 'duplicate detection' в качестве HashSet
, что очень быстро для вставки, удаления и поиска.
-
Реализовать фактический буфер (который получает новые события и заканчивает старые) в качестве достаточно большого кругового/кольцевого буфера. Это позволит избежать выделения памяти и освобождения памяти и может добавлять записи на передний план и удалять их со спины. Вот некоторые полезные ссылки, включая один (второй), который описывает алгоритмы для истечения элементов в кеше:
Круговой буфер для .NET
Быстрый расчет минимального, максимального и среднего числа входящих номеров
Общий С# RingBuffer
Как бы вы закодировали эффективный циркулярный буфер в Java или С#
-
Обратите внимание, что круговой буфер еще лучше, если вы хотите, чтобы ваш кеш был ограничен числом элементов (например, 100 000), а не временем событий (например, последние 5 минут).
-
Когда элементы удаляются из буфера (который сначала начинается с конца), их также можно удалить из HashSet
. Не нужно делать обе структуры данных одинаковыми.
-
Избегайте многопоточности, пока вам это не понадобится! У вас есть "серийная" рабочая нагрузка. Если вы не знаете, что один из ваших потоков ЦП не может обрабатывать скорость, держите его в одном потоке. Это позволяет избежать конфликтов, блокировок, пропусков кэша процессора и других многопоточных головных болей, которые замедляют работу для рабочих нагрузок, которые не являются неловко параллельными. Мое главное предостережение здесь в том, что вы можете отключить "получение" событий в другой поток от их обработки.
-
Вышеуказанная рекомендация является основной идеей Staged event-driven architecture (SEDA), которая используется в качестве основы для высоких -результаты и стабильные системы управления событиями (например, очереди сообщений).
Вышеупомянутая конструкция может быть обернута чистой и попытаться достичь необработанной производительности, требуемой с минимальной сложностью. Это только обеспечивает достойную базовую линию, из которой эффективность теперь может быть извлечена и измерена.
( Примечание. Если вам нужна настойчивость для кеша, посмотрите Киотский кабинет. кеш, чтобы быть видимым для других пользователей или распространяться, посмотрите Redis.
Ответ 2
У меня нет ничего, чтобы поддержать это, но мне нравится практика на выходных:)
Чтобы решить проблему очистки, вы можете использовать круговой кеш, где последние значения перезаписывают самые старые (у вас точно не будет кеша в течение n минут), поэтому вам нужно только запомнить смещение, в котором была ваша последняя запись. Вы можете инициализировать кеш, заполнив его копиями первой записи, чтобы предотвратить совпадение записи с 0 с неинициализированными данными кэша.
Затем вы можете просто начать сопоставление с первым байтом, если запись не соответствует, пропустить эту запись, оставшуюся в байтах, и попытаться сопоставить следующую информацию до конца кеша.
Если записи содержат заголовок, за которым следуют данные, вы можете захотеть сопоставить их назад, чтобы увеличить скорость, с которой вы находите нечеткие данные.
Ответ 3
Вот пример, который будет работать с одним потоком. В коде используются два словаря для отслеживания данных. Один словарь используется для отслеживания записей за интервал hashDuplicateTracker
и второго словаря для устаревания определенных значений словаря HashesByDate
Ошибка: CheckDataFreshness имеет несколько ошибок, связанных с ElementAt()... Я работаю над этим.
Некоторые улучшения, которые я должен сделать
- Заменить оператор Linq ElementAt (x) на что-то еще
- Убедитесь, что CheckDataFreshness выполняется не чаще одного раза за интервал
Чтобы сделать это многопоточным
- Заменить словарь с ConcurrentDictionary для FrequencyOfMatchedHash, DecrementRecordHash,
- Получить отсортированную версию ConcurrentDictionary или использовать блокировки для
HashesByDate
public class FrequencyOfMatchedHash : Dictionary<int,int>
{
public void AddRecordHash(int hashCode)
{
if (this.ContainsKey(hashCode))
{
this[hashCode]++;
}
else
{
this.Add(hashCode, 1);
}
}
public void DecrementRecordHash(int hashCode)
{
if (this.ContainsKey(hashCode))
{
var val = this[hashCode];
if (val <= 1)
{
this.Remove(hashCode);
}
}
}
public override string ToString()
{
return this.Count + " records";
}
}
public class HashDuplicateTracker : Dictionary<int, int >
{
internal void AddRecord(int recordHash)
{
if (this.ContainsKey(recordHash))
{
this[recordHash]++;
}
else
{
this.Add(recordHash, 1);
}
}
}
public class HashesByDate : SortedDictionary<DateTime, FrequencyOfMatchedHash>
{
internal void AddRecord(DateTime dt, int recordHash)
{
if (this.ContainsKey(dt))
{
this[dt].AddRecordHash(recordHash);
}
else
{
var tmp = new FrequencyOfMatchedHash();
tmp.AddRecordHash(recordHash);
var tmp2 = new FrequencyOfMatchedHash();
tmp2.AddRecordHash(recordHash);
this.Add(dt, tmp);
}
}
}
public class DuplicateTracker
{
HashDuplicateTracker hashDuplicateTracker = new HashDuplicateTracker();
// track all the hashes by date
HashesByDate hashesByDate = new HashesByDate();
private TimeSpan maxRange;
private int average;
public DuplicateTracker(TimeSpan range)
{
this.maxRange = range;
}
public void AddRecordHash(DateTime dt, int recordHash)
{
if (hashesByDate.Count == 0)
{
hashDuplicateTracker.AddRecord(recordHash);
hashesByDate.AddRecord(dt, recordHash);
return;
}
else
{
// Cleanup old data
DateTime maxDate = hashesByDate.ElementAt(hashesByDate.Count - 1).Key;
DateTime oldestPermittedEntry = maxDate - maxRange;
if (dt >= oldestPermittedEntry)
try
{
hashDuplicateTracker.AddRecord(recordHash);
hashesByDate.AddRecord(dt, recordHash);
CheckDataFreshness(oldestPermittedEntry);
}
catch (ArgumentException e)
{
// An entry with the same key already exists.
// Increment count/freshness
hashesByDate[dt].AddRecordHash(recordHash);
hashDuplicateTracker[recordHash]++;
CheckDataFreshness(oldestPermittedEntry);
}
}
}
/// <summary>
/// This should be called anytime data is added to the collection
///
/// If threading issues are addressed, a more optimal solution would be to run this on an independent thread.
/// </summary>
/// <param name="oldestEntry"></param>
private void CheckDataFreshness(DateTime oldestEntry)
{
while (hashesByDate.Count > 0)
{
DateTime currentDate = hashesByDate.ElementAt(0).Key;
if (currentDate < oldestEntry)
{
var hashesToDecrement = hashesByDate.ElementAt(0).Value;
for (int i = 0; i < hashesToDecrement.Count; i++)
{
int currentHash = hashesToDecrement.ElementAt(0).Key;
int currentValue = hashesToDecrement.ElementAt(0).Value;
// decrement counter for hash
int tmpResult = hashDuplicateTracker[currentHash] - currentValue;
if (tmpResult == 0)
{
// prevent endless memory growth.
// For performance this might be deferred
hashDuplicateTracker.Remove(tmpResult);
}
else
{
hashDuplicateTracker[currentHash] = tmpResult;
}
// remove item
continue;
}
hashesByDate.Remove(currentDate);
}
else
break;
}
}
}