您现在的位置是:亿华云 > 系统运维
Java 从零开始手写 RPC-Netty4 实现客户端和服务端
亿华云2025-10-03 15:46:09【系统运维】7人已围观
简介说明上一篇代码基于 socket 的实现非常简单,但是对于实际生产,一般使用 netty。至于 netty 的优点可以参考:为什么选择 netty?[1]http://houbb.github.io/
说明
上一篇代码基于 socket 的从零实现非常简单,但是开始客户对于实际生产,一般使用 netty。手写实现
至于 netty 的端和端优点可以参考:
为什么选择 netty?[1]
http://houbb.github.io/2019/05/10/netty-definitive-gudie-04-why-netty

代码实现
maven 引入
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>${ netty.version}</version> </dependency>引入 netty 对应的 maven 包,此处为 4.1.17.Final。服务
服务端代码实现
netty 的从零服务端启动代码是比较固定的。
package com.github.houbb.rpc.server.core; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.rpc.server.constant.RpcServerConst; import com.github.houbb.rpc.server.handler.RpcServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * rpc 服务端 * @author binbin.hou * @since 0.0.1 */ public class RpcServer extends Thread { private static final Log log = LogFactory.getLog(RpcServer.class); /** * 端口号 */ private final int port; public RpcServer() { this.port = RpcServerConst.DEFAULT_PORT; } public RpcServer(int port) { this.port = port; } @Override public void run() { // 启动服务端 log.info("RPC 服务开始启动服务端"); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(workerGroup,开始客户 bossGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new RpcServerHandler()); } }) // 这个参数影响的是还没有被accept 取出的连接 .option(ChannelOption.SO_BACKLOG, 128) // 这个参数只是过一段时间内客户端没有响应,云服务器提供商服务端会发送一个 ack 包,手写实现以判断客户端是端和端否还活着。 .childOption(ChannelOption.SO_KEEPALIVE,服务 true); // 绑定端口,开始接收进来的从零链接 ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly(); log.info("RPC 服务端启动完成,监听【" + port + "】端口"); channelFuture.channel().closeFuture().syncUninterruptibly(); log.info("RPC 服务端关闭完成"); } catch (Exception e) { log.error("RPC 服务异常",开始客户 e); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }为了简单,服务端启动端口号固定,手写实现RpcServerConst 常量类内容如下:
public final class RpcServerConst { private RpcServerConst(){ } /** * 默认端口 * @since 0.0.1 */ public static final int DEFAULT_PORT = 9627; }RpcServerHandler
当然,端和端还有一个比较核心的服务类就是 RpcServerHandler
public class RpcServerHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // do nothing now } }目前是空实现,后续可以添加对应的日志输出及逻辑处理。
测试
启动测试的代码非常简单:
/** * 服务启动代码测试 * @param args 参数 */ public static void main(String[] args) { new RpcServer().start(); }说明
上面我们实现了服务端的云服务器实现,这一节来一起看一下 client 客户端代码实现。
代码实现
RpcClient
/* * Copyright (c) 2019. houbinbin Inc. * rpc All rights reserved. */ package com.github.houbb.rpc.client.core; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.rpc.client.handler.RpcClientHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * <p> rpc 客户端 </p> * * <pre> Created: 2019/10/16 11:21 下午 </pre> * <pre> Project: rpc </pre> * * @author houbinbin * @since 0.0.2 */ public class RpcClient extends Thread { private static final Log log = LogFactory.getLog(RpcClient.class); /** * 监听端口号 */ private final int port; public RpcClient(int port) { this.port = port; } public RpcClient() { this(9527); } @Override public void run() { // 启动服务端 log.info("RPC 服务开始启动客户端"); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); ChannelFuture channelFuture = bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<Channel>(){ @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new LoggingHandler(LogLevel.INFO)) .addLast(new RpcClientHandler()); } }) .connect("localhost", port) .syncUninterruptibly(); log.info("RPC 服务启动客户端完成,监听端口:" + port); channelFuture.channel().closeFuture().syncUninterruptibly(); log.info("RPC 服务开始客户端已关闭"); } catch (Exception e) { log.error("RPC 客户端遇到异常", e); } finally { workerGroup.shutdownGracefully(); } } }.connect("localhost", port) 声明了客户端需要连接的服务端,此处和服务端的端口保持一致。
RpcClientHandler
客户端处理类也比较简单,暂时留空。
/* * Copyright (c) 2019. houbinbin Inc. * rpc All rights reserved. */ package com.github.houbb.rpc.client.handler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * <p> 客户端处理类 </p> * * <pre> Created: 2019/10/16 11:30 下午 </pre> * <pre> Project: rpc </pre> * * @author houbinbin * @since 0.0.2 */ public class RpcClientHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // do nothing. } }启动测试
服务端
首先启动服务端。
客户端
然后启动客户端连接服务端,实现如下:
/** * 服务启动代码测试 * @param args 参数 */ public static void main(String[] args) { new RpcClient().start(); }小结
为了便于大家学习,以上源码已经开源:
https://github.com/houbb/rpc
我是老马,期待与你的下次重逢。
References
[1] 为什么选择 netty?: http://houbb.github.io/2019/05/10/netty-definitive-gudie-04-why-netty
源码下载很赞哦!(1121)