Python email collection tutorial: using POP3 protocol

1. Overview of Email Collection Agreement

In the world of email, sending relies on SMTP, and there are two main options for receiving:

  • POP3 (Post Office Protocol version 3) - A lightweight protocol focusing on "download + local processing", simple and direct.
  • IMAP (Internet Message Access Protocol) - an advanced protocol that supports "cloud synchronization + multi-device management" and is suitable for multi-device collaboration.

This tutorial focuses on the simpler and easier-to-use POP3, using Python built-in modules throughout the process to quickly implement the complete process of grabbing emails from the mailbox.


2. POP3 minimalist workflow

Without getting bogged down in protocol details, remember these six steps to understand the code logic:

  1. The client connects to the server: port 110 (plain text, strongly not recommended) or 995 (SSL encryption, required for production environment)
  2. Authentication (login)
  3. Get a list of IDs and sizes of all messages
  4. Download specified emails on demand
  5. (Optional) Mark/delete downloaded messages on server
  6. Actively disconnect

3. Python full process implementation

No need to install third-party libraries, Python standard librarypoplib + emailThe whole family bucket is completely sufficient.

3.1 Module introduction

First import all the tools you want to use:

import poplib
from email.parser import BytesParser
from email.header import decode_header
from email.utils import parseaddr

3.2 Secure connection function

Plaintext POP3 is basically disabled in modern mailboxes. It would be more convenient to encapsulate a universal connection function that uses SSL by default:

def connect_to_pop3_server(email, password, pop3_server, use_ssl=True, port=None):
    """
    建立 POP3 安全连接
    :param email: 完整邮箱地址
    :param password: 主密码或应用专用密码(双重验证邮箱必须用后者)
    :param pop3_server: 服务商提供的 POP3 地址(例如 pop.qq.com)
    :param use_ssl: 是否启用 SSL/TLS 加密
    :param port: 自定义端口(留空则自动匹配加密/明文默认端口)
    :return: 已登录的 POP3 连接实例
    """
    # 自动匹配端口
    if not port:
        port = 995 if use_ssl else 110
    
    # 初始化连接
    try:
        server = poplib.POP3_SSL(pop3_server, port=port) if use_ssl else poplib.POP3(pop3_server, port=port)
    except Exception as e:
        raise ConnectionError(f"连接服务器失败: {str(e)}") from e
    
    # 调试开关(可选,开发时打开,生产环境关闭)
    # server.set_debuglevel(1)
    
    # 打印欢迎信息,验证连接成功
    print("✅ 服务器响应:", server.getwelcome().decode('utf-8'))
    
    # 身份验证
    server.user(email)
    server.pass_(password)
    print("✅ 登录成功")
    
    return server

3.3 Get email metadata list

First, pull the ID and size list of the emails. You can use this to determine the range to be downloaded and avoid downloading very large emails at once:

def get_email_metadata(server):
    """
    获取邮件元数据(ID + 大小)
    :param server: 已登录的 POP3 连接
    :return: [(邮件ID(字符串), 邮件大小(字节)), ...] 按时间从旧到新排序
    """
    _, raw_mails, _ = server.list()
    email_list = []
    for raw in raw_mails:
        mail_id, mail_size = raw.decode('utf-8').split()
        email_list.append((mail_id, int(mail_size)))
    return email_list

3.4 Download a single original email

After getting the ID, directly download the byte stream original content of the email for subsequent use.emailModule analysis:

def download_single_email(server, mail_id):
    """
    下载指定 ID 的邮件
    :param server: 已登录的 POP3 连接
    :param mail_id: 邮件元数据中的 ID
    :return: 邮件原始字节流
    """
    _, raw_lines, _ = server.retr(mail_id)
    return b'\r\n'.join(raw_lines)

3.5 Core parsing tools

POP3 only transmits raw bytes, and the parsing of the email header, body, and attachments depends entirely onemailmodule, two key issues need to be solved here:

  • Encoding confusion: Chinese email headers/texts often use multiple encodings such as GBK, GB2312, UTF-8, etc.
  • Multi-part emails: emails with attachments, plain text + HTML dual formats are nested structures

3.5.1 General string decoding function

def safe_decode_str(s):
    """
    安全解码邮件头中的编码字符串(支持多编码拼接)
    :param s: 待解码的邮件头字符串
    :return: 纯文本结果
    """
    if not s:
        return ""
    
    decoded_parts = []
    for part, charset in decode_header(s):
        if isinstance(part, bytes):
            charset = charset or "utf-8"
            try:
                part = part.decode(charset)
            except UnicodeDecodeError:
                # 尝试用国内常用编码兜底
                part = part.decode("gbk", errors="replace")
        decoded_parts.append(part)
    return "".join(decoded_parts)

3.5.2 Guess the character encoding of the email part

def guess_part_charset(msg_part):
    """
    猜测邮件子部分的字符编码
    :param msg_part: email 模块解析的邮件子对象
    :return: 编码名称
    """
    charset = msg_part.get_charset()
    if charset:
        return charset
    
    content_type = msg_part.get("Content-Type", "").lower()
    for token in content_type.split(";"):
        token = token.strip()
        if token.startswith("charset="):
            return token[8:].strip('"').strip("'")
    
    return "utf-8"  # 最终兜底编码

3.5.3 Recursively print email summary

When processing nested multi-part emails, only print the key content (the first 200 words of the text, the name of the attachment) to avoid swiping the screen:

def print_email_summary(msg, indent=0):
    """
    递归打印邮件摘要
    :param msg: email 模块解析的完整邮件对象
    :param indent: 缩进层级(内部递归用)
    """
    prefix = " " * indent
    
    # 最外层打印基本邮件头
    if indent == 0:
        print("\n" + "="*50)
        headers_to_show = ["From", "To", "Subject", "Date"]
        for header in headers_to_show:
            value = msg.get(header, "")
            if value:
                if header in ("Subject", "Date"):
                    value = safe_decode_str(value)
                else:
                    # 格式化发件人/收件人
                    name, addr = parseaddr(value)
                    name = safe_decode_str(name)
                    value = f"{name} <{addr}>" if name else addr
                print(f"{prefix}{header}: {value}")
        print("="*50 + "\n")
    
    # 处理多部分 / 单部分
    if msg.is_multipart():
        # 递归遍历子部分
        for i, part in enumerate(msg.get_payload()):
            print(f"{prefix}--- Part {i+1} ---")
            print_email_summary(part, indent + 2)
    else:
        content_type = msg.get_content_type()
        charset = guess_part_charset(msg)
        
        # 纯文本 / HTML 正文
        if content_type.startswith("text/"):
            content_bytes = msg.get_payload(decode=True)
            try:
                content = content_bytes.decode(charset)
            except UnicodeDecodeError:
                content = content_bytes.decode("gbk", errors="replace")
            # 只打印前 200 个字符
            print(f"{prefix}{content_type}, {charset}】")
            print(f"{prefix}{content[:200].strip()}{'...' if len(content) > 200 else ''}")
        # 附件
        else:
            filename = msg.get_filename()
            filename = safe_decode_str(filename) if filename else "未命名附件"
            print(f"{prefix}📎 附件: {filename} ({content_type})")

4. Complete runnable example

String the previous functions together and encapsulate them into a main entrance:

def main(email, password, pop3_server, max_fetch=3):
    """
    POP3 收取邮件主流程
    :param max_fetch: 最多收取最新的 N 封邮件
    """
    server = None
    try:
        # 1. 连接并登录
        server = connect_to_pop3_server(email, password, pop3_server)
        
        # 2. 获取邮件总数和元数据
        total_count, _ = server.stat()
        print(f"📬 邮箱总共有 {total_count} 封邮件")
        if total_count == 0:
            print("没有邮件可收取")
            return
        
        email_metadata = get_email_metadata(server)
        # 取最新的 max_fetch 封(列表是旧 → 新,直接切片末尾)
        target_mails = email_metadata[-max_fetch:]
        print(f"📥 准备收取最新的 {len(target_mails)} 封邮件\n")
        
        # 3. 逐个下载 + 解析
        for i, (mail_id, size) in enumerate(target_mails, 1):
            print(f"🔍 正在处理第 {i}/{len(target_mails)} 封 (ID: {mail_id}, 大小: {size/1024:.2f}KB)")
            raw_content = download_single_email(server, mail_id)
            msg = BytesParser().parsebytes(raw_content)
            print_email_summary(msg)
            
            # 可选:标记为已删除(断开连接后才生效,可通过 server.rset() 取消)
            # server.dele(mail_id)
            # print(f"🗑️ 已标记ID为{mail_id}的邮件删除")
            
    except Exception as e:
        print(f"❌ 发生错误: {str(e)}")
    finally:
        # 无论成功失败都要关闭连接
        if server:
            server.quit()
            print("\n🔌 已断开服务器连接")


if __name__ == "__main__":
    # --------------------------
    # 请替换成你自己的邮箱配置
    # --------------------------
    MY_EMAIL = "your_email@example.com"
    MY_PWD = "your_app_password_or_main_password"
    MY_POP3 = "pop.example.com"  # 比如 QQ 是 pop.qq.com,网易是 pop.163.com
    
    # 收取最新 2 封
    main(MY_EMAIL, MY_PWD, MY_POP3, max_fetch=2)

5. Pitfall avoidance guide (safe + practical)

5.1 Safety first

  1. Never hardcode passwords: Use environment variables (os.getenv("EMAIL_PWD")) or encrypted configuration files (such aspython-dotenv + .envfile and join.gitignore)。
  2. Two-factor verification email must use "application-specific password": QQ, 163, Gmail, etc. need to be generated separately, and the master password cannot be used.
  3. SSL must be enabled: Port 995 is used by default, and the plaintext port 110 is now almost entirely blocked.

5.2 Practical Notes

  1. Avoid frequent calls: Repeated connections within a short period of time will be restricted or even blocked by the email service provider. It is recommended to add appropriate intervals.
  2. Email ID is session-level: Each time you log in again, the email ID may change, so do not save it across sessions.
  3. Be careful when deleting emails:server.dele(mail_id)Just mark, must wait untilserver.quit()Only then will it be truly deleted. If you regret it, you can use it.server.rset()Unmark all flags.
  4. Don’t panic about encoding issues: GBK is commonly used in domestic mailboxes, and UTF-8 is mostly used in foreign countries. Our code has been added with two layers of cover, and it can basically display normally.

6. Simple advanced direction

If you want to further expand the functionality, you can try:

  1. Attachment Download: Traverse multiple parts of the email and findContent-Disposition: attachmentpart and save it locally.
  2. Mail filtering: First pull all email headers (useserver.top(mail_id, 0)), filter based on subject, sender, date and other conditions before downloading the text.
  3. Timing capture: combinationscheduleThe library creates scheduled tasks to automatically receive emails.
  4. Data persistence: Store the parsed emails in SQLite, MySQL or Elasticsearch for further analysis.

7. Summary

This tutorial uses Python built-in modules to implement the core functions of receiving emails via POP3: from establishing a secure connection, pulling metadata, downloading and parsing, to avoiding pitfalls, everything is covered. The entire process has a small amount of code and clear logic, making it very suitable as an introductory practice for email automation.