Processor

【라즈베리파이】 wiringPi Python : UART 라즈베리파이 간 채팅

작성자 임베디드코리아 작성일26-02-20 00:48 조회138회 댓글0건
라즈베리파이 2대를 연결하여 채팅을 한다.

--->>> 라즈베리파이 간 채팅 : uart_chat.py    <<<----------------------------
import serial
import sys, time
#파이썬에서 쓰레드 기능 구현을 위해 필요
import threading
import commands
import binascii
import RPi.GPIO as GPIO
 
#중요한 제어 문자
NAK_CNT = 0
STX = 0x02
ETX = 0x03
ACK = 0x06
NCK = 0x15
 
Last_Packet= ""
Rcv_End = False
Debug = False
 
#출력 문자 색 제어
RED =  "\033[31m"
DEFAULT =  "\033[0m"
BLUE = "\033[34m"
GREEN = "\033[32m"
 
# ISO 1155 표준 LRC 계산식
def make_LRC(packet):
    LRC = 0x00
    for ch in packet:
        LRC = ((LRC + ord(ch[0])) & 0xFF)
    LRC = (~LRC + 1) & 0xFF
    return LRC
 
# UART 패킷을 송신
def send(packet):
    global Last_Packet
    bytes = [STX]
    LRC = 0x10
    Last_Packet = "".join(map(chr, bytes))
    Last_Packet += packet
    bytes = [ETX]
    Last_Packet += "".join(map(chr, bytes))
    LRC = make_LRC(Last_Packet[1:len(Last_Packet)])
    bytes = [LRC]
    Last_Packet += "".join(map(chr, bytes))
    ser.write(Last_Packet)
    print "S:", binascii.hexlify(bytearray(Last_Packet))
    print "\033[34mSend:", packet, DEFAULT
    return
 
# LRC 체크가 성공여부 체크
def check_packet(val):
    lrc = make_LRC(val[1:len(val) - 1])
    if(lrc == ord(val[len(val) -1])):
        if(Debug == True):
            print BLUE, "LRC Check Success :", lrc, DEFAULT
            print "CHECK:", binascii.hexlify(bytearray(val))
        ret = val[1: len(val) -2]
    else:
        if(Debug == True):
            print RED, "LRC Check Fail", lrc, "!= ",ord(val[len(val) -1]), DEFAULT
            print "CHECK:", binascii.hexlify(bytearray(val))
        ret = ""
       
    return ret
"""
UART 채널은 완전 이중화(Full Duplex)이기 때문에 별도 쓰레드에서 수신처리 가능
"""
def rs232_receive_thread():
    global  Last_Packet
    data = ""
    while 1:
        val = ser.read(1)
        if(0 == len(val)):
            time.sleep(0.002)
            continue
        ival = ord(val[0])
        if(Debug == True):
            print "R:", binascii.hexlify(bytearray(val))
        if(ival ==  ACK):   
            if(Debug == True):
                print "\033[37mSend:OK ACK rcv \033[0m"
            continue
 
 
        if(ival ==  NCK):    #packet currupted ->resend 3 times
            print "\033[31m", "Packet Currupted ->NAK received", "\033[0m"
            if(++NAK_CNT < 4):
                ser.write(Last_Packet)
            else:    #drop packet
                NAK_CNT = 0
            continue;
 
        if(ival ==  ETX):
            data += val
            while True:
                val = ser.read(1)    #receive Last LRC
                if(len(val) == 1):
                    Rcv_End = True
                    data += val
                    break
 
        else:
            Rcv_End = False
            data += val
 
        if(True == Rcv_End):
            rcv_data = check_packet(data)
            if(0 == len(rcv_data)):
                print RED,  "Invalid Packet Received ->NAK  DATA:", data,  DEFAULT
                bytes = [NCK]
                ser.write("".join(map(chr, bytes)))
            else:
                print GREEN, "RCV:",  rcv_data, DEFAULT
                bytes = [ACK]
                ser.write("".join(map(chr, bytes)))
            data = ""
        time.sleep(0.002)
 
# 수신 전용 쓰레드를 만들어 실행시킴
def rs232_receive_svc():
    print ('rs232 receive svc')
    th = threading.Thread(target=rs232_receive_thread)
    th.start()
 
# 여기에서부터 프로그램 시작!
print "RS232 Chatting Application"
 
#루프백 테스트에 사용할 UART용 가상 파일 ttyAMA0(ttyS0)를 연다.
if(GPIO.RPI_REVISION < 3):
  ser = serial.Serial(port = "/dev/ttyAMA0", baudrate=9600, timeout=2)
else:
  ser = serial.Serial(port = "/dev/ttyS0", baudrate=9600, timeout=2)
 
if (ser.isOpen() == False):
    ser.open()
#만약 ttyAMA0에 데이터가 남아있으면 비우고 새로 시작한다.
ser.flushInput()
ser.flushOutput()
 
#수신 쓰레드 생성
rs232_receive_svc()
 
try:
    while True:
        #송신할 채팅 문장을 입력받아서 송신
        packet = raw_input("")
        send(packet)
 
except KeyboardInterrupt: 
    print "RS232 Chatting Application End"
 
finally:
    ser.close()
    print "Good by!"