Commit 9f9603d2 by 田超

Merge branch 'featur/zipkinSamplerConfig' into 'master'

支持zipkin采样率动态配置

See merge request rays/pcloud-common-parent!138
parents a2c7d964 40f0a717
package com.pcloud.common.core.zipkin;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.cloud.sleuth.Sampler;
import org.springframework.cloud.sleuth.Span;
import java.util.BitSet;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 自定义采样率Sampler
*/
@Slf4j
public class PercentageLocalSampler implements Sampler {
//采样率最小值
private static final Float MIN_PERCENTAGE = 0.0f;
//采样率最大值
private static final Float MAX_PERCENTAGE = 1.0f;
private Map<String, BitSet> sampleDecisionsMap;
//本地采样器
private final SamplerLocalProperties configuration;
private Float globalPercentage;
private static final String all = "all";
//替换访问地址前缀正则表达式
private static final String regex = "^(http://www\\.|http://|www\\.|http:)";
//uri访问次数
private final Map<String, AtomicInteger> concurrentSampleCount;
//刷新本地采样器和读取采样器配置使用读写锁,该场景读大于写,后期可优化为类似eureka注册表多级缓存
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//读锁
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
//写锁
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
public PercentageLocalSampler(SamplerLocalProperties configuration) {
this.configuration = configuration;
sampleDecisionsMap = buildRandomBit();
concurrentSampleCount = new ConcurrentHashMap<>();
concurrentSampleCount.put(all, new AtomicInteger(0));
}
public Map<String, AtomicInteger> getConcurrentSampleCount() {
return this.concurrentSampleCount;
}
@Override
public boolean isSampled(Span currentSpan) {
try {
readLock.lock();
if (currentSpan == null) {
return false;
}
// 获取span中的请求uri
String uri = currentSpan.getName();
uri = uri.replaceFirst(regex, "");
AtomicInteger count = this.concurrentSampleCount.get(all);
// 获取全局的bitSet
BitSet bitSet = this.sampleDecisionsMap.get(all);
// 获取全局的采样率
float percentage = this.configuration.getPercentage();
for (UriSampleProperties sampleProperties : configuration.getUriSamples()) {
// 正则匹配
if (uri.matches(sampleProperties.getUriRegex())) {
//匹配上了自定义采样率的正则
// 多个线程会有并发问题,加个局部锁
synchronized (this) {
// 判断当前uri是否在map中
if (!concurrentSampleCount.containsKey(uri)) {
concurrentSampleCount.put(uri, new AtomicInteger(0));
}
}
// 获取当前URI对应的访问次数
count = concurrentSampleCount.get(uri);
// 获取当前URI对应的bitSet
bitSet = sampleDecisionsMap.get(sampleProperties.getUriRegex());
// 获取当前URI对应的采样率
percentage = sampleProperties.getUriPercentage();
break;
}
}
log.warn("replace uri {} , percentage {}", uri, percentage);
// 如果采样率是0 ,直接返回false
if (percentage == MIN_PERCENTAGE) {
return false;
} else if (percentage == MAX_PERCENTAGE) { // 如果采样率是1 ,那么直接返回true
return true;
}
synchronized (this) {
// 访问次数加1
final int i = count.getAndIncrement();
// 判断当前的访问 次数是否在 bitSet中,存在则返回true
boolean result = bitSet.get(i);
// 等于99的时候,重新设置为0
if (i == 99) {
count.set(0);
}
return result;
}
} finally {
readLock.unlock();
}
}
private static BitSet randomBitSet(int size, int cardinality, Random rnd) {
BitSet result = new BitSet(size);
int[] chosen = new int[cardinality];
int i;
for (i = 0; i < cardinality; ++i) {
chosen[i] = i;
result.set(i);
}
for (; i < size; ++i) {
int j = rnd.nextInt(i + 1);
if (j < cardinality) {
result.clear(chosen[j]);
result.set(i);
chosen[j] = i;
}
}
return result;
}
private Map<String, BitSet> buildRandomBit() {
Map<String, BitSet> map = new ConcurrentHashMap<>();
int size = 100;
// 设置全局的采样率
int outOf100 = (int) (configuration.getPercentage() * size);
map.put(all, randomBitSet(size, outOf100, new Random()));
if (CollectionUtils.isNotEmpty(configuration.getUriSamples())) {
for (UriSampleProperties sampleProperties : configuration.getUriSamples()) {
// 设置个性化的采样率
map.put(sampleProperties.getUriRegex(), randomBitSet(size,
(int) (sampleProperties.getUriPercentage() * size), new Random()));
}
}
return map;
}
public void initPercentage(SamplerLocalProperties samplerRemoteProperties) {
try {
writeLock.lock();
configuration.setPercentage(samplerRemoteProperties.getPercentage());
configuration.setUriSamples(samplerRemoteProperties.getUriSamples());
sampleDecisionsMap = buildRandomBit();
} finally {
writeLock.unlock();
}
}
public SamplerLocalProperties getConfiguration() {
return configuration;
}
}
package com.pcloud.common.core.zipkin;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.pcloud.common.utils.HttpKit;
import com.pcloud.common.utils.string.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.bootstrap.config.PropertySourceLocator;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
import org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.event.EventListener;
import org.springframework.core.env.Environment;
import org.springframework.core.env.PropertySource;
import org.springframework.stereotype.Component;
import java.net.URL;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 刷新本地采样率
*/
@Slf4j
@Component
public class RefreshSamplerLocalProperties implements ApplicationContextAware, InitializingBean {
@Autowired(required = false)
private List<PropertySourceLocator> propertySourceLocators = new ArrayList<>();
@Autowired
private PercentageLocalSampler percentageLocalSampler;
//线程休眠时间
private static final Long REFRESH_SLEEP_TIME = 60 * 1000L;
//个性化采样率map的key
private static final String SPRING_ZIPKIN_URISAMPLEMAPJSONSTR = "spring.zipkin.uriSampleMapJsonStr";
//个性化设置全局采样率的key
private static final String SPRING_ZIPKIN_GLOBALPERCENTAGE = "spring.zipkin.globalPercentage";
//容器上下文
private ApplicationContext applicationContext;
//个性化采样率默认值
private static final String DEFAULT_URISAMPLEMAPJSONSTR = "{}";
//全局采样率默认值
private static final Float DEFAULT_GLOBALPERCENTAGE = 0.01f;
private Environment environment;
private static final String STATUS_DELIMITER="-";
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
environment = applicationContext.getEnvironment();
}
@Override
public void afterPropertiesSet() throws Exception {
new RefreshRequest().start();
}
public void setPropertySourceLocators(
Collection<PropertySourceLocator> propertySourceLocators) {
this.propertySourceLocators = new ArrayList<>(propertySourceLocators);
}
/**
* 读取git配置文件线程
*/
class RefreshRequest extends Thread {
@Override
public void run() {
while (true) {
try {
for (PropertySourceLocator locator : propertySourceLocators) {
PropertySource<?> source = null;
source = locator.locate(environment);
if (source == null) {
continue;
}
float globalPercentage = source.getProperty(SPRING_ZIPKIN_GLOBALPERCENTAGE) != null ? Float.parseFloat(source.getProperty(SPRING_ZIPKIN_GLOBALPERCENTAGE).toString()) : DEFAULT_GLOBALPERCENTAGE;
String uriSampleMapJsonStr = source.getProperty(SPRING_ZIPKIN_URISAMPLEMAPJSONSTR) != null ? source.getProperty(SPRING_ZIPKIN_URISAMPLEMAPJSONSTR).toString() : DEFAULT_URISAMPLEMAPJSONSTR;
log.warn("读取git配置文件 globalPercentage {} ,globalPercentageJsonStr {}", globalPercentage, uriSampleMapJsonStr);
if (!uriSampleMapJsonStr.equals(DEFAULT_URISAMPLEMAPJSONSTR)) {
loadSamplerProperties(globalPercentage,uriSampleMapJsonStr);
}
}
Thread.sleep(REFRESH_SLEEP_TIME);
} catch (Exception e) {
log.error("刷新采样率异常 {}", e);
}
}
}
}
/**
* 刷新PercentageLocalSampler
*
* @param globalPercentage 全局采样率
* @param uriSampleMapJsonStr url正则表达式和个性化采样率
*/
public void loadSamplerProperties(Float globalPercentage, String uriSampleMapJsonStr) {
//获取容器的percentageLocalSampler
SamplerLocalProperties samplerLocalProperties = percentageLocalSampler.getConfiguration();
SamplerLocalProperties samplerRemoteLocalProperties = new SamplerLocalProperties();
//全局采样率配置更新
if (globalPercentage != null && globalPercentage != DEFAULT_GLOBALPERCENTAGE) {
samplerRemoteLocalProperties.setPercentage(globalPercentage);
}
if (!StringUtil.isEmpty(uriSampleMapJsonStr) && !uriSampleMapJsonStr.equals(DEFAULT_URISAMPLEMAPJSONSTR)) {
List<UriSampleProperties> uriSamples = new ArrayList<>();
Map<Float, ArrayList<String>> uriSampleMap = (HashMap<Float, ArrayList<String>>) JSON.parseObject(uriSampleMapJsonStr, new TypeReference<HashMap<Float, ArrayList<String>>>() {
});
if (uriSampleMap != null && uriSampleMap.size() > 0) {
for (Map.Entry<Float, ArrayList<String>> entry : uriSampleMap.entrySet()) {
float percentage = entry.getKey();
for (String uri : entry.getValue()) {
UriSampleProperties uriSampleProperties = new UriSampleProperties();
uriSampleProperties.setUriPercentage(percentage);
uriSampleProperties.setUriRegex(uri);
uriSamples.add(uriSampleProperties);
}
}
}
samplerRemoteLocalProperties.setUriSamples(uriSamples);
}
if(!getPercentageSamplerHashCode(samplerRemoteLocalProperties).equals(getPercentageSamplerHashCode(samplerLocalProperties))){
log.warn("更新采样率 globalPercentage {} ,globalPercentageJsonStr {}", globalPercentage, uriSampleMapJsonStr);
percentageLocalSampler.initPercentage(samplerRemoteLocalProperties);
}
}
/**
* 计算计算hash值
*
* @param samplerLocalProperties
* @return
*/
public static String getPercentageSamplerHashCode(SamplerLocalProperties samplerLocalProperties) {
StringBuffer reconcileHashCode = new StringBuffer((samplerLocalProperties.getPercentage() + STATUS_DELIMITER).hashCode());
for (UriSampleProperties uriSampleProperties : samplerLocalProperties.getUriSamples()) {
reconcileHashCode.append(uriSampleProperties.getUriRegex().hashCode() + STATUS_DELIMITER.hashCode());
reconcileHashCode.append((uriSampleProperties.getUriPercentage() + STATUS_DELIMITER).hashCode());
}
return reconcileHashCode.toString();
}
}
package com.pcloud.common.core.zipkin;
import java.util.ArrayList;
import java.util.List;
/**
* 本地采样器集合
*/
public class SamplerLocalProperties {
//采样器集合
private List<UriSampleProperties> uriSamples = new ArrayList<>(10);
//全局采样率
private float percentage = 0.1f;
public List<UriSampleProperties> getUriSamples() {
return uriSamples;
}
public void setUriSamples(List<UriSampleProperties> uriSamples) {
this.uriSamples = uriSamples;
}
public float getPercentage() {
return percentage;
}
public void setPercentage(float percentage) {
this.percentage = percentage;
}
}
package com.pcloud.common.core.zipkin;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
/**
* 本地取样器
*/
public class UriSampleProperties {
//url正则表达式表达式
private String uriRegex;
//采样率
private float uriPercentage = 0.01f;
public String getUriRegex() {
return uriRegex;
}
public void setUriRegex(String uriRegex) {
this.uriRegex = uriRegex;
}
public float getUriPercentage() {
return uriPercentage;
}
public void setUriPercentage(float uriPercentage) {
this.uriPercentage = uriPercentage;
}
}
package com.pcloud.common.core.zipkin;
import org.springframework.cloud.sleuth.Sampler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 重新配置自定义采样率
*/
@Configuration
public class ZipkinConfig {
//ZipkinAutoConfiguration使用@ConditionalOnMissingBean注解的,也就是容器中不存在这个Bean的时候,才初始化他自己默认的配置,可以重写他的配置
@Bean
public Sampler percentageLocalSampler(){
SamplerLocalProperties samplerLocalProperties = new SamplerLocalProperties();
return new PercentageLocalSampler(samplerLocalProperties);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment