JetCache :实现二级缓存准实时刷新

/ Java / 没有评论 / 1517浏览

JetCache :实现二级缓存准实时刷新

JetCache简介及特性

JetCache是一个基于Java的缓存系统封装,提供统一的API和注解来简化缓存的使用。 JetCache可以原生的支持TTL、两级缓存、分布式自动刷新,还提供了Cache接口用于手工缓存操作。 当前有四个实现,RedisCache、TairCache、CaffeineCache(in memory)和一个简易的LinkedHashMapCache(in memory)。

简单用法

JetCache提供了简单易用的注解。相较于springframework.cache提供的注解,JetCache增加了一些新的功能性注解:

通过JetCache注解实现的方法缓存,缓存value的数据类型统一为String。这块想想也没什么问题。比从复杂数据结构存取数据的效率还高。 (RedissonSpringCacheManager实现的spring cache方法级别缓存,是通过Hash的结构来缓存方法的返回值的)。

JetCache本地缓存有最大元素限制,默认是100个。可配置。基于LRU淘汰

JetCache存在问题:

二级缓存结构下:local cache的一致性问题

扩展实现

JetCache设计了缓存更新消息的发布机制。保障了架构的可扩展性。新版jetcache提供了操作缓存的监听接口,如下图

基于redis 订阅发布机制实现了 CacheMessagePublisher (如果项目又消息中间件的需求,可以使用MQ)

一下是jetcache配置,上代码

remoteCache: &remoteCache
  type: redis
  keyConvertor: fastjson
  valueEncoder: kryo
  valueDecoder: kryo
  host: ${spring.redis.host:localhost}
  port: ${spring.redis.port:6379}
  password: ${spring.redis.password}
  database: ${spring.redis.database}
  poolConfig:
    minIdle: 5
    maxIdle: 20
    maxTotal: 50

jetcache:
  statIntervalMinutes: 15
  areaInCacheName: false
  hidePackages: com.springboot.cloud
  local:
    # 默认1小时本地缓存
    default:
      type: caffeine
      keyConvertor: fastjson
      expireAfterWriteInMillis: 3600000
      expireAfterAccessInMillis: 1800000
    # 長時本地緩存,主要用于要求时效一般
    longTime:
      type: caffeine
      keyConvertor: fastjson
      expireAfterWriteInMillis: 300000
      expireAfterAccessInMillis: 180000
    # 短時本地緩存,主要用于要求时效较高的配置
    shortTime:
      type: caffeine
      keyConvertor: fastjson
      expireAfterWriteInMillis: 60000
      expireAfterAccessInMillis: 40000
  remote:
    # 默认1小时的远程缓存
    default:
      expireAfterWriteInMillis: 3600000
      <<: *remoteCache
      # uri格式:redis://密码@ip:端口/redis库名?timeout=5s  url[0]
      #uri: redis://${spring.redis.password}@${spring.redis.password}:${spring.redis.port:6379}/7?timeout=5s
    # 长时远程緩存,主要用于要求时效要求一般的集中式缓存
    longTime:
      expireAfterWriteInMillis: 7200000
      <<: *remoteCache
    # 短時远程緩存,主要用于要求时效较高的集中式缓存
    shortTime:
      expireAfterWriteInMillis: 300000
      <<: *remoteCache

  cacheMessageTopic:
    synCache: synCache

2. 消息发布实现

只有本地缓存或者二级缓存再发送消息。通过ConfigProvider 获取 getCacheManager().getCache获取Cache实例

package com.cc.core.jetcache.core.publisher;

import com.alicp.jetcache.Cache;
import com.alicp.jetcache.anno.support.ConfigProvider;
import com.alicp.jetcache.redis.RedisCacheConfig;
import com.alicp.jetcache.support.CacheMessage;
import com.alicp.jetcache.support.CacheMessagePublisher;
import com.cc.core.jetcache.core.entity.LocalCacheMessageEntity;
import com.cc.core.redis.core.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

/**
 * @Author: jerry.chen
 * @Date: 2021/4/16 15:04
 */
@Component
@Primary
public class LocalCacheRedisPublisher implements CacheMessagePublisher {
    @Autowired
    ConfigProvider configProvider;
    @Autowired
    private RedisUtil redisUtil;
    @Value("${jetcache.cacheMessageTopic.synCache}")
    String topicName;

    @Override
    public void publish(String area, String cacheName, CacheMessage cacheMessage) {
        int type = cacheMessage.getType();
        /*写入不发送消息*/
        if (type == CacheMessage.TYPE_PUT || type == CacheMessage.TYPE_PUT_ALL) {
            return;
        }
        Cache cache = configProvider.getCacheManager().getCache(area, cacheName);
        /*如果是远程缓存 不处理*/
        if (cache.config() instanceof RedisCacheConfig) {
            return;
        }
        LocalCacheMessageEntity entity = new LocalCacheMessageEntity();
        entity.setCacheMessage(cacheMessage);
        entity.setArea(area);
        entity.setCacheName(cacheName);
        redisUtil.publish(topicName, entity);
    }
}

3. redis中的发布和订阅实现(使用RedisMessageListenerContainer 进行订阅)


/**
 * 发布信息
 *
 * @param channel 通道
 * @param message 信息
 */
public void publish(String channel, Object message) {
    redisTemplate.convertAndSend(channel, message);
}

@Autowired
RedisMessageListenerContainer redisMessageListenerContainer;

public void subscriber(String channel, MessageListener messageListener){
    redisMessageListenerContainer.addMessageListener(messageListener,new ChannelTopic(channel));//订阅指定频道
}

redis配置

package com.cc.core.redis.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @Author: jerry.chen
 * @Date: 2018/12/12 14:26
 */
@Configuration
public class RedisConfig {
    @Bean(name = "redisTemplate")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        template.setConnectionFactory(factory);
        //key序列化方式
        template.setKeySerializer(redisSerializer);

        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.setHashKeySerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();

        return template;
    }
    @Autowired
    private RedisTemplate redisTemplate;
    /*监听容器*/
    @Bean
    RedisMessageListenerContainer container() {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisTemplate.getConnectionFactory());
        return container;
    }

}

4. 实现监听者

通过ConfigProvider 获取 getCacheManager().getCache获取Cache实例

package com.cc.core.jetcache.core.listener;

import com.alicp.jetcache.anno.support.ConfigProvider;
import com.alicp.jetcache.support.CacheMessage;
import com.cc.core.jetcache.core.entity.LocalCacheMessageEntity;
import com.github.benmanes.caffeine.cache.Cache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;


/**
 * Caffeine本地缓存过期策略
 *
 * @Author: jerry.chen
 * @Date: 2021/4/19 16:01
 */
@Component
public class LocalCacheRedisMessageListener implements MessageListener {

    @Autowired
    private ConfigProvider configProvider;

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] bytes) {
        /*redis 配置的时候 对象序列化使用的是 Jackson2JsonRedisSerializer*/
        LocalCacheMessageEntity entity = (LocalCacheMessageEntity) redisTemplate.getValueSerializer().deserialize(message.getBody());
        CacheMessage cacheMessage = entity.getCacheMessage();
        if (cacheMessage == null) {
            return;
        }
        Cache localCache = (Cache) configProvider.getCacheManager().getCache(entity.getArea(), entity.getCacheName()).unwrap(Cache.class);
        if (localCache == null) {
            return;
        }

        switch (cacheMessage.getType()) {
            case CacheMessage.TYPE_REMOVE: {
                invalidateLocalCaches(localCache, cacheMessage.getKeys());
                break;
            }
            case CacheMessage.TYPE_REMOVE_ALL: {
                localCache.invalidateAll();
                break;
            }
        }
    }

    private void invalidateLocalCaches(Cache localCache, Object[] keys) {
        for (Object key : keys) {
            Object value = localCache.getIfPresent(key);
            if (null != value) {
                localCache.invalidate(key);
            }
        }
    }
}

5. 实现消息订阅功能

监听者通过通道监听消息

package com.cc.core.jetcache.core.subscription;

import com.cc.core.jetcache.core.listener.LocalCacheRedisMessageListener;
import com.cc.core.redis.core.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @Author: jerry.chen
 * @Date: 2021/4/19 16:09
 */
@Component
public class LocalCacheSyncSubscription {
    @Value("${jetcache.cacheMessageTopic.synCache}")
    String topicName;

    @Autowired
    private RedisUtil redisUtil;

    @Autowired
    private LocalCacheRedisMessageListener localCacheRedisMessageListener;

    @PostConstruct
    public void initConsumer() {
        redisUtil.subscriber(topicName, localCacheRedisMessageListener);
    }
}

删除缓存的时候自动进程间同步删除