Кэширование асинхронных операций

Я ищу элегантный способ кэширования результатов моих асинхронных операций.

Сначала у меня был синхронный метод:

public String GetStuff(String url)
{
    WebRequest request = WebRequest.Create(url);
    using (var response = request.GetResponse())
    using (var sr = new StreamReader(response.GetResponseStream()))
        return sr.ReadToEnd();
}

Затем я сделал асинхронным:

public async Task<String> GetStuffAsync(String url)
{
    WebRequest request = WebRequest.Create(url);
    using (var response = await request.GetResponseAsync())
    using (var sr = new StreamReader(response.GetResponseStream()))
        return await sr.ReadToEndAsync();
}

Затем я решил, что я должен кэшировать результаты, поэтому мне не нужно часто запрашивать вне этого:

ConcurrentDictionary<String, String> _cache = new ConcurrentDictionary<String, String>();

public async Task<String> GetStuffAsync(String url)
{
    return _cache.GetOrAdd(url, await GetStuffInternalAsync(url));
}

private async Task<String> GetStuffInternalAsync(String url)
{
    WebRequest request = WebRequest.Create(url);
    using (var response = await request.GetResponseAsync())
    using (var sr = new StreamReader(response.GetResponseStream()))
        return await sr.ReadToEndAsync();
}

Затем я прочитал статью (смотрел видео) о том, как лучше кэшировать Task<T>, потому что их создание дороговато:

ConcurrentDictionary<String, Task<String>> _cache = new ConcurrentDictionary<String, Task<String>>();

public Task<String> GetStuffAsync(String url)
{
    return _cache.GetOrAdd(url, GetStuffInternalAsync(url));
}

private async Task<String> GetStuffInternalAsync(String url)
{
    WebRequest request = WebRequest.Create(url);
    using (var response = await request.GetResponseAsync())
    using (var sr = new StreamReader(response.GetResponseStream()))
        return await sr.ReadToEndAsync();
}

И теперь проблема заключается в том, что если запрос не сработает (например, HTTP 401), кэш будет содержать сбой Task<String>, и мне придется использовать reset приложение, потому что будет невозможно отправить запрос повторно.

Есть ли элегантный способ использования ConcurrentDictionary<T1,T2> для кэширования только успешных задач и по-прежнему иметь атомное поведение?

Ответы

Ответ 1

Прежде всего, оба подхода неправильны, потому что они не сохраняют вас ни на какие запросы (хотя второй, по крайней мере, экономит ваше время).

Ваш первый код (тот, у которого await) делает следующее:

  • Сделайте запрос.
  • Дождитесь завершения запроса.
  • Если в кэше уже был результат, проигнорируйте результат запроса.

Второй код удаляет шаг 2, поэтому он быстрее, но вы все еще делаете много ненужных запросов.

Вместо этого вам следует использовать перегрузку GetOrAdd(), которая принимает делегат:

public Task<String> GetStuffAsync(String url)
{
    return _cache.GetOrAdd(url, GetStuffInternalAsync);
}

Это не полностью исключает возможность игнорирования запросов, но делает их гораздо менее вероятными. (Для этого вы можете попробовать отменить запросы, которые, как вы знаете, игнорируются, но я не думаю, что это стоит усилий здесь.)


Теперь на ваш реальный вопрос. Я думаю, что вам следует использовать метод AddOrUpdate(). Если значение еще не указано, добавьте его. Если он там, замените его, если он неисправен:

public Task<String> GetStuffAsync(String url)
{
    return _cache.AddOrUpdate(
        url, GetStuffInternalAsync, (u, task) =>
        {
            if (task.IsCanceled || task.IsFaulted)
                return GetStuffInternalAsync(u);
            return task;
        });
}

Ответ 2

Это действительно разумно (и в зависимости от вашего дизайна и производительности, что важно), чтобы сохранить эти неудавшиеся задачи как Negative Cache. В противном случае, если a url всегда терпит неудачу, использование его снова и снова поражает точку использования кэша вообще.

Что вам нужно, так это время, чтобы время от времени очищать кеш. Самый простой способ - установить таймер, заменяющий экземпляр ConcurrentDictionarry. Более надежным решением является создание собственного LruDictionary или чего-то подобного.

Ответ 3

Здесь можно кэшировать результаты асинхронных операций, которые не гарантируют пропусков в кэше.

Как упоминается в принятых ответах ответа, если один и тот же URL-адрес запрашивается много раз в цикле (в зависимости от SynchronizationContext) или из нескольких потоков, веб-запрос будет продолжать получать сообщения, пока не будет получен ответ, который будет кэшироваться, на в результате чего кеш начнет использоваться.

Метод ниже создает объект SemaphoreSlim для каждого уникального ключа. Это предотвратит запуск многократной работы async для одного и того же ключа, одновременно позволяя ему работать одновременно для разных ключей. Очевидно, что накладные расходы хранят объекты SemaphoreSlim, чтобы предотвратить пропуски кэша, поэтому, возможно, это не стоит в зависимости от варианта использования. Но если гарантировать отсутствие пропусков в кэше, важно, чем это достигается.

private readonly ConcurrentDictionary<string, SemaphoreSlim> _keyLocks = new ConcurrentDictionary<string, SemaphoreSlim>();
private readonly ConcurrentDictionary<string, string> _cache = new ConcurrentDictionary<string, string>();

public async Task<string> GetSomethingAsync(string key)
{   
    string value;
    // get the semaphore specific to this key
    var keyLock = _keyLocks.GetOrAdd(key, x => new SemaphoreSlim(1));
    await keyLock.WaitAsync();
    try
    {
        // try to get value from cache
        if (!_cache.TryGetValue(key, out value))
        {
            // if value isn't cached, get it the long way asynchronously
            value = await GetSomethingTheLongWayAsync();

            // cache value
            _cache.TryAdd(key, value);
        }
    }
    finally
    {
        keyLock.Release();
    }
    return value;
}

Ответ 4

Я создал обертку для MemoryCache, которая в основном кэширует объекты Lazy<Task<T>> и работает так, что решаются все следующие проблемы:

  • Никакие параллельные или ненужные операции для получения значения не будут запущены. Несколько узлов или потоков вызовов могут ожидать того же значения из кеша.
  • Неудачные задачи не кэшируются. (Нет отрицательного кеширования.)
  • Пользователи кэша не могут получить недействительные результаты из кеша, даже если значение недействительно во время ожидания.

Решение далее объясняется в моем блоге, а полный рабочий код доступен по адресу GitHub.

Ответ 5

Эта работа для меня:

ObjectCache _cache = MemoryCache.Default;
static object _lockObject = new object();
public Task<T> GetAsync<T>(string cacheKey, Func<Task<T>> func, TimeSpan? cacheExpiration = null) where T : class
{
    var task = (T)_cache[cacheKey];
    if (task != null) return task;          
    lock (_lockObject)
    {
        task = (T)_cache[cacheKey](cacheKey);
        if (task != null) return task;
        task = func();
        Set(cacheKey, task, cacheExpiration);
        task.ContinueWith(t => {
            if (t.Status != TaskStatus.RanToCompletion)
                _cache.Remove(cacheKey);
        });
    }
    return task;
} 

Ответ 6

Другим простым способом сделать это является расширение Lazy<T> до AsyncLazy<T>, например:

public class AsyncLazy<T> : Lazy<Task<T>>
{
    public AsyncLazy(Func<Task<T>> taskFactory, LazyThreadSafetyMode mode) :
        base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap(), mode)
    { }

    public TaskAwaiter<T> GetAwaiter() { return Value.GetAwaiter(); }
}

Затем вы можете сделать это:

private readonly ConcurrentDictionary<string, AsyncLazy<string>> _cache
    = new ConcurrentDictionary<string, AsyncLazy<string>>();

public async Task<string> GetStuffAsync(string url)
{
    return await _cache.GetOrAdd(url,
        new AsyncLazy<string>(
            () => GetStuffInternalAsync(url),
            LazyThreadSafetyMode.ExecutionAndPublication));
}