TCP编程

Python Socket 入门与实战

TCP 是面向连接、可靠的传输层协议,是构建大多数互联网应用(如 HTTP、SSH、邮件协议)的基础。Python 标准库的 socket 模块,为我们提供了一套简洁抽象的 API,来快速实现 TCP/UDP 网络编程。

本文将从基础概念讲起,逐步演示现代风格的 TCP 客户端/服务器、UDP 示例、以及进阶优化点,最后总结常见问题与最佳实践。


1. Socket 核心三要素

Socket 是网络连接的“端点”抽象,要建立有效通信,必须同时明确三个参数:

  1. 目标主机标识:IP 地址(IPv4/IPv6)或域名
  2. 端口号:1-65535 之间的整数,区分同一主机上的不同网络服务
  3. 传输协议:TCP(SOCK_STREAM,流套接字)或 UDP(SOCK_DGRAM,数据报套接字)

2. TCP 客户端编程

TCP 是“三次握手”建立连接、“四次挥手”断开的可靠协议,数据传输像读写文件一样是有序流。

2.1 最简 HTTP 客户端(不推荐老写法)

import socket

# 老写法:需手动 close(),容易资源泄漏
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('www.example.com', 80))
client.sendall(b'GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n')

# 循环读取直到连接关闭(Chunked)
resp = bytearray()
while True:
    chunk = client.recv(4096)  # 单次最大读取4KB
    if not chunk:
        break
    resp.extend(chunk)
client.close()

# 简单解析响应
header, _, body = resp.partition(b'\r\n\r\n')
print("响应头预览:\n", header.decode('utf-8')[:500])

2.2 现代 Python 推荐写法

上下文管理器 (with 语句) 自动接管 socket 资源释放,无需担心异常时未关闭:

import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client:
    client.connect(('www.example.com', 80))
    client.sendall(b'GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n')
    resp = client.recv(4096)  # 仅读取前4KB做演示
    print(resp.decode('utf-8'))

3. TCP 服务器编程

服务器的核心流程是:创建 socket → 绑定地址端口 → 监听 → 接受连接 → 处理请求 → 关闭

3.1 多线程 Echo 服务器(基础版)

import socket
import threading

def handle_client(conn, addr):
    """单线程处理单个客户端的循环"""
    print(f"✅ 新连接来自 {addr}")
    try:
        with conn:  # 处理完自动关闭该客户端的连接
            conn.sendall(b"Welcome to Echo Server!\n")
            while True:
                data = conn.recv(1024)
                # 收到空字节/exit 命令时退出
                if not data or data.strip().lower() == b"exit":
                    break
                conn.sendall(b"Echo: " + data)
    finally:
        print(f"❌ 连接 {addr} 已断开")

def start_server(host="127.0.0.1", port=65432):
    """启动主服务器"""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        # 核心优化1:允许端口立即重用(避免 TIME_WAIT 状态重启报错)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind((host, port))
        s.listen()  # 默认积压连接数为5
        print(f"🚀 服务器启动,监听于 {host}:{port}")
        
        try:
            while True:
                # 阻塞等待连接
                conn, addr = s.accept()
                # 为每个连接开新线程
                t = threading.Thread(target=handle_client, args=(conn, addr))
                t.start()
                print(f"🔍 当前活跃连接数: {threading.active_count() - 1}")
        except KeyboardInterrupt:
            print("\n🛑 收到停止信号,服务器关闭中...")

if __name__ == "__main__":
    start_server()

3.2 生产级简易优化版

基础版没有并发数限制超时机制,容易被恶意连接打挂,我们用 concurrent.futures.ThreadPoolExecutor 改造:

import socket
import concurrent.futures

def handle_client(conn, addr):
    print(f"✅ 新连接来自 {addr}")
    # 核心优化2:设置30秒超时,避免僵尸连接占用线程
    conn.settimeout(30.0)
    try:
        with conn:
            conn.sendall(b"Welcome to Optimized Server!\n")
            while True:
                try:
                    data = conn.recv(1024)
                    if not data or data.strip().lower() == b"exit":
                        break
                    conn.sendall(b"Processed: " + data.upper())
                except socket.timeout:
                    print(f"⏰ 连接 {addr} 超时")
                    break
    except Exception as e:
        print(f"⚠️ 处理 {addr} 时出错: {e}")
    finally:
        print(f"❌ 连接 {addr} 已断开")

def start_server(host="127.0.0.1", port=65432, max_workers=10):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind((host, port))
        s.listen()
        print(f"🚀 优化版服务器启动,监听于 {host}:{port},最大并发 {max_workers}")
        
        # 核心优化3:线程池控制并发
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            try:
                while True:
                    conn, addr = s.accept()
                    executor.submit(handle_client, conn, addr)
            except KeyboardInterrupt:
                print("\n🛑 收到停止信号,服务器关闭中...")

if __name__ == "__main__":
    start_server()

4. 快速补充:UDP Socket

UDP 是无连接、不可靠的协议,适合直播、游戏等对实时性要求高于完整性的场景,代码更短:

# UDP 客户端
import socket
def udp_client():
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
        msgs = [b"Hello", b"UDP", b"World"]
        for msg in msgs:
            s.sendto(msg, ("127.0.0.1", 65433))
            data, addr = s.recvfrom(1024)
            print(f"📨 收到 {addr} 的回复: {data.decode()}")

# UDP 服务器
def udp_server():
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
        s.bind(("127.0.0.1", 65433))
        print(f"🚀 UDP 服务器启动,监听于 127.0.0.1:65433")
        try:
            while True:
                data, addr = s.recvfrom(1024)
                print(f"📨 收到 {addr}: {data.decode()}")
                s.sendto(data.upper(), addr)
        except KeyboardInterrupt:
            print("\n🛑 服务器关闭")

if __name__ == "__main__":
    # 分别运行 udp_server() 和 udp_client() 测试
    udp_server()

5. 最佳实践与避坑

避坑速查

问题解决方案
重启服务器提示 Address already in use绑定前加 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
recv() 收到的消息被截断或合并TCP 是流协议,需设计应用层协议(如头部带消息长度)
线程/进程耗尽用线程池/进程池或异步 I/O(asyncio

通用建议

  1. 始终用 withtry/finally 释放资源
  2. 合理设置超时(settimeout()
  3. 生产环境优先用异步或成熟框架(如 asyncioTornadoFastAPI 的底层)
  4. 不要用 send() 代替 sendall()sendall() 会循环发送直到所有数据写完

6. 总结

本文用简洁的代码演示了 Python 标准库 socket 模块的核心用法,包括:

  • TCP 流通信的三次握手、四次挥手逻辑
  • 现代自动资源管理的写法
  • 生产级简易服务器的优化
  • 无连接 UDP 的快速示例
  • 常见避坑点

如果需要处理更复杂的场景,可以进一步学习 asyncio 异步 Socket、SSL/TLS 加密(ssl.create_default_context())等进阶内容。