hadoop[1].源碼閱讀總結
《hadoop[1].源碼閱讀總結》由會員分享,可在線閱讀,更多相關《hadoop[1].源碼閱讀總結(11頁珍藏版)》請在裝配圖網(wǎng)上搜索。
1、CommonIPC/RPC Server大概流程基于NIO,Listener關注OP_ACCEPT事件,當有客戶端連接過來,Accept后,從readers中選取一個Reader將客戶端Channel注冊到Reader中的NIO selector,并新建一個Connection對象關聯(lián)客戶端Channel,Reader關注OP_READ事件.客戶端建立連接后,首先發(fā)送的是ConnnectionHeader包含協(xié)議名,用戶組信息,驗證方法,Connection會根據(jù)以上信息進行校驗.之后將是先讀取4位的長度代表這次請求的數(shù)據(jù)的長度,然后一直等待事件觸發(fā)讀取夠長度,將讀取的數(shù)據(jù) 解碼為調用id和p
2、aram,新建一個Call對象(關聯(lián)Connection)放入call隊列中,handlers中的Handler會將Call中callQuene中取走,Call中的param實際為Invocation對象,包含調用方法名,參數(shù)名,參數(shù)類型,由這些信息使用Java反射API調用Server的instance對象,獲取返回值,組織返回數(shù)據(jù),寫入Call的response屬性中,馬上調用responder的doRespond方法,將Call加入到Connection的responseQuene中,如果responseQuene長度等于1做一次NIO寫操作,如果不能一次性能夠將數(shù)據(jù)寫完,將客戶端cha
3、nnel注冊到responder關注寫事件OP_WRITE,下一次繼續(xù)寫,如果長度不為1證明該channel已經注冊到了responder了直接加入隊列,由responder線程后續(xù)處理.NOTE:客戶端關閉流后出發(fā)一次讀操作,返回為-1,Server關閉連接CurCall與獲取客戶端IPHandler獲取一個Call后,會將Server的curCall(ThreadLocal類型)設置為當前的Call,調用Instance方法實際是在Handler線程中,在Instance的方法內就可以使用Server提供的方法來獲取客戶端IPPing在上述過程中如果讀取的4位長度為-1代表是客戶端PING
4、操作清理空閑連接如果一定時間ipc.client.connection.maxidletime沒有讀取到數(shù)據(jù)并且當前連接現(xiàn)有Call數(shù)目為0,則視為空閑連接,Listener會在每次接受完新連接之后進行一次清理,最多清理ipc.client.kill.max個連接,如果出現(xiàn)OutOfMemoryError則強制清理全部空閑連接DataNode,TaskTractor設置的心跳時間需小于空閑時間.清理長時間未發(fā)送到客戶端的響應注冊到responder的Call如果長時間沒有發(fā)送到客戶端,每隔一段時間會清理掉涉及參數(shù)ipc.server.handler.queue.sizecallQuene隊列大
5、小,隨集群增大而增大,ipc.server.max.response.size如果返回的結果序列化后大小大于這個值,重置緩沖區(qū)ByteArrayOutputStream釋放內存.ipc.server.read.threadpool.sizereaders個數(shù)ipc.server.tcpnodelayTCP優(yōu)化參數(shù)Nagle's algorithmhandlerCounthandler個數(shù),由構造參數(shù)指定,在DN,TT中配置socketSendBufferSizesocket設置ipc.server.listen.queue.sizesocket設置socket.bind(address
6、, backlog)ipc.client.idlethreshold總連接數(shù)超過多少后,開始清理空閑連接ipc.client.kill.max一次最多清理多少個空閑連接IPC/RPC ClientClient代理模式,調用RPC.getProxy實際上返回的一個代理對象,當調用方法的時候實際調用的是Invoker, Invoker將協(xié)議,調用的方法名,參數(shù),參數(shù)類型封裝成Invocation對象經過client發(fā)送到server,并讀取返回流,根據(jù)流中的id,判斷是服務器返回的是那次調用的結果.Connection線程負責讀取Server返回值,在讀取的過程中,調用Client的線程會wait
7、直到Connection獲取到返回值.讀取時候如果超時(ipc.ping.interval)就發(fā)送一次ping,如果沒有出現(xiàn)IOException就繼續(xù)讀取,Conneciton可以根據(jù)標識(地址,用戶組信息,協(xié)議)共用.IPC/RPC AuthingHDFSName協(xié)議ClientProtocol:客戶端調用協(xié)議,涉及文件操作,DFS管理,升級(DFSAdmin)DatanodeProtocol:DN與NN通訊協(xié)議,注冊,BlockReport,心跳,升級NamenodeProtocol: SN和NN通訊協(xié)議,通知NN使用新的fsimage和editRefreshAuthorizationP
8、olicyProtocol, RefreshUserMappingsProtocol FSNamesystem數(shù)據(jù)結構LightWeightGSetGset 類似Set但提供get方法,主要用于存儲BlockInfo,根據(jù)Block找到BlockInfo其中一個實現(xiàn)LightWeightGSet,利用數(shù)組存儲元素,用鏈表解決沖突問題,類似HashMap但是沒有ReHash操作BlocksMap初始化LightWeightGSet時候,會根據(jù)JVM內存將數(shù)組的大小初始為最大能占用的內存數(shù)(4194304 -Xmx1024M)加上高效的hash映射算法, LightWeightGSet在Block
9、Info數(shù)量比較小的時候get性能逼近數(shù)組.BlockInfo繼承Block,沒有重寫hashCode和equals方法,在Block中equals方法只要求傳入的對象是Block實例并且blockId相等,就認為兩個對象相等,故存儲BlockInfo時候分配的在數(shù)組中的Index和Get時候由Block的hashCode定位是一致的.BlockMapsBlockMaps負責管理Block以及Block相關的元數(shù)據(jù)Block 有3個long型的屬性blockId(隨機生成)numBytes(塊大小),generationStampBlockInfo繼承Block添加了2個屬性,實現(xiàn)了用戶Lig
10、htWeightGSet的LinkedElement接口inode:引用文件Inodetriplets:3Xreplication的數(shù)組,即replication 個組,每組有3個元素,第一個指向DatanodeDescriptor,代表在這個DN上有一個Block,第二個和第三個分別代表DN上的上一個blockInfo和下一個blockInfoDatanodeDescriptor有一個屬性blockList指向一個BlockInfo,因為每個BlockInfo中的triplets中有一組記錄著對應的DN上的上一個,下一個BlockInfo,所以從這個角度來看BlockInfo是一個雙向鏈表.
11、新建文件打開輸入流后,寫入,會在namenode中分配BlockInfo,當Block寫入到分配的DN后,DN在發(fā)送心跳時候會將新接受到的塊報告給NN,此時NN在將triplets可用的組關聯(lián)到DN(DD).(例子前提假設:新建的集群沒有文件,操作是在DN1上,此時很大可能性每次分配塊的時候都會首選本地DN1,bkl_* *實際為隨機數(shù))namenode中分配BlockInfo 并加入Gset中,blk_1,0-64M,此時DN1的blockList為nullDN1向NN報告接收了新的Block blk_1 ,NN從blocksMap中根據(jù)Block blk_1找到BlockInfo bloc
12、kInfo1 將triplets的可用組(=null)的第一位關聯(lián)到DN1(DatanodeDescriptor1),將DN1的blockList指向blockInfo1此時blockList指向的是blockInfo1NN分配blockInfo2,DN1向NN報告接受到了信的Block blk_2,NN找到blockInfo2后1,將triplets的可用組(=null)的第一位關聯(lián)到DN1(DatanodeDescriptor1)2,將第三位指向blockList即blockInfo1,2,將blockInfo1的對應DN1的組的第二位指向blockInfo24,將DN1的blockLis
13、t指向blockInfo1升級LoadBalance磁盤占用,還是分布策略 可能出現(xiàn)一個DN上兩個相同的Block么.MapReduce命令行運行bin/hadoop jarjarFilemainClassargs.設置JVM啟動參數(shù),將lib,conf等加入classpath,啟動JVM運行RunJarRunJar階段:1,設定MainClass如果jar設置了Manifest,則作為MainClass否則取第二個參數(shù)2,在臨時目錄(hadoop.tmp.dir)中建立臨時目錄(File.createTempFile("hadoop-unjar", ""
14、;, tmpDir),并注冊鉤子JVM退出時候刪除.3,將Jar解壓到建立的臨時目錄中4,將目錄hadoop-unjar38923742,目錄hadoop-unjar38923742/class, 目錄hadoop-unjar38923742/lib中的每個文件作為URLClassLoader參數(shù),構造一個classLoader.5,將當前線程的上下文ClassLoader設置為classLoader6,以上5步都是為mainClass啟動做準備,最后應用反射啟動mainClass,將args作為參數(shù)MainClassbin/hadoop jar -libjars testlib.jar -a
15、rchives test.tgz -files file.txt inputjar argsjob.setXXX均將KV設置到了Conf實例中(傳值,例如將-file指定的文件設定到Conf中,在submit中獲取,從本地復制到hdfs)在Job.submit方法中會向hdfs寫入以下信息目錄:hdfs:/$mapreduce.jobtracker.staging.root.dir/$user/.staging/$jobId/hdfs:/tmp/hadoop/mapred/staging/$user/.staging/$jobId/目錄下文件:job.split->(Split信息)由w
16、riteSplits方法寫入job.splitmetainfo->( Split信息元數(shù)據(jù),版本,個數(shù),索引)由writeSplits方法寫入job.xml->conf對象job.jar->inputjarfiles/->參數(shù)-files 逗號分割,交給(DistributedCache管理)archives/->參數(shù)-archives 逗號分割, 交給(DistributedCache管理)libjars/->參數(shù)-libjars 逗號分割, 交給(DistributedCache管理)split,splitmetainfo(FileSplit)設計目的j
17、ob.splitmetainfo中保存有Split的在那幾個機器上有副本,JT讀取這個文件用,用來分配Task使Task能夠讀取本地磁盤文件.job.split保存具體的Split,不保存位置信息,因為TT不需要(hdfs決定)JT調度CapacityTaskScheduler,TTTT啟動時候,啟動線程mapLauncher(用于啟動MapTask),reduceLauncher(用于啟動ReduceTask), taskCleanupThread(用于清理Task或者Job),TT 通過心跳從JT獲得HeartbeatResponse,包含TaskTrackerAction,具體有5種操作
18、LAUNCH_TASK啟動任務,將LaunchTaskAction中包裝的Task與Conf對象和TaskLauncher組合成TaskInProgres,然后添加到mapLauncher或者reduceLauncher中的隊列中.TaskLauncher構造參數(shù)numSlots代表當前TaskTractor能同時執(zhí)行多少個Task,由參數(shù)mapred.tasktracker.map.tasks.maximum, mapred.tasktracker.reduce.tasks.maximum設定,slot意思為: 槽,位置 將TaskTractor的資源抽象化,一般情況下一個task占用一個s
19、lot,如果有對資源需求大的Task也可以通過參數(shù)來控制(調度器CapacityTaskScheduler設置,未開放給User?)TaskLauncher根據(jù)剩余空閑的槽位(numFreeSlots)和隊列情況,來從隊列中取出Task來運行(synchronized, wait, notify).KILL_TASK殺死任務KILL_JOB殺死和Job相關的任務,放入tasksToCleanup隊列中REINIT_TRACKER重新啟動TTCOMMIT_TASK提交任務(1, speculative execution 2,need commit file?) OutputCommitterR
20、EINIT_TRACKER 重啟TT, startNewTask 新的JVM(不是TT的JVM,錯誤處理,GC)執(zhí)行Child.class,通過main參數(shù)argsMap過程MapTask中會根據(jù)jobConf記錄的hdfs上的job.split文件以及JT分配的splitIndex獲取InputSplit,根據(jù)jobConf的配置新建Map和InputFormat,由InputFormat獲取RecordReader來讀取inputSplit,生成原始original_key, original_value交給Mapper.map方法處理生成gen_key,gen_value,根據(jù)parti
21、tioner生成partition,成對的(gen_key,gen_value, partition)會先放入一個緩沖區(qū),如圖,這個緩沖區(qū)分為3級索引(排序kvoffset,復制效率)等這個緩沖區(qū)到達一定閥值之后,并不是緩沖區(qū)慢之后,SplitThread會標記當前前后界,對界內數(shù)據(jù)進行排序(現(xiàn)根據(jù)partition在根據(jù)kv),并寫入到磁盤文件中(split.x.out)并記錄各個partition段的位置,部分存到內存部分存到磁盤,在這個過程中,map仍然繼續(xù)進行,如果緩沖區(qū)滿之后,map線程暫時wait,到SplitThread完畢.當輸入讀取完畢,隨之的SplitThread也結束后,
22、磁盤中中間文件為split.1.out -> split.n.out ,索引部分存在內存里面,超過1024*1024個,作為索引文件spill.n.out.index(避免內存不夠用).然后通過合并排序將分段的文件(split.x.n)合并排序成一個文件file.out,file.out.index記錄partition信息.(詳細見MergeQueue)這樣在Reduce過程中,通過http請求TT其中需要的partition段(參數(shù)reduce),TT根據(jù)file.out.index記錄的索引信息將file.out的partition段,生成http響應.如果有CombinerSor
23、tAndSpillkvoffset達到臨界點softRecordLimit,例如100個,設定80個為臨界點.Kvbuffer達到臨界點softBufferLimit,例如100M,當80M為臨界點.目的是為了不讓map過程停止浪費時間,但由于IO map可能會慢一點(進一步多磁盤負載).io.sort.mb配置的是圖中kvoffset,kvindices,kvbuffer占用的空間總大小Mb.上述參數(shù)都可以通過conf.setXXX來配置,根據(jù)特定job的特點來設定.來減少Spill次數(shù),同時避免內存溢出.Reduce過程JobInProgress初始化mapred.reduce.tasks
24、個ReduceTask 用參數(shù)partition區(qū)別.然后JT在心跳過程中,將ReduceTask分給TT執(zhí)行.ReduceTask有SHUFFLE, SORT, REDUCE三個階段SHUFFLE這一階段是ReduceTask初始化階段,新建了N(參數(shù)控制)個下載線程,來獲取Map的輸出,TaskTracker中有一個線程會不斷的從JT中獲取在本TT運行的ReudceTask(s)的JOB的Map完成事件. ReduceTask不斷從TT中獲取Job的Map完成事件,然后將事件中的Map輸出位置交給下載線程來獲取.下載的時候,從HTTP響應頭獲取文件的大小,決定是放在內存中還是寫入磁盤.在內
25、存中的數(shù)據(jù),滿足一定條件會在后臺將內存中的數(shù)據(jù)Merger寫入硬盤,在硬盤中的數(shù)據(jù),滿足一定條件(數(shù)目超過了2 * ioSortFactor - 1)會在后臺做Merger.所有的Map輸出下載完畢,并且后臺Merger線程也結束后,進入SORT階段.SORT這個階段還是Merger,將內存和硬盤中的數(shù)據(jù),做合并排序(ioSortFactor),使能夠高效率的輸出key ,values.然后進入REDEUCE階段.REDUCE這一階段只要是將上述產生的key value通過ReduceContext轉化成key values(ValueIterable),傳遞給reduce,最后通過輸出配置,
26、將reduce的輸出寫入HDFS(part-r-00000,1,2),或者其他.MergeQueueMR 優(yōu)化1,控制好每個Map的輸入和輸出,盡量使Map處理存在本地的Block,一個InputSpilt不要太大,最好使產生的輸出能控制在io.sort.mb之內,這樣能夠減少從內存將輸出數(shù)據(jù)寫到磁盤的磁盤的個數(shù). 根據(jù)任務將io.sort.mb設置合理,盡量能容納單個Map全部輸出.2, 多磁盤負載,MR中大量的臨時輸出文件會放在這個下mapred.local.dir = /hd1/mr, /hd2/mr, /hd3/mr hd*為掛載的不同磁盤.LocalDirAllocator從以上多個
27、目錄中分配每次創(chuàng)建文件的目錄,降低IO負載.(其他SSD)HDFS中逗號分隔的目錄(dfs.data.dir, dfs.name.dir, dfs.name.edits.dir)是為了冗余.3,Map輸出壓縮,減少網(wǎng)絡傳輸.4,Reduce階段,將各個map的輸出下載到本地,由于各個Map輸出可能有大有小,合適的可以放到內存中,( mapred.job.reduce.total.mem.bytes, mapred.job.shuffle.input.buffer.percent),減少N個下載線程寫磁盤.5,設置Java運行內存偏大,GC回收算法. UseConcMarkSweepGC.6,頻外心跳mapreduce.tasktracker.outofband.heartbeat,會加重JT負擔.預想優(yōu)化Reduce負載較重(收集N個map輸出,執(zhí)行統(tǒng)計工作),可以通過指定高配置機器,網(wǎng)絡節(jié)離中心交換機近.做Merger的時候k,vvv,k,vv最后源碼基于hadoop-0.20.203.0,粗糙整理,不斷完善中,正誤自辯,如有疑問交流或指正錯誤,可發(fā)郵件nice2mu。
- 溫馨提示:
1: 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
2: 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
3.本站RAR壓縮包中若帶圖紙,網(wǎng)頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
4. 未經權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
5. 裝配圖網(wǎng)僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。