netty 真的有那么高并发吗
展开全部
我就是用demo那种方式写的,netty自带线程池的,我就没有用自己的线程处理了,而且占cpu资源太猛了,4个线程就能占CPU100%了,客户端测试基本不占CPU,CPU资源基都被netty框架占满了,这个是怎么回事呢?
代码如下:
public class NettyServer {
public static final ThreadLocal<PacketCrypt> session = new ThreadLocal<PacketCrypt>();
// MemoryAwareThreadPoolExecutor
// OrderedMemoryAwareThreadPoolExecutor
// ExecutionHandler executionHandler = new ExecutionHandler(
// new MemoryAwareThreadPoolExecutor(16, 1048576, 1048576))
private static final ImmediateEventExecutor iee = ImmediateEventExecutor.INSTANCE;
public static void startServer(int port,int workerCount) throws InterruptedException{
//取默认
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
PacketCrypt pc = null;
pc = session.get();
if (pc == null){
try {
pc = new PacketCrypt();
pc.getCryptAES().importKey(PacketCrypt.INITKEY,PacketCrypt.INITIV);
session.set(pc);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
ConnectionContext connectionContext = null;
connectionContext = new ConnectionContext(pc);
connectionContext.setChannel(ch);
// ch.pipeline().addLast(new WriteTimeoutHandler(60));
// // 注册两个OutboundHandler,执行顺序为注册顺序的逆序,所以应该是OutboundHandler2 OutboundHandler1
// ch.pipeline().addLast(new MessageEncoder());
// 注册两个InboundHandler,执行顺序为注册顺序,所以应该是InboundHandler1 InboundHandler2
// ch.pipeline().addLast(new ReadTimeoutHandler(60));
ch.pipeline().addLast(ImmediateEventExecutor.INSTANCE,new MessageDecoder(connectionContext));
// ch.pipeline().addLast(new DepacketAdapter(connectionContext));
ch.pipeline().addLast(ImmediateEventExecutor.INSTANCE,new BusinessAdapter());
}
});
b.option(ChannelOption.SO_BACKLOG, 1024) ;
// b.childOption("child.reuseAddress", true);
b.childOption(ChannelOption.TCP_NODELAY,true);
b.childOption(ChannelOption.SO_KEEPALIVE,true);
// b.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(4096));
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync();
TraceLog.info(" 系统启动完成.");
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
public class MessageDecoder extends ChannelInboundHandlerAdapter {
private ConnectionContext connectionContext;
public MessageDecoder(ConnectionContext connectionContext){
this.connectionContext = connectionContext;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// SimpleChannelInboundHandler
ByteBuf buf = (ByteBuf) msg;
// System.out.println(buf.readableBytes());
// Make sure if the length field was received.
if (buf.readableBytes() < 4) {
// The length field was not received yet - return.
// This method will be invoked again when more packets are
// received and appended to the buffer.
return ;
}
// The length field is in the buffer.
// Mark the current buffer position before reading the length field
// because the whole frame might not be in the buffer yet.
// We will reset the buffer position to the marked position if
// there's not enough bytes in the buffer.
buf.markReaderIndex(); // Read the length field.
byte[] lenbytes = new byte[4];
buf.readBytes(lenbytes);
int length = ( (lenbytes[3] & 0xFF) << 24 ) | ( (lenbytes[2] & 0xFF) << 16 )
| ( (lenbytes[1] & 0xFF) << 8 ) | ( (lenbytes[0] & 0xFF) );
// System.out.println(buf.readableBytes());
if (length < MINLEN){
buf.clear();
ctx.close();
return;
}
// Make sure if there's enough bytes in the buffer.
if (buf.readableBytes() < length + 9) {// The whole bytes were not received yet - return null.
buf.resetReaderIndex();
return;
}
try{
// There's enough bytes in the buffer. Read it.
byte[] body = new byte[length];
byte[] sign = new byte[9];
// Successfully decoded a frame. Return the decoded frame.
buf.readBytes(body);
buf.readBytes(sign);
MessagePacket mp = new MessagePacket(connectionContext, body, sign);
connectionContext.incPackNum();
ctx.fireChannelRead(mp);
}finally{
buf.release();
}
}
private final static int MINLEN = "{\"ver\":\"1\",\"av\":\"1\",\"cf\":10001,\"tc\":\"tc\",\"md\":\"md\"}".length();
}
public class BusinessAdapter extends ChannelInboundHandlerAdapter {
private MessagePacket mp;
private PacketCrypt pc = null;
private TradeContextImp tradeContextImp = null;
private SocketChannel sc = null;
public BusinessAdapter() throws Exception{
pc = new PacketCrypt();
}
private void doExecuteMessage(){
sc = mp.getConnectionContext().getChannel();
//如果连接已关闭则不处理该包
if (!sc.isActive()) return;
//解析包体
try {
tradeContextImp = new TradeContextImp(pc,mp);
mp.getConnectionContext().setKeepAlive(false);
//处理业务
byte[] resp = doExecute();
if (!sc.isActive()) return;
ChannelHandlerContext chc = mp.getConnectionContext().getChannelHandlerContext();
ByteBuf bb = chc.alloc().directBuffer(resp.length);
bb.writeBytes(resp);
//如果连接已关闭则不处理该包
chc.writeAndFlush(bb);
// chc.fireChannelWritabilityChanged();
} catch (PacketException e) {
sc.close();
} catch (IOException e) {
sc.close();
}
}
/**
* 交易接口参数类型数组定义
*/
private static Class<?>[] invokeParamsClass = new Class<?>[]{TradeContext.class};
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// System.out.println("enter business adapter time: " + new Date());
mp = (MessagePacket)msg;
mp.getConnectionContext().setChannelHandlerContext(ctx);
doExecuteMessage();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
代码如下:
public class NettyServer {
public static final ThreadLocal<PacketCrypt> session = new ThreadLocal<PacketCrypt>();
// MemoryAwareThreadPoolExecutor
// OrderedMemoryAwareThreadPoolExecutor
// ExecutionHandler executionHandler = new ExecutionHandler(
// new MemoryAwareThreadPoolExecutor(16, 1048576, 1048576))
private static final ImmediateEventExecutor iee = ImmediateEventExecutor.INSTANCE;
public static void startServer(int port,int workerCount) throws InterruptedException{
//取默认
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
PacketCrypt pc = null;
pc = session.get();
if (pc == null){
try {
pc = new PacketCrypt();
pc.getCryptAES().importKey(PacketCrypt.INITKEY,PacketCrypt.INITIV);
session.set(pc);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
ConnectionContext connectionContext = null;
connectionContext = new ConnectionContext(pc);
connectionContext.setChannel(ch);
// ch.pipeline().addLast(new WriteTimeoutHandler(60));
// // 注册两个OutboundHandler,执行顺序为注册顺序的逆序,所以应该是OutboundHandler2 OutboundHandler1
// ch.pipeline().addLast(new MessageEncoder());
// 注册两个InboundHandler,执行顺序为注册顺序,所以应该是InboundHandler1 InboundHandler2
// ch.pipeline().addLast(new ReadTimeoutHandler(60));
ch.pipeline().addLast(ImmediateEventExecutor.INSTANCE,new MessageDecoder(connectionContext));
// ch.pipeline().addLast(new DepacketAdapter(connectionContext));
ch.pipeline().addLast(ImmediateEventExecutor.INSTANCE,new BusinessAdapter());
}
});
b.option(ChannelOption.SO_BACKLOG, 1024) ;
// b.childOption("child.reuseAddress", true);
b.childOption(ChannelOption.TCP_NODELAY,true);
b.childOption(ChannelOption.SO_KEEPALIVE,true);
// b.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(4096));
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync();
TraceLog.info(" 系统启动完成.");
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
public class MessageDecoder extends ChannelInboundHandlerAdapter {
private ConnectionContext connectionContext;
public MessageDecoder(ConnectionContext connectionContext){
this.connectionContext = connectionContext;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// SimpleChannelInboundHandler
ByteBuf buf = (ByteBuf) msg;
// System.out.println(buf.readableBytes());
// Make sure if the length field was received.
if (buf.readableBytes() < 4) {
// The length field was not received yet - return.
// This method will be invoked again when more packets are
// received and appended to the buffer.
return ;
}
// The length field is in the buffer.
// Mark the current buffer position before reading the length field
// because the whole frame might not be in the buffer yet.
// We will reset the buffer position to the marked position if
// there's not enough bytes in the buffer.
buf.markReaderIndex(); // Read the length field.
byte[] lenbytes = new byte[4];
buf.readBytes(lenbytes);
int length = ( (lenbytes[3] & 0xFF) << 24 ) | ( (lenbytes[2] & 0xFF) << 16 )
| ( (lenbytes[1] & 0xFF) << 8 ) | ( (lenbytes[0] & 0xFF) );
// System.out.println(buf.readableBytes());
if (length < MINLEN){
buf.clear();
ctx.close();
return;
}
// Make sure if there's enough bytes in the buffer.
if (buf.readableBytes() < length + 9) {// The whole bytes were not received yet - return null.
buf.resetReaderIndex();
return;
}
try{
// There's enough bytes in the buffer. Read it.
byte[] body = new byte[length];
byte[] sign = new byte[9];
// Successfully decoded a frame. Return the decoded frame.
buf.readBytes(body);
buf.readBytes(sign);
MessagePacket mp = new MessagePacket(connectionContext, body, sign);
connectionContext.incPackNum();
ctx.fireChannelRead(mp);
}finally{
buf.release();
}
}
private final static int MINLEN = "{\"ver\":\"1\",\"av\":\"1\",\"cf\":10001,\"tc\":\"tc\",\"md\":\"md\"}".length();
}
public class BusinessAdapter extends ChannelInboundHandlerAdapter {
private MessagePacket mp;
private PacketCrypt pc = null;
private TradeContextImp tradeContextImp = null;
private SocketChannel sc = null;
public BusinessAdapter() throws Exception{
pc = new PacketCrypt();
}
private void doExecuteMessage(){
sc = mp.getConnectionContext().getChannel();
//如果连接已关闭则不处理该包
if (!sc.isActive()) return;
//解析包体
try {
tradeContextImp = new TradeContextImp(pc,mp);
mp.getConnectionContext().setKeepAlive(false);
//处理业务
byte[] resp = doExecute();
if (!sc.isActive()) return;
ChannelHandlerContext chc = mp.getConnectionContext().getChannelHandlerContext();
ByteBuf bb = chc.alloc().directBuffer(resp.length);
bb.writeBytes(resp);
//如果连接已关闭则不处理该包
chc.writeAndFlush(bb);
// chc.fireChannelWritabilityChanged();
} catch (PacketException e) {
sc.close();
} catch (IOException e) {
sc.close();
}
}
/**
* 交易接口参数类型数组定义
*/
private static Class<?>[] invokeParamsClass = new Class<?>[]{TradeContext.class};
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// System.out.println("enter business adapter time: " + new Date());
mp = (MessagePacket)msg;
mp.getConnectionContext().setChannelHandlerContext(ctx);
doExecuteMessage();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
推荐律师服务:
若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询