我的编程空间,编程开发者的网络收藏夹
学习永远不晚

spring aop + xmemcached 配置service层缓存策略

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

spring aop + xmemcached 配置service层缓存策略

Memcached 作用与使用 基本介绍

1,对于缓存的存取方式,简言之,就是以键值对的形式将数据保存在内存中。在日常业务中涉及的操作无非就是增删改查。加入缓存机制后,查询的时候,对数据进行缓存,增删改的时候,清除缓存即可。这其中对于缓存的闭合就非常重要,如果缓存没有及时得到更新,那用户就会获取到过期数据,就会产生问题。

2,对于单一业务的缓存管理(数据库中只操作单表),只需生成一个key,查询时,使用key,置入缓存;增删改时,使用key,清除缓存。将key与表绑定,操作相对简单。

3,但是在现实业务中,更多的是对关联表的增删改查(数据库多表操作),业务之间互相关联,数据库中的某张表不止一个业务再进行操作,将缓存拦截在service层,对业务进行缓存,对多表进行缓存。

4,业务层缓存实现策略:

  4.1,在缓存中建立一个key为"union_query",value为“hashmap<prefix,uqversion>(‘简称uqmap’)“的缓存,prefix保存的是当前业务操作涉及到的数据库表名的组合(数据库表名的唯一性),使用‘|’分隔(例 prefix=“A|B”,此次业务将操作A表与B表),uqversion是业务版本号,从0开始递增。

  4.2,调用一个查询业务时,对数据进行缓存,设置operation为1,告诉cache对象,这是一个缓存操作,例如调用 queryAB(args[])方法时,cache对象切入,将prefix(即”A|B“)与uqversion(初始化为0),存入uqmap中进行缓存。

  4.3,将prefix,uqversion,方法明+参数,进行拼接,使用md5进行加密后作为一个key,将方法的结果集作为value,进行缓存。至此缓存成功。

  4.4,当第二个请求来调用queryAB时,cache对象切入,首先,查询uqmap对象,使用prefix找到对应的uqversion,然后,通过拼接加密获取key,最后取得结果集进行返回。

  4.5,当有一个updateA方法被调用时,设置operation为4,告诉cache对象,这是一个删除缓存的操作,此时prefix的值为“A”,cache对象切入,获取全局的uqmap,遍历其中的prefix,是否包含了表A的名称:如果包含,则更新此prefix的uqversion进行自增,uqversion一旦发生变化,4.3中组合的key将不复存在,业务缓存也就消失了。(对于复杂的updateAB方法,遍历prefix要复杂一点,可以实现)

  4.6,当第三个请求来调用queryAB时,可以获取到uqversion,组合成key后,但是没有对应的value。此时确定缓存不存在时,继续正常执行方法,获取结果集,返回给客户的同时,将结果集进行缓存。

5,对于缓存的操作,网上有三种api可以选择(memcached client forjava、spymemcached、xmemcached),具体的好坏,本人在这就不做分析。本人使用的是XMemcached api。

具体实现细节:

1,新建 @interface Annotation{ } 定义一个注解 @Annotation,一个注解是一个类。定义缓存策略。

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;


@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface XmemCache{

	
	String prefix() default "";
	
	
	int interval() default 3600; 
	
	
	int operation() default 1; 
}

2,memcache基础操作类,对一些常用方法进行封装,对memcachedclient进行配置

import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;

import com.node.hlhw.rbac.api.constant.Constant;

import net.rubyeye.xmemcached.GetsResponse;
import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientBuilder;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.command.BinaryCommandFactory;
import net.rubyeye.xmemcached.exception.MemcachedException;
import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator;
import net.rubyeye.xmemcached.transcoders.SerializingTranscoder;
import net.rubyeye.xmemcached.utils.AddrUtil;


public class MemcachedOperate implements DisposableBean {

	
	private static final Logger logger = LoggerFactory.getLogger(MemcachedOperate.class);
	
	private static Properties PROPERTIES = new Properties();
	
	private static String MEMCACHED_SETTING = "memcached.properties";
	
	private static MemcachedClient memcachedClient;
	

	public static MemcachedClient getClient(){
		return memcachedClient;
	}
	
	
	
	static {
		InputStream in = Object.class.getResourceAsStream("/" + MEMCACHED_SETTING);
		try {
			PROPERTIES.load(in);
		} catch (IOException e) {
			e.printStackTrace();
		}
		String servers = PROPERTIES.getProperty("memcached.servers", "");
		if (null != servers && !"".equals(servers)) {
			try {
				logger.debug("启动memcached连接");
				MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(servers));
				builder.setConnectionPoolSize(100);
				builder.setFailureMode(true);
				builder.setCommandFactory(new BinaryCommandFactory());
				builder.setSessionLocator(new KetamaMemcachedSessionLocator());
				builder.setTranscoder(new SerializingTranscoder());
				memcachedClient = builder.build();
				memcachedClient.setEnableHeartBeat(false); // 关闭心跳
				memcachedClient.flushAll(); // 清空缓存
			} catch (IOException e) {
				e.printStackTrace();
			} catch (TimeoutException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (MemcachedException e) {
				e.printStackTrace();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
	
	
	public static Object get(String key) {
		Object object = null;
		try {
			object = memcachedClient.get(key);
		} catch (TimeoutException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (MemcachedException e) {
			e.printStackTrace();
		}
		return object;
	}


	public static void setWithNoReply(String key, int exp, Object value) {
		try {
			memcachedClient.setWithNoReply(key, exp, value);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (MemcachedException e) {
			e.printStackTrace();
		}
	}


	
	@SuppressWarnings("unchecked")

	public static Long getUnionQueryVersion(String prefix) {
		try {
			Map<String, Long> uqmap = null;
			GetsResponse<Object> getsresponse = memcachedClient.gets(Constant.UNION_QUERY);
			if (getsresponse == null) {
				uqmap = new HashMap<String, Long>();
				Long uqversion = new Long(1); // 初始化版本号
				uqmap.put(prefix, uqversion);
				if (memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, 0)) { // 检测插入之前是否被修改过
					return uqversion; // 插入成功
				} else { // 插入失败,说明在代码运行期间,已经有其他线程去修改了unionquery的缓存,重新进行查询
					return getUnionQueryVersion(prefix);
				}
			} else {

				long cas = getsresponse.getCas();
				Object uqobj = getsresponse.getValue();
				if (uqobj == null) { // 不存在对象
					uqmap = new HashMap<String, Long>();
					Long uqversion = new Long(1); // 初始化版本号
					uqmap.put(prefix, uqversion);
					if (memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, cas)) { // 检测插入之前是否被修改过
						return uqversion; // 插入成功
					} else { // 插入失败,说明在代码运行期间,已经有其他线程去修改了unionquery的缓存,重新进行查询
						return getUnionQueryVersion(prefix);
					}
				} else {
					uqmap = (Map<String, Long>) uqobj;
					Long uqversion = uqmap.get(prefix);
					if (uqversion == null) { // 不存在此业务版本
						uqversion = new Long(1); // 初始化版本号
						uqmap.put(prefix, uqversion);
						if (memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, cas)) { // 检测插入之前是否被修改过
							return uqversion; // 插入成功
						} else { // 插入失败,说明在代码运行期间,已经有其他线程去修改了unionquery的缓存,重新进行查询
							return getUnionQueryVersion(prefix);
						}
					} else {
						return uqversion;
					}

				}
			}
		} catch (TimeoutException | InterruptedException | MemcachedException e) {
			e.printStackTrace();
			System.err.println("getUnionQueryVersion---Exception");
		}
		return 1L;
	}

	
	public static Long getVersion(String prefix) {
		try {
			GetsResponse<Object> getsresponse = memcachedClient.gets(prefix);
			if (getsresponse == null) {
				Long pfversion = new Long(1);
				if (memcachedClient.cas(prefix, 0, pfversion, 0)) {
					return pfversion;
				} else {
					return getVersion(prefix);
				}
			} else {
				Object pfobj = getsresponse.getValue();
				long cas = getsresponse.getCas();
				if (pfobj == null) {
					Long pfversion = new Long(1);
					if (memcachedClient.cas(prefix, 0, pfversion, cas)) {
						return pfversion;
					} else {
						return getVersion(prefix);
					}
				} else {
					return (Long) pfobj;
				}
			}
		} catch (TimeoutException | InterruptedException | MemcachedException e) {
			e.printStackTrace();
			System.err.println("getVersion---Exception");
		}

		return 1L;
	}

	
	@SuppressWarnings("unchecked")
	public static void updateUnionQueryVersion(String prefix) {

		try {
			Map<String, Long> uqmap = null;
			GetsResponse<Object> getsresponse = memcachedClient.gets(Constant.UNION_QUERY);
			if (getsresponse == null) {
				return;
			} else {
				Object uqobj = getsresponse.getValue();
				long cas = getsresponse.getCas();
				if (uqobj == null) {
					return;
				} else {
					uqmap = (HashMap<String, Long>) uqobj;
					Set<String> uqset = uqmap.keySet(); // 遍历unionquery中的key
					Iterator<String> quit = uqset.iterator();
					String uqkey = "";
					boolean uqflag = false;
					while (quit.hasNext()) {
						uqkey = quit.next();
						if (("|" + uqkey + "|").contains("|" + prefix + "|")) { // key中包含prefix
							uqmap.put(uqkey, uqmap.get(uqkey) + 1);		// 更新map
							uqflag = true;
						}
					}
					if (uqflag) {
						if (!memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, cas)) {
							updateUnionQueryVersion(prefix);
						}
					}
				}
			}
		} catch (TimeoutException | InterruptedException | MemcachedException e) {
			e.printStackTrace();
			System.err.println("updateUnionQueryVersion---Exception");
		}
	}

	
	public static void updateVersion(String prefix) {

		try {
			GetsResponse<Object> getsresponse;
			getsresponse = memcachedClient.gets(prefix);

			if (getsresponse == null) {
				return ;
			} else {
				Object pfobj = getsresponse.getValue();
				long cas = getsresponse.getCas();
				if (pfobj == null) {
					return ;
				} else {
					Long pfversion = (Long) pfobj;
					pfversion += 1;
					if (!memcachedClient.cas(prefix, 0, pfversion, cas)) {
						updateVersion(prefix);
					}
				}
			}
		} catch (TimeoutException | InterruptedException | MemcachedException e) {
			e.printStackTrace();
			System.err.println("updateVersion---Exception");
		}
	}

	public void shutdown() {
		try {
			memcachedClient.shutdown();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void destroy() throws Exception {
		shutdown();
	}

}

3,结合spring aop 配置缓存,使用spring aop来切入业务层加入缓存,与业务进行解耦。使用注解进行方便配置。 

import java.lang.reflect.Method;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.node.hlhw.common.cache.XmemCache;
import com.node.hlhw.common.digest.Md5Utils;

@Component
@Aspect
public class MemcachedAop {

	@Pointcut("execution (* com.node.hlhw.*.service.impl.*.*(..))")
	public void pointcut() {
	}

	// 方法执行前调用
	@Before("pointcut()")
	public void before() {
	}

	// 方法执行的前后调用
	
	@Around("pointcut()")
	public Object doAround(ProceedingJoinPoint call) throws Throwable {
		Object result = null;

		// 检测是否存在memcached客户端实例
		if (MemcachedOperate.getClient() == null) {
			System.err.println("memcached client not exist");
			result = call.proceed();
			return result;
		}
		Signature signature = call.getSignature();
		MethodSignature methodSignature = (MethodSignature) signature;
		Method method = methodSignature.getMethod();
		
		if(!method.isAnnotationPresent(XmemCache.class)){
			result = call.proceed();
			return result;
		}
		XmemCache xmemcache = method.getAnnotation(XmemCache.class);
		
		// 获取操作方法
		int operation = xmemcache.operation();
		// 获取注解前缀,实际使用是为各个业务包名称,一般为表名
		String prefix = xmemcache.prefix();
		// 无前缀
		if(prefix==null||"".equals(prefix)){
			result = call.proceed();
			return result;
		}

		// 获取注解配置memcached死亡时间 秒单位
		int interval = xmemcache.interval();
		switch (operation) {
		case 1: // 1 从cache里取值,如果未置入cache,则置入
			// 判断prefix是否涉及多表,查看是否包含|
			if (prefix.contains("|")) {
				Long uqversion = MemcachedOperate.getUnionQueryVersion(prefix);
				String combinedkey = generCombinedKey(prefix, uqversion, method, call.getArgs());
				Object resultobj = MemcachedOperate.get(combinedkey);
				if(resultobj == null){
					result = call.proceed();
					MemcachedOperate.setWithNoReply(combinedkey, interval, JSON.toJSONString(result));// 缓存数据
				}else{
					Class<?> returnType = ((MethodSignature) signature).getReturnType(); 
					result = JSON.parseObject(resultobj.toString(), returnType);
				}
				
			} else { // 单表操作
				
				Long pfversion = MemcachedOperate.getVersion(prefix);
				String combinedkey = generCombinedKey(prefix, pfversion, method, call.getArgs());
				Object resultobj  = MemcachedOperate.get(combinedkey);
				if(resultobj == null){
					result = call.proceed();
					MemcachedOperate.setWithNoReply(combinedkey, interval, JSON.toJSONString(result));// 缓存数据
				}else{
					Class<?> returnType = ((MethodSignature) signature).getReturnType(); 
					result = JSON.parseObject(resultobj.toString(), returnType);
				}
			}
			break;
		case 2: // 2 replace cache value
			break;
		case 3:
			break;
		case 4: // 4 remove cache key 从cache里删除对应 业务版本的缓存
			
			if (prefix.contains("|")) {  // 表示涉及到多表,需要清除 单表的缓存,与联表中 包含 当前部分的 缓存
				String[] prefixs = prefix.split("\\|");	 // 0.切割 prefix为p1、p2、p3
				for(String pf : prefixs){		
					MemcachedOperate.updateVersion(pf);  // 1,更新prefix为p1或p2或p3的version
					MemcachedOperate.updateUnionQueryVersion(pf);
				}
			}else{	//  没有涉及到多表的时候
				MemcachedOperate.updateVersion(prefix);
				MemcachedOperate.updateUnionQueryVersion(prefix);
			}
			result = call.proceed();
			break;
		default:
			result = call.proceed();
			break;
		}
		return result;
	}

	
	private String generCombinedKey(String key, Long version, Method method, Object[] args) {
		StringBuffer sb = new StringBuffer();
		// 获取方法名
		String methodName = method.getName();
		// 获取参数类型
		Object[] classTemps = method.getParameterTypes();
		// 存入方法名
		sb.append(methodName);

		for (int i = 0; i < args.length; i++) {
			sb.append(classTemps[i] + "&");
			if (null == args[i]) {
				sb.append("null");
			} else if ("".equals(args[i])) {
				sb.append("*");
			} else {
				String tt = JSON.toJSONString(args[i]);
				sb.append(tt);
			}
		}
		sb.append(key);
		sb.append(version.toString());
		String temp = Md5Utils.getMD5(sb.toString());
		return temp;

	}
}

4,properties文件中配置memcached服务器地址

#host1:port1,host2:port2
memcached.servers=192.168.1.1:11211,192.168.1.2:11211

5,修改spring配置文件,声明自动为spring容器中那些配置@aspectJ切面的bean创建代理,织入切面。

<aop:aspectj-autoproxy proxy-target-class="true"/>

 6,service层使用注解方式切入缓存

import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.apache.ibatis.session.RowBounds;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;

import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.config.annotation.Service;
import com.node.hlhw.common.cache.XmemCache;
import com.node.hlhw.common.digest.ApplicationUtils;
import com.node.hlhw.core.service.BaseService;
import com.node.hlhw.core.store.IBaseStore;
import com.node.hlhw.core.store.PageParam;
import com.node.hlhw.rbac.api.dao.UserRoleDao;
import com.node.hlhw.rbac.api.entity.UserRole;
import com.node.hlhw.rbac.api.service.UserRoleService;


@Service(version = "1.0.0")
public class UserRoleServiceImpl extends BaseService<UserRole> implements
		UserRoleService {

	private static final Logger logger = Logger
			.getLogger(UserRoleServiceImpl.class);

	@Autowired
	public UserRoleDao userRoleDao;

	@Override
	protected IBaseStore<UserRole> getBaseDao() {
		return userRoleDao;
	}

	
	
	@XmemCache(prefix="userrole",operation=4)
	public void insertUserRole(UserRole userRole) throws Exception {
		userRoleDao.insertUserRole(userRole);
		logger.info("插入用户角色数据");
	}

	
	@Override
	@XmemCache(prefix="role|userrole",interval=3600 , operation=1)
	public List<Map<String, Object>> selectUserRoleList(UserRole userrole, PageParam pageParam) throws Exception {
		RowBounds rowBounds = new RowBounds(pageParam.getOffset(),pageParam.getLimit());
		 List<Map<String, Object>>  list = userRoleDao.selectUserRoleList(userrole,rowBounds);
		return  list ;
	}

	@Override
	@XmemCache(prefix="userrole" , operation=4)
	public void modifyUserRole(UserRole userrole, String[] roleids)throws Exception {
		
		//删除所包含的角色
		userRoleDao.deleteByUserRole(userrole);
		for(String roleid : roleids){
			if(!StringUtils.isEmpty(roleid)){
				userrole.setCreatetime(new Date());
				userrole.setRoleid(roleid);
				userrole.setUuid(ApplicationUtils.getUUID());
				userRoleDao.insertUserRole(userrole);
			}
		}
		
	}

	@Override
	@XmemCache(prefix="userrole" , operation=1)
	public boolean existsRef(String roleids)throws Exception {
		String [] roleid = roleids.split(",");
		List<String> roleidlist = Arrays.asList(roleid);
		return userRoleDao.existsRef(roleidlist)>0?true:false;
	}
}


免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

spring aop + xmemcached 配置service层缓存策略

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

redis 的 maxmemory 配置以及 缓存淘汰策略

1. maxmemory 相关介绍maxmemory 的作用设置 redis 可用内存的上限。maxmemory 的配置将 maxmemory 设置为零将导致没有内存限制。这是 64 位系统的默认行为,而32位系统使用 3GB 的隐式内存限制。maxmemor
redis 的 maxmemory 配置以及 缓存淘汰策略
2015-05-05

Docker中Nginx的缓存策略与性能优化(如何在Docker中为Nginx配置高效的缓存策略?)

Docker中Nginx的缓存策略和性能优化至关重要。通过HTTP缓存、代理缓存、反向代理缓存和数据库缓存的合理配置,可以显著提升响应时间和减少服务器负载。此外,禁用未使用功能、优化工作进程配置、启用GZIP压缩、使用CDN和持续监控等性能优化措施可进一步增强Nginx的表现。定期调整策略和优化配置,确保在Docker环境中高效运行Nginx,提供最佳的Web体验。
Docker中Nginx的缓存策略与性能优化(如何在Docker中为Nginx配置高效的缓存策略?)
2024-04-02

编程热搜

目录