自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

記錄一次MySQL+Redis實(shí)現(xiàn)優(yōu)化百萬數(shù)據(jù)統(tǒng)計的方式

數(shù)據(jù)庫 MySQL
客戶在我司采購了WAF防火墻產(chǎn)品,用于攔截和阻斷非法請求和一些具有攻擊行為的請求。隨著系統(tǒng)的不斷運(yùn)作,數(shù)據(jù)量也隨之增長,這就導(dǎo)致客戶系統(tǒng)部分報表頁面加載時間過長,用戶體驗(yàn)極差。

提到歷史項目,大家對它的第一印象可能會是數(shù)據(jù)量大、技術(shù)老舊、文檔缺失、開發(fā)人員斷層、"屎山"等。剛好這幾天就接到了一個優(yōu)化老項目的需求,客戶反饋頁面數(shù)據(jù)加載緩慢甚至加載不出來,希望能夠做一些優(yōu)化。

剛接到這個任務(wù)后真的是一臉懵逼,因?yàn)榧葲]有文檔,也沒有相關(guān)的開發(fā)人員,甚至連需求都不了解。唯一的解決辦法就是向上面多要時間,有了足夠的時間就可以通過代碼梳理出業(yè)務(wù)邏輯。

背景

客戶在我司采購了WAF防火墻產(chǎn)品,用于攔截和阻斷非法請求和一些具有攻擊行為的請求。隨著系統(tǒng)的不斷運(yùn)作,數(shù)據(jù)量也隨之增長,這就導(dǎo)致客戶系統(tǒng)部分報表頁面加載時間過長,用戶體驗(yàn)極差。

技術(shù)棧

SSM + Gateway + Redis + Kafka + MySQL

其中Gateway負(fù)責(zé)安全防護(hù)和限流,當(dāng)請求經(jīng)過Gateway時,Gateway會將該請求的原參數(shù),以及安全狀態(tài),是否存在攻擊,請求ip等信息通過Kafka發(fā)送到后臺系統(tǒng)并當(dāng)作日志記錄到數(shù)據(jù)庫中。

優(yōu)化思路

當(dāng)我看到報表接口的第一眼,就被驚呆了。先不說業(yè)務(wù)邏輯,單單一個函數(shù)中的代碼行數(shù)將近1000行,在這1000行的代碼中依稀殘留著幾行簡潔而又模糊的注釋,并且函數(shù)內(nèi)對象的命名也是慘不忍睹,比如format1,data1,data2,collect1, collect2......。即使冒著涉密的風(fēng)險,我也要復(fù)制出來,與大家一起分享。

圖片圖片

if (pageResultDTO.isPresent()) {
    List<SecurityIncidentDTO> data = pageResultDTO.get().getData();
    Long count = Long.parseLong(pageResultDTO.get().getCount().toString());
    long normalCount = data.stream().filter(log -> log.getType().equals("正常")).count();
    response.setTotalCount(count);
    response.setNormalCount(normalCount);
    response.setAbNormalCount(count - normalCount);
    Map<String, List<SecurityIncidentDTO>> collect = data.stream()
            .filter(log -> log.getType().equals("正常"))
            .collect(Collectors.groupingBy(
                    item -> new SimpleDateFormat(
                            "yyyy-MM-dd HH").format(
                            com.payegis.antispider.admin.common.utils.DateUtil
                                    .pars2Calender(item.getTime())
                                    .getTime())));

    Map<String, List<SecurityIncidentDTO>> collect1 = data.stream()
            .filter(log -> !log.getType().equals("正常"))
            .collect(Collectors.groupingBy(
                    item -> new SimpleDateFormat(
                            "yyyy-MM-dd HH").format(
                            com.payegis.antispider.admin.common.utils.DateUtil
                                    .pars2Calender(item.getTime())
                                    .getTime())));
    
    Map<String, List<SecurityIncidentDTO>> ipMap = data.stream()
            .filter(log -> !log.getType().equals("正常"))
            .collect(Collectors.groupingBy(
                    SecurityIncidentDTO::getSourceIp));
    for (String s : ipMap.keySet()) {
        List<SecurityIncidentDTO> tempList = ipMap.get(s);
        int size = tempList.size();
        ApiStatisticDataVO apiStatisticDataVO = new ApiStatisticDataVO();
        apiStatisticDataVO.setValue(size);
        apiStatisticDataVO.setMsg(s);
        apiStatisticDataVO.setId(s);
        ipList.add(apiStatisticDataVO);

    }
    List<ApiStatisticDataVO> collect3 = ipList.stream()
            .sorted(Comparator.comparing(ApiStatisticDataVO::getValue)
                    .reversed())
            .limit(5)
            .collect(Collectors.toList());
    ipList = new ArrayList<>(5);
    for (int i = 0; i < 5; i++) {
        ApiStatisticDataVO apiStatisticDataVO = new ApiStatisticDataVO();
        apiStatisticDataVO.setId(i + "");
        apiStatisticDataVO.setValue(0);
        apiStatisticDataVO.setMsg("");
        ipList.add(i, apiStatisticDataVO);
    }
    for (int i = 0; i < collect3.size(); i++) {
        ipList.set(i, collect3.get(i));
    }

    for (String hour2 : list) {
        boolean falg = false;
        for (String hour : collect.keySet()) {
            if (hour2.substring(0, 2).equals(hour.substring(hour.length() - 2))) {
                data1.add(collect.get(hour).size());
                falg = true;
            }
        }
        if (!falg) {
            data1.add(0);
        }
    }

    for (String hour2 : list) {
        boolean falg = false;
        for (String hour : collect1.keySet()) {
            if (hour2.substring(0, 2).equals(hour.substring(hour.length() - 2))) {
                data2.add(collect1.get(hour).size());
                falg = true;
            }
        }
        if (!falg) {
            data2.add(0);
        }
    }

吐槽完了,下面開始正式步入正題。經(jīng)過不知道多久的時間,該方法的邏輯也慢慢變得清晰起來,其主要實(shí)現(xiàn)的是:將Kafka接受并存儲在數(shù)據(jù)庫中的日志數(shù)據(jù),進(jìn)行分類統(tǒng)計,具體包括事件狀態(tài)統(tǒng)計(正常訪問量,異常訪問量,總訪問量),近12/24小時內(nèi)各時間段的事件統(tǒng)計,攻擊IP地址TOP5,接口訪問TOP5,安全類型分布等報表。

原有邏輯是直接查詢數(shù)據(jù)庫,通過sql來實(shí)現(xiàn)統(tǒng)計,這種方式如果在數(shù)據(jù)量小的情況下并不會出現(xiàn)什么問題并且實(shí)現(xiàn)方式也相對簡單。但是,當(dāng)數(shù)據(jù)量上去之后,sql的查詢效率就會隨之下降,即使通過優(yōu)化索引的方式也無濟(jì)于事。

那么在不能引入其他組件或框架情況下,該如何優(yōu)化查詢呢?

經(jīng)過短暫的思考后,決定以歸檔的方式進(jìn)行數(shù)據(jù)處理,即在存儲日志前,先對日志數(shù)據(jù)進(jìn)行分門別類的處理,比如需要統(tǒng)計每個時段的事件訪問量,那么就以小時和事件狀態(tài)為標(biāo)識進(jìn)行存儲,假設(shè)在12:30分有一條異常的訪問,那么在消費(fèi)端接收到消息后,先查詢數(shù)據(jù)庫中是否存在12點(diǎn)且訪問異常的數(shù)據(jù),如果存在,那么次數(shù)加一,否則將該數(shù)據(jù)插入到數(shù)據(jù)庫中,這樣在一小時內(nèi)統(tǒng)一時間狀態(tài)只會存在一條數(shù)據(jù)。

圖片圖片

上面的方式是可以減少一定的數(shù)據(jù)量并且可以提高查詢效率,但是如果請求量很大,消息在不斷的消費(fèi)那么就意味著需要不斷的查詢數(shù)據(jù)庫,更新數(shù)據(jù)庫,這樣就會造成一定的性能消耗,而且還會出現(xiàn)并發(fā)問題,造成數(shù)據(jù)重復(fù)。

本打算先用這種方式來解決的,有并發(fā)就加鎖。但是在劃了一小時水之后突然想到,當(dāng)前小時的數(shù)據(jù)是不是可以存到redis中?

經(jīng)過片刻的構(gòu)想,發(fā)現(xiàn)確實(shí)可以,畢竟變得只是一個數(shù)量,可以用redis自增去做。存到緩存后,定時在同步到數(shù)據(jù)庫中不就搞定了嗎,這樣既可以大大減少數(shù)據(jù)庫操作,還能提高查詢效率。

圖片圖片

/**
 * 將事件詳情按事件正常狀態(tài)進(jìn)行歸檔,將次數(shù)緩存到redis用于報表查詢
 *
 * @param log
 */
@Override
public void handleWebEventStatus(Log log) {
    String siteId = antispiderDetailLog.getSiteId();
    Date curr = new Date();
    DateTime beginOfHour = DateUtil.beginOfHour(curr);
    Integer eventStatus = log.getAntispiderRule().intValue() == 0 ? 0 : 1;
    // 不同站點(diǎn)事件(區(qū)分站點(diǎn))
    String cacheKey = StrUtil.format(RedisConstant.REPORT_WEB_TIME_EXIST, siteId, DateUtil.format(beginOfHour, timeFormat), eventStatus.intValue());
    // 所有站點(diǎn)事件(不區(qū)分站點(diǎn))
    String cacheKeyAll = StrUtil.format(RedisConstant.REPORT_WEN_TIME_ALL, DateUtil.format(beginOfHour, timeFormat), eventStatus.intValue());
    if (redisService.exist(cacheKeyAll)) {
        redisService.increment(cacheKeyAll, 1L);
    } else {
        redisService.setValueByHour(cacheKeyAll, 1, 2L);
    }
    if (redisService.exist(cacheKey)) {
        redisService.increment(cacheKey, 1L);
    } else {
        redisService.setValueByHour(cacheKey, 1, 2L);
    }
}


/**
 * 將當(dāng)前小時內(nèi)的數(shù)據(jù) 以及上一個小時內(nèi)的數(shù)據(jù)同步到數(shù)據(jù)庫
 */
@Scheduled(cron = "0 0/30 * * * ?")
public void synRedisDataToDB() {
    synchronized (lock) {
        reportWebEventStatusService.synRedisDataToDB();
        reportWebEventTopService.synRedisDataToDB();
        reportWebIpTopService.synRedisDataToDB();
    }
}

上面的搞完后,我突然又發(fā)現(xiàn),如果要統(tǒng)計24小時內(nèi)的數(shù)據(jù),那前23小時的數(shù)據(jù)肯定都已經(jīng)固定了,不會在發(fā)生變化了。那完全可以將前23小時的數(shù)據(jù)統(tǒng)計完后存入redis,查詢的時候只需要在數(shù)據(jù)庫中查詢當(dāng)前所屬小時的數(shù)據(jù)即可。更新緩存的時間可以設(shè)定為1小時1更新,這樣就可以保證到一個新時段時,可以保證緩存中的數(shù)據(jù)為近23小時內(nèi)的數(shù)據(jù)。

圖片圖片

/**
 * 每小時同步所有站點(diǎn)23小時內(nèi)的事件數(shù)據(jù)到緩存中
 */
@Scheduled(cron = "0 0 0/1 * * ?")
public void synAllSiteWebEventDataToRedis() {
    synchronized (lock) {
        synReportWebDataToRedis();
    }
}

現(xiàn)在經(jīng)過優(yōu)化以后,幾乎所有的數(shù)據(jù)都通過定時任務(wù)的方式來統(tǒng)計和存儲了,不在需要通過sql的方式實(shí)時統(tǒng)計了。最后還是會有個地方存在優(yōu)化的空間,由于原業(yè)務(wù)接口是將所有統(tǒng)計報表的數(shù)據(jù)放在一個接口里面返回的,那么在不改變原參數(shù)和不拆分接口的情況下,可以使用Future做異步處理,畢竟每個報表的數(shù)據(jù)查詢統(tǒng)計操作都是獨(dú)立的,可以按照預(yù)估的查詢效率做個排序。那么,最終的一個方法就是將上述幾個報表數(shù)據(jù)進(jìn)行組裝,并統(tǒng)一返回給前端。

@Override
public ApiDashboardResponse webDashboardV2(DashboardRequest request) throws Exception {
    ApiDashboardResponse response = new ApiDashboardResponse();

    // 1. 統(tǒng)計近12/24小時事件防護(hù)數(shù)量排名
    Future<ReportWebEventTopVo> reportWebEventTopVoFuture = reportTaskExecutor.submit(() -> {
        ReportWebEventTopVo webEventTopVo = reportWebEventTopService.getWebEventTopVo(request.getSiteId(), request.getTimeType());
        return webEventTopVo;
    });

    // 2. 統(tǒng)計近12/24小時內(nèi)各時段安全事件狀態(tài)
    Future<ReportWebEventStatusVo> webEventTopVoFuture = reportTaskExecutor.submit(() -> {
        ReportWebEventStatusVo reportWebEventStatus = reportWebEventStatusService.getReportWebEventStatus(request.getSiteId(), request.getTimeType());
        return reportWebEventStatus;
    });

    // 3.統(tǒng)計top5的攻擊ip地址
    Future<ReportWebIpTopVo> reportWebIpTopVoFuture = reportTaskExecutor.submit(() -> {
        ReportWebIpTopVo reportWebIpTop5 = reportWebIpTopService.getReportWebIpTop5(request.getSiteId(), request.getTimeType());
        return reportWebIpTop5;
    });

    // 4. 統(tǒng)計訪問top5的站點(diǎn)
    Future<ReportWebSiteTopVo> reportWebSiteTopVoFuture = reportTaskExecutor.submit(() -> {
        ReportWebSiteTopVo webSiteTop5 = reportWebSiteTopService.getWebSiteTop5Vo(request.getSiteId(), request.getTimeType());
        return webSiteTop5;
    });


    // 拼裝響應(yīng)數(shù)據(jù)
    // 站點(diǎn)訪問量的數(shù)據(jù)都存儲在redis處理速度應(yīng)該最快
    ReportWebSiteTopVo reportWebSiteTopVo = reportWebSiteTopVoFuture.get();

    ReportWebEventTopVo reportWebEventTopVo = reportWebEventTopVoFuture.get();

    ReportWebEventStatusVo reportWebEventStatusVo = webEventTopVoFuture.get();

    //攻擊源ip的數(shù)據(jù)可能相對較多
    ReportWebIpTopVo reportWebIpTopVo = reportWebIpTopVoFuture.get();

    //......
    return response;
}

小結(jié)

由于是公司項目的代碼,所以在這里只能粘貼一小部分。但代碼不是關(guān)鍵,關(guān)鍵在于如何在不借助其他數(shù)據(jù)處理的中間件的情況下,如何優(yōu)化大量數(shù)據(jù)查詢速度。數(shù)據(jù)分類歸檔確實(shí)是一種可行的解決方式,如果你的項目中有一些需要以月,以天,以人或者其他標(biāo)準(zhǔn)來進(jìn)行統(tǒng)計的話,不妨可以嘗試一下。

如果有更好的方法方式,可以忽略。下面的兩張圖是測試人員提供的優(yōu)化前后對比,發(fā)現(xiàn)150萬的日志量,查詢時間在1秒內(nèi),比老版本提高很多倍。

優(yōu)化前

圖片圖片

優(yōu)化后

圖片 圖片

責(zé)任編輯:武曉燕 來源: JAVA日知錄
相關(guān)推薦

2023-12-08 07:55:37

MySQL數(shù)據(jù)統(tǒng)計InnoDB

2023-03-29 09:36:32

2018-01-15 14:50:49

APP轉(zhuǎn)讓App賬號

2015-07-17 10:04:33

MKMapView優(yōu)化

2021-05-24 08:58:34

Redis Bitmap 數(shù)據(jù)統(tǒng)計

2021-03-11 10:55:41

MySQL數(shù)據(jù)庫索引

2021-06-08 08:51:50

Redis 數(shù)據(jù)類型數(shù)據(jù)統(tǒng)計

2023-11-29 12:12:24

Oceanbase數(shù)據(jù)庫

2023-11-06 07:45:42

單據(jù)圖片處理

2011-02-22 09:29:23

jQueryJavaScript

2018-02-23 13:41:05

數(shù)據(jù)庫MySQL數(shù)據(jù)恢復(fù)

2010-09-07 11:16:14

SQL語句

2010-11-04 15:43:49

DB2數(shù)據(jù)統(tǒng)計與分析系

2015-02-12 16:05:51

微信SDK

2015-02-12 15:45:05

微信SDK

2015-02-12 16:17:09

微信SDK

2022-04-07 07:30:47

InnoDBMySQL數(shù)據(jù)

2019-04-04 15:00:40

SQL索引數(shù)據(jù)庫

2022-09-15 10:02:58

測試軟件

2011-06-28 10:41:50

DBA
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號