首页 > Thrift源码解析--TBinaryProtocol

Thrift源码解析--TBinaryProtocol

本文为原创:http://www.cnblogs.com/leehfly/p/4958206.html,未经许可禁止转载。

关于Tprotocol层都是一些通信协议,个人感觉内容较大,很难分类描述清楚。故打算以TBinaryProtocol为例,分析客户端发请求以及接收服务端返回数据的整个过程。

先将客户端的测试用例贴上。

复制代码
 1 public class DemoClient {2     public static void main(String[] args) throws Exception{3         String param1 = "haha";4         Map param3 = new HashMap();5         param3.put("1", "2");6         Parameter param2 = new Parameter(10, "kaka");7         8         TSocket socket = new TSocket("127.0.0.1", 7911);9         socket.setTimeout(3000);
10         TTransport transport = socket;
11         transport.open();
12         TProtocol protocol = new TBinaryProtocol(transport);
13         DemoService.Client client = new DemoService.Client.Factory().getClient(protocol);
14         int result = client.demoMethod(param1, param2, param3);
15         System.out.println("result: " + result);
16         transport.close();
17     }
复制代码

首先就是构造transport,这里由于TSocket extens TIOStreamTransport,因此可构造一个TSocket即可,而TSocket包含:host(主机IP),port(端口号),time_out(超时时间)与一个Socket。

复制代码
1  public TSocket(String host, int port, int timeout) {
2     host_ = host;
3     port_ = port;
4     timeout_ = timeout;
5     initSocket();
6   }
复制代码

对于socket.setTimeout(3000);实际操作就是为TSocket中的socket设置timeout

复制代码
1  public void setTimeout(int timeout) {
2     timeout_ = timeout;
3     try {
4       socket_.setSoTimeout(timeout);
5     } catch (SocketException sx) {
6       LOGGER.warn("Could not set socket timeout.", sx);
7     }
8   }
复制代码

 下图是构造的transport直观构造:包含了host,inputStream,outputStream,port,socket,timeout.

transport.open所做的事情就是初始化一些输入输出流并且connect the socket to the InetSocketAddress

复制代码
 1 /**2    * Connects the socket, creating a new socket object if necessary.3    */4   public void open() throws TTransportException {5     if (isOpen()) {6       throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected.");7     }8 9     if (host_.length() == 0) {
10       throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host.");
11     }
12     if (port_ <= 0) {
13       throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open without port.");
14     }
15 
16     if (socket_ == null) {
17       initSocket();
18     }
19 
20     try {
21       socket_.connect(new InetSocketAddress(host_, port_), timeout_);
22       inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);//均采用缓冲模式输入输出流
23       outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
24     } catch (IOException iox) {
25       close();
26       throw new TTransportException(TTransportException.NOT_OPEN, iox);
27     }
28   }
复制代码

再看一下open之后的transport:

接下来就是在已有transport也就是TSocket的基础之上,完成Tprotocol的构建,这里选择了TBinaryProtocol。这个工作实际上就是将上一步建好的Ttransport关联到Tprotocol上来。相当于进一步封装。

复制代码
 1 public abstract class TProtocol {2 3   /**4    * Prevent direct instantiation5    */6   @SuppressWarnings("unused")7   private TProtocol() {}8 9   /**
10    * Transport
11    */
12   protected TTransport trans_;
13 
14   /**
15    * Constructor
16    */
17   protected TProtocol(TTransport trans) {
18     trans_ = trans;
19   }
20 
21   /**
22    * Transport accessor
23    */
24   public TTransport getTransport() {
25     return trans_;
26   }
27 /**各种读写方法略去
28 */
29 }
复制代码

从TProtocol的构造方法中可以看出,实际上就是将上一步生成的Transport赋与TProtocol中的trans_变量并将strictRead_与strictWrite_赋值。

复制代码
 1  /**2    * Constructor3    */4   public TBinaryProtocol(TTransport trans) {5     this(trans, false, true);6   }7 8   public TBinaryProtocol(TTransport trans, boolean strictRead, boolean strictWrite) {9     super(trans);
10     strictRead_ = strictRead;
11     strictWrite_ = strictWrite;
12   }
复制代码

其中还有一些字节数组的初始化工作。

复制代码
 1 private byte [] bout = new byte[1];2 3 4  private byte[] i16out = new byte[2];5   6 7  private byte[] i32out = new byte[4];8   9 
10  private byte[] i64out = new byte[8];
11   
复制代码

 

这时候一切准备就绪。Tprotocol目前状态如下图:

Tprotocol已经准备就绪,接下来的工作就是new 一个client,然后才可以去与服务端进行请求与响应。下面我把一个client的代码全部粘贴出来。

复制代码
 1 public static class Client extends org.apache.thrift.TServiceClient implements Iface {2     public static class Factory implements org.apache.thrift.TServiceClientFactory {3       public Factory() {}4       public Client getClient(org.apache.thrift.protocol.TProtocol prot) {//通过Tprotocol去构造client5         return new Client(prot);6       }7       public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {8         return new Client(iprot, oprot);9       }
10     }
11 
12     public Client(org.apache.thrift.protocol.TProtocol prot)
13     {
14       super(prot, prot);//使用了相同的Tprotocol进行构造
15     }
16 
17     public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
18       super(iprot, oprot);
19     }
20 
21     public int demoMethod(String param1, Parameter param2, Map param3) throws org.apache.thrift.TException
22     {
23       send_demoMethod(param1, param2, param3);
24       return recv_demoMethod();
25     }
26 
27     public void send_demoMethod(String param1, Parameter param2, Map param3) throws org.apache.thrift.TException
28     {
29       demoMethod_args args = new demoMethod_args();
30       args.setParam1(param1);
31       args.setParam2(param2);
32       args.setParam3(param3);
33       sendBase("demoMethod", args);
34     }
35 
36     public int recv_demoMethod() throws org.apache.thrift.TException
37     {
38       demoMethod_result result = new demoMethod_result();
39       receiveBase(result, "demoMethod");
40       if (result.isSetSuccess()) {
41         return result.success;
42       }
43       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "demoMethod failed: unknown result");
44     }
45 
46   }
复制代码

为了理解客户端构造的具体过程,我把TserviceClient.class的部分源码贴出来:

复制代码
 1  public TServiceClient(TProtocol iprot, TProtocol oprot) {2     iprot_ = iprot;3     oprot_ = oprot;4   }5 6   protected TProtocol iprot_;7   protected TProtocol oprot_;8 9   protected int seqid_;
10 
11   /**
12    * Get the TProtocol being used as the input (read) protocol.
13    * @return the TProtocol being used as the input (read) protocol.
14    */
15   public TProtocol getInputProtocol() {
16     return this.iprot_;
17   }
18 
19   /**
20    * Get the TProtocol being used as the output (write) protocol.
21    * @return the TProtocol being used as the output (write) protocol.
22    */
23   public TProtocol getOutputProtocol() {
24     return this.oprot_;
25   }
复制代码

明显的可以看到,client有三个变量,TProtocol类型的iprot_和oprot_,还有一个顺序号seqid_.由于在构造client的过程中使用了相同的Tprotocol,在这里也就是使用了相同的TBinaryProtocol,因此iprot_与oprot_是相同的,都指向上一步生成的TProtocol,也就是TBinaryProtocol.当DemoService.Client client = new DemoService.Client.Factory().getClient(protocol);执行完毕后,client的状态如下图:

client已经准备完毕,我们调用client的方法就可以向服务端发送请求了。而这个过程的总体代码也就那么一点点,先直接贴出来:

复制代码
 1   public int demoMethod(String param1, Parameter param2, Map param3) throws org.apache.thrift.TException2     {3       send_demoMethod(param1, param2, param3);//发送请求4       return recv_demoMethod();//接收响应5     }6 7     public void send_demoMethod(String param1, Parameter param2, Map param3) throws org.apache.thrift.TException8     {9       demoMethod_args args = new demoMethod_args();//封装请求参数demoMethod_args
10       args.setParam1(param1);
11       args.setParam2(param2);
12       args.setParam3(param3);
13       sendBase("demoMethod", args);//发请求
14     }
15 
16     public int recv_demoMethod() throws org.apache.thrift.TException
17     {
18       demoMethod_result result = new demoMethod_result();//封装接收响应数据demoMethod_result,貌似与demoMethod_args还不一样
19       receiveBase(result, "demoMethod");//接收返回数据
20       if (result.isSetSuccess()) {
21         return result.success;
22       }
23       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "demoMethod failed: unknown result");
24     }
复制代码

当执行完demoMethod_args args = new demoMethod_args();之后,其实就是对demoMethod_args中的静态变量进行了初始化,STRUCT_DESC,PARAM1_FIELD_DESC,PARAM2_FIELD_DESC,schemes,PARAM3_FIELD_DESC,metaDataMap等都有了初始值。args.setParam之后,demoMethod_args的状态:

接下来就是:

复制代码
1 protected void sendBase(String methodName, TBase args) throws TException {
2     oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));//注意这里的++seqid,就是发送请求的序号,递增
3     args.write(oprot_);
4     oprot_.writeMessageEnd();
5     oprot_.getTransport().flush();//这里最终其实就是outputStream进行flush
6   }
复制代码

将methodName: demoMethod, args: demoMethod_args(param1:haha, param2:Parameter(id:10, name:kaka), param3:{1=2})写入Tprotocol,在这里是oprot_。

复制代码
 1  public void writeMessageBegin(TMessage message) throws TException {2     if (strictWrite_) {3       int version = VERSION_1 | message.type;//异或形成版本号4       writeI32(version);//写入版本号5       writeString(message.name);//写方法名6       writeI32(message.seqid);//方法序号7     } else {8       writeString(message.name);9       writeByte(message.type);
10       writeI32(message.seqid);
11     }
12   }
复制代码
复制代码
1  public void writeString(String str) throws TException {
2     try {
3       byte[] dat = str.getBytes("UTF-8");
4       writeI32(dat.length);
5       trans_.write(dat, 0, dat.length);
6     } catch (UnsupportedEncodingException uex) {
7       throw new TException("JVM DOES NOT SUPPORT UTF-8");
8     }
9   }
复制代码
复制代码
1  public void writeI32(int i32) throws TException {
2     i32out[0] = (byte)(0xff & (i32 >> 24));
3     i32out[1] = (byte)(0xff & (i32 >> 16));
4     i32out[2] = (byte)(0xff & (i32 >> 8));
5     i32out[3] = (byte)(0xff & (i32));
6     trans_.write(i32out, 0, 4);
7   }
复制代码
复制代码
 1  /**2    * Writes to the underlying output stream if not null.3    */4   public void write(byte[] buf, int off, int len) throws TTransportException {5     if (outputStream_ == null) {6       throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");7     }8     try {9       outputStream_.write(buf, off, len);
10     } catch (IOException iox) {
11       throw new TTransportException(TTransportException.UNKNOWN, iox);
12     }
13   }
复制代码

从以上代码可以看出来,无论怎么写,都是一层层深入的,TProtocol oprot_ ----->Ttransport trans_ ----->OutputStream outputStream(TODO:这里的outputStream其实也是bufferedOutputStream,也就是刚刚初始化transport的时候那个outputstream.其中比较奇葩的是args_.write,其代码如下,最后还是绕到了oprot.write,只不过这里有Struct,Field.目测这里用  schemes.get(oprot.getScheme()).getScheme().write(oprot, this);就是因为args的一些参数在静态初始化的时候已经放入了schemes

1   public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
2       schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
3     }
复制代码
 1   public void write(org.apache.thrift.protocol.TProtocol oprot, demoMethod_args struct) throws org.apache.thrift.TException {2         struct.validate();3 4         oprot.writeStructBegin(STRUCT_DESC);5         if (struct.param1 != null) {6           oprot.writeFieldBegin(PARAM1_FIELD_DESC);7           oprot.writeString(struct.param1);8           oprot.writeFieldEnd();9         }
10         if (struct.param2 != null) {
11           oprot.writeFieldBegin(PARAM2_FIELD_DESC);
12           struct.param2.write(oprot);
13           oprot.writeFieldEnd();
14         }
15         if (struct.param3 != null) {
16           oprot.writeFieldBegin(PARAM3_FIELD_DESC);
17           {
18             oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.param3.size()));
19             for (Map.Entry _iter4 : struct.param3.entrySet())
20             {
21               oprot.writeString(_iter4.getKey());
22               oprot.writeString(_iter4.getValue());
23             }
24             oprot.writeMapEnd();
25           }
26           oprot.writeFieldEnd();
27         }
28         oprot.writeFieldStop();
29         oprot.writeStructEnd();
30       }
31 
32     }
复制代码

到此为止,send_domoMethod完毕,接下来就是recv_demoMethod()也就是接受服务端返回的数据。

复制代码
1  public int recv_demoMethod() throws org.apache.thrift.TException
2     {
3       demoMethod_result result = new demoMethod_result();//与封装请求参数类似,加入一些内容到schema中
4       receiveBase(result, "demoMethod");//读取数据进行一些组装工作
5       if (result.isSetSuccess()) {
6         return result.success;//返回result中的success值
7       }
8       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "demoMethod failed: unknown result");
9     }
复制代码
复制代码
 1  protected void receiveBase(TBase result, String methodName) throws TException {//读取返回结果,并将返回结果组装好放到result中2     TMessage msg = iprot_.readMessageBegin();3     if (msg.type == TMessageType.EXCEPTION) {4       TApplicationException x = TApplicationException.read(iprot_);5       iprot_.readMessageEnd();6       throw x;7     }8     if (msg.seqid != seqid_) {9       throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
10     }
11     result.read(iprot_);//将所读取的数据封装成需要类型返回
12     iprot_.readMessageEnd();//这一步其实什么也没做,到此为止result其实已经形成
13   }
复制代码

由于写入的时候有写入信息的类型,序号之类的东西,故这里读取和写入保持一致,也要readMessageBegin,只不过这里使用的是iprot_,其实还是Tprotocol。Tprotocol iprot_ ----->Ttransport trans_ ----->InputStream inputstream

复制代码
 1 public TMessage readMessageBegin() throws TException {2     int size = readI32();3     if (size < 0) {4       int version = size & VERSION_MASK;5       if (version != VERSION_1) {6         throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin");7       }8       return new TMessage(readString(), (byte)(size & 0x000000ff), readI32());9     } else {
10       if (strictRead_) {
11         throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?");
12       }
13       return new TMessage(readStringBody(size), readByte(), readI32());
14     }
15   }
复制代码

其中result.read(iprot_)还是对应着写入时候的args.write,代码贴出来:

复制代码
 1 private static class demoMethod_resultStandardScheme extends StandardScheme {2 3       public void read(org.apache.thrift.protocol.TProtocol iprot, demoMethod_result struct) throws org.apache.thrift.TException {4         org.apache.thrift.protocol.TField schemeField;5         iprot.readStructBegin();6         while (true)7         {8           schemeField = iprot.readFieldBegin();9           if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
10             break;
11           }
12           switch (schemeField.id) {
13             case 0: // SUCCESS
14               if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
15                 struct.success = iprot.readI32();//在这里读取返回结果,这些结果的结构都是早已经定义好的,因为我们这里的例子是int类型,故这里只需要读取readI32即可
16                 struct.setSuccessIsSet(true);
17               } else { 
18                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
19               }
20               break;
21             default:
22               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
23           }
24           iprot.readFieldEnd();
25         }
26         iprot.readStructEnd();
27 
28         // check for required fields of primitive type, which can't be checked in the validate method
29         struct.validate();
30       }
31 
32       public void write(org.apache.thrift.protocol.TProtocol oprot, demoMethod_result struct) throws org.apache.thrift.TException {
33         struct.validate();
34 
35         oprot.writeStructBegin(STRUCT_DESC);
36         oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
37         oprot.writeI32(struct.success);
38         oprot.writeFieldEnd();
39         oprot.writeFieldStop();
40         oprot.writeStructEnd();
41       }
42 
43     }
复制代码

综上,整个客户端发请求以及接受返回数据也就是先写后读的一个完整过程也就完毕。整体流程图我就用从网上找到的一个例子来看就好了,除了方法不一样,其他都是一样的道理。

本文为博主原创,未经许可禁止转载。谢谢。

做人第一,做学问第二。

转载于:https://www.cnblogs.com/xumaojun/p/8526522.html

更多相关:

  • 菜鸟一枚,正在学习C++ Gui Qt4,整理很零碎,欢迎批评指正   1.窗口标题: QWidget *window = new QWidget; window->setWindowTitle("Enter Your Age"); **************************************** 关于标题...

  • 将两个有序链表合并为一个新的有序链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 示例: 输入:1->2->4, 1->3->4 输出:1->1->2->3->4->4 总体思路是: 比较两个链表头节点,较小的插入新链表指针之后,同时较小链表指针向后移动一位 实现如下: ListNode* mergeTwo...

  • 1.直接调用微软socket对象处理 static void Main(string[] args){try{IPAddress ip = new IPAddress(new byte[] { 127, 0, 0, 1 });//在3721端口新建一个TcpListener对象TcpListener listener = new...

  •   现在很多地方都会用到zookeeper, 用到它的地方就是为了实现分布式。用到的场景就是服务注册,比如一个集群服务器,需要知道哪些服务器在线,哪些服务器不在线。   ZK有一个功能,就是创建临时节点,当机器启动应用的时候就会连接到一个ZK节点,然后创建一个临时节点,那么通过获取监听该路径,并且获取该路径下的节点数量就知道有哪些服务...

  • 前台到后台java时data日期类型的转化 在实体类中用@DataTimeFormat,这样设置即使传过来是空的字符串也是可以转的,要和前面传过来的格式一致,如 @XmlElement(name="BeginDate") @DateTimeFormat(pattern="yyyy-MM-dd") private Date begin...

  • 在.Net Framework中,配置文件一般采用的是XML格式的,.NET Framework提供了专门的ConfigurationManager来读取配置文件的内容,.net core中推荐使用json格式的配置文件,那么在.net core中该如何读取json文件呢?1、在Startup类中读取json配置文件1、使用Confi...

  •   1 public class FrameSubject extends JFrame {   2    3   …………..   4    5   //因为无法使用多重继承,这儿就只能使用对象组合的方式来引入一个   6    7   //java.util.Observerable对象了。   8    9   DateSub...

  • 本案例主要说明如何使用NSwag 工具使用桌面工具快速生成c# 客户端代码、快速的访问Web Api。 NSwagStudio 下载地址 比较强大、可以生成TypeScript、WebApi Controller、CSharp Client  1、运行WebApi项目  URL http://yourserver/swagger 然后...

  •   在绑定完Action的所有参数后,WebAPI并不会马上执行该方法,而要对参数进行验证,以保证输入的合法性.   ModelState 在ApiController中一个ModelState属性用来获取参数验证结果.   public abstract class ApiController : IHttpController,...

  • 1# 引用  C:AVEVAMarineOH12.1.SP4Aveva.ApplicationFramework.dll C:AVEVAMarineOH12.1.SP4Aveva.ApplicationFramework.Presentation.dll 2# 引用命名空间, using Aveva.Applicati...