作为后端开发,端口号被占用是经常遇到的事情,但是为了查询该问题却并不容易,下面使用一个脚本解决该问题,首先需要安装依赖:pip install psutil
完整的脚本如下所示:
import subprocess
import re
import psutil
import argparse
from typing import Optional, Dict, List
def get_process_by_port(port: int) -> Optional[Dict]:
"""
通过端口号获取进程信息
"""
try:
# 使用netstat命令查找占用端口的进程
cmd = f"netstat -ano | findstr :{port}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
return None
output_lines = result.stdout.strip().split('\n')
processes = []
for line in output_lines:
if not line.strip():
continue
# 解析netstat输出
parts = re.split(r'\s+', line.strip())
if len(parts) >= 5:
protocol = parts[0]
local_address = parts[1]
pid = parts[-1] # 最后一个是PID
# 验证端口匹配
if f":{port}" in local_address:
try:
pid_int = int(pid)
process = psutil.Process(pid_int)
process_info = {
'pid': pid_int,
'name': process.name(),
'exe': process.exe(),
'status': process.status(),
'create_time': process.create_time(),
'cmdline': ' '.join(process.cmdline()),
'username': process.username(),
'protocol': protocol,
'local_address': local_address
}
processes.append(process_info)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
# 进程可能已结束或无权限访问
continue
return processes if processes else None
except Exception as e:
print(f"错误: {e}")
return None
def get_all_processes() -> List[Dict]:
"""
获取所有进程的基本信息
"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'exe']):
try:
process_info = proc.info
processes.append(process_info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return processes
def search_process_by_keyword(keyword: str) -> List[Dict]:
"""
通过关键字搜索进程
"""
matched = []
for proc in psutil.process_iter(['pid', 'name', 'exe', 'cmdline']):
try:
info = proc.info
# 在进程名、可执行文件路径和命令行中搜索
if (keyword.lower() in info['name'].lower() or
(info['exe'] and keyword.lower() in info['exe'].lower()) or
(info['cmdline'] and any(keyword.lower() in str(arg).lower()
for arg in info['cmdline']))):
matched.append(info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return matched
def display_process_info(processes: List[Dict], show_details: bool = False):
"""
显示进程信息
"""
if not processes:
print("未找到相关进程")
return
print(f"\n找到 {len(processes)} 个进程:")
print("=" * 80)
for i, proc in enumerate(processes, 1):
print(f"\n[{i}] PID: {proc.get('pid')}")
print(f" 进程名: {proc.get('name', 'N/A')}")
print(f" 可执行文件: {proc.get('exe', 'N/A')}")
if 'protocol' in proc:
print(f" 协议: {proc.get('protocol')}")
print(f" 本地地址: {proc.get('local_address')}")
if show_details:
print(f" 状态: {proc.get('status', 'N/A')}")
print(f" 用户名: {proc.get('username', 'N/A')}")
print(f" 命令行: {proc.get('cmdline', 'N/A')}")
if 'create_time' in proc:
from datetime import datetime
create_time = datetime.fromtimestamp(proc['create_time'])
print(f" 创建时间: {create_time.strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)
def main():
parser = argparse.ArgumentParser(description='Windows端口进程查询工具')
parser.add_argument('-p', '--port', type=int, help='指定端口号')
parser.add_argument('-s', '--search', help='搜索进程名或路径中的关键字')
parser.add_argument('-l', '--list', action='store_true', help='列出所有进程')
parser.add_argument('-d', '--details', action='store_true', help='显示详细信息')
parser.add_argument('-k', '--kill', type=int, nargs='+', help='终止指定PID的进程')
args = parser.parse_args()
if args.port:
# 查询指定端口
processes = get_process_by_port(args.port)
if processes:
display_process_info(processes, args.details)
else:
print(f"端口 {args.port} 未被任何进程占用")
elif args.search:
# 搜索进程
processes = search_process_by_keyword(args.search)
display_process_info(processes, args.details)
elif args.list:
# 列出所有进程
processes = get_all_processes()
print(f"系统中共有 {len(processes)} 个进程:")
display_process_info(processes[:50], args.details) # 默认显示前50个
if len(processes) > 50:
print(f"\n(只显示前50个进程,使用搜索功能查看特定进程)")
elif args.kill:
# 终止进程
for pid in args.kill:
try:
process = psutil.Process(pid)
process_name = process.name()
process.terminate() # 尝试正常终止
print(f"已发送终止信号给进程 {pid} ({process_name})")
# 等待进程结束
try:
process.wait(timeout=3)
print(f"进程 {pid} 已成功终止")
except psutil.TimeoutExpired:
process.kill() # 强制终止
print(f"进程 {pid} 被强制终止")
except psutil.NoSuchProcess:
print(f"进程 {pid} 不存在")
except psutil.AccessDenied:
print(f"无权限终止进程 {pid}")
except Exception as e:
print(f"终止进程 {pid} 时出错: {e}")
else:
# 交互模式
while True:
print("\n" + "=" * 50)
print("Windows 端口进程查询工具")
print("=" * 50)
print("1. 查询端口占用")
print("2. 搜索进程")
print("3. 列出所有进程")
print("4. 终止进程")
print("0. 退出")
try:
choice = input("\n请选择操作 (0-4): ").strip()
if choice == '1':
port = input("请输入端口号: ").strip()
if port.isdigit():
processes = get_process_by_port(int(port))
if processes:
display_process_info(processes, True)
else:
print(f"端口 {port} 未被占用")
else:
print("请输入有效的端口号")
elif choice == '2':
keyword = input("请输入搜索关键字: ").strip()
if keyword:
processes = search_process_by_keyword(keyword)
display_process_info(processes, True)
else:
print("请输入搜索关键字")
elif choice == '3':
processes = get_all_processes()
print(f"系统中共有 {len(processes)} 个进程")
display_process_info(processes[:20], True)
elif choice == '4':
pid_input = input("请输入要终止的PID (多个PID用空格分隔): ").strip()
if pid_input:
pids = []
for pid_str in pid_input.split():
if pid_str.isdigit():
pids.append(int(pid_str))
if pids:
for pid in pids:
try:
process = psutil.Process(pid)
process.terminate()
print(f"已发送终止信号给进程 {pid}")
except Exception as e:
print(f"终止进程 {pid} 时出错: {e}")
else:
print("未输入有效的PID")
elif choice == '0':
print("退出程序")
break
else:
print("无效选择,请重新输入")
except KeyboardInterrupt:
print("\n\n程序被用户中断")
break
except Exception as e:
print(f"发生错误: {e}")
if __name__ == "__main__":
# 检查是否安装了psutil
try:
import psutil
except ImportError:
print("错误: 需要安装psutil库")
print("请运行: pip install psutil")
exit(1)
main()
运行效果:
注意:本文归作者所有,未经作者允许,不得转载