[Python] 백그라운드에서 터미널 입력을 TCP 통신으로 연속 입력받는법
PowerShell에서 터미널을 통해 Python으로 RAW 데이터를 TCP 통신으로 전달하는 구현 방법을 설명한다.
Feb 04, 2025
📝개요
- 센서에서 엣지 디바이스로 RAW 데이터를 전달해야 하는데, 터미널을 통해서 Stream으로 파이썬 코드에 데이터를 전달하고 싶었다.
- 테스트한 버전은 Python 3.12에 환경은 PyCharm과 PowerShell로 수행했다.
🔍시도한 방법
- 이벤트 루프 처리 방식에서 PowerShell이 다루는 방식이 다른 터미널과 다르기 때문에 이를 처리하는 코드를 상단에 추가해야 한다.
# 이벤트 루프 정책 설정 (파일 상단에 추가) import sys import asyncio if sys.platform == 'win32': from asyncio import WindowsSelectorEventLoopPolicy asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
- 이후 Server.py 파일에서 TCP 서버를 시작하고, 터미널에서 실행되는 개별 클라이언트를 처리하는 코드를 작성한다.
# TCP 서버 시작 및 실행 async def tcp_server(queue: asyncio.Queue, host='localhost', port=8888): # TCP 서버 생성, 지정 호스트와 포트에서 실행 server = await asyncio.start_server( # 클라이언트 연결 처리 lambda r, w: handle_tcp_client(r, w, queue), host, port ) async with server: # 계속해서 서버 실행 await server.serve_forever() # 개별 클라이언트 처리 async def handle_tcp_client(reader, writer, queue): # 클라이언트로부터 최대 1024 바이트의 데이터 읽기 data = await reader.read(1024) # 받은 데이터를 디코딩 후 공백 제거, 큐에 추가 await queue.put(data.decode().strip()) writer.close()
- 마지막으로 메인 함수에서 데이터를 수신받고, 백그라운드에서 터미널을 실행하는 코드를 작성한다.
async def main(): # 데이터 수신 큐 생성 data_queue = asyncio.Queue() # 백그라운드에서 터미널 입력 읽기 asyncio.create_task(tcp_server(data_queue)) # TCP 서버 실행 while True: # 큐에서 RAW 데이터 추출 raw_data = await data_queue.get() print(f"\n[RAW DATA RECEIVED] {raw_data}")
- 코드가 정상적으로 동작하는지 확인하기 위해 터미널에서 명령어를 수행한다.
# 데이터 변수 정의 $data = "01 03 28 0B AB 41 DA 69 F4 3F 8F 0D 36 40 7F 5B 51 42 E1 00 00 42 28 CE 5C 42" # TCP 클라이언트 생성 $client = New-Object System.Net.Sockets.TcpClient('localhost', 8888) # 네트워크 스트림 획득 $stream = $client.GetStream() # 스트림 라이터 생성 $writer = New-Object System.IO.StreamWriter($stream) # 데이터 전송 $writer.WriteLine($data) # 버퍼 플러시 $writer.Flush() # 연결 종료 $client.Close()
✅실행 결과
D:\실행경로\Server.py [RAW DATA RECEIVED] 01 03 28 0B AB 41 DA 69 F4 3F 8F 0D 36 40 7F 5B 51 42 E1 00 00 42 28 CE 5C 42
Share article