[Python] 백그라운드에서 터미널 입력을 TCP 통신으로 연속 입력받는법
PowerShell에서 터미널을 통해 Python으로 RAW 데이터를 TCP 통신으로 전달하는 구현 방법을 설명한다.
Feb 04, 2025
![[Python] 백그라운드에서 터미널 입력을 TCP 통신으로 연속 입력받는법](https://image.inblog.dev?url=https%3A%2F%2Finblog.ai%2Fapi%2Fog%3Ftitle%3D%255BPython%255D%2520%25EB%25B0%25B1%25EA%25B7%25B8%25EB%259D%25BC%25EC%259A%25B4%25EB%2593%259C%25EC%2597%2590%25EC%2584%259C%2520%25ED%2584%25B0%25EB%25AF%25B8%25EB%2584%2590%2520%25EC%259E%2585%25EB%25A0%25A5%25EC%259D%2584%2520TCP%2520%25ED%2586%25B5%25EC%258B%25A0%25EC%259C%25BC%25EB%25A1%259C%2520%25EC%2597%25B0%25EC%2586%258D%2520%25EC%259E%2585%25EB%25A0%25A5%25EB%25B0%259B%25EB%258A%2594%25EB%25B2%2595%26logoUrl%3Dhttps%253A%252F%252Finblog.ai%252Finblog_logo.png%26blogTitle%3D%25F0%259F%2591%25A8%25F0%259F%258F%25BB%25E2%2580%258D%25F0%259F%2592%25BBDriedPollack%27s%2520Blog&w=2048&q=75)
📝개요
- 센서에서 엣지 디바이스로 RAW 데이터를 전달해야 하는데, 터미널을 통해서 Stream으로 파이썬 코드에 데이터를 전달하고 싶었다.
- 테스트한 버전은 Python 3.12에 환경은 PyCharm과 PowerShell로 수행했다.
🔍시도한 방법
- 이벤트 루프 처리 방식에서 PowerShell이 다루는 방식이 다른 터미널과 다르기 때문에 이를 처리하는 코드를 상단에 추가해야 한다.
# 이벤트 루프 정책 설정 (파일 상단에 추가)
import sys
import asyncio
if sys.platform == 'win32':
from asyncio import WindowsSelectorEventLoopPolicy
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
- 이후 Server.py 파일에서 TCP 서버를 시작하고, 터미널에서 실행되는 개별 클라이언트를 처리하는 코드를 작성한다.
# TCP 서버 시작 및 실행
async def tcp_server(queue: asyncio.Queue, host='localhost', port=8888):
# TCP 서버 생성, 지정 호스트와 포트에서 실행
server = await asyncio.start_server(
# 클라이언트 연결 처리
lambda r, w: handle_tcp_client(r, w, queue),
host, port
)
async with server:
# 계속해서 서버 실행
await server.serve_forever()
# 개별 클라이언트 처리
async def handle_tcp_client(reader, writer, queue):
# 클라이언트로부터 최대 1024 바이트의 데이터 읽기
data = await reader.read(1024)
# 받은 데이터를 디코딩 후 공백 제거, 큐에 추가
await queue.put(data.decode().strip())
writer.close()
- 마지막으로 메인 함수에서 데이터를 수신받고, 백그라운드에서 터미널을 실행하는 코드를 작성한다.
async def main():
# 데이터 수신 큐 생성
data_queue = asyncio.Queue()
# 백그라운드에서 터미널 입력 읽기
asyncio.create_task(tcp_server(data_queue)) # TCP 서버 실행
while True:
# 큐에서 RAW 데이터 추출
raw_data = await data_queue.get()
print(f"\n[RAW DATA RECEIVED] {raw_data}")
- 코드가 정상적으로 동작하는지 확인하기 위해 터미널에서 명령어를 수행한다.
# 데이터 변수 정의
$data = "01 03 28 0B AB 41 DA 69 F4 3F 8F 0D 36 40 7F 5B 51 42 E1 00 00 42 28 CE 5C 42"
# TCP 클라이언트 생성
$client = New-Object System.Net.Sockets.TcpClient('localhost', 8888)
# 네트워크 스트림 획득
$stream = $client.GetStream()
# 스트림 라이터 생성
$writer = New-Object System.IO.StreamWriter($stream)
# 데이터 전송
$writer.WriteLine($data)
# 버퍼 플러시
$writer.Flush()
# 연결 종료
$client.Close()
✅실행 결과
D:\실행경로\Server.py
[RAW DATA RECEIVED] 01 03 28 0B AB 41 DA 69 F4 3F 8F 0D 36 40 7F 5B 51 42 E1 00 00 42 28 CE 5C 42
Share article