TCP编程

Python Socket 入门与实战

TCP 是面向连接、可靠的传输层协议,支撑着 HTTP、SSH、邮件等绝大多数互联网应用。Python 标准库中的 socket 模块,提供了一套清晰、跨平台的网络编程接口。本文从基础概念出发,带你一步步写出现代风格的 TCP 客户端与服务器,同时了解 UDP 的简单用法、生产环境中的优化技巧,以及常见的问题与对策。


1. Socket 核心三要素

要想建立有效的网络通信,一个 socket 需要同时确定三个要素:

  1. 目标主机标识:IPv4/IPv6 地址或域名(如 www.example.com)。
  2. 端口号:1~65535 之间的整数,用于区分同一主机上的不同服务(如 HTTP 默认 80,HTTPS 443)。
  3. 传输协议
    • SOCK_STREAM:TCP,面向连接的流协议,保证数据有序、可靠到达。
    • SOCK_DGRAM:UDP,无连接的数据报协议,不保证可靠性,但对实时性要求较高的场景(如直播、游戏)更高效。

2. TCP 客户端编程

TCP 通过“三次握手”建立连接,通过“四次挥手”断开,数据传输就像读写文件一样,是一个有序的字节流。

2.1 最简 HTTP 客户端(传统写法,不推荐)

下面的例子演示了如何使用底层 socket 发送一个最简单的 HTTP 请求。这种写法需要手动调用 close(),容易因异常而忘记释放资源,现在不推荐在生产中使用。

import socket

# 传统写法:需手动 close(),存在资源泄漏风险
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('www.example.com', 80))
client.sendall(b'GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n')

# 循环读取,直到服务器关闭连接
resp = bytearray()
while True:
    chunk = client.recv(4096)  # 单次最多读取 4 KB
    if not chunk:
        break
    resp.extend(chunk)
client.close()

# 简单展示响应头
header, _, body = resp.partition(b'\r\n\r\n')
print("响应头预览:\n", header.decode('utf-8')[:500])

2.2 现代 Python 推荐写法

利用上下文管理器(with 语句),Python 会在代码块结束时自动关闭 socket,无论是否发生异常,资源都能得到安全释放。

import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client:
    client.connect(('www.example.com', 80))
    client.sendall(b'GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n')
    resp = client.recv(4096)  # 仅读取前 4 KB 作为演示
    print(resp.decode('utf-8'))

3. TCP 服务器编程

服务器的核心流程为:创建 socket → 绑定地址与端口 → 开始监听 → 接受连接 → 处理请求 → 关闭连接

3.1 多线程 Echo 服务器(基础版)

这个版本为每个客户端创建一个新线程,实现“回声”功能——把收到的数据原样返回,并在遇到空数据或 exit 命令时断开连接。

import socket
import threading

def handle_client(conn, addr):
    """处理单个客户端连接"""
    print(f"✅ 新连接来自 {addr}")
    try:
        with conn:  # with 语句确保连接自动关闭
            conn.sendall(b"Welcome to Echo Server!\n")
            while True:
                data = conn.recv(1024)
                # 收到空数据或客户端输入 exit 时退出循环
                if not data or data.strip().lower() == b"exit":
                    break
                conn.sendall(b"Echo: " + data)
    finally:
        print(f"❌ 连接 {addr} 已断开")

def start_server(host="127.0.0.1", port=65432):
    """启动主服务器"""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        # 允许端口立即重用,避免 TIME_WAIT 状态导致重启失败
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind((host, port))
        s.listen()  # 默认积压连接数为 5
        print(f"🚀 服务器启动,监听于 {host}:{port}")

        try:
            while True:
                conn, addr = s.accept()        # 阻塞等待新连接
                t = threading.Thread(
                    target=handle_client,
                    args=(conn, addr)
                )
                t.start()
                print(f"🔍 当前活跃连接数: {threading.active_count() - 1}")
        except KeyboardInterrupt:
            print("\n🛑 收到停止信号,服务器关闭中...")

if __name__ == "__main__":
    start_server()

3.2 生产级简易优化版

上面的基础版没有限制并发数,也没有设置超时机制,恶意连接可能会耗尽线程资源。下面用 concurrent.futures.ThreadPoolExecutor 来改进:

import socket
import concurrent.futures

def handle_client(conn, addr):
    print(f"✅ 新连接来自 {addr}")
    # 设置 30 秒超时,防止僵尸连接长期占用线程
    conn.settimeout(30.0)
    try:
        with conn:
            conn.sendall(b"Welcome to Optimized Server!\n")
            while True:
                try:
                    data = conn.recv(1024)
                    if not data or data.strip().lower() == b"exit":
                        break
                    conn.sendall(b"Processed: " + data.upper())
                except socket.timeout:
                    print(f"⏰ 连接 {addr} 超时")
                    break
    except Exception as e:
        print(f"⚠️ 处理 {addr} 时出错: {e}")
    finally:
        print(f"❌ 连接 {addr} 已断开")

def start_server(host="127.0.0.1", port=65432, max_workers=10):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind((host, port))
        s.listen()
        print(f"🚀 优化版服务器启动,监听于 {host}:{port},最大并发 {max_workers}")

        # 使用线程池控制并发数量
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            try:
                while True:
                    conn, addr = s.accept()
                    executor.submit(handle_client, conn, addr)
            except KeyboardInterrupt:
                print("\n🛑 收到停止信号,服务器关闭中...")

if __name__ == "__main__":
    start_server()

通过线程池,我们限制了同时处理的请求数量,避免资源耗尽;超时设置则防止某些连接长时间无响应却一直占用线程。


4. 快速补充:UDP Socket

UDP 是无连接、不可靠的协议,不需要预先建立连接,延迟更小,但可能丢失数据或乱序。它非常适合视频直播、在线游戏等场景。下面的代码演示了简单的 UDP 服务器和客户端。

import socket

def udp_client():
    """UDP 客户端:发送几条消息"""
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
        msgs = [b"Hello", b"UDP", b"World"]
        for msg in msgs:
            s.sendto(msg, ("127.0.0.1", 65433))
            data, addr = s.recvfrom(1024)
            print(f"📨 收到 {addr} 的回复: {data.decode()}")

def udp_server():
    """UDP 服务器:接收消息并转为大写后回复"""
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
        s.bind(("127.0.0.1", 65433))
        print("🚀 UDP 服务器启动,监听于 127.0.0.1:65433")
        try:
            while True:
                data, addr = s.recvfrom(1024)
                print(f"📨 收到 {addr}: {data.decode()}")
                s.sendto(data.upper(), addr)
        except KeyboardInterrupt:
            print("\n🛑 服务器关闭")

if __name__ == "__main__":
    # 分别运行 udp_server() 和 udp_client() 进行测试
    udp_server()

5. 最佳实践与避坑

避坑速查

常见问题解决方案
重启服务器时出现 Address already in usebind() 前添加 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
recv() 收到的消息被截断或粘在一起TCP 是流协议,需要设计应用层协议(如在消息头部附带长度)
线程或进程耗尽使用线程池 / 进程池,或切换到异步 I/O(asyncio

通用建议

  1. 资源管理:始终使用 withtry/finally 释放 socket 资源。
  2. 设置超时:通过 settimeout() 避免连接长时间阻塞。
  3. 生产环境:优先选用成熟的异步框架(如 asyncioTornadoFastAPI 的底层设施)。
  4. sendall() 替代 send()sendall() 会持续尝试发送直到所有数据写完,而 send() 可能只发送一部分就返回。

6. 总结

本文通过简洁、可运行的代码,展示了 Python socket 模块的核心用法:

  • TCP 的三次握手、四次挥手与流式通信的本质;
  • 从传统的手动 close() 写法过渡到现代的 with 上下文管理器;
  • 从简单的多线程 Echo 服务器到线程池 + 超时的生产级优化;
  • UDP 的无连接通信示例;
  • 开发中容易踩到的坑及对应的解决办法。

如果业务场景更为复杂,还可以进一步学习 asyncio 异步 Socket、SSL/TLS 加密(ssl.create_default_context())等进阶主题,构建更高性能、更安全的网络应用。