webmagic源碼淺析
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
? 作者?|??夢想家haima
來源 |? urlify.cn/imyqMb
66套java從入門到精通實(shí)戰(zhàn)課程分享
webmagic簡介
webmagic可以說是中國傳播度最廣的Java爬蟲框架,https://github.com/code4craft/webmagic,閱讀相關(guān)源碼,獲益良多。閱讀作者博客【代碼工匠】,能夠領(lǐng)略到一個(gè)IT工作者的工匠精神,希望以后成為他這樣的開源貢獻(xiàn)者。Webmagic的文檔也是寫得非常漂亮,這里就不具體講它的使用方法了,見官方文檔
webmagic核心架構(gòu)
webmagic幫我們做了幾個(gè)核心的事情:
1.線程池封裝,不用手動控制采集線程
2.url調(diào)度,實(shí)現(xiàn)了生產(chǎn)者消費(fèi)者模型
3.封裝下載器組件(downloader),解析組件,持久化。見官方文檔
4.支持注解
簡單案例
借用一段官方案例,快速入門,便于后面的理解。開啟一個(gè)爬蟲,只需要簡單幾步,編寫頁面解析器,寫具體的解析方法。新建Spider實(shí)例,添加至少一個(gè)種子URL,設(shè)置其他可選屬性,最后調(diào)用run()方法,或者start(),start()方法內(nèi)部會為spider單獨(dú)開啟一個(gè)線程,使得爬蟲與主線程異步。
import?us.codecraft.webmagic.Page;
import?us.codecraft.webmagic.Site;
import?us.codecraft.webmagic.Spider;
import?us.codecraft.webmagic.processor.PageProcessor;
public?class?GithubRepoPageProcessor?implements?PageProcessor?{
????private?Site?site?=?Site.me().setRetryTimes(3).setSleepTime(100);
????@Override
????public?void?process(Page?page)?{
????????//將提取的url加入page對象暫存,最終會加入到
????????page.addTargetRequests(page.getHtml().links().regex("(https://github\\.com/\\w+/\\w+)").all());
????????page.putField("author",?page.getUrl().regex("https://github\\.com/(\\w+)/.*").toString());
????????page.putField("name",?page.getHtml().xpath("http://h1[@class='entry-title?public']/strong/a/text()").toString());
????????if?(page.getResultItems().get("name")==null){
????????????//skip?this?page
????????????page.setSkip(true);
????????}
????????page.putField("readme",?page.getHtml().xpath("http://div[@id='readme']/tidyText()"));
????}
?
????@Override
????public?Site?getSite()?{
????????return?site;
????}
????public?static?void?main(String[]?args)?{
????????//官方鏈?zhǔn)秸{(diào)用,拆解到下面方便理解
????????//Spider.create(new?GithubRepoPageProcessor()).addUrl("https://github.com/code4craft").thread(5).run();
????????
????????//創(chuàng)建線程
????????Spider?spider?=?Spider.create(new?GithubRepoPageProcessor());
????????//添加采集種子URL
????????spider.addUrl("https://github.com/code4craft");
????????//設(shè)置線程數(shù)
????????spider.thread(5);
????????//啟動爬蟲//run()方法既可以看作多線程中的Runnable接口方法,也可以直接運(yùn)行,是爬蟲的核心方法
????????spider.run();
????}
}
Spider類屬性
爬蟲的核心是us.codecraft.webmagic.Spider類,看看Spider類中都有哪些重要屬性
屬性列表:
public?class?Spider?implements?Runnable,?Task?{
????//下載器對象
????protected?Downloader?downloader;
????//持久化統(tǒng)一處理器,可以有多個(gè)
????protected?List?pipelines?=?new?ArrayList();
????//頁面解析器
????protected?PageProcessor?pageProcessor;
????//種子請求(這個(gè)地方看著種子請求也不是很對,因?yàn)閟pider對象在沒開始運(yùn)行時(shí),仍然可以使用addRequest,addUrl添加url???)
????protected?List?startRequests;
????//瀏覽器信息對象
????protected?Site?site;
????//爬蟲任務(wù)標(biāo)識
????protected?String?uuid;
?????//任務(wù)調(diào)度器,默認(rèn)是JDK中的LinkedBlockingQueue的實(shí)現(xiàn)
????protected?Scheduler?scheduler?=?new?QueueScheduler();
?
????protected?Logger?logger?=?LoggerFactory.getLogger(getClass());
?????//線程池(自己封裝的一個(gè)模型,內(nèi)部的execute方法實(shí)際是executorService的execute實(shí)現(xiàn)添加線程的作用)
????protected?CountableThreadPool?threadPool;
?????//執(zhí)行管理器對象(和線程池配合使用)
????protected?ExecutorService?executorService;
????//線程數(shù),控制采集并發(fā)
????protected?int?threadNum?=?1;
????//爬蟲任務(wù)運(yùn)行狀態(tài)
????protected?AtomicInteger?stat?=?new?AtomicInteger(STAT_INIT);
???//是否采集完成退出
????protected?boolean?exitWhenComplete?=?true;
?
????protected?final?static?int?STAT_INIT?=?0;
?
????protected?final?static?int?STAT_RUNNING?=?1;
?
????protected?final?static?int?STAT_STOPPED?=?2;
?
????//是否回流url,spawn產(chǎn)卵的意思。個(gè)人覺得這個(gè)參數(shù)很多余,不想采集繼續(xù)下去,可以別把url加入隊(duì)列
????protected?boolean?spawnUrl?=?true;
????//退出時(shí)是否回收處理
????protected?boolean?destroyWhenExit?=?true;
?
????//控制新生成url鎖
????private?ReentrantLock?newUrlLock?=?new?ReentrantLock();
???//控制新生成url鎖,配合newUrlLock?使用
????private?Condition?newUrlCondition?=?newUrlLock.newCondition();
?????//監(jiān)聽器集合,請求爬去成功或者失敗時(shí),可以通過注入監(jiān)聽器分別實(shí)現(xiàn)onSuccess和onError方法
????private?List?spiderListeners;
?????//采集頁面數(shù)統(tǒng)計(jì)(只代表請求的次數(shù),不代表成功抓取數(shù))
????private?final?AtomicLong?pageCount?=?new?AtomicLong(0);
?????//爬取開始時(shí)間
????private?Date?startTime;
?????//調(diào)度器隊(duì)列中的URL已經(jīng)被消費(fèi)光,且采集線程未執(zhí)行完成,仍然可能生產(chǎn)URL到調(diào)度器隊(duì)列中時(shí),線程最多wait?30秒
????private?int?emptySleepTime?=?30000;
threadNum?這里Spider本身實(shí)現(xiàn)了Runnable接口,可以作為一個(gè)獨(dú)立的線程開啟,當(dāng)然它的線程控制不僅于此,這里有一個(gè)屬性threadNum才是控制采集線程數(shù)的,后面再細(xì)說。
scheduler?對象做為調(diào)度器,內(nèi)部采用隊(duì)列維護(hù)了一個(gè)實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型,爬取的過程中,可以將采集的url提取到scheduler的隊(duì)列中,線程會持續(xù)不斷的消費(fèi)scheduler 的隊(duì)列中消費(fèi)。
pageProcessor?用于用戶自定義頁面解析規(guī)則,定義具體的解析邏輯,新建Spider實(shí)例的方式僅兩種,public static Spider create(PageProcessor pageProcessor)和構(gòu)造方法public Spider(PageProcessor pageProcessor)create方法內(nèi)部只是調(diào)用了一下構(gòu)造方法。構(gòu)造一個(gè)spider對象都需要一個(gè)自定義的解析器,不同頁面,解析邏輯不相同,PageProcessor接口中。spider會調(diào)用PageProcessor的process方法,這是一個(gè)策略設(shè)計(jì)模式。
uuid?這個(gè)名字可能讓人誤會,和平時(shí)uuid不是一個(gè)含義,這個(gè)屬性是一個(gè)爬蟲進(jìn)程的唯一標(biāo)識
其他屬性?比較重要的屬性還包括threadPool,executorService,控制多線程并發(fā),瀏覽器對象site,對于有些反爬策略的網(wǎng)站,該對象可以用于模擬瀏覽器,達(dá)到反反爬蟲的作用。
Spider核心方法run()
@Override
????public?void?run()?{
????????checkRunningStat();//檢查爬蟲運(yùn)行狀態(tài),防止run方法被調(diào)用多次
????????initComponent();//初始化
????????logger.info("Spider?{}?started!",getUUID());
????????while?(!Thread.currentThread().isInterrupted()?&&?stat.get()?==?STAT_RUNNING)?{
????????????//循環(huán)消費(fèi)Request,url在放入scheduler時(shí),已經(jīng)封裝為Request對象了
????????????final?Request?request?=?scheduler.poll(this);
????????????if?(request?==?null)?{
????????????????if?(threadPool.getThreadAlive()?==?0?&&?exitWhenComplete)?{
????????????????????//threadPool.getThreadAlive()線程池中仍然還有存活線程,那么存活線程可能會生產(chǎn)出新的url來
????????????????????//exitWhenComplete默認(rèn)為true,
????????????????????//exitWhenComplete如果為false,線程等待新URL,
????????????????????//如果隊(duì)列(自定義隊(duì)列)能實(shí)現(xiàn)動態(tài)添加url,那就可以實(shí)現(xiàn)動態(tài)添加采集任務(wù)的功能
????????????????????break;
????????????????}
????????????????//?wait?until?new?url?added?
????????????????//等待存活的線程生產(chǎn)新的url
????????????????waitNewUrl();
????????????}?else?{
????????????????//將request封裝為線程,加入線程隊(duì)列,線程池會根據(jù)設(shè)置的并行參數(shù)threadNum,并行執(zhí)行
????????????????threadPool.execute(new?Runnable()?{
????????????????????@Override
????????????????????public?void?run()?{
????????????????????????try?{
????????????????????????????processRequest(request);//執(zhí)行請求
????????????????????????????onSuccess(request);//調(diào)用執(zhí)行成功的方法
????????????????????????}?catch?(Exception?e)?{
????????????????????????????onError(request);
????????????????????????????logger.error("process?request?"?+?request?+?"?error",?e);
????????????????????????}?finally?{
????????????????????????????pageCount.incrementAndGet();
????????????????????????????signalNewUrl();
????????????????????????}
????????????????????}
????????????????});
????????????}
????????}
????????stat.set(STAT_STOPPED);
????????//?release?some?resources
????????if?(destroyWhenExit)?{
????????????close();
????????}
????????logger.info("Spider?{}?closed!?{}?pages?downloaded.",?getUUID(),?pageCount.get());
????}
核心方法的流程還是比較簡答的,checkRunningStat()會先檢查一下爬蟲是否已經(jīng)啟動,這有點(diǎn)兒像多線程中的開啟線程的start()方法,兩次開啟是不允許的。
然后初始化方法initComponent()各種組件,在initComponent()方法中,加入startRequests中的Request,實(shí)際上在Spider啟動之前可以調(diào)用addUrl(String... urls)和addRequest(Request... requests)方法直接將請求加入到隊(duì)列中,startRequests和后面那種添加url的方法缺少了一定的一致性。
后面一個(gè)循環(huán)消費(fèi)的過程,正如我注釋里寫的那樣,如果隊(duì)列中url被消費(fèi)完畢,且沒有正在被消費(fèi)的存活的線程了,且完成采集退出屬性exitWhenComplete為true(exitWhenComplete默認(rèn)為true,設(shè)置為false則進(jìn)程將會一直掛起),就會跳出死循環(huán),采集結(jié)束,反之,如果依然有線程存活,或者exitWhenComplete為false,那么線程waitNewUrl()等待,在exitWhenComplete為false的情況,進(jìn)程就不會自動停止了,除非強(qiáng)殺了,這種設(shè)計(jì)在分布式的模式下才顯得有意義,可以動態(tài)添加url到隊(duì)列中去。
private?void?waitNewUrl()?{
????????newUrlLock.lock();
????????try?{
????????????//?double?check?
????????????if?(threadPool.getThreadAlive()?==?0?&&?exitWhenComplete)?{
????????????????return;
????????????}
????????????//默認(rèn)是30秒后自動蘇醒,可以通過設(shè)置emptySleepTime屬性,控制自動蘇醒的時(shí)間
????????????newUrlCondition.await(emptySleepTime,?TimeUnit.MILLISECONDS);
????????}?catch?(InterruptedException?e)?{
????????????logger.warn("waitNewUrl?-?interrupted,?error?{}",?e);
????????}?finally?{
????????????newUrlLock.unlock();
????????}
????}
后面使用threadPool執(zhí)行一個(gè)新的子線程。new Runnable構(gòu)造的匿名內(nèi)部類會通過threadPool開啟一個(gè)新的子線程,執(zhí)行請求processRequest(request),執(zhí)行成功就調(diào)用onSuccess(request),失敗就調(diào)用onError(request),接著finally代碼塊中的內(nèi)容是非常重要的,統(tǒng)計(jì)請求的頁面次數(shù)(無論失敗或者成功),signalNewUrl()喚醒等待的線程,這里要和前面waitNewUrl()結(jié)合起來看,兩者使用同一個(gè)鎖,waitNewUrl()作為父線程,默認(rèn)會自動蘇醒,但調(diào)用signalNewUrl()的用意在于,可能這個(gè)子線程已經(jīng)又生成新的URL放到隊(duì)列中了,就不用再等30秒了。
private?void?signalNewUrl()?{
????????try?{
????????????newUrlLock.lock();
????????????newUrlCondition.signalAll();
????????}?finally?{
????????????newUrlLock.unlock();
????????}
????}
后面的代碼則是爬蟲結(jié)束的操作,這種情況只有前文提到的跳出死循環(huán),采集結(jié)束,結(jié)束前設(shè)置了一下狀態(tài),做了一下close()操作
調(diào)度器Scheduler
調(diào)度器在webmagic中扮演的角色是非常重要的,說來功能也不算太復(fù)雜,實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模式,順便去重。scheduler默認(rèn)為QueueScheduler ,在scheduler聲明的時(shí)候就直接新建了這個(gè)實(shí)例
public?class?QueueScheduler?extends?DuplicateRemovedScheduler?implements?MonitorableScheduler?{
?
????//LinkedBlockingQueue隊(duì)列存url
????private?BlockingQueue?queue?=?new?LinkedBlockingQueue();
?
????@Override????覆蓋父類DuplicateRemovedScheduler?的方法
????public?void?pushWhenNoDuplicate(Request?request,?Task?task)?{
????????queue.add(request);
????}
?
????@Override???實(shí)現(xiàn)DuplicateRemovedScheduler?不完全實(shí)現(xiàn)Scheduler的poll方法
????public?Request?poll(Task?task)?{
????????return?queue.poll();
????}
?
????@Override?????//實(shí)現(xiàn)MonitorableScheduler?的方法
????public?int?getLeftRequestsCount(Task?task)?{
????????return?queue.size();
????}
?
????@Override????//實(shí)現(xiàn)MonitorableScheduler?的方法
????public?int?getTotalRequestsCount(Task?task)?{
????????return?getDuplicateRemover().getTotalRequestsCount(task);
????}
}
以上代碼,可以看到QueueScheduler的構(gòu)成,QueueScheduler繼承了抽象類DuplicateRemovedScheduler 實(shí)現(xiàn)了接口MonitorableScheduler 接口,DuplicateRemovedScheduler 又實(shí)現(xiàn)了Scheduler,DuplicateRemovedScheduler 為抽象類,僅僅實(shí)現(xiàn)了push邏輯(生產(chǎn)者),而poll是QueueScheduler自己實(shí)現(xiàn)的(消費(fèi)者)。push()方法使用去重器,判斷該請求有沒有被采集過。這里要注意,默認(rèn)Post請求是不去重的,能直接打開的請求都是get的?,官方文檔也有特別說明
//DuplicateRemovedScheduler??源碼
private?DuplicateRemover?duplicatedRemover?=?new?HashSetDuplicateRemover();
@Override
????public?void?push(Request?request,?Task?task)?{
????????logger.trace("get?a?candidate?url?{}",?request.getUrl());
????????//duplicatedRemover.isDuplicate(request,?task)?檢查是否采集過
????????if?(shouldReserved(request)?||?noNeedToRemoveDuplicate(request)?||?!duplicatedRemover.isDuplicate(request,?task))?{
????????????logger.debug("push?to?queue?{}",?request.getUrl());
????????????pushWhenNoDuplicate(request,?task);
????????}
????}
//是否需要去重,POST請求則不需要去重
?protected?boolean?noNeedToRemoveDuplicate(Request?request)?{
????????return?HttpConstant.Method.POST.equalsIgnoreCase(request.getMethod());
??}
public?class?HashSetDuplicateRemover?implements?DuplicateRemover?{
?
????private?Set?urls?=?Collections.newSetFromMap(new?ConcurrentHashMap());
?
????@Override
????public?boolean?isDuplicate(Request?request,?Task?task)?{
????????//add成功,說明沒有添加過這條請求,返回true
????????return?!urls.add(getUrl(request));
????}
?
????protected?String?getUrl(Request?request)?{
????????return?request.getUrl();
????}
?
????@Override
????public?void?resetDuplicateCheck(Task?task)?{
????????urls.clear();
????}
?
????@Override
????public?int?getTotalRequestsCount(Task?task)?{
????????return?urls.size();
????}
}
粉絲福利:實(shí)戰(zhàn)springboot+CAS單點(diǎn)登錄系統(tǒng)視頻教程免費(fèi)領(lǐng)取
???
?長按上方微信二維碼?2 秒 即可獲取資料
感謝點(diǎn)贊支持下哈?
