Python脚本:查询占用端口号进程

Published on 2026-03-31 17:12 in 分类: 随笔 with 狂盗一枝梅
分类: 随笔

作为后端开发,端口号被占用是经常遇到的事情,但是为了查询该问题却并不容易,下面使用一个脚本解决该问题,首先需要安装依赖:pip install psutil

完整的脚本如下所示:

import subprocess
import re
import psutil
import argparse
from typing import Optional, Dict, List


def get_process_by_port(port: int) -> Optional[Dict]:
    """
    通过端口号获取进程信息
    """
    try:
        # 使用netstat命令查找占用端口的进程
        cmd = f"netstat -ano | findstr :{port}"
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

        if result.returncode != 0:
            return None

        output_lines = result.stdout.strip().split('\n')
        processes = []

        for line in output_lines:
            if not line.strip():
                continue

            # 解析netstat输出
            parts = re.split(r'\s+', line.strip())

            if len(parts) >= 5:
                protocol = parts[0]
                local_address = parts[1]
                pid = parts[-1]  # 最后一个是PID

                # 验证端口匹配
                if f":{port}" in local_address:
                    try:
                        pid_int = int(pid)
                        process = psutil.Process(pid_int)

                        process_info = {
                            'pid': pid_int,
                            'name': process.name(),
                            'exe': process.exe(),
                            'status': process.status(),
                            'create_time': process.create_time(),
                            'cmdline': ' '.join(process.cmdline()),
                            'username': process.username(),
                            'protocol': protocol,
                            'local_address': local_address
                        }
                        processes.append(process_info)

                    except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
                        # 进程可能已结束或无权限访问
                        continue

        return processes if processes else None

    except Exception as e:
        print(f"错误: {e}")
        return None


def get_all_processes() -> List[Dict]:
    """
    获取所有进程的基本信息
    """
    processes = []
    for proc in psutil.process_iter(['pid', 'name', 'exe']):
        try:
            process_info = proc.info
            processes.append(process_info)
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            continue
    return processes


def search_process_by_keyword(keyword: str) -> List[Dict]:
    """
    通过关键字搜索进程
    """
    matched = []
    for proc in psutil.process_iter(['pid', 'name', 'exe', 'cmdline']):
        try:
            info = proc.info
            # 在进程名、可执行文件路径和命令行中搜索
            if (keyword.lower() in info['name'].lower() or
                    (info['exe'] and keyword.lower() in info['exe'].lower()) or
                    (info['cmdline'] and any(keyword.lower() in str(arg).lower()
                                             for arg in info['cmdline']))):
                matched.append(info)
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            continue
    return matched


def display_process_info(processes: List[Dict], show_details: bool = False):
    """
    显示进程信息
    """
    if not processes:
        print("未找到相关进程")
        return

    print(f"\n找到 {len(processes)} 个进程:")
    print("=" * 80)

    for i, proc in enumerate(processes, 1):
        print(f"\n[{i}] PID: {proc.get('pid')}")
        print(f"    进程名: {proc.get('name', 'N/A')}")
        print(f"    可执行文件: {proc.get('exe', 'N/A')}")

        if 'protocol' in proc:
            print(f"    协议: {proc.get('protocol')}")
            print(f"    本地地址: {proc.get('local_address')}")

        if show_details:
            print(f"    状态: {proc.get('status', 'N/A')}")
            print(f"    用户名: {proc.get('username', 'N/A')}")
            print(f"    命令行: {proc.get('cmdline', 'N/A')}")
            if 'create_time' in proc:
                from datetime import datetime
                create_time = datetime.fromtimestamp(proc['create_time'])
                print(f"    创建时间: {create_time.strftime('%Y-%m-%d %H:%M:%S')}")

    print("=" * 80)


def main():
    parser = argparse.ArgumentParser(description='Windows端口进程查询工具')
    parser.add_argument('-p', '--port', type=int, help='指定端口号')
    parser.add_argument('-s', '--search', help='搜索进程名或路径中的关键字')
    parser.add_argument('-l', '--list', action='store_true', help='列出所有进程')
    parser.add_argument('-d', '--details', action='store_true', help='显示详细信息')
    parser.add_argument('-k', '--kill', type=int, nargs='+', help='终止指定PID的进程')

    args = parser.parse_args()

    if args.port:
        # 查询指定端口
        processes = get_process_by_port(args.port)
        if processes:
            display_process_info(processes, args.details)
        else:
            print(f"端口 {args.port} 未被任何进程占用")

    elif args.search:
        # 搜索进程
        processes = search_process_by_keyword(args.search)
        display_process_info(processes, args.details)

    elif args.list:
        # 列出所有进程
        processes = get_all_processes()
        print(f"系统中共有 {len(processes)} 个进程:")
        display_process_info(processes[:50], args.details)  # 默认显示前50个
        if len(processes) > 50:
            print(f"\n(只显示前50个进程,使用搜索功能查看特定进程)")

    elif args.kill:
        # 终止进程
        for pid in args.kill:
            try:
                process = psutil.Process(pid)
                process_name = process.name()
                process.terminate()  # 尝试正常终止
                print(f"已发送终止信号给进程 {pid} ({process_name})")

                # 等待进程结束
                try:
                    process.wait(timeout=3)
                    print(f"进程 {pid} 已成功终止")
                except psutil.TimeoutExpired:
                    process.kill()  # 强制终止
                    print(f"进程 {pid} 被强制终止")

            except psutil.NoSuchProcess:
                print(f"进程 {pid} 不存在")
            except psutil.AccessDenied:
                print(f"无权限终止进程 {pid}")
            except Exception as e:
                print(f"终止进程 {pid} 时出错: {e}")

    else:
        # 交互模式
        while True:
            print("\n" + "=" * 50)
            print("Windows 端口进程查询工具")
            print("=" * 50)
            print("1. 查询端口占用")
            print("2. 搜索进程")
            print("3. 列出所有进程")
            print("4. 终止进程")
            print("0. 退出")

            try:
                choice = input("\n请选择操作 (0-4): ").strip()

                if choice == '1':
                    port = input("请输入端口号: ").strip()
                    if port.isdigit():
                        processes = get_process_by_port(int(port))
                        if processes:
                            display_process_info(processes, True)
                        else:
                            print(f"端口 {port} 未被占用")
                    else:
                        print("请输入有效的端口号")

                elif choice == '2':
                    keyword = input("请输入搜索关键字: ").strip()
                    if keyword:
                        processes = search_process_by_keyword(keyword)
                        display_process_info(processes, True)
                    else:
                        print("请输入搜索关键字")

                elif choice == '3':
                    processes = get_all_processes()
                    print(f"系统中共有 {len(processes)} 个进程")
                    display_process_info(processes[:20], True)

                elif choice == '4':
                    pid_input = input("请输入要终止的PID (多个PID用空格分隔): ").strip()
                    if pid_input:
                        pids = []
                        for pid_str in pid_input.split():
                            if pid_str.isdigit():
                                pids.append(int(pid_str))

                        if pids:
                            for pid in pids:
                                try:
                                    process = psutil.Process(pid)
                                    process.terminate()
                                    print(f"已发送终止信号给进程 {pid}")
                                except Exception as e:
                                    print(f"终止进程 {pid} 时出错: {e}")
                        else:
                            print("未输入有效的PID")

                elif choice == '0':
                    print("退出程序")
                    break
                else:
                    print("无效选择,请重新输入")

            except KeyboardInterrupt:
                print("\n\n程序被用户中断")
                break
            except Exception as e:
                print(f"发生错误: {e}")


if __name__ == "__main__":
    # 检查是否安装了psutil
    try:
        import psutil
    except ImportError:
        print("错误: 需要安装psutil库")
        print("请运行: pip install psutil")
        exit(1)

    main()

运行效果:

image-20260331171104102
#python
复制 复制成功