如何用python写个串口通信的程序

 我来答
rickleo
2016-09-21 · TA获得超过164个赞
知道小有建树答主
回答量:100
采纳率:0%
帮助的人:117万
展开全部

使用 pyserial 就可以处理串口通信,这个包是跨平台的。

http://pyserial.sourceforge.net/


示例程序在这里:

https://pyserial.readthedocs.io/en/latest/examples.html#wxpython-examples


import serial

# 创建serial实例
serialport = serial.Serial()
serialport.port = 'COM1'
serialport.baudrate = 9600
serialport.parity = 'N'
serialport.bytesize = 8
serialport.stopbits = 1
serialport.timeout = 0.6
try:
    serialport.open()
    serialport.setDTR(True)
    serialport.setRTS(True)
except Exception, ex:
    print ex
    
# 发送数据
serialport.write(raw_data)

# 根据项目要求,可以开一个线程扫描接收数据
百度网友59ca271
2018-04-08 · TA获得超过7252个赞
知道小有建树答主
回答量:46
采纳率:0%
帮助的人:9730
展开全部

打开串口后启动一个线程来监听串口数据的进入,有数据时,就做数据的处理。

用python写串口通信程序的示例:

#coding=gb18030 


import sys,threading,time; 


import serial; 


import binascii,encodings; 


import re; 


import socket; 



class ReadThread:


def __init__(self, Output=None, Port=0, Log=None, i_FirstMethod=True):


self.l_serial = None;


self.alive = False;


self.waitEnd = None;


self.bFirstMethod = i_FirstMethod;


self.sendport = '';


self.log = Log;


self.output = Output;


self.port = Port;


self.re_num = None; 



def waiting(self):


if not self.waitEnd is None:


self.waitEnd.wait(); 



def SetStopEvent(self):


if not self.waitEnd is None:


self.waitEnd.set();


self.alive = False;


self.stop();

def start(self):


self.l_serial = serial.Serial();


self.l_serial.port = self.port;


self.l_serial.baudrate = 9600;


self.l_serial.timeout = 2; 


self.re_num = re.compile('\d'); 



try:
if not self.output is None:


self.output.WriteTex;


if not self.log is None:


self.log.info;


self.l_serial.open();


except Exception, ex:


if self.l_serial.isOpen():


self.l_serial.close();

self.l_serial = None;

if not self.output is None:


self.output.WriteText;


if not self.log is None:


self.log.error(u'%s' % ex);


return False;

if self.l_serial.isOpen():


if not self.output is None:


self.output.WriteText;


if not self.log is None:


self.log.info;


self.waitEnd = threading.Event();


self.alive = True;


self.thread_read = None;


self.thread_read = threading.Thread(target=self.FirstReader);


self.thread_read.setDaemon(1);


self.thread_read.start();


return True;


else:


if not self.output is None:


self.output.WriteText;


if not self.log is None:


self.log.info;


return False;

def InitHead(self):

try:


time.sleep(3);


if not self.output is None:


self.output.WriteText;


if not self.log is None:


self.log.info;


self.l_serial.flushInput();


self.l_serial.write('\x11');


data1 = self.l_serial.read(1024);


except ValueError,ex:


if not self.output is None:


self.output.WriteText;


if not self.log is None:


self.log.error(u'%s' % ex);


self.SetStopEvent();


return; 



if not self.output is None:


self.output.WriteText;


if not self.log is None:


self.log.info;


self.output.WriteText(u'===================================\r\n'); 



def SendData(self, i_msg):


lmsg = '';


isOK = False;


if isinstance(i_msg, unicode):


lmsg = i_msg.encode('gb18030');


lmsg = i_msg;



pass


except Exception, ex:


pass;


return isOK; 



def FirstReader(self):


data1 = '';


isQuanJiao = True;


isFirstMethod = True;


isEnd = True;


readCount = 0;


saveCount = 0;


RepPos = 0;


#read Head Infor content


self.InitHead(); 



while self.alive:


try:


data = '';


n = self.l_serial.inWaiting();


if n:


data = data + self.l_serial.read(n);


#print binascii.b2a_hex(data), 



for l in xrange(len(data)):


if ord(data[l])==0x8E:


isQuanJiao = True;


continue;


if ord(data[l])==0x8F:


isQuanJiao = False;


continue;


if ord(data[l]) == 0x80 or ord(data[l]) == 0x00:


if len(data1)>10:


if not self.re_num.search(data1,1) is None:


saveCount = saveCount + 1;


if RepPos==0:


RepPos = self.output.GetInsertionPoint();


self.output.Remove(RepPos,self.output.GetLastPosition()); 



self.SendData(data1);


data1 = '';


continue;


except Exception, ex:


if not self.log is None:


self.log.error(u'%s' % ex); 



self.waitEnd.set();


self.alive = False; 


def stop(self):


self.alive = False;


self.thread_read.join();


if self.l_serial.isOpen():


self.l_serial.close();


if not self.output is None:


self.output.WriteText;


if not self.log is None:


self.log.info; 



def printHex(self, s):


s1 = binascii.b2a_hex(s);


print s1; 




if __name__ == '__main__':


rt = ReadThread();


f = open("sendport.cfg", "r")


rt.sendport = f.read()


f.close()


try:


if rt.start():


rt.waiting();


rt.stop();


else:


pass;


except Exception,se:


print str(se); 



if rt.alive:


rt.stop(); 



print 'End OK .';


del rt;

本回答被网友采纳
已赞过 已踩过<
你对这个回答的评价是?
评论 收起
匿名用户
2017-07-06
展开全部
用split()分割即可a=input('inputaandb')lis
已赞过 已踩过<
你对这个回答的评价是?
评论 收起
收起 更多回答(1)
推荐律师服务: 若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询

为你推荐:

下载百度知道APP,抢鲜体验
使用百度知道APP,立即抢鲜体验。你的手机镜头里或许有别人想知道的答案。
扫描二维码下载
×

类别

我们会通过消息、邮箱等方式尽快将举报结果通知您。

说明

0/200

提交
取消

辅 助

模 式