Кэширование асинхронных операций
Я ищу элегантный способ кэширования результатов моих асинхронных операций.
Сначала у меня был синхронный метод:
public String GetStuff(String url)
{
WebRequest request = WebRequest.Create(url);
using (var response = request.GetResponse())
using (var sr = new StreamReader(response.GetResponseStream()))
return sr.ReadToEnd();
}
Затем я сделал асинхронным:
public async Task<String> GetStuffAsync(String url)
{
WebRequest request = WebRequest.Create(url);
using (var response = await request.GetResponseAsync())
using (var sr = new StreamReader(response.GetResponseStream()))
return await sr.ReadToEndAsync();
}
Затем я решил, что я должен кэшировать результаты, поэтому мне не нужно часто запрашивать вне этого:
ConcurrentDictionary<String, String> _cache = new ConcurrentDictionary<String, String>();
public async Task<String> GetStuffAsync(String url)
{
return _cache.GetOrAdd(url, await GetStuffInternalAsync(url));
}
private async Task<String> GetStuffInternalAsync(String url)
{
WebRequest request = WebRequest.Create(url);
using (var response = await request.GetResponseAsync())
using (var sr = new StreamReader(response.GetResponseStream()))
return await sr.ReadToEndAsync();
}
Затем я прочитал статью (смотрел видео) о том, как лучше кэшировать Task<T>
, потому что их создание дороговато:
ConcurrentDictionary<String, Task<String>> _cache = new ConcurrentDictionary<String, Task<String>>();
public Task<String> GetStuffAsync(String url)
{
return _cache.GetOrAdd(url, GetStuffInternalAsync(url));
}
private async Task<String> GetStuffInternalAsync(String url)
{
WebRequest request = WebRequest.Create(url);
using (var response = await request.GetResponseAsync())
using (var sr = new StreamReader(response.GetResponseStream()))
return await sr.ReadToEndAsync();
}
И теперь проблема заключается в том, что если запрос не сработает (например, HTTP 401), кэш будет содержать сбой Task<String>
, и мне придется использовать reset приложение, потому что будет невозможно отправить запрос повторно.
Есть ли элегантный способ использования ConcurrentDictionary<T1,T2>
для кэширования только успешных задач и по-прежнему иметь атомное поведение?
Ответы
Ответ 1
Прежде всего, оба подхода неправильны, потому что они не сохраняют вас ни на какие запросы (хотя второй, по крайней мере, экономит ваше время).
Ваш первый код (тот, у которого await
) делает следующее:
- Сделайте запрос.
- Дождитесь завершения запроса.
- Если в кэше уже был результат, проигнорируйте результат запроса.
Второй код удаляет шаг 2, поэтому он быстрее, но вы все еще делаете много ненужных запросов.
Вместо этого вам следует использовать перегрузку GetOrAdd()
, которая принимает делегат:
public Task<String> GetStuffAsync(String url)
{
return _cache.GetOrAdd(url, GetStuffInternalAsync);
}
Это не полностью исключает возможность игнорирования запросов, но делает их гораздо менее вероятными. (Для этого вы можете попробовать отменить запросы, которые, как вы знаете, игнорируются, но я не думаю, что это стоит усилий здесь.)
Теперь на ваш реальный вопрос. Я думаю, что вам следует использовать метод AddOrUpdate()
. Если значение еще не указано, добавьте его. Если он там, замените его, если он неисправен:
public Task<String> GetStuffAsync(String url)
{
return _cache.AddOrUpdate(
url, GetStuffInternalAsync, (u, task) =>
{
if (task.IsCanceled || task.IsFaulted)
return GetStuffInternalAsync(u);
return task;
});
}
Ответ 2
Это действительно разумно (и в зависимости от вашего дизайна и производительности, что важно), чтобы сохранить эти неудавшиеся задачи как Negative Cache. В противном случае, если a url
всегда терпит неудачу, использование его снова и снова поражает точку использования кэша вообще.
Что вам нужно, так это время, чтобы время от времени очищать кеш. Самый простой способ - установить таймер, заменяющий экземпляр ConcurrentDictionarry
. Более надежным решением является создание собственного LruDictionary
или чего-то подобного.
Ответ 3
Здесь можно кэшировать результаты асинхронных операций, которые не гарантируют пропусков в кэше.
Как упоминается в принятых ответах ответа, если один и тот же URL-адрес запрашивается много раз в цикле (в зависимости от SynchronizationContext) или из нескольких потоков, веб-запрос будет продолжать получать сообщения, пока не будет получен ответ, который будет кэшироваться, на в результате чего кеш начнет использоваться.
Метод ниже создает объект SemaphoreSlim для каждого уникального ключа. Это предотвратит запуск многократной работы async для одного и того же ключа, одновременно позволяя ему работать одновременно для разных ключей. Очевидно, что накладные расходы хранят объекты SemaphoreSlim, чтобы предотвратить пропуски кэша, поэтому, возможно, это не стоит в зависимости от варианта использования. Но если гарантировать отсутствие пропусков в кэше, важно, чем это достигается.
private readonly ConcurrentDictionary<string, SemaphoreSlim> _keyLocks = new ConcurrentDictionary<string, SemaphoreSlim>();
private readonly ConcurrentDictionary<string, string> _cache = new ConcurrentDictionary<string, string>();
public async Task<string> GetSomethingAsync(string key)
{
string value;
// get the semaphore specific to this key
var keyLock = _keyLocks.GetOrAdd(key, x => new SemaphoreSlim(1));
await keyLock.WaitAsync();
try
{
// try to get value from cache
if (!_cache.TryGetValue(key, out value))
{
// if value isn't cached, get it the long way asynchronously
value = await GetSomethingTheLongWayAsync();
// cache value
_cache.TryAdd(key, value);
}
}
finally
{
keyLock.Release();
}
return value;
}
Ответ 4
Я создал обертку для MemoryCache, которая в основном кэширует объекты Lazy<Task<T>>
и работает так, что решаются все следующие проблемы:
- Никакие параллельные или ненужные операции для получения значения не будут запущены. Несколько узлов или потоков вызовов могут ожидать того же значения из кеша.
- Неудачные задачи не кэшируются. (Нет отрицательного кеширования.)
- Пользователи кэша не могут получить недействительные результаты из кеша, даже если значение недействительно во время ожидания.
Решение далее объясняется в моем блоге, а полный рабочий код доступен по адресу GitHub.
Ответ 5
Эта работа для меня:
ObjectCache _cache = MemoryCache.Default;
static object _lockObject = new object();
public Task<T> GetAsync<T>(string cacheKey, Func<Task<T>> func, TimeSpan? cacheExpiration = null) where T : class
{
var task = (T)_cache[cacheKey];
if (task != null) return task;
lock (_lockObject)
{
task = (T)_cache[cacheKey](cacheKey);
if (task != null) return task;
task = func();
Set(cacheKey, task, cacheExpiration);
task.ContinueWith(t => {
if (t.Status != TaskStatus.RanToCompletion)
_cache.Remove(cacheKey);
});
}
return task;
}
Ответ 6
Другим простым способом сделать это является расширение Lazy<T>
до AsyncLazy<T>
, например:
public class AsyncLazy<T> : Lazy<Task<T>>
{
public AsyncLazy(Func<Task<T>> taskFactory, LazyThreadSafetyMode mode) :
base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap(), mode)
{ }
public TaskAwaiter<T> GetAwaiter() { return Value.GetAwaiter(); }
}
Затем вы можете сделать это:
private readonly ConcurrentDictionary<string, AsyncLazy<string>> _cache
= new ConcurrentDictionary<string, AsyncLazy<string>>();
public async Task<string> GetStuffAsync(string url)
{
return await _cache.GetOrAdd(url,
new AsyncLazy<string>(
() => GetStuffInternalAsync(url),
LazyThreadSafetyMode.ExecutionAndPublication));
}