spring整合thrift

jopen 12年前

整合thrift主要分客户端和服务端两部分。用spring主要是为了调用和发布服务方便。代码时公司的一个高手写的,这里记录下,以便学习。

先看客户端代码。thrift的服务端暴露服务,客户端则是调用。以平台共通服务为例:

<!-- 平台共通服务 -->     <bean id="standardService"    class="framework.support.thrift.ThriftHttpProxyFactoryBean">    <property name="serviceUrl" value="${cs.url}/standardService" />    <property name="serviceInterface"     value="api.standard.StandardService$Iface" />   </bean>

看配置就知道使用了spring的FactoryBean模式,在ThriftHttpProxyFactoryBean里面使用了httpClient获取thrift暴露的接口。

private HttpClient httpClient; // FactoryBean初始化httpClient   public ThriftHttpProxyFactoryBean() {          SchemeRegistry schemeRegistry = new SchemeRegistry();          schemeRegistry.register(new Scheme("http", 80, PlainSocketFactory.getSocketFactory()));          schemeRegistry.register(new Scheme("https", 443, SSLSocketFactory.getSocketFactory()));          PoolingClientConnectionManager connectionManager = new PoolingClientConnectionManager(schemeRegistry);          connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL_CONNECTIONS);          connectionManager.setDefaultMaxPerRoute(DEFAULT_MAX_CONNECTIONS_PER_ROUTE);          this.httpClient = new DefaultHttpClient(connectionManager);          setReadTimeout(DEFAULT_READ_TIMEOUT_MILLISECONDS);      }

然后factoryBean继承的方法:

public T getObject() {          return (T) getServiceProxy(); // 服务对象不在本地,使用代理      }        public Class<?> getObjectType() {          return getServiceInterface(); // 服务对象类型,就是配置里面的serviceInterface      }        public boolean isSingleton() {          return true;      }

ThriftHttpProxyFactoryBean有个父类ThriftClientInterceptor,它继承了UrlBasedRemoteAccessor并实现了MethodInterceptor。先看父类ThriftClientInterceptor的afterPropertiesSet:

public void afterPropertiesSet() {          super.afterPropertiesSet();          if (getServiceInterface() == null) {              throw new IllegalArgumentException("property serviceInterface is required.");          }          .....          ProxyFactory pf = new ProxyFactory(getServiceInterface(), this);//用当前对象包装接口          this.serviceProxy = pf.getProxy(getBeanClassLoader());      }

ThriftHttpProxyFactoryBean实现返回的就是上面的serviceProxy,所以我们接下来看ThriftClientInterceptor的invoke方法。这段是精华。

public Object invoke(MethodInvocation invocation) throws Throwable {          Method method = invocation.getMethod();          Object[] args = invocation.getArguments();          if (serviceRegistry != null) {              ServiceReference sr = serviceRegistry.getService(getServiceInterface());              if (sr != null) {                  try {                      return method.invoke(sr.getService(), args);                  } catch (InvocationTargetException e) {                      throw e.getTargetException();                  }              }          }          String name = method.getName();          if (args.length == 0) {              if ("toString".equals(name)) {                  return "Thrift proxy for service URL [" + getServiceUrl() + "]";              } else if ("hashCode".equals(name)) {                  return getServiceUrl().hashCode();              }          } else if (args.length == 1 && "equals".equals(name)) {              return getServiceUrl().equals(args[0]);          }          Object client = clientConstructor.newInstance(protocolFactory.getProtocol(getTransport()));          Assert.notNull(client, "the Thrift RPC client was not correctly created. Aborting.");          ClassLoader originalClassLoader = overrideThreadContextClassLoader();          try {              return method.invoke(client, args);          } catch (InvocationTargetException e) {              Throwable target = e.getTargetException();              if (target instanceof TApplicationException && ((TApplicationException) target).getType() == TApplicationException.MISSING_RESULT) {                  return null;              }              throw convertException(target);          } catch (Throwable ex) {              throw convertException(ex);          } finally {              resetThreadContextClassLoader(originalClassLoader);          }      }
serviceRegistry这个其实类型于连接池,注册服务的地方在服务端,这种模式必须客户端服务端代码在一个tomcat里面,避免网络请求。不过我觉得如果分开部署其实也可以使用一个连接池缓存对象,这样就只请求一次即可不过服务端代码有改动客户端就得想办法更新连接池了,稍微复杂点。重点看通过网络获取服务对象的内容: protocolFactory.getProtocol(getTransport())

protocolFactory 是thrift的类型 TProtocolFactory,而getTransport()内容如下:

protected TTransport getTransport() {          return new THttpClient(getServiceUrl(), getHttpClient());      }

这里都使用了thrift自带的一些类。

————————————————————分割线————————————————————

然后是服务端代码, 先看服务端暴露服务的配置:

<!-- Thrift export -->      <bean name="/paymentService" class="framework.support.thrift.ThriftHttpServiceExporter">       <property name="service">        <bean class="payment.service.impl.ThriftPaymentServiceImpl"/>       </property>          <property name="serviceInterface" value="api.payment.PaymentService$Iface"/>      </bean>
很明显使用的spring的HttpRequestHandler框架。ThriftHttpServiceExporter实现了 HttpRequestHandler接口并且有个父类ThriftExporter,而ThriftExporter继承了spring的RemoteExporter。

我们先看ThriftHttpServiceExporter的处理请求的方法:

public void handleRequest(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {          if (!"POST".equals(request.getMethod()) && metadataXml != null) {              response.setContentType("text/xml; charset=UTF-8");              FileCopyUtils.copy(metadataXml.openStream(), response.getOutputStream());              return;          }          InputStream in = request.getInputStream();          OutputStream out = response.getOutputStream();          try {              ThriftContextHolder.init();              response.setContentType("application/x-thrift");              TTransport transport = new TIOStreamTransport(in, out);              TProtocol protocol = request.getParameter("_json") != null ? jsonProtocolFactory.getProtocol(transport) : getProtocolFactory().getProtocol(transport);              doInvoke(protocol, protocol);          } catch (Throwable e) {              response.setContentType("text/plain; charset=UTF-8");              response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);              e.printStackTrace(new PrintWriter(out, true));              if (LOG.isErrorEnabled()) {                  LOG.error("Thrift server direct error", e);              }          } finally {              ThriftContextHolder.reset();          }      }
就是拿到请求调用thrift方法处理。doInvoke内容:
protected void doInvoke(TProtocol in, TProtocol out) throws Throwable {          processor.process(in, out);      }

在ThriftExport类里面的afterPropertiesSet里面:

public void afterPropertiesSet() throws Exception {          if (serviceRegistry != null) {              List<MethodInterceptor> il = new ArrayList<MethodInterceptor>();              il.add(new ClassLoaderInterceptor(getBeanClassLoader()));              if (threadResourceManager != null) {                  il.add(new ThreadResourceSupportInterceptor(threadResourceManager));              }              serviceRegistry.exportService(getServiceInterface(), getProxyForService0(il));          }          super.setInterceptors(ArrayUtils.add(interceptors, new AnyExceptionConvertInterceptor()));          this.processor = ThriftHelper.buildProcessor(getServiceInterface(), getProxyForService());      }

可以看到serviceRegistry的使用。

spring整合thrift服务主要内容就这些了。主要还是使用了spring的Remote的框架。