Hadoop是一個分散式程式,分佈在多臺機器上執行,事必會涉及到網路程式設計。那這裡如何讓網路程式設計變得簡單、透明的呢?
超人學院吳超老師為你講解:
網路程式設計中,首先我們要學的就是Socket程式設計,這是網路程式設計中最底層的程式介面,分為伺服器端和客戶端,伺服器負責監聽某個埠,客戶端負責連線伺服器上的某個埠,一旦連線通過後,伺服器和客戶端就可以雙向通訊了。
方法/步驟
1. ServerSocket server = new ServerSocket(8111);
2. Socket socket = server.accept();
3.
4. //由Socket物件得到輸入流,並構造相應的BufferedReader物件
5. BufferedReader is = new BufferedReader(new InputStreamReader(socket.getInputStream()));
6. //由Socket物件得到輸出流,並構造PrintWriter物件
7. PrintWriter os = new PrintWriter(socket.getOutputStream());
8.
9. while(true){
10. String inline = is.readLine();
11. System.out.println(" 收到資訊:" + inline);
12. //伺服器反回
13. os.println("serverSend:" + inline);
14. os.flush();
15. if (inline == "bye")
16. break;
17. }
18. os.close();
19. is.close();
20. socket.close();
21. server.close();
22. System.out.println("伺服器退出");
1. Socket socket = new Socket("127.0.0.1",8111);
2.
3. //由Socket物件得到輸入流,並構造相應的BufferedReader物件
4. BufferedReader is = new BufferedReader(new InputStreamReader(socket.getInputStream()));
5. //由Socket物件得到輸出流,並構造PrintWriter物件
6. PrintWriter os = new PrintWriter(socket.getOutputStream());
7. BufferedReader sin=new BufferedReader(new InputStreamReader(System.in));
8. while(true){
9. System.out.println("請輸入:");
10. String line = sin.readLine();
11. os.println(line);
12. os.flush();
13. String inline = is.readLine();
14. System.out.println("伺服器獲取值:" + inline);
15. if (line=="bye")
16. break;
17. }
18. os.close();
19. is.close();
20. socket.close();
21. System.out.println("客戶端退出");
這兩段程式碼分別帖入兩個類中,分開執行,先執行伺服器端,再執行客戶端,就可以互發訊息了。
觀察下程式碼,發現程式碼中下面4~20行邏輯是一至的,都是通過流來通訊,所以Socket中不同的是開始地方,伺服器是通過server.accept()來獲取Socket,而客戶端是通過直接建立Socket物件的。
這段程式碼,其本執行是沒問題的,但存在一個問題,就是當客戶端接入時伺服器端的accept函式才走下去,不然的話,會一直處於卡死等待狀態。包括getInputStream函式,也會等待雙方接通後,才往下走。除非等到客戶端接入,或中斷。當然有人會說,可以引入多執行緒啊,沒錯,是可以,但是想一下,是不是每個客戶接入都得有一個執行緒? 否則少一個執行緒,就會有一堆的卡著。所以這種方式不適合在大最客戶端接入的情況。
在JDK1.4引入了非阻塞的通訊方式,這樣使得伺服器端只需要一個執行緒就能處理所有客戶端socket的請求。
下面是幾個需要用到的核心類:
· ServerSocketChannel: ServerSocket 的替代類, 支援阻塞通訊與非阻塞通訊.
· SocketChannel: Socket 的替代類, 支援阻塞通訊與非阻塞通訊.
· Selector: 為ServerSocketChannel 監控接收客戶端連線就緒事件, 為 SocketChannel 監控連線伺服器就緒, 讀就緒和寫就緒事件.
· SelectionKey: 代表 ServerSocketChannel 及 SocketChannel 向 Selector 註冊事件的控制代碼. 當一個 SelectionKey 物件位於Selector 物件的 selected-keys 集合中時, 就表示與這個 SelectionKey 物件相關的事件發生了.在SelectionKey 類中有幾個靜態常量
· SelectionKey.OP_ACCEPT->客戶端連線就緒事件 等於監聽serversocket.accept()返回一個socket
· SelectionKey.OP_CONNECT->準備連線伺服器就緒跟上面類似,只不過是對於socket的相當於監聽了socket.connect()
· SelectionKey.OP_READ->讀就緒事件, 表示輸入流中已經有了可讀資料, 可以執行讀操作了
· SelectionKey.OP_WRITE->寫就緒事件
所以伺服器端程式碼就可以升一下級了,變成如下:
1. public class SocketChannelTest implements Runnable {
2.
3. @Override
4. public void run() {
5. while (true) {
6. try {
7. selector.select();
8. Set
9. Iterator
10. SocketChannel sc;
11. while (iter.hasNext()) {
12. SelectionKey key = iter.next();
13. if (key.isAcceptable())
14. ; // 新的連線
15. else if (key.isReadable())
16. ;// 可讀
17. iter.remove(); // 處理完事件的要從keys中刪去
18. }
19. } catch (Exception e) {
20. e.printStackTrace();
21. }
22. }
23. }
24. static Selector selector;
25.
26. public static void main(String[] args) throws IOException,
27. InterruptedException {
28. selector = Selector.open(); // 靜態方法 例項化selector
29. ServerSocketChannel serverChannel = ServerSocketChannel.open();
30. serverChannel.configureBlocking(false); // 設定為非阻塞方式,如果為true 那麼就為傳統的阻塞方式
31. serverChannel.socket().bind(new InetSocketAddress(8001)); // 繫結IP 及 埠
32. serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 註冊
33. // OP_ACCEPT事件
34. Thread thd = new Thread(new SocketChannelTest());
35. thd.start();// 開啟執行緒 處理請求
36. thd.join();
37. }
38. }
好,這樣通訊程式碼簡化了。但繼續想,我們通訊的目的是什麼?客戶端發一個指令,伺服器執行一些內容,然後把結果返回給客戶端。這不就像呼叫一下函式麼,呼叫函式名、傳入引數、返回值。
這個就稱之為遠端方法呼叫(RPC Remote Procedure Call Protocol),毫無疑問,這個RPC實現肯定是基於上面的這個Socket的。至於具體如何實現呢,我們看下面的分解。
在看實現之前,我們先看一下,這個RPC是如何用的,如何做到呼叫透明的:
我們在src下新建一個RPCTest的包,定義一個功能介面IRPCTestEntity.java:
1. package RPCTest;
2. import org.apache.hadoop.ipc.VersionedProtocol;
3. public interface IRPCTestEntity extends VersionedProtocol {
4. int Calc(int x,int y);
5. }
該介面中有一個Calc的函式。
定義一個實現類RPCTestEntity.java:
1. package RPCTest;
2. import java.io.IOException;
3. public class RPCTestEntity implements IRPCTestEntity{
4. @Override
5. public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
6. return 0;
7. }
8.
9. public int Calc(int x,int y){
10. int z =0 ;
11. z = x + y;
12. return z;
13. }
14.
15. }
這個類中實現了Calc函式,執行內容為將x,y相加,將和返回。
我們再定義一個伺服器類(RPCTestSvr.java),將該實現類註冊成RPC服務:
1. package RPCTest;
2. import java.io.IOException;
3.
4. public class RPCTestSvr {
5. public static void main(String[] args) throws IOException, InterruptedException {
6. RPCTestEntity obj = new RPCTestEntity();
7. Configuration conf = new Configuration();
8. Server server = RPC.getServer(obj, "", 9001, conf);
9. server.start();
10. server.join();
11. }
12. }
程式碼比較簡單,定義了一個RPCTestEntity的實體,然後RPC建立一個Server,傳入實體物件,然後這個服務就呼叫join卡住,用於不斷接收請求。 建立完後,就可把這個"伺服器"啟動起來了。
再建立一個客戶端(RPCTestClient.java):
1. package RPCTest;
2.
3. import java.io.IOException;
4. import java.net.InetSocketAddress;
5.
6. import org.apache.hadoop.conf.Configuration;
7. import org.apache.hadoop.ipc.RPC;
8. import org.apache.hadoop.ipc.VersionedProtocol;
9.
10. public class RPCTestClient {
11. public static void main(String[] args) throws IOException {
12. InetSocketAddress addr = new InetSocketAddress("127.0.0.1",9001);
13. Configuration conf = new Configuration();
14. VersionedProtocol obj = RPC.getProxy(IRPCTestEntity.class, 0, addr, conf);
15. IRPCTestEntity ent = (IRPCTestEntity)obj;
16. int x = ent.Calc(5, 6);
17. System.out.println(x);
18. }
19. }
這裡,我們通過RPC.getProxy函式獲了一個IRPCTestEntity的介面例項,然後就可以直接呼叫了。
執行後,發現這個值馬上返回了過來,同時在"伺服器"端也會收到一定的請求資訊。說明兩者之間通了。
仔細看,這個客戶端中,整個過程就沒有涉及到RPCTestEntity這個實現的實體,換句話說,客戶端產生的是一個虛擬的實現類,然後呼叫起來了。
OK,示例程式跑起來了,也帶給我們幾個問題,1、這個客戶端中的obj是什麼物件?2、為什麼我們呼叫obj物件中的函式(Calc)會跑到伺服器上執行,如何實現的?
底層的通訊,我們是知道的,肯定用socket,用它能夠傳遞各種資料。如何與函式關聯呢? 我們進入getProxy函式,
我們看到這個getProxy函式中,返回了VersionedProtocol介面的物件,從字面意思,這個Proxy意為代理, 所以我們得到的obj就是一個代理類。同時也看出,要作為RPC處理物件,這個介面必實現VersionedProtocol(簡單地看下里面,只有一個函式,返回版本號,是用於判斷雙方版本所用,只有版本匹配,才能呼叫)。
其建立可以看到,用到了:
Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
new Invoker(addr, ticket, conf, factory));
然後這個代理類,就自動實現了偉放的protocol這個介面型別。然後當我們呼叫代理類中的函式時,這個傳入的Invoker類,就會收到通知,通知裡包含了呼叫資訊,我們進入Invoker中看一下:
private static class Invoker implements InvocationHandler
這是一個寫在RPC類中的內部類,且是私有的,意思就是隻為這個RPC呼叫,其實現的規定介面InvocationHandler,那麼就要實現規定的函式Invoke咯:
1. public Object invoke(Object proxy, Method method, Object[] args)
2. throws Throwable {
3. final boolean logDebug = LOG.isDebugEnabled();
4. long startTime = 0;
5. if (logDebug) {
6. startTime = System.currentTimeMillis();
7. }
8.
9. ObjectWritable value = (ObjectWritable)
10. client.call(new Invocation(method, args), address,
11. method.getDeclaringClass(), ticket);
12. if (logDebug) {
13. long callTime = System.currentTimeMillis() - startTime;
14. LOG.debug("Call: " + method.getName() + "" + callTime);
15. }
16. return value.get();
17. }
這個invoke函式,就是當我們呼叫代理類中的函式(obj.Calc)時,會收到的請求,看下引數,傳入的有,Method(函式),args(引數),一應俱全,有了這些內容後,就可以呼叫底層的Socket,將這些資訊打包起來(放入的Invocation類)中,一併發向伺服器中。
同時,伺服器端中,就比較容易了,在收到請求後,就可以解析出要呼叫的函式和引數,然後通過反射來呼叫在伺服器一開始註冊上的物件中的函式,再將返回值通過Socket傳回客戶端,再由這個invoke函式將值返回。
OK,這個幾個點想通了,整個過程就容易理解了。總之:
伺服器端——註冊服務:RPC.getServer(obj, "", 9001, conf);
客戶端——取得代理類:obj = RPC.getProxy()
通過這樣的包裝後,網路訪問就非常透明瞭。