Commit 8970f06f by 阮思源

优化收消息处理

parent 7d3bfa99
...@@ -30,6 +30,7 @@ import com.pcloud.book.personalstage.entity.PersonalStageWakeup; ...@@ -30,6 +30,7 @@ import com.pcloud.book.personalstage.entity.PersonalStageWakeup;
import com.pcloud.book.personalstage.enums.JumpTypeEnum; import com.pcloud.book.personalstage.enums.JumpTypeEnum;
import com.pcloud.book.personalstage.enums.PersonalStageUserStateEnum; import com.pcloud.book.personalstage.enums.PersonalStageUserStateEnum;
import com.pcloud.book.personalstage.enums.StageReplyRelevEnum; import com.pcloud.book.personalstage.enums.StageReplyRelevEnum;
import com.pcloud.book.personalstage.utils.CacheUtils;
import com.pcloud.common.core.aspect.ParamLog; import com.pcloud.common.core.aspect.ParamLog;
import com.pcloud.common.core.mq.DelayQueueDTO; import com.pcloud.common.core.mq.DelayQueueDTO;
import com.pcloud.common.utils.ListUtils; import com.pcloud.common.utils.ListUtils;
...@@ -274,7 +275,8 @@ public class PersonalStageBizImpl implements PersonalStageBiz { ...@@ -274,7 +275,8 @@ public class PersonalStageBizImpl implements PersonalStageBiz {
if (!iskeyword&&hasRecord){ if (!iskeyword&&hasRecord){
//判断是否已经达到熔断标准 //判断是否已经达到熔断标准
if (personalStage!=null){ if (personalStage!=null){
if ((last.getNotKeywordSendCount()!=null&&last.getNotKeywordSendCount()>=personalStage.getNotKeywordFusingCount()) Integer notKeywordSendCount =getSendNKeywordCountFromCache(personalStage.getId(),userWxId,robotId);
if ((notKeywordSendCount!=null&&notKeywordSendCount>=personalStage.getNotKeywordFusingCount())
||PersonalStageUserStateEnum.FUSING.value.equals(last.getState())){ ||PersonalStageUserStateEnum.FUSING.value.equals(last.getState())){
// 走熔断 // 走熔断
LOGGER.info("走熔断"); LOGGER.info("走熔断");
...@@ -285,10 +287,10 @@ public class PersonalStageBizImpl implements PersonalStageBiz { ...@@ -285,10 +287,10 @@ public class PersonalStageBizImpl implements PersonalStageBiz {
||PersonalStageUserStateEnum.WAKEUP.value.equals(last.getState())){ ||PersonalStageUserStateEnum.WAKEUP.value.equals(last.getState())){
LOGGER.info("走正常逻辑非关键词回复并且更新记录"); LOGGER.info("走正常逻辑非关键词回复并且更新记录");
//如果是正常状态或者唤醒状态才回复,并且更新状态为正常状态 //如果是正常状态或者唤醒状态才回复,并且更新状态为正常状态
sendNotKeywordReply(robotId,userWxId,ip,personalStage.getId()); last.setNotKeywordSendCount(notKeywordSendCount+1);
last.setNotKeywordSendCount(last.getNotKeywordSendCount()+1);
last.setState(PersonalStageUserStateEnum.NORMAL.value); last.setState(PersonalStageUserStateEnum.NORMAL.value);
personalStageUserDao.update(last); personalStageUserDao.update(last);
sendNotKeywordReply(robotId,userWxId,ip,personalStage.getId());
} }
} }
} }
...@@ -312,6 +314,32 @@ public class PersonalStageBizImpl implements PersonalStageBiz { ...@@ -312,6 +314,32 @@ public class PersonalStageBizImpl implements PersonalStageBiz {
} }
} }
@ParamLog("从缓存中获取发送非关键词次数")
private synchronized Integer getSendNKeywordCountFromCache(Long personalStageId, String wxId, String robotId) {
String key = PersonalStageConstant.NOT_SEND_KEYWORD_COUNT_LOCK + personalStageId;
Integer count = 0;
try {
while (!CacheUtils.lock(key)) {
Thread.sleep(100);
}
String cacheKey = PersonalStageConstant.USER_SEND_KEYWORD_COUNT_LOCK + personalStageId + "_" + wxId;
long value = JedisClusterUtils.incr(cacheKey);
if (value != 1) {
return (int)value - 1;
}
count = personalStageUserDao.getNotKeywordSendCount(wxId, robotId, null);
count = count == null ? 0 : count;
JedisClusterUtils.set(cacheKey, (count.longValue() + 1) + "", 10 * 60);
return count;
} catch (Exception e) {
LOGGER.warn("从缓存中获取发送非关键词次数失败,personalStageId=" + personalStageId +"wxId="+wxId+"robotId="+robotId);
} finally {
CacheUtils.unlock(key);
}
return count;
}
@Override @Override
@ParamLog("判断是否是定制化用户") @ParamLog("判断是否是定制化用户")
public Boolean isPersonalStageUser(String robotId){ public Boolean isPersonalStageUser(String robotId){
...@@ -529,8 +557,8 @@ public class PersonalStageBizImpl implements PersonalStageBiz { ...@@ -529,8 +557,8 @@ public class PersonalStageBizImpl implements PersonalStageBiz {
String key = "BOOK:PERSONAL_STAGE:FUSING:" + last.getId(); String key = "BOOK:PERSONAL_STAGE:FUSING:" + last.getId();
if (!JedisClusterUtils.exists(key)) { if (!JedisClusterUtils.exists(key)) {
// 发送熔断回复消息 // 发送熔断回复消息
sendNotKeywordFusingReply(robotId, userWxId, ip, personalStage.getId());
JedisClusterUtils.set(key, "true", 60); JedisClusterUtils.set(key, "true", 60);
sendNotKeywordFusingReply(robotId, userWxId, ip, personalStage.getId());
} }
} }
...@@ -556,5 +584,7 @@ public class PersonalStageBizImpl implements PersonalStageBiz { ...@@ -556,5 +584,7 @@ public class PersonalStageBizImpl implements PersonalStageBiz {
user.setState(PersonalStageUserStateEnum.NORMAL.value); user.setState(PersonalStageUserStateEnum.NORMAL.value);
user.setNotKeywordSendCount(0); user.setNotKeywordSendCount(0);
personalStageUserDao.update(user); personalStageUserDao.update(user);
//刪除发送非关键词次数緩存
JedisClusterUtils.del(PersonalStageConstant.USER_SEND_KEYWORD_COUNT_LOCK + personalStageId + "_" + wxId);
} }
} }
...@@ -4,4 +4,9 @@ public class PersonalStageConstant { ...@@ -4,4 +4,9 @@ public class PersonalStageConstant {
public static final String PERSONALSTAGE_DELAY_WAKEUP="PERSONALSTAGE_DELAY_WAKEUP"; public static final String PERSONALSTAGE_DELAY_WAKEUP="PERSONALSTAGE_DELAY_WAKEUP";
public static final String PERSONALSTAGE_DELAY_FUSING="PERSONALSTAGE_DELAY_FUSING"; public static final String PERSONALSTAGE_DELAY_FUSING="PERSONALSTAGE_DELAY_FUSING";
public static final String NOT_SEND_KEYWORD_COUNT_LOCK="NOT_SEND_KEYWORD_COUNT_LOCK";
public static final String USER_SEND_KEYWORD_COUNT_LOCK="USER_SEND_KEYWORD_COUNT_LOCK";
} }
...@@ -7,4 +7,5 @@ public interface PersonalStageUserDao extends BaseDao<PersonalStageUser> { ...@@ -7,4 +7,5 @@ public interface PersonalStageUserDao extends BaseDao<PersonalStageUser> {
PersonalStageUser getLast(String wxId, String robotId, Long personalStageId); PersonalStageUser getLast(String wxId, String robotId, Long personalStageId);
Integer getNotKeywordSendCount(String wxId, String robotId, Long personalStageId);
} }
\ No newline at end of file
...@@ -19,4 +19,13 @@ public class PersonalStageUserDaoImpl extends BaseDaoImpl<PersonalStageUser> imp ...@@ -19,4 +19,13 @@ public class PersonalStageUserDaoImpl extends BaseDaoImpl<PersonalStageUser> imp
map.put("personalStageId", personalStageId); map.put("personalStageId", personalStageId);
return super.getSessionTemplate().selectOne(getStatement("getLast"), map); return super.getSessionTemplate().selectOne(getStatement("getLast"), map);
} }
@Override
public Integer getNotKeywordSendCount(String wxId, String robotId, Long personalStageId) {
Map<String, Object> map = new HashMap<>();
map.put("wxId", wxId);
map.put("robotId", robotId);
map.put("personalStageId", personalStageId);
return super.getSessionTemplate().selectOne(getStatement("getNotKeywordSendCount"), map);
}
} }
/**
*
*/
package com.pcloud.book.personalstage.utils;
import com.pcloud.common.utils.cache.redis.JedisClusterUtils;
public class CacheUtils {
/**
* 加锁
* @param key
* @return
*/
public static boolean lock(String key){
//加锁,如果返回1为true,表示已经加锁,如果再来存储同一个key会返回0为false
Boolean flag = JedisClusterUtils.setnx(key,"lock");
//如果为true的话,设置一个过期时间,防止锁释放不掉
if(flag){
JedisClusterUtils.expire(key,10);
}
return flag;
}
/**
* 释放锁
*/
public static void unlock(String key){
JedisClusterUtils.del(key);
}
}
...@@ -97,4 +97,21 @@ ...@@ -97,4 +97,21 @@
LIMIT 0,1 LIMIT 0,1
</select> </select>
<select id="getNotKeywordSendCount" parameterType="map" resultType="integer">
select not_keyword_send_count
from personal_stage_user
where 1=1
<if test="wxId!=null">
and wx_id=#{wxId}
</if>
<if test="robotId!=null">
and robot_id = #{robotId}
</if>
<if test="personalStageId!=null">
and personal_stage_id=#{personalStageId}
</if>
order by create_time desc
LIMIT 0,1
</select>
</mapper> </mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment