Distributed Cache Imporvement

This topic was automatically closed 365 days after the last reply. New replies are no longer allowed.
1 ano atrás
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using ICSharpCode.SharpZipLib.GZip;
using Microsoft.Extensions.Caching.Distributed;
using Newtonsoft.Json;
using Nito.AsyncEx;
using Nop.Core.Configuration;
using Nop.Core.Infrastructure;
using StackExchange.Redis;

namespace Nop.Core.Caching
{
    /// <summary>
    /// Represents a distributed cache
    /// </summary>
    public partial class DistributedCacheManager : CacheKeyService, ILocker, IStaticCacheManager
    {
        #region Fields

        private readonly IDistributedCache _distributedCache;
        private readonly ConcurrentDictionary<CacheKey, object> _items;
        private static readonly List<string> _keys;
        private static readonly AsyncLock _locker;

        #endregion

        #region Ctor

        static DistributedCacheManager()
        {
            _locker = new AsyncLock();
            _keys = new List<string>();
        }

        public DistributedCacheManager(AppSettings appSettings, IDistributedCache distributedCache) : base(appSettings)
        {
            _distributedCache = distributedCache;
            _items = new ConcurrentDictionary<CacheKey, object>();
        }

        #endregion

        #region Utilities

        /// <summary>
        /// Prepare cache entry options for the passed key
        /// </summary>
        /// <param name="key">Cache key</param>
        /// <returns>Cache entry options</returns>
        private DistributedCacheEntryOptions PrepareEntryOptions(CacheKey key)
        {
            //set expiration time for the passed cache key
            var options = new DistributedCacheEntryOptions
            {
                AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(key.CacheTime)
            };

            return options;
        }

        /// <summary>
        /// Try to get the cached item
        /// </summary>
        /// <typeparam name="T">Type of cached item</typeparam>
        /// <param name="key">Cache key</param>
        /// <returns>
        /// A task that represents the asynchronous operation
        /// The task result contains the flag which indicate is the key exists in the cache, cached item or default value
        /// </returns>
        private async Task<(bool isSet, T item)> TryGetItemAsync<T>(CacheKey key)
        {
            //var json = await _distributedCache.GetStringAsync(key.Key);

            var json = await _distributedCache.GetAsync(key.Key);


            if (json == null)
                return (false, default);

            using var _ = await _locker.LockAsync();
            if (!_keys.Contains(key.Key))
                _keys.Add(key.Key);


            var decompressedJsonData = Unzip(json);
            return (true, JsonConvert.DeserializeObject<T>(decompressedJsonData));

            //return (true, JsonConvert.DeserializeObject<T>(json));
        }

        /// <summary>
        /// Try to get the cached item
        /// </summary>
        /// <typeparam name="T">Type of cached item</typeparam>
        /// <param name="key">Cache key</param>
        /// <returns>Flag which indicate is the key exists in the cache, cached item or default value</returns>
        private (bool isSet, T item) TryGetItem<T>(CacheKey key)
        {
            //var json = _distributedCache.GetString(key.Key);
            
            var json = _distributedCache.Get(key.Key);

            if (json==null)
                return (false, default);

            using var _ = _locker.Lock();
            if (!_keys.Contains(key.Key))
                _keys.Add(key.Key);


            var decompressedJsonData = Unzip(json);
            return (true, JsonConvert.DeserializeObject<T>(decompressedJsonData));

            //return (true, JsonConvert.DeserializeObject<T>(json));
        }

        /// <summary>
        /// Add the specified key and object to the cache
        /// </summary>
        /// <param name="key">Key of cached item</param>
        /// <param name="data">Value for caching</param>
        private void Set(CacheKey key, object data)
        {
            if ((key?.CacheTime ?? 0) <= 0 || data == null)
                return;

            var jsondata = JsonConvert.SerializeObject(data);
            var compressedJsonData = Zip(jsondata);
            _distributedCache.SetAsync(key.Key, compressedJsonData, PrepareEntryOptions(key));


            //_distributedCache.SetString(key.Key, compressedJsonData, PrepareEntryOptions(key));
            _items.TryAdd(key, data);

            using var _ = _locker.Lock();
            _keys.Add(key.Key);
        }

        #endregion

        #region Methods

        /// <summary>
        /// Performs application-defined tasks associated with freeing,
        /// releasing, or resetting unmanaged resources.
        /// </summary>
        public void Dispose()
        {
        }

        /// <summary>
        /// Get a cached item. If it's not in the cache yet, then load and cache it
        /// </summary>
        /// <typeparam name="T">Type of cached item</typeparam>
        /// <param name="key">Cache key</param>
        /// <param name="acquire">Function to load item if it's not in the cache yet</param>
        /// <returns>
        /// A task that represents the asynchronous operation
        /// The task result contains the cached value associated with the specified key
        /// </returns>
        public async Task<T> GetAsync<T>(CacheKey key, Func<Task<T>> acquire)
        {
            //little performance workaround here:
            //we use local dictionary to cache a loaded object in memory for the current HTTP request.
            //this way we won't connect to distributed cache server many times per HTTP request (e.g. each time to load a locale or setting)
            if (_items.ContainsKey(key))
                return (T)_items.GetOrAdd(key, acquire);

            if (key.CacheTime <= 0)
                return await acquire();

            var (isSet, item) = await TryGetItemAsync<T>(key);

            if (isSet)
            {
                if (item != null)
                    _items.TryAdd(key, item);

                return item;
            }

            var result = await acquire();

            if (result != null)
                await SetAsync(key, result);

            return result;
        }

        /// <summary>
        /// Get a cached item. If it's not in the cache yet, then load and cache it
        /// </summary>
        /// <typeparam name="T">Type of cached item</typeparam>
        /// <param name="key">Cache key</param>
        /// <param name="acquire">Function to load item if it's not in the cache yet</param>
        /// <returns>
        /// A task that represents the asynchronous operation
        /// The task result contains the cached value associated with the specified key
        /// </returns>
        public async Task<T> GetAsync<T>(CacheKey key, Func<T> acquire)
        {
            //little performance workaround here:
            //we use local dictionary to cache a loaded object in memory for the current HTTP request.
            //this way we won't connect to distributed cache server many times per HTTP request (e.g. each time to load a locale or setting)
            if (_items.ContainsKey(key))
                return (T)_items.GetOrAdd(key, acquire);

            if (key.CacheTime <= 0)
                return acquire();

            var (isSet, item) = await TryGetItemAsync<T>(key);

            if (isSet)
            {
                if (item != null)
                    _items.TryAdd(key, item);

                return item;
            }

            var result = acquire();

            if (result != null)
                await SetAsync(key, result);

            return result;
        }

        /// <summary>
        /// Get a cached item. If it's not in the cache yet, then load and cache it
        /// </summary>
        /// <typeparam name="T">Type of cached item</typeparam>
        /// <param name="key">Cache key</param>
        /// <param name="acquire">Function to load item if it's not in the cache yet</param>
        /// <returns>The cached value associated with the specified key</returns>
        public T Get<T>(CacheKey key, Func<T> acquire)
        {
            //little performance workaround here:
            //we use local dictionary to cache a loaded object in memory for the current HTTP request.
            //this way we won't connect to distributed cache server many times per HTTP request (e.g. each time to load a locale or setting)
            if (_items.ContainsKey(key))
                return (T)_items.GetOrAdd(key, acquire);

            if (key.CacheTime <= 0)
                return acquire();

            var (isSet, item) = TryGetItem<T>(key);

            if (isSet)
            {
                if (item != null)
                    _items.TryAdd(key, item);

                return item;
            }

            var result = acquire();

            if (result != null)
                Set(key, result);

            return result;
        }

        /// <summary>
        /// Remove the value with the specified key from the cache
        /// </summary>
        /// <param name="cacheKey">Cache key</param>
        /// <param name="cacheKeyParameters">Parameters to create cache key</param>
        /// <returns>A task that represents the asynchronous operation</returns>
        public async Task RemoveAsync(CacheKey cacheKey, params object[] cacheKeyParameters)
        {
            cacheKey = PrepareKey(cacheKey, cacheKeyParameters);

            await _distributedCache.RemoveAsync(cacheKey.Key);
            _items.TryRemove(cacheKey, out var _);

            using var _ = await _locker.LockAsync();
            _keys.Remove(cacheKey.Key);
        }

        /// <summary>
        /// Add the specified key and object to the cache
        /// </summary>
        /// <param name="key">Key of cached item</param>
        /// <param name="data">Value for caching</param>
        /// <returns>A task that represents the asynchronous operation</returns>
        public async Task SetAsync(CacheKey key, object data)
        {
            if ((key?.CacheTime ?? 0) <= 0 || data == null)
                return;

            var jsondata = JsonConvert.SerializeObject(data);
            var compressedJsonData = Zip(jsondata);
            await _distributedCache.SetAsync(key.Key, compressedJsonData, PrepareEntryOptions(key));

            //await _distributedCache.SetStringAsync(key.Key, JsonConvert.SerializeObject(data), PrepareEntryOptions(key));
            _items.TryAdd(key, data);

            using var _ = await _locker.LockAsync();
            _keys.Add(key.Key);
        }

        /// <summary>
        /// Remove items by cache key prefix
        /// </summary>
        /// <param name="prefix">Cache key prefix</param>
        /// <param name="prefixParameters">Parameters to create cache key prefix</param>
        /// <returns>A task that represents the asynchronous operation</returns>
        public async Task RemoveByPrefixAsync(string prefix, params object[] prefixParameters)
        {
            using var _ = await _locker.LockAsync();

            prefix = PrepareKeyPrefix(prefix, prefixParameters);

            var regex = new Regex(prefix,
                RegexOptions.Singleline | RegexOptions.Compiled | RegexOptions.IgnoreCase);

            var matchesKeys = new List<CacheKey>();

            //get cache keys that matches pattern
            matchesKeys.AddRange(_items.Keys.Where(key => regex.IsMatch(key.Key)).ToList());

            //remove matching values
            if (matchesKeys.Any())
                foreach (var key in matchesKeys)
                    _items.TryRemove(key, out var _);

            foreach (var key in _keys.Where(key => key.StartsWith(prefix, StringComparison.InvariantCultureIgnoreCase)).ToList())
            {
                await _distributedCache.RemoveAsync(key);
                _keys.Remove(key);
            }
        }

        /// <summary>
        /// Clear all cache data
        /// </summary>
        /// <returns>A task that represents the asynchronous operation</returns>
        public async Task ClearAsync()
        {
            using var _ = await _locker.LockAsync();
            try
            {
                  // Do not do this except by moving your application session somewhere else (something like dynamo db or sql).
                //var connectionMultiplexer = ConnectionMultiplexer.Connect(distributedCacheConfig.ConnectionString + ",allowAdmin=true");
                var connectionMultiplexer = ConnectionMultiplexer.Connect(_appSettings.Get<DistributedCacheConfig>().ConnectionString);
                var endpoints = connectionMultiplexer.GetEndPoints(false);
                foreach (var endpoint in endpoints)
                {
                    var server = connectionMultiplexer.GetServer(endpoint);
                    await server.FlushAllDatabasesAsync();
                }
                await connectionMultiplexer.CloseAsync(true);
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }

            _keys.Clear();
        }

        /// <summary>
        /// Perform asynchronous action with exclusive in-memory lock
        /// </summary>
        /// <param name="resource">The key we are locking on</param>
        /// <param name="expirationTime">The time after which the lock will automatically be expired</param>
        /// <param name="action">Action to be performed with locking</param>
        /// <returns>True if lock was acquired and action was performed; otherwise false</returns>
        public async Task<bool> PerformActionWithLockAsync(string resource, TimeSpan expirationTime, Func<Task> action)
        {
            if (!string.IsNullOrEmpty(await _distributedCache.GetStringAsync(resource)))
                return false;

            try
            {
                await _distributedCache.SetStringAsync(resource, resource, new DistributedCacheEntryOptions
                {
                    AbsoluteExpirationRelativeToNow = expirationTime
                });

                //perform action
                await action();

                return true;
            }
            finally
            {
                //release lock even if action fails
                await _distributedCache.RemoveAsync(resource);
            }
        }

        #endregion

        #region helper

        private byte[] Zip(string text)
        {
            if (text == null)
                return null;

            byte[] ret;
            using (var outputMemory = new MemoryStream())
            {
                using (var gz = new GZipStream(outputMemory, CompressionLevel.Optimal))
                {
                    using (var sw = new StreamWriter(gz, Encoding.UTF8))
                    {
                        sw.Write(text);
                    }
                }
                ret = outputMemory.ToArray();
            }
            return ret;
        }

        private string Unzip(byte[] bytes)
        {
            string ret = null;
            using (var inputMemory = new MemoryStream(bytes))
            {
                using (var gz = new GZipStream(inputMemory, CompressionMode.Decompress))
                {
                    using (var sr = new StreamReader(gz, Encoding.UTF8))
                    {
                        ret = sr.ReadToEnd();
                    }
                }
            }
            return ret;
        }
        #endregion
    }
}


It will reduce the response time rather than the existing code. As the engine is keeping large JSON(depending on shop category, manufacturer, vendor, setting, local resource store, discounts, and the URLRecords depending on data volume).  And then this large JSON is converted to the byte array by Microsoft.Extensions.Caching.StackExchangeRedis for Redis and keep that data in the cache database. So by compressing that large string byte array the data volume will be reduced and you will get better performance rather than the existing code.
I use SharpZipLib for compressing, which is supporting both Linux(Ubuntu 18.0x) and windows.
1 ano atrás
RE: "...better performance"
Do you have some comparisons?  (I'd think the zip/unzip could have some impact)
(Why did you change from PerRequestCache to ConcurrentDictionary?)

RE: "I use SharpZipLib ..."
Isn't your code using GZipStream from Microsoft's System.IO.Compression (also in your "using")
1 ano atrás
New York wrote:
RE: "...better performance"
Do you have some comparisons?  (I'd think the zip/unzip could have some impact)
(Why did you change from PerRequestCache to ConcurrentDictionary?)

RE: "I use SharpZipLib ..."
Isn't your code using GZipStream from Microsoft's System.IO.Compression (also in your "using")


1. Isn't your code using GZipStream from Microsoft's System.IO.Compression (also in your "using")
  ==> You are right. It is System.IO.Compression in this code. I forgot to post the updated code.


private byte[] Zip(string entryName)
        {

            string largeUncompressedText = entryName;
            byte[] targetByteArray = new byte[0];
            using (MemoryStream source = new MemoryStream(Encoding.UTF8.GetBytes(largeUncompressedText)))
            {
                using (MemoryStream target = new MemoryStream())
                {
                    GZip.Compress(source, target, true, 4096);
                    targetByteArray = target.ToArray();
                }
            }
            return targetByteArray;

        }

        private string Unzip(byte[] bytes)
        {
            string uncompressedString = string.Empty;
            using (MemoryStream source = new MemoryStream(bytes))
            {
                using (MemoryStream target = new MemoryStream())
                {
                    GZip.Decompress(source, target, true);
                    uncompressedString = Encoding.UTF8.GetString(target.ToArray());
                }
            }
            return uncompressedString;
        }

2. Do you have some comparisons?  (I'd think the zip/unzip could have some impact)
     ==> Do you think 1MB and 500/700Kb are the same over the network?

3. (Why did you change from PerRequestCache to ConcurrentDictionary?)
    ===> https://github.com/nopSolutions/nopCommerce/blob/develop/src/Libraries/Nop.Core/Caching/DistributedCacheManager.cs
1 ano atrás
What I "think" isn't always correct ;)  It's best to do some benchmarks / comparisons.
P.S.  In the past I benchmarked SharpZipLib vs System.IO.Compression, and the latter is faster (~ 2x - 3x)
1 ano atrás
New York wrote:
What I "think" isn't always correct ;)  It's best to do some benchmarks / comparisons.
P.S.  In the past I benchmarked SharpZipLib vs System.IO.Compression, and the latter is faster (~ 2x - 3x)


you mean System.IO.Compression one is better than SharpZipLib. Yes, it can be but the problem is System.IO.Compression comparisons work either at Linux or windows but not both. That is why I need to choose SharpZipLib comparisons.

Thank you for your suggestion. I will compare both compression and write here.
1 ano atrás
Have you considered caching the 'typed object' rather than JSON?
https://dejanstojanovic.net/aspnet/2018/may/using-idistributedcache-in-net-core-just-got-a-lot-easier/

(Another thought is ...  Can you just 'fire and forget' the SetAsync?  I.e. why 'block' the customer since the data is cached locally/in-memory anyway?)
1 ano atrás
New York wrote:
Have you considered caching the 'typed object' rather than JSON?
https://dejanstojanovic.net/aspnet/2018/may/using-idistributedcache-in-net-core-just-got-a-lot-easier/

(Another thought is ...  Can you just 'fire and forget' the SetAsync?  I.e. why 'block' the customer since the data is cached locally/in-memory anyway?)


No, I do not consider the  'typed object' rather than JSON.

(Another thought is ...  Can you just 'fire and forget' the SetAsync?  I.e. why 'block' the customer since the data is cached locally/in-memory anyway?)
  ==> I am sorry, I can not understand this one. But so far I understand that cache locally/in-memory can cause issues like shopping cart empty under the load balancer with auto-scaling as it is not guaranteed that the next HTTP request will serve from the same machine.
1 ano atrás
I hope you're using a 'sticky' load balancer ;)
But in any case, it's moot, because the multiple cache requests per 'session' will first try to get data from memory, as per the comment:
public async Task<T> GetAsync<T>(CacheKey key, Func<Task<T>> acquire)
{
//little performance workaround here:
//we use "PerRequestCache" to cache a loaded object in memory for the current HTTP request.
//this way we won't connect to Redis server many times per HTTP request (e.g. each time to load a locale or setting)


An example of "Fire and forget", in this case,  would mean to not 'await' the Set (...
Some refactoring may be needed, maybe a 'background task', but the general idea is that the current request does not need to wait for it.
1 ano atrás
New York wrote:
I hope you're using a 'sticky' load balancer ;)
But in any case, it's moot, because the multiple cache requests per 'session' will first try to get data from memory, as per the comment:
public async Task<T> GetAsync<T>(CacheKey key, Func<Task<T>> acquire)
{
//little performance workaround here:
//we use "PerRequestCache" to cache a loaded object in memory for the current HTTP request.
//this way we won't connect to Redis server many times per HTTP request (e.g. each time to load a locale or setting)


An example of "Fire and forget", in this case,  would mean to not 'await' the Set (...
Some refactoring may be needed, maybe a 'background task', but the general idea is that the current request does not need to wait for it.


I hope you're using a 'sticky' load balancer ;)
==> No, I am not :).

An example of "Fire and forget", in this case,  would mean to not 'await' the Set (...
Some refactoring may be needed, maybe a 'background task', but the general idea is that the current request does not need to wait for it.
===> Better write to the nopCommerce git repo as an improvement because I do not change anything there except compress and decompress the data and save it to the Redis. Or you can share the code so others can get help like me. By the way, you are absolutely right System.IO.Compression is far better than SharpZipLib.
This topic was automatically closed 365 days after the last reply. New replies are no longer allowed.