基于EHcache实现高并发数据缓存池
jopen
11年前
在高并发的场景里面经常会使用到localcache内容,但是一直没有一个很好的内存管理工具。在开发的时候发现了ehcache,这么一个开源的工具。唯一的缺点就是无法对于多块数据单元进行一个有效的管理,并且在数据过期的时候无法提供有效的更新机制,所以这里写了一个数据缓存池来满足这个需求。
下面是设计组织结构:
这里主要是在数据实体内部封装了数据更新器,这样在数据过期的时候可以调用更新器的方法。
1. Ehcache数据缓冲的具体代码:(主要是get方法内部进行数据更新,使用对象锁的方式来进行数据过期的并发控制,缺点是可能在非常高的并发里面会出现数据阻塞的现象,但是因为这里大部分都是内存的运算操作,所以相对来说阻塞的效果还好)
package com.tmall.lafite.core.manager.localcache; import java.util.List; import net.sf.ehcache.Cache; import net.sf.ehcache.CacheManager; import net.sf.ehcache.Element; import net.sf.ehcache.concurrent.LockType; import net.sf.ehcache.concurrent.ReadWriteLockSync; import net.sf.ehcache.config.CacheConfiguration; import net.sf.ehcache.store.MemoryStoreEvictionPolicy; import com.tmall.lafite.core.LafiteResult; import com.tmall.lafite.core.ResultCode; /** * cache数据实体 * @author wangxiao * */ public class LafiteCache { private CacheManager cacheManager = null; private Cache cacheImpl = null; ReadWriteLockSync rwLock = new ReadWriteLockSync(); private int capability = 30; private long expireTime = 30; public static final int DEFAULT_CAPABILITY = 30; public static final int DEFAULT_EXPIRETIME = 30; private String cacheName = "Tair Local Cache"; public LafiteCache(String id, int capability, long expireTimeMS) { this.cacheName = id; this.capability = capability; this.expireTime = expireTimeMS; } public void setExpireTime(long expireTimeMS) { this.expireTime = expireTimeMS; } public void setCapacity(int cap) { cacheImpl.getCacheConfiguration().setMaxEntriesLocalHeap(cap); } public long getExpireTime() { return expireTime; } @SuppressWarnings("deprecation") public void initialize() { CacheConfiguration cacheConfiguration = new CacheConfiguration(); cacheConfiguration.setDiskPersistent(false); cacheConfiguration.name(cacheName) .maxEntriesLocalHeap(capability) .diskPersistent(false) .memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU); // .timeToLiveSeconds(expireTime); //cacheConfiguration.transactionalMode("LOCAL"); //Configuration config = new Configuration().name(cacheName).cache(cacheConfiguration); cacheManager = CacheManager.create(); cacheImpl = new Cache(cacheConfiguration); cacheManager.addCache(cacheImpl); //cache = MemoryStore.create(cacheImpl, new UnboundedPool()); } public int size() { return (int) cacheImpl.getSize(); } public void destroy() { cacheImpl.dispose(); //cacheManager.removeCache(cacheName); cacheManager.shutdown(); } public void clear() { rwLock.lock(LockType.WRITE); try { cacheImpl.removeAll(); } finally { rwLock.unlock(LockType.WRITE); } } public void del(Object key) { cacheImpl.remove(key); return; } public void put(Object key, Object value) { cacheImpl.put(new Element(key, value)); return ; } public LafiteResult get(Object key) { LafiteResult lafiteResult = new LafiteResult(); Element element = cacheImpl.get(key); if (element == null) { lafiteResult.setError(ResultCode.Error.Cache.NO_DATA); return lafiteResult; } long now = System.currentTimeMillis(); long pastTime = now - element.getLastUpdateTime(); if (pastTime >= expireTime) { // double check synchronized (element) { pastTime = now - element.getLastUpdateTime(); if (pastTime >= expireTime) { // expired, update entry element.updateUpdateStatistics(); lafiteResult.setError(ResultCode.Error.Cache.DATA_OVERDUE); } } } // element object value never null; lafiteResult.setDefaultModel(element.getObjectValue()); return lafiteResult; } @SuppressWarnings("unchecked") public List<Object> getKeys() { List<Object> keys = cacheImpl.getKeys(); return keys; } }
2. 数据逻辑单元定义(这里封装了数据容器单元,把原来的数据池方法传入,在数据更新的时候使用namespace来获取内存实体,进而来获取数据。)
(注:这里目前还没有想好是否把数据的初始化放在容器当中,这里暂时不放入。只是在容器里面进行初始化方法的调用,真正的数据设置方式由逻辑单元获取数据池进行自身的put)
package com.tmall.lafite.core.manager.localcache.util; import org.springframework.beans.factory.annotation.Autowired; import com.tmall.lafite.core.LafiteResult; import com.tmall.lafite.core.manager.localcache.LafiteContainer; /** * 缓存逻辑单元 * @author wangxiao * */ public abstract class LogicCenter { @Autowired private LafiteContainer lafiteContainer; /** * 初始化方法 * @return */ public abstract Object initialize(); /** * 回调函数 * @param lafiteCache * @param key * @return */ public abstract Object callBack(String namespace, Object key, LafiteResult lafiteResult); public LafiteContainer getLafiteContainer() { return lafiteContainer; } }3. 数据池(数据池,使用init来循环调用内存实体里的逻辑单元进行数据的初始化)
package com.tmall.lafite.core.manager.localcache; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.tmall.lafite.core.LafiteResult; import com.tmall.lafite.core.ResultCode; import com.tmall.lafite.core.manager.localcache.entity.CacheEntity; import com.tmall.lafite.core.manager.localcache.util.LogicCenter; /** * cache容器 * @author wangxiao * */ public class LafiteContainer { protected final Logger logger = LoggerFactory.getLogger(LafiteContainer.class); private Map<String, CacheEntity> cacheMap = new ConcurrentHashMap<String, CacheEntity>(); /** * 注册缓存对象 * @param namespace * @param key * @param lafiteCache 缓存对象 * @param logicCenter 逻辑对象 (包含:初始化方法和callback方法) * @return */ // public String register(String namespace, String key, LafiteCache lafiteCache, LogicCenter logicCenter) { // if(namespace != null && StringUtils.isEmpty(key) && lafiteCache != null) { // if(cacheMap.containsKey(namespace)) { // return ResultCode.Error.Cache.NAMESPACE_REPETITION; // } // // CacheEntity cacheEntity = new CacheEntity(lafiteCache, logicCenter); // try { // cacheEntity.initialize(); // } catch (Exception e) { // logger.error("LafiteContainer.register ", e); // } // cacheMap.put(namespace, cacheEntity); // return null; // } // return ResultCode.Error.Cache.COMMON_PARAM_LOST; // } /** * 获取cache内的数据 * @param namespace * @param key * @return */ public LafiteResult get(String namespace, Object key) { LafiteResult lafiteResult = new LafiteResult(); CacheEntity cacheEntity = cacheMap.get(namespace);//获取缓存实体 if(cacheEntity == null) { lafiteResult.setError(ResultCode.Error.Cache.NO_CACHEENTITY); } else { LafiteCache lafiteCache = cacheEntity.getLafiteCache(); LafiteResult result = lafiteCache.get(key); if(ResultCode.Error.Cache.NO_DATA.equals(result.getError()) || ResultCode.Error.Cache.DATA_OVERDUE.equals(result.getError())) { cacheEntity.getLogicCenter().callBack(namespace, key, result);//数据过期触发callback事件 } lafiteResult = result; } return lafiteResult; } /** * 获取指定命名空间的全部数据 * 这里采用的是逐条遍历的方式 * @param namespace * @return */ public LafiteResult getAll(String namespace) { LafiteResult lafiteResult = new LafiteResult(); List<Object> objects = new ArrayList<Object>(); CacheEntity cacheEntity = cacheMap.get(namespace); if(cacheEntity == null) { lafiteResult.setError(ResultCode.Error.Cache.NO_CACHEENTITY); } else { LafiteCache lafiteCache = cacheEntity.getLafiteCache(); List<Object> keys = lafiteCache.getKeys(); if(keys.isEmpty()) { lafiteResult.setError(ResultCode.Error.Cache.NO_DATA); } else { for(Object key : keys) { LafiteResult lr = get(namespace, key); objects.add(lr.getDefaultModel()); } } } lafiteResult.setDefaultModel(objects); return lafiteResult; } /** * 设置数据对象内容 * @param namespace * @param key * @param value * @return */ public LafiteResult put(String namespace, Object key, Object value) { LafiteResult lafiteResult = new LafiteResult(); CacheEntity cacheEntity = cacheMap.get(namespace); if(cacheEntity == null) { lafiteResult.setError(ResultCode.Error.Cache.NO_CACHEENTITY); } else { LafiteCache lafiteCache = cacheEntity.getLafiteCache(); lafiteCache.put(key, value); } return lafiteResult; } public void setCacheMap(Map<String, CacheEntity> cacheMap) { this.cacheMap = cacheMap; } public Map<String, CacheEntity> getCacheMap() { return cacheMap; } public void initialize() { new Thread(new Runnable() { @Override public void run() { for(String key : cacheMap.keySet()) { try { CacheEntity cacheEntity = cacheMap.get(key); if(cacheEntity != null) { LogicCenter logicCenter = cacheEntity.getLogicCenter(); if(logicCenter != null) { logicCenter.initialize(); } } }catch (Exception e) { e.printStackTrace(); } } } }).start(); } }4. spring初始化方式
<!-- 权限角色缓存 <bean id="permitRoleCache" class="com.tmall.lafite.core.manager.localcache.LafiteCache" init-method="initialize"> <constructor-arg value="_permit_role_cache_"/> <constructor-arg value="100"/> <constructor-arg value="2000"/> </bean> <bean id="permitRoleLogicCenter" class="com.tmall.lafite.core.manager.permit.cache.PermitRoleLogicCenter"/> <bean id="permitRoleCacheEntity" class="com.tmall.lafite.core.manager.localcache.entity.CacheEntity"> <property name="logicCenter" ref="permitRoleLogicCenter" /> <property name="lafiteCache" ref="permitRoleCache" /> </bean> --> <!-- 缓存容器 --> <bean id="lafiteContainer" class="com.tmall.lafite.core.manager.localcache.LafiteContainer" init-method="initialize"> <!-- <property name="cacheMap"> <map> <entry key="PermitCommon" value-ref="permitCommonCacheEntity" /> <entry key="PermitAlgorithm" value-ref="permitAlgorithmCacheEntity"/> <entry key="PermitRole" value-ref="permitRoleCacheEntity"/> </map> </property> --> </bean>
5. 使用示例
@Autowired private LafiteContainer lafiteContainer; private String namespace = LafiteNameSpace.PermitCommon; @SuppressWarnings("unchecked") @Transactional public List<PermitCommonDO> getPermitDOCache() { LafiteResult lafiteResult = lafiteContainer.getAll(namespace); if(lafiteResult.getDefaultModel() != null) { return (List<PermitCommonDO>) lafiteResult.getDefaultModel(); } return null; }