netty 真的有那么高并发吗

 我来答
就烦条0o
2016-07-03 · 知道合伙人软件行家
就烦条0o
知道合伙人软件行家
采纳数:33315 获赞数:46492
从事多年系统运维,喜欢编写各种小程序和脚本。

向TA提问 私信TA
展开全部
我就是用demo那种方式写的,netty自带线程池的,我就没有用自己的线程处理了,而且占cpu资源太猛了,4个线程就能占CPU100%了,客户端测试基本不占CPU,CPU资源基都被netty框架占满了,这个是怎么回事呢?
代码如下:
public class NettyServer {

public static final ThreadLocal<PacketCrypt> session = new ThreadLocal<PacketCrypt>();
// MemoryAwareThreadPoolExecutor
// OrderedMemoryAwareThreadPoolExecutor
// ExecutionHandler executionHandler = new ExecutionHandler(
// new MemoryAwareThreadPoolExecutor(16, 1048576, 1048576))

private static final ImmediateEventExecutor iee = ImmediateEventExecutor.INSTANCE;

public static void startServer(int port,int workerCount) throws InterruptedException{

//取默认

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
PacketCrypt pc = null;
pc = session.get();
if (pc == null){
try {
pc = new PacketCrypt();
pc.getCryptAES().importKey(PacketCrypt.INITKEY,PacketCrypt.INITIV);
session.set(pc);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
ConnectionContext connectionContext = null;
connectionContext = new ConnectionContext(pc);
connectionContext.setChannel(ch);
// ch.pipeline().addLast(new WriteTimeoutHandler(60));
// // 注册两个OutboundHandler,执行顺序为注册顺序的逆序,所以应该是OutboundHandler2 OutboundHandler1
// ch.pipeline().addLast(new MessageEncoder());
// 注册两个InboundHandler,执行顺序为注册顺序,所以应该是InboundHandler1 InboundHandler2
// ch.pipeline().addLast(new ReadTimeoutHandler(60));
ch.pipeline().addLast(ImmediateEventExecutor.INSTANCE,new MessageDecoder(connectionContext));
// ch.pipeline().addLast(new DepacketAdapter(connectionContext));
ch.pipeline().addLast(ImmediateEventExecutor.INSTANCE,new BusinessAdapter());
}

});
b.option(ChannelOption.SO_BACKLOG, 1024) ;
// b.childOption("child.reuseAddress", true);
b.childOption(ChannelOption.TCP_NODELAY,true);
b.childOption(ChannelOption.SO_KEEPALIVE,true);
// b.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(4096));
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync();
TraceLog.info(" 系统启动完成.");
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}

}

}

public class MessageDecoder extends ChannelInboundHandlerAdapter {
private ConnectionContext connectionContext;

public MessageDecoder(ConnectionContext connectionContext){

this.connectionContext = connectionContext;

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg)

throws Exception {
// SimpleChannelInboundHandler

ByteBuf buf = (ByteBuf) msg;
// System.out.println(buf.readableBytes());

// Make sure if the length field was received.

if (buf.readableBytes() < 4) {

// The length field was not received yet - return.

// This method will be invoked again when more packets are

// received and appended to the buffer.

return ;

}

// The length field is in the buffer.

// Mark the current buffer position before reading the length field

// because the whole frame might not be in the buffer yet.

// We will reset the buffer position to the marked position if

// there's not enough bytes in the buffer.

buf.markReaderIndex(); // Read the length field.

byte[] lenbytes = new byte[4];

buf.readBytes(lenbytes);

int length = ( (lenbytes[3] & 0xFF) << 24 ) | ( (lenbytes[2] & 0xFF) << 16 )

| ( (lenbytes[1] & 0xFF) << 8 ) | ( (lenbytes[0] & 0xFF) );
// System.out.println(buf.readableBytes());

if (length < MINLEN){

buf.clear();
ctx.close();
return;

}

// Make sure if there's enough bytes in the buffer.

if (buf.readableBytes() < length + 9) {// The whole bytes were not received yet - return null.

buf.resetReaderIndex();

return;

}

try{

// There's enough bytes in the buffer. Read it.

byte[] body = new byte[length];

byte[] sign = new byte[9];

// Successfully decoded a frame. Return the decoded frame.

buf.readBytes(body);

buf.readBytes(sign);

MessagePacket mp = new MessagePacket(connectionContext, body, sign);

connectionContext.incPackNum();

ctx.fireChannelRead(mp);

}finally{

buf.release();

}

}

private final static int MINLEN = "{\"ver\":\"1\",\"av\":\"1\",\"cf\":10001,\"tc\":\"tc\",\"md\":\"md\"}".length();
}

public class BusinessAdapter extends ChannelInboundHandlerAdapter {

private MessagePacket mp;

private PacketCrypt pc = null;

private TradeContextImp tradeContextImp = null;

private SocketChannel sc = null;

public BusinessAdapter() throws Exception{

pc = new PacketCrypt();

}

private void doExecuteMessage(){

sc = mp.getConnectionContext().getChannel();

//如果连接已关闭则不处理该包

if (!sc.isActive()) return;

//解析包体

try {

tradeContextImp = new TradeContextImp(pc,mp);

mp.getConnectionContext().setKeepAlive(false);

//处理业务

byte[] resp = doExecute();

if (!sc.isActive()) return;

ChannelHandlerContext chc = mp.getConnectionContext().getChannelHandlerContext();

ByteBuf bb = chc.alloc().directBuffer(resp.length);

bb.writeBytes(resp);

//如果连接已关闭则不处理该包

chc.writeAndFlush(bb);
// chc.fireChannelWritabilityChanged();

} catch (PacketException e) {

sc.close();

} catch (IOException e) {

sc.close();

}

}

/**

* 交易接口参数类型数组定义

*/

private static Class<?>[] invokeParamsClass = new Class<?>[]{TradeContext.class};

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg)

throws Exception {
// System.out.println("enter business adapter time: " + new Date());

mp = (MessagePacket)msg;

mp.getConnectionContext().setChannelHandlerContext(ctx);

doExecuteMessage();

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// ctx.flush();

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception {

ctx.close();

}

}
晓网科技
2024-10-17 广告
ZigBee协议适应无线传感器的低花费、低能量、高容错性等的要求。ZigBee是基于IEEE802.15.4标准的低功耗局域网协议。但IEEE仅处理低级MAC层和物理层协议,因此Zigbee联盟扩展了IEEE,对其网络层协议和API进行了标... 点击进入详情页
本回答由晓网科技提供
joyu的帐号
2020-06-19
知道答主
回答量:1
采纳率:0%
帮助的人:610
展开全部
你这个代码没有一点条理性,太乱了,我的服务端代码并发连接几万个,服务器资源使用率还不到60%。
已赞过 已踩过<
你对这个回答的评价是?
评论 收起
推荐律师服务: 若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询

为你推荐:

下载百度知道APP,抢鲜体验
使用百度知道APP,立即抢鲜体验。你的手机镜头里或许有别人想知道的答案。
扫描二维码下载
×

类别

我们会通过消息、邮箱等方式尽快将举报结果通知您。

说明

0/200

提交
取消

辅 助

模 式