中文字幕第五页-中文字幕第页-中文字幕韩国-中文字幕最新-国产尤物二区三区在线观看-国产尤物福利视频一区二区

zookeeper(12)源碼分析-請(qǐng)求處理鏈(2)

SyncRequestProcessor,該處理器將請(qǐng)求存入磁盤,其將請(qǐng)求批量的存入磁盤以提高效率,請(qǐng)求在寫入磁盤之前是不會(huì)被轉(zhuǎn)發(fā)到下個(gè)處理器的。

創(chuàng)新互聯(lián)為您提適合企業(yè)的網(wǎng)站設(shè)計(jì)?讓您的網(wǎng)站在搜索引擎具有高度排名,讓您的網(wǎng)站具備超強(qiáng)的網(wǎng)絡(luò)競爭力!結(jié)合企業(yè)自身,進(jìn)行網(wǎng)站設(shè)計(jì)及把握,最后結(jié)合企業(yè)文化和具體宗旨等,才能創(chuàng)作出一份性化解決方案。從網(wǎng)站策劃到網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作, 我們的網(wǎng)頁設(shè)計(jì)師為您提供的解決方案。

類的核心屬性

SyncRequestProcessor維護(hù)了ZooKeeperServer實(shí)例,其用于獲取ZooKeeper的數(shù)據(jù)庫和其他信息;維護(hù)了一個(gè)處理請(qǐng)求的隊(duì)列,其用于存放請(qǐng)求;維護(hù)了一個(gè)處理快照的線程,用于處理快照;維護(hù)了一個(gè)running標(biāo)識(shí),標(biāo)識(shí)SyncRequestProcessor是否在運(yùn)行;同時(shí)還維護(hù)了一個(gè)等待被刷新到磁盤的請(qǐng)求隊(duì)列。

// Zookeeper服務(wù)器
    private final ZooKeeperServer zks;
    // 請(qǐng)求隊(duì)列
    private final LinkedBlockingQueue<Request> queuedRequests =
        new LinkedBlockingQueue<Request>();
    // 下個(gè)處理器
    private final RequestProcessor nextProcessor;
    // 快照處理線程
    private Thread snapInProcess = null;
    // 是否在運(yùn)行中
    volatile private boolean running;

    /**
     * Transactions that have been written and are waiting to be flushed to
     * disk. Basically this is the list of SyncItems whose callbacks will be
     * invoked after flush returns successfully.
     */
    // 等待被刷新到磁盤的請(qǐng)求隊(duì)列
    private final LinkedList<Request> toFlush = new LinkedList<Request>();
    // 隨機(jī)數(shù)生成器
    private final Random r = new Random();
    /**
     * The number of log entries to log before starting a snapshot
     */
    // 快照個(gè)數(shù)
    private static int snapCount = ZooKeeperServer.getSnapCount();
    // 結(jié)束請(qǐng)求標(biāo)識(shí)
    private final Request requestOfDeath = Request.requestOfDeath;

構(gòu)造函數(shù)

構(gòu)造函數(shù)首先會(huì)調(diào)用父類的構(gòu)造函數(shù),然后根據(jù)構(gòu)造函數(shù)參數(shù)給類的屬性賦值,其中會(huì)確定下個(gè)處理器,并會(huì)設(shè)置該處理器正在運(yùn)行的標(biāo)識(shí)。

public SyncRequestProcessor(ZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        super("SyncThread:" + zks.getServerId(), zks
                .getZooKeeperServerListener());
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        running = true;
    }

核心方法

1、run

@Override
    public void run() {
        try {
            // 寫日志數(shù)量初始化為0
            int logCount = 0;

            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            // 防止集群中所有機(jī)器在同一時(shí)刻進(jìn)行數(shù)據(jù)快照,對(duì)是否進(jìn)行數(shù)據(jù)快照增加隨機(jī)因素
            int randRoll = r.nextInt(snapCount/2);
            while (true) {
                Request si = null;
                // 沒有需要刷新到磁盤的請(qǐng)求
                if (toFlush.isEmpty()) {
                    // 從請(qǐng)求隊(duì)列中取出一個(gè)請(qǐng)求,若queuedRequests隊(duì)列為空會(huì)阻塞
                    si = queuedRequests.take();
                } else {
                    // 從請(qǐng)求隊(duì)列中取出一個(gè)請(qǐng)求,若queuedRequests隊(duì)列為空,則返回空,不會(huì)阻塞
                    si = queuedRequests.poll();
                    // 取出的請(qǐng)求為空
                    if (si == null) {
                        // 刷新數(shù)據(jù)磁盤
                        flush(toFlush);
                        continue;
                    }
                }
                // 在關(guān)閉處理器之后,會(huì)添加requestOfDeath請(qǐng)求到queuedRequests隊(duì)列,表示關(guān)閉后不再處理請(qǐng)求
                if (si == requestOfDeath) {
                    break;
                }
                // 請(qǐng)求不為空,處理請(qǐng)求
                if (si != null) {
                    // track the number of records written to the log
                    // 將寫請(qǐng)求添加至事務(wù)日志文件 FileTxnSnapLog.append(si)
                    if (zks.getZKDatabase().append(si)) {
                        // 日志寫入,logCount加1
                        logCount++;
                                                //確定是否需要進(jìn)行數(shù)據(jù)快照
                        if (logCount > (snapCount / 2 + randRoll)) {
                            randRoll = r.nextInt(snapCount/2);
                            // roll the log
                            // 滾動(dòng)日志,從當(dāng)前日志文件滾到下一個(gè)日志文件,不是回滾
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
                            if (snapInProcess != null && snapInProcess.isAlive()) {  // 正在進(jìn)行快照
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                // 創(chuàng)建線程來處理快照
                                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                        public void run() {
                                            try {
                                                // 進(jìn)行快照
                                                zks.takeSnapshot();
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                // 開始快照線程處理
                                snapInProcess.start();
                            }
                            // 重置為0
                            logCount = 0;
                        }
                    } else if (toFlush.isEmpty()) {// 讀請(qǐng)求會(huì)走到這里,查看此時(shí)toFlush是否為空,如果為空,說明近段時(shí)間讀多寫少,直接響應(yīng)
                        // optimization for read heavy workloads
                        // iff this is a read, and there are no pending
                        // flushes (writes), then just pass this to the next
                        // processor
                        if (nextProcessor != null) {
                            // 下個(gè)處理器開始處理請(qǐng)求
                            nextProcessor.proce***equest(si);
                            // 處理器是Flushable的,刷新數(shù)據(jù)到磁盤
                            if (nextProcessor instanceof Flushable) {
                                ((Flushable)nextProcessor).flush();
                            }
                        }
                        continue;
                    }
                    // 將請(qǐng)求添加至被刷新至磁盤隊(duì)列
                    toFlush.add(si);
                    if (toFlush.size() > 1000) {// 隊(duì)列大小大于1000,直接刷新到磁盤
                        flush(toFlush);
                    }
                }
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        } finally{
            running = false;
        }
        LOG.info("SyncRequestProcessor exited!");
    }

2、flush

flush將toFlush隊(duì)列中的請(qǐng)求刷新到磁盤中。

 private void flush(LinkedList<Request> toFlush)
        throws IOException, RequestProcessorException
    {
        if (toFlush.isEmpty())
            return;
        // 提交事務(wù)至ZK數(shù)據(jù)庫
        zks.getZKDatabase().commit();
        while (!toFlush.isEmpty()) {
            // 從隊(duì)列移除請(qǐng)求
            Request i = toFlush.remove();
            // 下個(gè)處理器開始處理請(qǐng)求
            if (nextProcessor != null) {
                nextProcessor.proce***equest(i);
            }
        }
        if (nextProcessor != null && nextProcessor instanceof Flushable) {
            ((Flushable)nextProcessor).flush();
        }
    }

3、shutdown

函數(shù)用于關(guān)閉SyncRequestProcessor處理器,其首先會(huì)在queuedRequests隊(duì)列中添加一個(gè)結(jié)束請(qǐng)求requestOfDeath,然后再判斷SyncRequestProcessor是否還在運(yùn)行,若是,則會(huì)等待其結(jié)束;之后判斷toFlush隊(duì)列是否為空,若不為空,則刷新到磁盤中

public void shutdown() {
        LOG.info("Shutting down");
        // 添加結(jié)束請(qǐng)求請(qǐng)求至隊(duì)列
        queuedRequests.add(requestOfDeath);
        try {
            // 還在運(yùn)行
            if(running){
                this.join();// 等待該線程終止
            }
            if (!toFlush.isEmpty()) {// 隊(duì)列不為空,刷新到磁盤
                flush(toFlush);
            }
        } catch(InterruptedException e) {
            LOG.warn("Interrupted while wating for " + this + " to finish");
        } catch (IOException e) {
            LOG.warn("Got IO exception during shutdown");
        } catch (RequestProcessorException e) {
            LOG.warn("Got request processor exception during shutdown");
        }
        if (nextProcessor != null) {
            nextProcessor.shutdown();
        }
    }

當(dāng)前名稱:zookeeper(12)源碼分析-請(qǐng)求處理鏈(2)
本文來源:http://m.2m8n56k.cn/article48/jdciep.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設(shè)計(jì)用戶體驗(yàn)品牌網(wǎng)站制作網(wǎng)站維護(hù)營銷型網(wǎng)站建設(shè)搜索引擎優(yōu)化

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)
主站蜘蛛池模板: 国产高清厕所盗摄视频 | 精品国产精品国产 | 玖玖玖视频在线观看视频6 玖玖影院在线观看 | 正能量www正能量免费网站 | 黄色三级三级三级 | 中文字幕日韩在线 | 成人免费视频播放 | 亚洲精美视频 | 三级毛片免费看 | 免费观看国产精品 | 色综合久久久 | 欧美成人午夜做爰视频在线观看 | 欧美一级片免费 | 在线精品视频免费观看 | 欧美成人全部视频 | 亚洲国产精品一区二区不卡 | 日韩欧美一区二区在线 | 一区二区中文字幕亚洲精品 | 亚洲狠狠综合久久 | 97视频在线观看免费视频 | 国产亚洲一区呦系列 | 亚洲欧美一区二区三区孕妇 | 亚洲欧美日韩国产专区一区 | 国产视频自拍一区 | 亚洲 欧美 视频 | 亚洲国产99 | 99精品国产在现线免费 | 国产玖玖在线 | 一级毛片美国aaj毛片 | 久久精品综合免费观看 | 成年免费a级毛片 | 日韩美a一级毛片 | 国产欧美成人 | 久久综合88| 99在线在线视频免费视频观看 | 国产一级片大全 | 欧美国产永久免费看片 | 国产不卡在线视频 | 久久久久国产一级毛片高清板 | 一个人看的www日本视频 | 国产成人午夜性视频影院 |