Http协议是一个成熟的协议,实现起来比较简单,如果使用TCP的话,我们需要自定义协议内容,实现服务端,客户端,性能更高
@MsReference(uri = "http://localhost:7777/", resultType = Goods.class)
private GoodsService goodsService;
@GetMapping("/find/{id}")
public Goods find(@PathVariable Long id){
return goodsService.findGoods(id);
}@MsReference做的事情:
- 发起网络请求
- 传递参数
- 解析返回值
package com.mszlu.rpc.annontation;
import java.lang.annotation.*;
//可用于构造方法和字段上
@Target({ElementType.CONSTRUCTOR,ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface MsReference {
String uri() default "";
Class resultType();
}注解定义完,需要使之生效。
BeanPostProcessor是Spring IOC容器给我们提供的一个扩展接口。
public interface BeanPostProcessor {
//bean初始化方法调用前被调用
Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException;
//bean初始化方法调用后被调用
Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;
}使用@MsReference注解发起网络调用,那么一定有一个服务提供方,使用HTTP方式,是采用暴露一个controller接口,使用TCP的方式,需要实现一个Server服务,这里我们使用注解@MsService来标识需要发布的服务
package com.mszlu.rpc.annontation;
import java.lang.annotation.*;
//可用于类上
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Inherited
public @interface MsService {
String version() default "1.0";
}//在service的实现类加上注解,代表将GoodsService下的所有方法发布为服务
@MsService
public class GoodsServiceImpl implements GoodsService {
public Goods findGoods(Long id) {
return new Goods(id,"服务提供方商品", BigDecimal.valueOf(100));
}
}注意导包
package com.mszlu.rpc.spring;
import com.mszlu.rpc.annontation.MsReference;
import com.mszlu.rpc.annontation.MsService;
import com.mszlu.rpc.proxy.MsRpcClientProxy;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
@Component
public class MsRpcSpringBeanPostProcessor implements BeanPostProcessor {
////bean初始化方法前被调用
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
//bean初始化方法调用后被调用
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//在这里判断bean上有没有加MsService注解
//如果有,将其发布为服务
if (bean.getClass().isAnnotationPresent(MsService.class)){
MsService msService = bean.getClass().getAnnotation(MsService.class);
// serviceProvider.publishService(msService);
}
//在这里判断bean里面的字段有没有加@MsRefrence注解
//如果有 识别并生成代理实现类,发起网络请求
Class<?> targetClass = bean.getClass();
Field[] declaredFields = targetClass.getDeclaredFields();
for (Field declaredField : declaredFields) {
MsReference annotation = declaredField.getAnnotation(MsReference.class);
if (annotation != null){
//代理实现类,调用方法的时候 会触发invoke方法,在其中实现网络调用
MsRpcClientProxy msRpcClientProxy = new MsRpcClientProxy(annotation);
Object proxy = msRpcClientProxy.getProxy(declaredField.getType());
//当isAccessible()的结果是false时不允许通过反射访问该字段
declaredField.setAccessible(true);
try {
declaredField.set(bean,proxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
return bean;
}
}package com.mszlu.rpc.proxy;
import com.mszlu.rpc.annontation.MsMapping;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
//每一个动态代理类的调用处理程序都必须实现InvocationHandler接口,
// 并且每个代理类的实例都关联到了实现该接口的动态代理类调用处理程序中,
// 当我们通过动态代理对象调用一个方法时候,
// 这个方法的调用就会被转发到实现InvocationHandler接口类的invoke方法来调用
public class MsRpcClientProxy implements InvocationHandler {
public MsRpcClientProxy(){
}
private MsReference msReference;
public MsRpcClientProxy(MsReference msReference) {
this.msReference = msReference;
}
/**
* proxy:代理类代理的真实代理对象com.sun.proxy.$Proxy0
* method:我们所要调用某个对象真实的方法的Method对象
* args:指代代理对象方法传递的参数
*/
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//在这里实现调用
System.out.println("rpc的代理实现类 调用了...");
return null;
}
/**
* get the proxy object
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
}在ms-rpc中导入netty包,使用netty来构建一个服务,并将至发布出去
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.69.Final</version>
</dependency>package com.mszlu.rpc.factory;
import com.mszlu.rpc.server.MsServiceProvider;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 获取单例对象的工厂类
*
*/
public final class SingletonFactory {
private static final Map<String, Object> OBJECT_MAP = new ConcurrentHashMap<>();
private SingletonFactory() {
}
public static <T> T getInstance(Class<T> c) {
if (c == null) {
throw new IllegalArgumentException();
}
String key = c.toString();
if (OBJECT_MAP.containsKey(key)) {
return c.cast(OBJECT_MAP.get(key));
} else {
return c.cast(OBJECT_MAP.computeIfAbsent(key, k -> {
try {
return c.getDeclaredConstructor().newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new RuntimeException(e.getMessage(), e);
}
}));
}
}
public static void main(String[] args) {
//测试并发下 生成的单例是否唯一
ExecutorService executorService = Executors.newFixedThreadPool(100);
for (int i = 0 ; i< 1000; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
MsServiceProvider instance = SingletonFactory.getInstance(MsServiceProvider.class);
System.out.println(instance);
}
});
}
while (true){}
}
}package com.mszlu.rpc.annontation;
import com.mszlu.rpc.bean.MsBeanDefinitionRegistry;
import com.mszlu.rpc.factory.SingletonFactory;
import com.mszlu.rpc.server.MsServiceProvider;
import com.mszlu.rpc.spring.MsRpcSpringBeanPostProcessor;
import org.springframework.context.annotation.Import;
import java.lang.annotation.*;
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import({MsRpcSpringBeanPostProcessor.class)
public @interface EnableRpc {
}package com.mszlu.rpc.spring;
import com.mszlu.rpc.annontation.MsReference;
import com.mszlu.rpc.annontation.MsService;
import com.mszlu.rpc.factory.SingletonFactory;
import com.mszlu.rpc.netty.MsClient;
import com.mszlu.rpc.proxy.MsRpcClientProxy;
import com.mszlu.rpc.server.MsServiceProvider;
import lombok.SneakyThrows;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
@Component
public class MsRpcSpringBeanPostProcessor implements BeanPostProcessor{
private MsServiceProvider msServiceProvider;
public MsRpcSpringBeanPostProcessor(){
//单例工厂 生产服务提供者类
msServiceProvider = SingletonFactory.getInstance(MsServiceProvider.class);
}
////bean初始化方法前被调用
@SneakyThrows
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
//bean初始化方法调用后被调用
@SneakyThrows
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
MsService msService = bean.getClass().getAnnotation(MsService.class);
if (msService != null){
//发布服务,如果netty服务未启动进行启动
msServiceProvider.publishService(msService,bean);
}
//在这里判断bean里面的字段有没有加@MsRefrence注解
//如果有 识别并生成代理实现类,发起网络请求
Class<?> targetClass = bean.getClass();
Field[] declaredFields = targetClass.getDeclaredFields();
for (Field declaredField : declaredFields) {
MsReference msReference = declaredField.getAnnotation(MsReference.class);
if (msReference != null){
//代理实现类,调用方法的时候 会触发invoke方法,在其中实现网络调用
MsRpcClientProxy msRpcClientProxy = new MsRpcClientProxy(msReference);
Object proxy = msRpcClientProxy.getProxy(declaredField.getType());
//当isAccessible()的结果是false时不允许通过反射访问该字段
declaredField.setAccessible(true);
try {
declaredField.set(bean,proxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
return bean;
}
}package com.mszlu.rpc.server;
import com.mszlu.rpc.annontation.MsService;
import com.mszlu.rpc.factory.SingletonFactory;
import com.mszlu.rpc.netty.NettyServer;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class MsServiceProvider {
private final Map<String, Object> serviceMap;
public MsServiceProvider(){
//发布的服务 都在这里
serviceMap = new ConcurrentHashMap<>();
}
public void publishService(MsService msService,Object service) {
registerService(msService,service);
//检测到有服务发布的注解,启动NettyServer
NettyServer nettyServer = SingletonFactory.getInstance(NettyServer.class);
nettyServer.setMsServiceProvider(this);
if (!nettyServer.isRunning()){
nettyServer.run();
}
}
private void registerService(MsService msService, Object service) {
//service要进行注册, 先创建一个map进行存储
String serviceName = service.getClass().getInterfaces()[0].getCanonicalName()+msService.version();
serviceMap.put(serviceName,service);
log.info("发现服务{}并注册",serviceName);
}
public Object getService(String serviceName) {
return serviceMap.get(serviceName);
}
} <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>加个日志依赖,用于打印日志
首先定义接口,抽象其行为,因为以后还会有更多的Server
package com.mszlu.rpc.netty;
public interface MsServer {
void run();
void stop();
}package com.mszlu.rpc.netty;
import com.mszlu.rpc.annontation.MsService;
import com.mszlu.rpc.netty.handler.MsRpcThreadFactory;
import com.mszlu.rpc.netty.handler.server.NettyServerInitiator;
import com.mszlu.rpc.server.MsServiceProvider;
import com.mszlu.rpc.utils.RuntimeUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServer implements MsServer {
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private MsServiceProvider msServiceProvider;
private DefaultEventExecutorGroup eventExecutors;
private boolean isRunning;
public NettyServer() {
}
public void run() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
eventExecutors = new DefaultEventExecutorGroup(RuntimeUtil.cpus() * 2,new MsRpcThreadFactory(msServiceProvider));
b.group(workerGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
.childOption(ChannelOption.TCP_NODELAY,true)
//是否开启 TCP 底层心跳机制
.childOption(ChannelOption.SO_KEEPALIVE,true)
//表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
.childOption(ChannelOption.SO_BACKLOG,1024)
.handler(new LoggingHandler(LogLevel.INFO))
// 当客户端第一次进行请求的时候才会进行初始化
.childHandler(new NettyServerInitiator(eventExecutors));
// 绑定端口,同步等待绑定成功
b.bind(13567).sync().channel();
isRunning = true;
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
stopNettyServer();
}
});
}catch (InterruptedException e){
log.error("occur exception when start server:",e);
}
}
public void stop() {
stopNettyServer();
}
private void stopNettyServer() {
if (eventExecutors != null){
eventExecutors.shutdownGracefully();
}
if (bossGroup != null){
bossGroup.shutdownGracefully();
}
if (workerGroup != null){
workerGroup.shutdownGracefully();
}
}
public void setMsServiceProvider(MsServiceProvider msServiceProvider) {
this.msServiceProvider = msServiceProvider;
}
public boolean isRunning() {
return isRunning;
}
public void setRunning(boolean running) {
isRunning = running;
}
}线程工厂
package com.mszlu.rpc.netty.handler;
import com.mszlu.rpc.server.MsServiceProvider;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class MsRpcThreadFactory implements ThreadFactory {
private MsServiceProvider msServiceProvider;
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private final ThreadGroup threadGroup;
public MsRpcThreadFactory(MsServiceProvider msServiceProvider) {
this.msServiceProvider = msServiceProvider;
SecurityManager securityManager = System.getSecurityManager();
threadGroup = securityManager != null ? securityManager.getThreadGroup() :Thread.currentThread().getThreadGroup();
namePrefix = "ms-rpc-" + poolNumber.getAndIncrement()+"-thread-";
}
//创建的线程以“N-thread-M”命名,N是该工厂的序号,M是线程号
public Thread newThread(Runnable runnable) {
Thread t = new Thread(threadGroup, runnable,
namePrefix + threadNumber.getAndIncrement(), 0);
t.setDaemon(true);
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}业务处理器,处理请求
package com.mszlu.rpc.netty.handler.server;
import com.mszlu.rpc.netty.codec.MsRpcDecoder;
import com.mszlu.rpc.netty.codec.MsRpcEncoder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.EventExecutorGroup;
public class NettyServerInitiator extends ChannelInitializer<SocketChannel> {
private EventExecutorGroup eventExecutors;
public NettyServerInitiator(EventExecutorGroup eventExecutors) {
this.eventExecutors = eventExecutors;
}
protected void initChannel(SocketChannel ch) throws Exception {
//解码器
ch.pipeline ().addLast ( "decoder",new MsRpcDecoder() );
//编码器
ch.pipeline ().addLast ( "encoder",new MsRpcEncoder());
//消息处理器,线程池处理
ch.pipeline ().addLast ( eventExecutors,"handler",new MsNettyServerHandler() );
}
}package com.mszlu.rpc.netty.handler.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class MsNettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//这里 接收到 请求的信息,然后根据请求,找到对应的服务提供者,调用,获取结果,然后返回
//消费方 会启动一个 客户端,用户接收返回的数据
}
}上方的代码,我们需要定义两个类MsRpcDecoder和MsRpcEncoder,从名字上可以看出是做编解码的,使用TCP的方式,那么当发起一个网络请求的时候,势必要传递数据流,那么这个数据流的格式是什么,我们就需要进行定义了,我们可以称为
自定义报文或者自定义协议
**Decoder:**接收到数据流后,按照自定义的协议进行解析(负责将消息从字节或其他序列形式转成指定的消息对象)
Encoder: 发送数据的时候,按照自定义的协议进行构建并发送(将消息对象转成字节或其他序列形式在网络上传输)
协议说明: 按顺序排
- 4B magic code(魔法数)
- 1B version(版本)
- 4B full length(消息长度)
- 1B messageType(消息类型)
- 1B codec(序列化类型)
- 1B compress(压缩类型)
- 4B requestId(请求的Id)
- body(object类型数据)
我们使用netty提供的LengthFieldBasedFrameDecoder做为自定义(不定长)长度的解码器,可以有效的解决TCP粘包,拆包等问题
假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以下四种情况:
- 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包
- 服务端一次接受到了两个数据包,D1和D2粘合在一起,称之为
TCP粘包 - 服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这称之为
TCP拆包 - 服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包。
特别要注意的是,如果TCP的接受滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种情况,即服务端分多次才能将D1和D2包完全接收,期间发生多次拆包。
产生原因主要有这3种:滑动窗口、MSS/MTU限制、Nagle算法
TCP流量控制主要使用滑动窗口协议,滑动窗口是接收数据端使用的窗口大小,用来告诉发送端接收端的缓存大小,以此可以控制发送端发送数据的大小,从而达到流量控制的目的。
这个窗口大小就是我们一次传输几个数据
对所有数据帧按顺序赋予编号,发送方在发送过程中始终保持着一个发送窗口,只有落在发送窗口内的帧才允许被发送;
同时接收方也维持着一个接收窗口,只有落在接收窗口内的帧才允许接收。这样通过调整发送方窗口和接收方窗口的大小可以实现流量控制。
现在来看一下滑动窗口是如何造成粘包、拆包的?
粘包:假设发送方的每256 bytes表示一个完整的报文,接收方由于数据处理不及时,这256个字节的数据都会被缓存到SO_RCVBUF(接收缓存区)中。如果接收方的SO_RCVBUF中缓存了多个报文,那么对于接收方而言,这就是粘包。拆包:考虑另外一种情况,假设接收方的窗口只剩了128,意味着发送方最多还可以发送128字节,而由于发送方的数据大小是256字节,因此只能发送前128字节,等到接收方ack 后,才能发送剩余字节。这就造成了拆包。
MSS:是Maximum Segement Size缩写,表示TCP报文中data部分的最大长度,是TCP协议在OSI五层网络模型中传输层对一次可以发送的最大数据的限制。MTU:最大传输单元是Maxitum Transmission Unit的简写,是OSI五层网络模型中链路层(datalink layer)对一次可以发送的最大数据的限制。
当需要传输的数据大于MSS或者MTU时,数据会被拆分成多个包进行传输。由于MSS是根据MTU计算出来的,因此当发送的数据满足MSS时,必然满足MTU。
为了更好的理解,我们先介绍一下在5层网络模型中应用通过TCP发送数据的流程:
-
对于应用层来说,
只关心发送的数据DATA,将数据写入socket在内核中的发送缓冲区SO_SNDBUF即返回,操作系统会将SO_SNDBUF中的数据取出来进行发送。 -
传输层会在
DATA前面加上TCP Header,构成一个完整的TCP报文。 -
当数据到达网络层(network layer)时,
网络层会在TCP报文的基础上再添加一个IP Header,也就是将自己的网络地址加入到报文中。 -
到数据链路层时,
还会加上Datalink Header和CRC。 -
当到达物理层时,
会将SMAC(Source Machine,数据发送方的MAC地址),DMAC(Destination Machine,数据接受方的MAC地址 )和Type域加入。
可以发现数据在发送前,每一层都会在上一层的基础上增加一些内容
下图演示了MSS、MTU在这个过程中的作用:
MTU是以太网传输数据方面的限制,每个
以太网帧都有最小的大小64bytes最大不能超过1518bytes。刨去以太网帧的帧头
(DMAC目的MAC地址48bit=6Bytes+(SMAC源MAC地址48bit=6Bytes+Type域2bytes))=14Bytes和
帧尾 CRC校验部分4Bytes(这个部分有时候大家也把它叫做FCS),那么剩下承载上层协议的地方也就是Data域最大就只能有1500Bytes这个值 我们就把它称之为MTU。
由于MTU限制了一次最多可以发送1500个字节,而TCP协议在发送DATA时,还会加上额外的TCP Header和Ip Header,因此刨去这两个部分,就是TCP协议一次可以发送的实际应用数据的最大大小,也就是MSS。
MSS长度=MTU长度-IP Header-TCP Header
TCP Header的长度是20字节,IPv4中IP Header长度是20字节,IPV6中IP Header长度是40字节,因此:在IPV4中,以太网MSS可以达到1460byte;在IPV6中,以太网MSS可以达到1440byte。
需要注意的是MSS表示的一次可以发送的DATA的最大长度,而不是DATA的真实长度。
发送方发送数据时,当SO_SNDBUF中的数据量大于MSS时,操作系统会将数据进行拆分,使得每一部分都小于MSS,这就是拆包,然后每一部分都加上TCP Header,构成多个完整的TCP报文进行发送,当然经过网络层和数据链路层的时候,还会分别加上相应的内容。
需要注意: 默认情况下,与外部通信的网卡的MTU大小是1500个字节。而本地回环地址的MTU大小为65535,这是因为本地测试时数据不需要走网卡,所以不受到1500的限制。
TCP/IP协议中,无论发送多少数据,总是要在数据(DATA)前面加上协议头(TCP Header+IP Header),同时,对方接收到数据,也需要发送ACK表示确认。
即使从键盘输入的一个字符,占用一个字节,可能在传输上造成41字节的包,其中包括1字节的有用信息和40字节的首部数据。这种情况转变成了4000%的消耗,这样的情况对于重负载的网络来是无法接受的。
为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据。
一个连接会设置MSS参数,因此,TCP/IP希望每次都能够以MSS尺寸的数据块来发送数据
Nagle算法就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。
Nagle算法的基本定义是任意时刻,
最多只能有一个未被确认的小段。 所谓“小段”,指的是小于MSS尺寸的数据块,所谓“未被确认”,是指一个数据块发送出去后,没有收到对方发送的ACK确认该数据已收到。
Nagle算法的规则:
- 如果SO_SNDBUF(发送缓冲区)中的数据长度达到MSS,则允许发送;
- 如果该SO_SNDBUF中含有FIN,表示请求关闭连接,则先将SO_SNDBUF中的剩余数据发送,再关闭;
- 设置了TCP_NODELAY=true选项,则允许发送。TCP_NODELAY是取消TCP的确认延迟机制,相当于禁用了Nagle 算法。
- 未设置TCP_CORK选项时,若所有发出去的小数据包(包长度小于MSS)均被确认,则允许发送;
- 上述条件都未满足,但发生了超时(一般为200ms),则立即发送。
TCP粘包拆包问题,在于无法准确的标识接收到的数据,我们使用LengthFieldBasedFrameDecoder和自定义协议的形式来标识我们需要的数据,就可以进行准确的辩识,从而解决粘包拆包问题。
package com.mszlu.rpc.netty.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
/**
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
* +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+----- --+-----+-----+-------+
* | magic code |version | full length | messageType| codec|compress| RequestId |
* +-----------------------+--------+---------------------+-----------+-----------+-----------+------------+
* | |
* | body |
* | |
* | ... ... |
* +-------------------------------------------------------------------------------------------------------+
* 4B magic code(魔法数) 1B version(版本) 4B full length(消息长度) 1B messageType(消息类型)
* 1B compress(压缩类型) 1B codec(序列化类型) 4B requestId(请求的Id)
* body(object类型数据)
*/
public class MsRpcDecoder extends LengthFieldBasedFrameDecoder {
public MsRpcDecoder(){
this(8 * 1024 * 1024,5,4,-9,0);
}
/**
*
* @param maxFrameLength 最大帧长度。它决定可以接收的数据的最大长度。如果超过,数据将被丢弃,根据实际环境定义
* @param lengthFieldOffset 数据长度字段开始的偏移量, magic code+version=长度为5
* @param lengthFieldLength 消息长度的大小 full length(消息长度) 长度为4
* @param lengthAdjustment 补偿值 lengthAdjustment+数据长度取值=长度字段之后剩下包的字节数(x + 16=7 so x = -9)
* @param initialBytesToStrip 忽略的字节长度,如果要接收所有的header+body 则为0,如果只接收body 则为header的长度 ,我们这为0
*/
public MsRpcDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//TODO 实现
return super.decode(ctx, in);
}
} @Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//TODO 实现
Object decode = super.decode(ctx, in);
if (decode instanceof ByteBuf){
ByteBuf frame = (ByteBuf) decode;
if (frame.readableBytes() < MsRpcConstants.TOTAL_LENGTH){
throw new MsRpcException("数据长度不符,格式有误");
}
return decodeFrame(frame);
}
return decode;
}
private Object decodeFrame(ByteBuf in) {
//顺序读取
//1. 先读取魔法数,确定是我们自定义的协议
checkMagicNumber(in);
//2. 检查版本
checkVersion(in);
//3.数据长度
int fullLength = in.readInt();
//4.messageType 消息类型
byte messageType = in.readByte();
//5.序列化类型
byte codecType = in.readByte();
//6.压缩类型
byte compressType = in.readByte();
//7. 请求id
int requestId = in.readInt();
//8. 读取数据
int bodyLength = fullLength - MsRpcConstants.TOTAL_LENGTH;
if (bodyLength > 0){
//有数据,读取body的数据
byte[] bodyData = new byte[bodyLength];
in.readBytes(bodyData);
//解压缩 使用gzip
String compressName = CompressTypeEnum.getName(compress);
}
return null;
}
private void checkVersion(ByteBuf in) {
byte b = in.readByte();
if (b != MsRpcConstants.VERSION){
throw new MsRpcException("未知的version");
}
}
private void checkMagicNumber(ByteBuf in) {
byte[] tmp = new byte[MsRpcConstants.MAGIC_NUMBER.length];
in.readBytes(tmp);
for (int i = 0;i< tmp.length;i++) {
if (tmp[i] != MsRpcConstants.MAGIC_NUMBER[i]){
throw new MsRpcException("未知的magic number");
}
}
}package com.mszlu.rpc.constants;
public class MsRpcConstants {
public static final int TOTAL_LENGTH = 16;
public static final byte[] MAGIC_NUMBER = {(byte)'m',(byte)'s',(byte)'n',(byte)'b'};
public static final int VERSION = 1;
}package com.mszlu.rpc.exception;
public class MsRpcException extends RuntimeException {
public MsRpcException(){
super();
}
public MsRpcException(String msg){
super(msg);
}
public MsRpcException(String msg,Exception e){
super(msg,e);
}
}在进行数据解码的时候,我们在协议里面定义了压缩类型,也就是说在进行传输的时候,数据是进行压缩处理的,这样做的目的是为了减少通过网络传输的数据量,提高性能。
在这里,我们使用应用最为广泛的gzip压缩
package com.mszlu.rpc.constants;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum CompressTypeEnum {
//读取协议这的压缩类型,来此枚举进行匹配
GZIP((byte) 0x01, "gzip");
private final byte code;
private final String name;
public static String getName(byte code) {
for (CompressTypeEnum c : CompressTypeEnum.values()) {
if (c.getCode() == code) {
return c.name;
}
}
return null;
}
}package com.mszlu.rpc.compress;
public interface Compress {
/**
* 压缩方法名称
* @return
*/
String name();
/**
* 压缩
* @param bytes
* @return
*/
byte[] compress(byte[] bytes);
/**
* 解压缩
* @param bytes
* @return
*/
byte[] decompress(byte[] bytes);
}package com.mszlu.rpc.compress;
import com.mszlu.rpc.constants.CompressTypeEnum;
import com.mszlu.rpc.exception.MsRpcException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class GzipCompress implements Compress {
@Override
public String name() {
return CompressTypeEnum.GZIP.getName();
}
@Override
public byte[] compress(byte[] bytes) {
if (bytes == null){
throw new NullPointerException("传入的压缩数据为null");
}
ByteArrayOutputStream os = new ByteArrayOutputStream();
try {
GZIPOutputStream gzip = new GZIPOutputStream(os);
gzip.write(bytes);
gzip.flush();
gzip.finish();
return os.toByteArray();
} catch (IOException e) {
throw new MsRpcException("压缩数据出错",e);
}
}
@Override
public byte[] decompress(byte[] bytes) {
if (bytes == null){
throw new NullPointerException("传入的解压缩数据为null");
}
ByteArrayOutputStream os = new ByteArrayOutputStream();
try {
GZIPInputStream gzipInputStream = new GZIPInputStream(new ByteArrayInputStream(bytes));
byte[] buffer = new byte[1024 * 4];
int n;
while ((n = gzipInputStream.read(buffer)) > -1){
os.write(buffer,0,n);
}
return os.toByteArray();
} catch (IOException e) {
throw new MsRpcException("解压缩数据出错",e);
}
}
}SPI ,全称为 Service Provider Interface,是一种服务发现机制。它通过在ClassPath路径下的META-INF/services文件夹查找文件,自动加载文件里所定义的类。
private Compress loadCompress(byte compressType) {
String compressName = CompressTypeEnum.getName(compressType);
ServiceLoader<Compress> load = ServiceLoader.load(Compress.class);
for (Compress compress : load) {
if (compress.name().equals(compressName)) {
return compress;
}
}
throw new MsRpcException("无对应的压缩类型");
}//解压缩 使用gzip
Compress compress = loadCompress(compressType);
bodyData = compress.decompress(bodyData);在META-INF/services新建com.mszlu.rpc.compress.Compress 文件,其中的内容为
com.mszlu.rpc.compress.GzipCompress在数据传输的过程中,我们将数据做了序列化与反序列化的处理,在解码的时候,需要进行反序列化。
序列化的好处:
- 跨平台
- 解决数据流传输的问题,便于按照自己的格式进行处理
- 不同的序列化方式,性能会有差距
https://github.com/protostuff/protostuff
java的序列化库,是基于谷歌Protocol Buffer的序列化库,Protocol Buffer是谷歌出品的一种数据交换格式,相比xml,json,它占用小,速度快。适合做数据存储或 RPC 数据交换格式。
Protocol Buffer门槛更高,因为需要编写.proto文件,再把它编译成目标语言,这样使用起来就很麻烦。但是现在有了protostuff之后,就不需要依赖.proto文件了,他可以直接对POJO进行序列化和反序列化,使用起来非常简单。
package com.mszlu.rpc.netty.serialize;
/**
* 序列化接口,所有序列化类都要实现这个接口
*/
public interface Serializer {
/**
* 使用的序列化名称
* @return
*/
String name();
/**
* 序列化
*
* @param obj 要序列化的对象
* @return 字节数组
*/
byte[] serialize(Object obj);
/**
* 反序列化
*
* @param bytes 序列化后的字节数组
* @param clazz 目标类
* @return 反序列化的对象
*/
Object deserialize(byte[] bytes, Class<?> clazz);
}先导入依赖:
<!-- protostuff -->
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.7.2</version>
</dependency>实现类:
package com.mszlu.rpc.netty.serialize;
import com.mszlu.rpc.constants.SerializationTypeEnum;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* @description Protostuff序列化器
*/
public class ProtostuffSerializer implements Serializer{
/**
* 避免每次序列化都重新申请Buffer空间,用来暂时存放对象序列化之后的数据
* 如果你设置的空间不足,会自动扩展的,但这个大小还是要设置一个合适的值,设置大了浪费空间,设置小了会自动扩展浪费时间
*/
private LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
/**
* 缓存类对应的Schema,由于构造schema需要获得对象的类和字段信息,会用到反射机制
* 这是一个很耗时的过程,因此进行缓存很有必要,下次遇到相同的类直接从缓存中get就行了
*/
private Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();
@Override
public String name() {
return SerializationTypeEnum.PROTOSTUFF.getName();
}
public byte[] serialize(Object obj) {
Class clazz = obj.getClass();
Schema schema = getSchema(clazz);
byte[] data;
try {
//序列化操作,将对象转换为字节数组
data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} finally {
//使用完清空buffer
buffer.clear();
}
return data;
}
public Object deserialize(byte[] bytes, Class<?> clazz) {
Schema schema = getSchema(clazz);
Object obj = schema.newMessage();
//反序列化操作,将字节数组转换为对应的对象
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
/**
* @description 获取Schema
* @param clazz
* @return [io.protostuff.Schema]
*/
private Schema getSchema(Class clazz) {
//首先尝试从Map缓存中获取类对应的schema
Schema schema = schemaCache.get(clazz);
if(Objects.isNull(schema)) {
//新创建一个schema,RuntimeSchema就是将schema繁琐的创建过程封装了起来
//它的创建过程是线程安全的,采用懒创建的方式,即当需要schema的时候才创建
schema = RuntimeSchema.getSchema(clazz);
if(Objects.nonNull(schema)) {
//缓存schema,方便下次直接使用
schemaCache.put(clazz, schema);
}
}
return schema;
}
}在META-INF/services新建com.mszlu.rpc.netty.serialize.Serializer 文件,其中的内容为
com.mszlu.rpc.netty.serialize.ProtostuffSerializer将协议中,除了魔法数,版本,数据长度外的其他数据,封装到Message对象中
package com.mszlu.rpc.message;
import lombok.*;
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class MsMessage {
//rpc message type
private byte messageType;
//serialization type
private byte codec;
//compress type
private byte compress;
//request id
private int requestId;
//request data
private Object data;
} MsMessage msMessage = MsMessage.builder()
.codec(codecType)
.compress(compressType)
.messageType(messageType)
.requestId(requestId)
.build();package com.mszlu.rpc.message;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Builder
@ToString
public class MsRequest implements Serializable {
private String requestId;
private String interfaceName;
private String methodName;
private Object[] parameters;
private Class<?>[] paramTypes;
private String version;
private String group;
}package com.mszlu.rpc.message;
import lombok.*;
import java.io.Serializable;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class MsResponse<T> implements Serializable {
private String requestId;
/**
* response code
*/
private Integer code;
/**
* response message
*/
private String message;
/**
* response body
*/
private T data;
public static <T> MsResponse<T> success(T data, String requestId) {
MsResponse<T> response = new MsResponse<>();
response.setCode(200);
response.setMessage("success");
response.setRequestId(requestId);
if (null != data) {
response.setData(data);
}
return response;
}
public static <T> MsResponse<T> fail(String message) {
MsResponse<T> response = new MsResponse<>();
response.setCode(500);
response.setMessage(message);
return response;
}
}package com.mszlu.rpc.constants;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum MessageTypeEnum {
REQUEST((byte) 0x01, "request"),
RESPONSE((byte) 0x02, "response"),
HEARTBEAT_PING((byte) 0x03, "heart ping"),
HEARTBEAT_PONG((byte) 0x04, "heart pong");
private final byte code;
private final String name;
public static String getName(byte code) {
for (MessageTypeEnum c : MessageTypeEnum.values()) {
if (c.getCode() == code) {
return c.name;
}
}
return null;
}
} //序列化
Serializer serializer = loadSerializer(codecType);
//
if (messageType == MessageTypeEnum.REQUEST.getCode()){
MsRequest msRequest = serializer.deserialize(bodyData, MsRequest.class);
msMessage.setData(msRequest);
return msMessage;
}
if (messageType == MessageTypeEnum.RESPONSE.getCode()){
MsResponse msResponse = serializer.deserialize(bodyData, MsResponse.class);
msMessage.setData(msResponse);
return msMessage;
} private Serializer loadSerializer(byte codecType) {
String serializerName = SerializationTypeEnum.getName(codecType);
ServiceLoader<Serializer> load = ServiceLoader.load(Serializer.class);
for (Serializer serializer : load) {
if (serializer.name().equals(serializerName)) {
return serializer;
}
}
throw new MsRpcException("无对应的序列化类型");
}编码器,将数据转换为我们上方
自定义的协议格式数据
package com.mszlu.rpc.netty.codec;
import com.mszlu.rpc.compress.Compress;
import com.mszlu.rpc.constants.CompressTypeEnum;
import com.mszlu.rpc.constants.MsRpcConstants;
import com.mszlu.rpc.constants.SerializationTypeEnum;
import com.mszlu.rpc.exception.MsRpcException;
import com.mszlu.rpc.message.MsMessage;
import com.mszlu.rpc.netty.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicInteger;
public class MsRpcEncoder extends MessageToByteEncoder<MsMessage> {
private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(0);
// 4B magic number(魔法数)
// 1B version(版本)
// 4B full length(消息长度)
// 1B messageType(消息类型)
// 1B codec(序列化类型)
// 1B compress(压缩类型)
// 4B requestId(请求的Id)
@Override
protected void encode(ChannelHandlerContext ctx, MsMessage msg, ByteBuf out) throws Exception {
out.writeBytes(MsRpcConstants.MAGIC_NUMBER);
out.writeByte(MsRpcConstants.VERSION);
// 预留数据长度位置
out.writerIndex(out.writerIndex() + 4);
byte messageType = msg.getMessageType();
out.writeByte(messageType);
out.writeByte(msg.getCodec());
out.writeByte(msg.getCompress());
//请求id 原子操作 线程安全 相对加锁 效率高
out.writeInt(ATOMIC_INTEGER.getAndIncrement());
// build full length
byte[] bodyBytes = null;
//header 长度为 16
int fullLength = MsRpcConstants.HEAD_LENGTH;
// fullLength = head length + body length
// 序列化数据
Serializer serializer = loadSerializer(msg.getCodec());
bodyBytes = serializer.serialize(msg.getData());
// 压缩数据
Compress compress = loadCompress(msg.getCompress());
bodyBytes = compress.compress(bodyBytes);
fullLength += bodyBytes.length;
out.writeBytes(bodyBytes);
int writeIndex = out.writerIndex();
//将fullLength写入之前的预留的位置
out.writerIndex(writeIndex - fullLength + MsRpcConstants.MAGIC_NUMBER.length + 1);
out.writeInt(fullLength);
out.writerIndex(writeIndex);
}
private Serializer loadSerializer(byte codecType) {
String serializerName = SerializationTypeEnum.getName(codecType);
ServiceLoader<Serializer> load = ServiceLoader.load(Serializer.class);
for (Serializer serializer : load) {
if (serializer.name().equals(serializerName)) {
return serializer;
}
}
throw new MsRpcException("无对应的序列化类型");
}
private Compress loadCompress(byte compressType) {
String compressName = CompressTypeEnum.getName(compressType);
ServiceLoader<Compress> load = ServiceLoader.load(Compress.class);
for (Compress compress : load) {
if (compress.name().equals(compressName)) {
return compress;
}
}
throw new MsRpcException("无对应的压缩类型");
}
}在上方 我们定义了Request数据格式,客户端会发送这样的数据到服务端,我们要做的事情是根据其中的相应数据,找到调用的service和方法,发起调用,得到返回数据,并将之返还给客户端
package com.mszlu.rpc.netty.handler.server;
import com.mszlu.rpc.constants.MessageTypeEnum;
import com.mszlu.rpc.exception.MsRpcException;
import com.mszlu.rpc.factory.SingletonFactory;
import com.mszlu.rpc.message.MsMessage;
import com.mszlu.rpc.message.MsRequest;
import com.mszlu.rpc.message.MsResponse;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MsNettyServerHandler extends ChannelInboundHandlerAdapter {
private MsRequestHandler requestHandler;
public MsNettyServerHandler(){
this.requestHandler = SingletonFactory.getInstance(MsRequestHandler.class);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
//这里 接收到 请求的信息,然后根据请求,找到对应的服务提供者,调用,获取结果,然后返回
//消费方 会启动一个 客户端,用户接收返回的数据
if (msg instanceof MsMessage){
MsMessage msMessage = (MsMessage) msg;
if (msMessage.getData() instanceof MsRequest){
//客户端请求
MsRequest msRequest = (MsRequest) msMessage.getData();
Object handler = requestHandler.handler(msRequest);
msMessage.setMessageType(MessageTypeEnum.RESPONSE.getCode());
if (ctx.channel().isActive() && ctx.channel().isWritable()){
MsResponse<Object> msResponse = MsResponse.success(handler,msRequest.getRequestId());
msMessage.setData(msResponse);
}else{
MsResponse<Object> msResponse = MsResponse.fail("net fail");
msMessage.setData(msResponse);
}
log.info("服务端收到数据,并处理完成{}:",msMessage);
//写完数据 并关闭通道
ctx.writeAndFlush(msMessage).addListener(ChannelFutureListener.CLOSE);
}
}
}catch (Exception e){
throw new MsRpcException("数据读取异常",e);
}finally {
//释放 以防内存泄露
ReferenceCountUtil.release(msg);
}
}
// @Override
// public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// super.channelReadComplete(ctx);
// }
}之前在服务发布的时候,我们将服务放入了serviceProvider的一个map,通过request中传递的参数,获取到map中的服务,通过反射 调用其方法
package com.mszlu.rpc.netty.handler.server;
import com.mszlu.rpc.exception.MsRpcException;
import com.mszlu.rpc.factory.SingletonFactory;
import com.mszlu.rpc.message.MsRequest;
import com.mszlu.rpc.server.MsServiceProvider;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class MsRequestHandler {
private MsServiceProvider serviceProvider;
public MsRequestHandler(){
this.serviceProvider = SingletonFactory.getInstance(MsServiceProvider.class);
}
public Object handler(MsRequest msRequest){
String interfaceName = msRequest.getInterfaceName();
String version = msRequest.getVersion();
String serviceName = interfaceName + version;
Object service = serviceProvider.getService(serviceName);
try {
Method method = service.getClass().getMethod(msRequest.getMethodName(), msRequest.getParamTypes());
Object invoke = method.invoke(service, msRequest.getParameters());
return invoke;
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
throw new MsRpcException("服务调用出现问题:"+e.getMessage(),e);
}
}
}至此服务提供方的处理就完成了




