一、實現介紹
根據官方wiki文檔,sentinel控制檯的實時監控數據,默認僅存儲 5 分鐘以內的數據。如需持久化,需要定製實現相關接口。
https://github.com/alibaba/Sentinel/wiki/在生產環境中使用-Sentinel-控制檯 也給出了指導步驟:
1.自行擴展實現 MetricsRepository 接口;
2.註冊成 Spring Bean 並在相應位置通過 @Qualifier 註解指定對應的 bean name 即可。
本文使用時序數據庫InfluxDB來進行持久化,從下載開始,一步步編寫一個基於InfluxDB的存儲實現。
二、InfluxDB介紹及安裝使用
InfluxDB官網:https://www.influxdata.com
關鍵詞:
- 高性能時序數據庫
- go語言編寫沒有外部依賴
- 支持HTTP API讀寫
- 支持類SQL查詢語法
- 通過數據保留策略(Retention Policies)支持自動清理歷史數據
- 通過連續查詢(Continuous Queries)支持數據歸檔
最新版本:1.6.4
下載
windows:wget https://dl.influxdata.com/influxdb/releases/influxdb-1.6.4_windows_amd64.zip
linux:wget https://dl.influxdata.com/influxdb/releases/influxdb-1.6.4_linux_amd64.tar.gz
注:windows下載安裝wget https://eternallybored.org/misc/wget/
在windows環境,解壓zip文件至D:\\influxdb\\influxdb-1.6.4-1目錄:
打開cmd命令行窗口,在D:\\influxdb\\influxdb-1.6.4-1執行命令啟動influxdb服務端:influxd
再打開一個cmd窗口,在目錄下輸入influx啟動客戶端: // 後面可以帶上參數:-precision rfc3339 指定時間格式顯示
show databases發現只有系統的2個數據庫,這裡我們新建一個sentinel_db,輸入命令:create database sentinel_db
use sentinel_db 使用sentinel_db數據庫
show measurements 查看數據庫中的數據表(measurement)
可以看到,這幾個InfluxDB命令跟MySQL很相似。
InfluxDB名詞概念:
database:數據庫 // 關係數據庫的database
measurement:數據庫中的表 // 關係數據庫中的table
point:表裡的一行數據 // 關係數據庫中的row
point由3部分組成:
time:每條數據記錄的時間,也是數據庫自動生成的主索引;// 類似主鍵
fields:各種記錄的值;// 沒有索引的字段
tags:各種有索引的屬性 // 有索引的字段
三、修改Sentinel Dashboard
在官方github上,有一個java的客戶端庫:
https://github.com/influxdata/influxdb-java
在sentinel-dashboard的pom.xml中,加入maven依賴:
<code><dependency>
<groupid>org.influxdb/<groupid>
<artifactid>influxdb-java/<artifactid>
<version>2.17/<version>
/<dependency>/<code>
封裝一個工具類:存儲InfluxDB連接信息以及方便調用
<code>/**
* @author cdfive
* @date 2018-10-19
*/
@Component
public class InfluxDBUtils {
private static Logger logger = LoggerFactory.getLogger(InfluxDBUtils.class);
private static String url;
private static String username;
private static String password;
private static InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
@Value("${influxdb.url}")
public void setUrl(String url) {
InfluxDBUtils.url = url;
}
@Value("${influxdb.username}")
public void setUsername(String username) {
InfluxDBUtils.username = username;
}
@Value("${influxdb.password}")
public void setPassword(String password) {
InfluxDBUtils.password = password;
}
public static void init(String url, String username, String password) {
InfluxDBUtils.url = url;
InfluxDBUtils.username = username;
InfluxDBUtils.password = password;
}
public staticT process(String database, InfluxDBCallback callback) { /<code>
InfluxDB influxDB = null;
T t = null;
try {
influxDB = InfluxDBFactory.connect(url, username, password);
influxDB.setDatabase(database);
t = callback.doCallBack(database, influxDB);
} catch (Exception e) {
logger.error("[process exception]", e);
} finally {
if (influxDB != null) {
try {
influxDB.close();
} catch (Exception e) {
logger.error("[influxDB.close exception]", e);
}
}
}
return t;
}
public static void insert(String database, InfluxDBInsertCallback influxDBInsertCallback) {
process(database, new InfluxDBCallback() {
@Override
publicT doCallBack(String database, InfluxDB influxDB) {
influxDBInsertCallback.doCallBack(database, influxDB);
return null;
}
});
}
public static QueryResult query(String database, InfluxDBQueryCallback influxDBQueryCallback) {
return process(database, new InfluxDBCallback() {
@Override
publicT doCallBack(String database, InfluxDB influxDB) {
QueryResult queryResult = influxDBQueryCallback.doCallBack(database, influxDB);
return (T) queryResult;
}
});
}
public staticList queryList(String database, String sql, Map<string> paramMap, Class clasz) { /<string>
QueryResult queryResult = query(database, new InfluxDBQueryCallback() {
@Override
public QueryResult doCallBack(String database, InfluxDB influxDB) {
BoundParameterQuery.QueryBuilder queryBuilder = BoundParameterQuery.QueryBuilder.newQuery(sql);
queryBuilder.forDatabase(database);
if (paramMap != null && paramMap.size() > 0) {
Set<map.entry>> entries = paramMap.entrySet();
for (Map.Entry<string> entry : entries) {
queryBuilder.bind(entry.getKey(), entry.getValue());
}
}
return influxDB.query(queryBuilder.create());
}
});
return resultMapper.toPOJO(queryResult, clasz);
}
public interface InfluxDBCallback {
T doCallBack(String database, InfluxDB influxDB); /<string>/<map.entry>
}
public interface InfluxDBInsertCallback {
void doCallBack(String database, InfluxDB influxDB);
}
public interface InfluxDBQueryCallback {
QueryResult doCallBack(String database, InfluxDB influxDB);
}
}
其中:
url、username、password用於存儲InfluxDB的連接、用戶名、密碼信息,定義為static屬性,因此在set方法上使用@Value註解從配置文件讀取屬性值;
resultMapper用於查詢結果到實體類的映射;
init方法用於初始化url、username、password;
process為通用的處理方法,負責打開關閉連接,並且調用InfluxDBCallback回調方法;
insert為插入數據方法,配合InfluxDBInsertCallback回調使用;
query為通用的查詢方法,配合InfluxDBQueryCallback回調方法使用,返回QueryResult對象;
queryList為查詢列表方法,調用query得到QueryResult,再通過resultMapper轉換為List;
在resources目錄下的application.properties文件中,增加InfluxDB的配置:
<code>influxdb.url=${influxdb.url}
influxdb.username=${influxdb.username}
influxdb.password=${influxdb.password}/<code>
用${xxx}佔位符,這樣可以通過maven的pom.xml添加profile配置不同環境(開發、測試、生產) 或 從配置中心讀取參數。
在datasource.entity包下,新建influxdb包,下面新建sentinel_metric數據表(measurement)對應的實體類MetricPO:
<code>package com.alibaba.csp.sentinel.dashboard.datasource.entity;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;
import java.time.Instant;
/**
* @author cdfive
* @date 2018-10-19
*/
@Measurement(name = "sentinel_metric")
public class MetricPO {
@Column(name = "time")
private Instant time;
@Column(name = "id")
private Long id;
@Column(name = "gmtCreate")
private Long gmtCreate;
@Column(name = "gmtModified")
private Long gmtModified;
@Column(name = "app", tag = true)
private String app;
@Column(name = "resource", tag = true)
private String resource;
@Column(name = "passQps")
private Long passQps;
@Column(name = "successQps")
private Long successQps;
@Column(name = "blockQps")
private Long blockQps;
@Column(name = "exceptionQps")
private Long exceptionQps;
@Column(name = "rt")
private double rt;
@Column(name = "count")
private int count;
@Column(name = "resourceCode")
private int resourceCode;
// getter setter省略
}/<code>
該類參考MetricEntity創建,加上influxdb-java包提供的註解,通過@Measurement(name = "sentinel_metric")指定數據表(measurement)名稱,
time作為時序數據庫的時間列;
app、resource設置為tag列,通過註解標識為tag=true;
其它字段為filed列;
接著在InMemoryMetricsRepository所在的repository.metric包下新建InfluxDBMetricsRepository類,實現MetricsRepository<metricentity>接口:/<metricentity>
<code>package com.alibaba.csp.sentinel.dashboard.repository.metric;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.commons.lang.time.DateUtils;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
import org.springframework.stereotype.Repository;
import org.springframework.util.CollectionUtils;
import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity;
import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricPO;
import com.alibaba.csp.sentinel.dashboard.util.InfluxDBUtils;
import com.alibaba.csp.sentinel.util.StringUtil;
/**
* metrics數據InfluxDB存儲實現
* @author cdfive
* @date 2018-10-19
*/
@Repository("influxDBMetricsRepository")
public class InfluxDBMetricsRepository implements MetricsRepository<metricentity> {
/**時間格式*/
private static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
/**數據庫名稱*/
private static final String SENTINEL_DATABASE = "sentinel_db";
/**數據表名稱*/
private static final String METRIC_MEASUREMENT = "sentinel_metric";
/**北京時間領先UTC時間8小時 UTC: Universal Time Coordinated,世界統一時間*/
private static final Integer UTC_8 = 8;
@Override
public void save(MetricEntity metric) {
if (metric == null || StringUtil.isBlank(metric.getApp())) {
return;
}
InfluxDBUtils.insert(SENTINEL_DATABASE, new InfluxDBUtils.InfluxDBInsertCallback() {
@Override
public void doCallBack(String database, InfluxDB influxDB) {
if (metric.getId() == null) {
metric.setId(System.currentTimeMillis());
}
doSave(influxDB, metric);
}
});
}
@Override
public void saveAll(Iterable<metricentity> metrics) {
if (metrics == null) {
return;
}
Iterator<metricentity> iterator = metrics.iterator();
boolean next = iterator.hasNext();
if (!next) {
return;
}
InfluxDBUtils.insert(SENTINEL_DATABASE, new InfluxDBUtils.InfluxDBInsertCallback() {
@Override
public void doCallBack(String database, InfluxDB influxDB) {
while (iterator.hasNext()) {
MetricEntity metric = iterator.next();
if (metric.getId() == null) {
metric.setId(System.currentTimeMillis());
}
doSave(influxDB, metric);
}
}
});
}
@Override
public List<metricentity> queryByAppAndResourceBetween(String app, String resource, long startTime, long endTime) {
List<metricentity> results = new ArrayList<metricentity>();
if (StringUtil.isBlank(app)) {
return results;
}
if (StringUtil.isBlank(resource)) {
return results;
}
// 將查詢的開始時間和結束減去8小時,因為influxdb使用的是UTC時間,北京時間比UTC時間慢8個小時
endTime = endTime - UTC_8 * 60 * 60 *1000;
startTime = startTime - UTC_8 * 60 * 60 *1000;
StringBuilder sql = new StringBuilder();
sql.append("SELECT * FROM " + METRIC_MEASUREMENT);
sql.append(" WHERE app=$app");
sql.append(" AND resource=$resource");
sql.append(" AND time>=$startTime");
sql.append(" AND time<=$endTime");
Map<string> paramMap = new HashMap<string>();
paramMap.put("app", app);
paramMap.put("resource", resource);
paramMap.put("startTime", DateFormatUtils.format(new Date(startTime), DATE_FORMAT_PATTERN));
paramMap.put("endTime", DateFormatUtils.format(new Date(endTime), DATE_FORMAT_PATTERN));
List<metricpo> metricPOS = InfluxDBUtils.queryList(SENTINEL_DATABASE, sql.toString(), paramMap, MetricPO.class);
if (CollectionUtils.isEmpty(metricPOS)) {
return results;
}
for (MetricPO metricPO : metricPOS) {
results.add(convertToMetricEntity(metricPO));
}
return results;
}
@Override
public List<string> listResourcesOfApp(String app) {
List<string> results = new ArrayList<>();
if (StringUtil.isBlank(app)) {
return results;
}
StringBuilder sql = new StringBuilder();
sql.append("SELECT * FROM " + METRIC_MEASUREMENT);
sql.append(" WHERE app=$app");
sql.append(" AND time>=$startTime");
Map<string> paramMap = new HashMap<string>();
long startTime = System.currentTimeMillis() - 1000 * 60;
// 將查詢的開始時間減去8小時,因為influxdb使用的是UTC時間,北京時間比UTC時間慢8個小時
startTime = startTime - UTC_8 * 60 * 60 *1000;
paramMap.put("app", app);
paramMap.put("startTime", DateFormatUtils.format(new Date(startTime), DATE_FORMAT_PATTERN));
List<metricpo> metricPOS = InfluxDBUtils.queryList(SENTINEL_DATABASE, sql.toString(), paramMap, MetricPO.class);
if (CollectionUtils.isEmpty(metricPOS)) {
return results;
}
List<metricentity> metricEntities = new ArrayList<metricentity>();
for (MetricPO metricPO : metricPOS) {
metricEntities.add(convertToMetricEntity(metricPO));
}
Map<string> resourceCount = new HashMap<>(32);
for (MetricEntity metricEntity : metricEntities) {
String resource = metricEntity.getResource();
if (resourceCount.containsKey(resource)) {
MetricEntity oldEntity = resourceCount.get(resource);
oldEntity.addPassQps(metricEntity.getPassQps());
oldEntity.addRtAndSuccessQps(metricEntity.getRt(), metricEntity.getSuccessQps());
oldEntity.addBlockQps(metricEntity.getBlockQps());
oldEntity.addExceptionQps(metricEntity.getExceptionQps());
oldEntity.addCount(1);
} else {
resourceCount.put(resource, MetricEntity.copyOf(metricEntity));
}
}
// Order by last minute b_qps DESC.
return resourceCount.entrySet()
.stream()
.sorted((o1, o2) -> {
MetricEntity e1 = o1.getValue();
MetricEntity e2 = o2.getValue();
int t = e2.getBlockQps().compareTo(e1.getBlockQps());
if (t != 0) {
return t;
}
return e2.getPassQps().compareTo(e1.getPassQps());
})
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
private MetricEntity convertToMetricEntity(MetricPO metricPO) {
MetricEntity metricEntity = new MetricEntity();
metricEntity.setId(metricPO.getId());
metricEntity.setGmtCreate(new Date(metricPO.getGmtCreate()));
metricEntity.setGmtModified(new Date(metricPO.getGmtModified()));
metricEntity.setApp(metricPO.getApp());
metricEntity.setTimestamp(Date.from(metricPO.getTime()));
metricEntity.setResource(metricPO.getResource());
metricEntity.setPassQps(metricPO.getPassQps());
metricEntity.setSuccessQps(metricPO.getSuccessQps());
metricEntity.setBlockQps(metricPO.getBlockQps());
metricEntity.setExceptionQps(metricPO.getExceptionQps());
metricEntity.setRt(metricPO.getRt());
metricEntity.setCount(metricPO.getCount());
return metricEntity;
}
private void doSave(InfluxDB influxDB, MetricEntity metric) {
influxDB.write(Point.measurement(METRIC_MEASUREMENT)
.time(metric.getTimestamp().getTime(), TimeUnit.MILLISECONDS)
.tag("app", metric.getApp())
.tag("resource", metric.getResource())
.addField("id", metric.getId())
.addField("gmtCreate", metric.getGmtCreate().getTime())
.addField("gmtModified", metric.getGmtModified().getTime())
.addField("passQps", metric.getPassQps())
.addField("successQps", metric.getSuccessQps())
.addField("blockQps", metric.getBlockQps())
.addField("exceptionQps", metric.getExceptionQps())
.addField("rt", metric.getRt())
.addField("count", metric.getCount())
.addField("resourceCode", metric.getResourceCode())
.build());
}
}/<string>/<metricentity>/<metricentity>/<metricpo>/<string>/<string>/<string>/<string>/<metricpo>/<string>/<string>/<metricentity>/<metricentity>/<metricentity>/<metricentity>/<metricentity>/<metricentity>/<code>
其中:
save、saveAll方法通過調用InfluxDBUtils.insert和InfluxDBInsertCallback回調方法,往sentinel_db庫的sentinel_metric數據表寫數據;
saveAll方法不是循環調用save方法,而是在回調內部循環Iterable<metricentity> metrics處理,這樣InfluxDBFactory.connect連接只打開關閉一次;/<metricentity>
doSave方法中,.time(DateUtils.addHours(metric.getTimestamp(), 8).getTime(), TimeUnit.MILLISECONDS)
因InfluxDB的UTC時間暫時沒找到修改方法,所以這裡time時間列加了8個小時時差;
queryByAppAndResourceBetween、listResourcesOfApp裡面的查詢方法,使用InfluxDB提供的類sql語法,編寫查詢語句即可。
最後一步,在MetricController、MetricFetcher兩個類,找到metricStore屬性,在@Autowired註解上面加上@Qualifier("jpaMetricsRepository")註解:
<code>@Qualifier("influxDBMetricsRepository")
@Autowired
private MetricsRepository<metricentity> metricStore;/<metricentity>/<code>
四、驗證成果
設置sentinel-dashboard工程啟動參數:-Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard
啟動工程,打開http://localhost:8080,查看各頁面均顯示正常,
在命令行通過InfluxDB客戶端命令,show measurements,可以看到已經生成了sentinel_metric數據表(measurement);
查詢總數:select count(id) from sentinel_metric
查詢最新5行數據:select * from sentinel_metric order by time desc limit 5
注:命令行語句結束不用加分號
代碼參考:https://github.com/cdfive/Sentinel/tree/winxuan_develop/sentinel-dashboard
擴展:
1.考慮以什麼時間維度歸檔歷史數據;
2.結合grafana將監控數據進行多維度的統計和呈現。
五、參考
Sentinel官方文檔:
https://github.com/alibaba/Sentinel/wiki/控制檯
https://github.com/alibaba/Sentinel/wiki/在生產環境中使用-Sentinel-控制檯
InfluxDB官網文檔 https://docs.influxdata.com/influxdb/v1.6/introduction/getting-started/
InfluxDB簡明手冊 https://xtutu.gitbooks.io/influxdb-handbook/content/
原文鏈接:https://www.cnblogs.com/cdfive2018/p/9914838.html
注:在原文的鏈接基礎之上修正了Sentinal控制檯可以顯示監控數據,但是通過Grafana因為時區的問題不能夠正常展示的問題。
閱讀更多 瘋狂架構師 的文章