import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import subprocess
import os
import json
import shutil
from pathlib import Path
import time
import psutil
class PHPPanel:
def edit_nginx_config(self):
"""编辑Nginx配置"""
selected = self.sites_tree.selection()
if not selected:
messagebox.showwarning("警告", "请先选择要配置的网站")
return
item = selected[0]
values = self.sites_tree.item(item)['values']
domain = values[0]
port = values[1]
config_file = os.path.join(self.config['nginx_conf_dir'], f"{domain}_{port}.conf")
if not os.path.exists(config_file):
messagebox.showerror("错误", "找不到该网站的Nginx配置文件")
return
NginxConfigEditor(self.root, domain, port, config_file)
def __init__(self, root):
self.root = root
self.root.title("PHP Panel for Mac M4")
# 配置信息
self.config = {
'php_path': '/opt/homebrew/opt/php',
'mysql_path': '/opt/homebrew/opt/mysql',
'nginx_path': '/opt/homebrew/opt/nginx',
'websites_path': '/opt/homebrew/var/www',
'nginx_conf_dir': '/opt/homebrew/etc/nginx/servers',
'data_file': os.path.expanduser('~/php_panel_data.json') # 数据文件路径
}
# 添加默认索引文件配置
self.default_index_files = ['index.php', 'index.html', 'index.htm']
self.service_status = {}
self.status_labels = {}
# 存储网站和数据库信息的字典
self.websites_data = []
self.databases_data = []
# 加载保存的数据
self.load_data()
self.create_gui()
self.update_service_status()
# 在窗口关闭时保存数据
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
def toggle_service(self, service):
"""启动或重启服务"""
if not self.service_status.get(service, False):
# 服务未运行,启动服务
self.start_service(service)
else:
# 服务正在运行,重启服务
self.restart_service(service)
def start_service(self, service):
"""启动指定服务"""
try:
if service == 'PHP':
subprocess.run(['brew', 'services', 'start', 'php'], check=True)
elif service == 'MySQL':
subprocess.run(['brew', 'services', 'start', 'mysql'], check=True)
elif service == 'Nginx':
subprocess.run(['brew', 'services', 'start', 'nginx'], check=True)
# 更新状态
self.service_status[service] = True
self.status_labels[service].config(text="运行中", foreground='green')
messagebox.showinfo("成功", f"{service} 服务已启动")
except subprocess.CalledProcessError as e:
messagebox.showerror("错误", f"启动 {service} 失败: {str(e)}")
def stop_service(self, service):
"""停止指定服务"""
try:
if service == 'PHP':
subprocess.run(['brew', 'services', 'stop', 'php'], check=True)
elif service == 'MySQL':
subprocess.run(['brew', 'services', 'stop', 'mysql'], check=True)
elif service == 'Nginx':
subprocess.run(['brew', 'services', 'stop', 'nginx'], check=True)
# 更新状态
self.service_status[service] = False
self.status_labels[service].config(text="已停止", foreground='red')
messagebox.showinfo("成功", f"{service} 服务已停止")
except subprocess.CalledProcessError as e:
messagebox.showerror("错误", f"停止 {service} 失败: {str(e)}")
def restart_service(self, service):
"""重启指定服务"""
try:
if service == 'PHP':
subprocess.run(['brew', 'services', 'restart', 'php'], check=True)
elif service == 'MySQL':
subprocess.run(['brew', 'services', 'restart', 'mysql'], check=True)
elif service == 'Nginx':
subprocess.run(['brew', 'services', 'restart', 'nginx'], check=True)
messagebox.showinfo("成功", f"{service} 服务已重启")
except subprocess.CalledProcessError as e:
messagebox.showerror("错误", f"重启 {service} 失败: {str(e)}")
def get_service_version(self, service):
"""获取服务版本"""
try:
if service == 'PHP':
result = subprocess.run(['php', '-v'], capture_output=True, text=True)
if result.stdout:
return result.stdout.split()[1]
elif service == 'MySQL':
result = subprocess.run(['mysql', '-V'], capture_output=True, text=True)
if result.stdout:
parts = result.stdout.split()
for part in parts:
if part.startswith('Ver'):
return part.strip(',')
elif service == 'Nginx':
result = subprocess.run(['nginx', '-v'], capture_output=True, text=True)
if result.stderr: # Nginx outputs version to stderr
return result.stderr.split('/')[1].strip()
return "未知"
except (subprocess.CalledProcessError, IndexError, FileNotFoundError):
return "未知"
def create_nginx_config(self, domain, root_path, port, default_index=None):
"""创建Nginx配置文件,支持自定义默认索引文件和上传限制"""
if default_index is None:
index_list = " ".join(self.default_index_files)
else:
index_files = [default_index] + [f for f in self.default_index_files if f != default_index]
index_list = " ".join(index_files)
config_content = f"""server {{
listen {port};
server_name {domain};
root {root_path};
# 文件上传大小限制
client_max_body_size 50M;
location / {{
index {index_list};
try_files $uri $uri/ /index.php?$args;
}}
location ~ \\.php$ {{
fastcgi_pass 127.0.0.1:9000;
fastcgi_index index.php;
fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
include fastcgi_params;
# PHP超时设置
fastcgi_read_timeout 300;
fastcgi_send_timeout 300;
}}
access_log /opt/homebrew/var/log/nginx/{domain}_access.log;
error_log /opt/homebrew/var/log/nginx/{domain}_error.log;
}}"""
try:
# 确保配置目录存在
os.makedirs(self.config['nginx_conf_dir'], exist_ok=True)
# 写入配置文件
config_file = os.path.join(self.config['nginx_conf_dir'], f"{domain}_{port}.conf")
with open(config_file, 'w') as f:
f.write(config_content)
# 修改主配置文件以包含全局设置
main_config_file = "/opt/homebrew/etc/nginx/nginx.conf"
if os.path.exists(main_config_file):
with open(main_config_file, 'r') as f:
content = f.read()
# 检查是否已经存在 client_max_body_size
if 'client_max_body_size' not in content:
# 在 http { 块中添加配置
content = content.replace('http {', '''http {
client_max_body_size 50M;''')
with open(main_config_file, 'w') as f:
f.write(content)
# 重新加载Nginx配置
subprocess.run(['brew', 'services', 'reload', 'nginx'], check=True)
messagebox.showinfo("成功", f"网站 {domain} 配置已创建")
except Exception as e:
messagebox.showerror("错误", f"创建Nginx配置失败: {str(e)}")
def load_data(self):
"""从JSON文件加载数据"""
try:
if os.path.exists(self.config['data_file']):
with open(self.config['data_file'], 'r', encoding='utf-8') as f:
data = json.load(f)
self.websites_data = data.get('websites', [])
self.databases_data = data.get('databases', [])
except Exception as e:
messagebox.showerror("错误", f"加载数据失败: {str(e)}")
self.websites_data = []
self.databases_data = []
def save_data(self):
"""保存数据到JSON文件"""
try:
data = {
'websites': self.websites_data,
'databases': self.databases_data
}
with open(self.config['data_file'], 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
messagebox.showerror("错误", f"保存数据失败: {str(e)}")
def on_closing(self):
"""窗口关闭时的处理函数"""
self.save_data()
self.root.destroy()
def refresh_website_list(self):
"""刷新网站列表显示"""
# 清空现有项目
for item in self.sites_tree.get_children():
self.sites_tree.delete(item)
# 添加保存的网站数据
for site in self.websites_data:
self.sites_tree.insert('', 'end', values=(
site['domain'],
site['port'],
site['path'],
site.get('default_index', 'index.php'), # 如果没有设置,默认使用 index.php
'运行中', # 状态可以根据实际检测更新
'访问'
))
def refresh_database_list(self):
"""刷新数据库列表显示"""
# 清空现有项目
for item in self.db_tree.get_children():
self.db_tree.delete(item)
# 添加保存的数据库数据
for db in self.databases_data:
self.db_tree.insert('', 'end', values=(
db['name'],
db['size'],
db['tables'],
db['charset'],
db.get('status', '正常'), # 默认状态为"正常"
db.get('password', '') # 显示明文密码
))
def create_websites_panel(self, parent):
frame = ttk.Frame(parent)
frame.pack(fill='both', expand=True, padx=5, pady=5)
# 工具栏
toolbar = ttk.Frame(frame)
toolbar.pack(fill='x', pady=5)
ttk.Button(
toolbar,
text="添加网站",
command=self.add_website
).pack(side='left', padx=5)
ttk.Button(
toolbar,
text="删除网站",
command=self.delete_website
).pack(side='left', padx=5)
ttk.Button(
toolbar,
text="查看日志",
command=self.view_logs
).pack(side='left', padx=5)
# 添加新的编辑按钮
ttk.Button(
toolbar,
text="编辑网站",
command=self.edit_website
).pack(side='left', padx=5)
ttk.Button(
toolbar,
text="编辑Nginx配置",
command=self.edit_nginx_config
).pack(side='left', padx=5)
# 网站列表
self.sites_tree = ttk.Treeview(
frame,
columns=('域名', '端口', '路径', '默认索引', '状态', '操作'),
show='headings'
)
self.sites_tree.heading('域名', text='域名')
self.sites_tree.heading('端口', text='端口')
self.sites_tree.heading('路径', text='路径')
self.sites_tree.heading('默认索引', text='默认索引')
self.sites_tree.heading('状态', text='状态')
self.sites_tree.heading('操作', text='操作')
self.sites_tree.column('域名', width=120)
self.sites_tree.column('端口', width=80)
self.sites_tree.column('路径', width=200)
self.sites_tree.column('默认索引', width=100)
self.sites_tree.column('状态', width=80)
self.sites_tree.column('操作', width=100)
self.sites_tree.bind('<Double-1>', self.on_tree_double_click)
self.sites_tree.pack(fill='both', expand=True)
# 加载保存的网站数据
self.refresh_website_list()
def create_database_panel(self, parent):
frame = ttk.Frame(parent)
frame.pack(fill='both', expand=True, padx=5, pady=5)
# 工具栏
toolbar = ttk.Frame(frame)
toolbar.pack(fill='x', pady=5)
ttk.Button(
toolbar,
text="创建数据库",
command=self.create_database
).pack(side='left', padx=5)
ttk.Button(
toolbar,
text="导入数据库",
command=self.import_database
).pack(side='left', padx=5)
ttk.Button(
toolbar,
text="删除数据库", # 新增删除按钮
command=self.delete_database
).pack(side='left', padx=5)
ttk.Button(
toolbar,
text="修改密码",
command=self.change_db_password
).pack(side='left', padx=5)
ttk.Button(
toolbar,
text="修改状态",
command=self.toggle_db_status
).pack(side='left', padx=5)
# 数据库列表
self.db_tree = ttk.Treeview(
frame,
columns=('数据库名', '大小', '表数量', '字符集', '状态', '密码'),
show='headings'
)
self.db_tree.heading('数据库名', text='数据库名')
self.db_tree.heading('大小', text='大小')
self.db_tree.heading('表数量', text='表数量')
self.db_tree.heading('字符集', text='字符集')
self.db_tree.heading('状态', text='状态')
self.db_tree.heading('密码', text='密码')
self.db_tree.column('数据库名', width=120)
self.db_tree.column('大小', width=80)
self.db_tree.column('表数量', width=80)
self.db_tree.column('字符集', width=80)
self.db_tree.column('状态', width=80)
self.db_tree.column('密码', width=120)
self.db_tree.pack(fill='both', expand=True)
# 右键菜单
self.db_menu = tk.Menu(self.root, tearoff=0)
self.db_menu.add_command(label="修改密码", command=self.change_db_password)
self.db_menu.add_command(label="修改状态", command=self.toggle_db_status)
self.db_menu.add_separator() # 添加分隔线
self.db_menu.add_command(label="删除数据库", command=self.delete_database)
# 绑定右键菜单
self.db_tree.bind('<Button-3>', self.show_db_menu)
# 加载保存的数据库数据
self.refresh_database_list()
def delete_database(self):
"""删除数据库"""
selected = self.db_tree.selection()
if not selected:
messagebox.showwarning("警告", "请先选择要删除的数据库")
return
item = selected[0]
values = self.db_tree.item(item)['values']
db_name = values[0]
# 确认删除
if not messagebox.askyesno("确认删除",
f"确定要删除数据库 {db_name} 吗?\n此操作将永久删除该数据库及其所有数据,且无法恢复!"):
return
try:
# 删除数据库和用户
commands = [
# 删除数据库
f"DROP DATABASE IF EXISTS {db_name};",
# 删除用户
f"DROP USER IF EXISTS '{db_name}'@'localhost';",
# 刷新权限
"FLUSH PRIVILEGES;"
]
for cmd in commands:
subprocess.run([
'mysql',
'-u', 'root',
'-e', cmd
], check=True)
# 从数据列表中移除
self.databases_data = [db for db in self.databases_data if db['name'] != db_name]
self.save_data()
# 从树形视图中移除
self.db_tree.delete(item)
messagebox.showinfo("成功", f"数据库 {db_name} 已成功删除")
except subprocess.CalledProcessError as e:
messagebox.showerror("错误", f"删除数据库失败: {str(e)}")
def show_db_menu(self, event):
"""显示数据库右键菜单"""
selected = self.db_tree.selection()
if selected:
self.db_menu.post(event.x_root, event.y_root)
def change_db_password(self):
"""修改数据库密码"""
selected = self.db_tree.selection()
if not selected:
messagebox.showwarning("警告", "请先选择要修改的数据库")
return
item = selected[0]
db_name = self.db_tree.item(item)['values'][0]
dialog = tk.Toplevel(self.root)
dialog.title(f"修改数据库 {db_name} 密码")
dialog.geometry("300x200")
ttk.Label(dialog, text="新密码:").pack(pady=5)
pwd_entry = ttk.Entry(dialog, show="*")
pwd_entry.pack(fill='x', padx=5)
ttk.Label(dialog, text="确认密码:").pack(pady=5)
pwd_confirm_entry = ttk.Entry(dialog, show="*")
pwd_confirm_entry.pack(fill='x', padx=5)
def save_password():
new_pwd = pwd_entry.get()
confirm_pwd = pwd_confirm_entry.get()
if not all([new_pwd, confirm_pwd]):
messagebox.showerror("错误", "请填写所有字段")
return
if new_pwd != confirm_pwd:
messagebox.showerror("错误", "两次输入的密码不一致")
return
try:
# 修改MySQL用户密码
subprocess.run([
'mysql',
'-u', 'root',
'-e', f"ALTER USER '{db_name}'@'localhost' IDENTIFIED BY '{new_pwd}';"
], check=True)
subprocess.run([
'mysql',
'-u', 'root',
'-e', "FLUSH PRIVILEGES;"
], check=True)
# 更新数据库信息
for db in self.databases_data:
if db['name'] == db_name:
db['password'] = new_pwd # 保存明文密码
break
self.save_data()
# 刷新显示
self.refresh_database_list()
messagebox.showinfo("成功", "密码修改成功")
dialog.destroy()
except subprocess.CalledProcessError as e:
messagebox.showerror("错误", f"修改密码失败: {str(e)}")
ttk.Button(
dialog,
text="保存",
command=save_password
).pack(pady=20)
def toggle_db_status(self):
"""切换数据库状态"""
selected = self.db_tree.selection()
if not selected:
messagebox.showwarning("警告", "请先选择要修改的数据库")
return
item = selected[0]
values = self.db_tree.item(item)['values']
db_name = values[0]
current_status = values[4] if len(values) > 4 else "正常"
new_status = "停用" if current_status == "正常" else "正常"
try:
if new_status == "停用":
# 撤销所有权限
subprocess.run([
'mysql',
'-u', 'root',
'-e', f"REVOKE ALL PRIVILEGES ON {db_name}.* FROM '{db_name}'@'localhost';"
], check=True)
else:
# 恢复所有权限
subprocess.run([
'mysql',
'-u', 'root',
'-e', f"GRANT ALL PRIVILEGES ON {db_name}.* TO '{db_name}'@'localhost';"
], check=True)
subprocess.run([
'mysql',
'-u', 'root',
'-e', "FLUSH PRIVILEGES;"
], check=True)
# 更新数据库信息
for db in self.databases_data:
if db['name'] == db_name:
db['status'] = new_status
break
self.save_data()
# 刷新显示
self.refresh_database_list()
messagebox.showinfo("成功", f"数据库状态已更改为: {new_status}")
except subprocess.CalledProcessError as e:
messagebox.showerror("错误", f"修改状态失败: {str(e)}")
def refresh_database_list(self):
"""刷新数据库列表显示"""
# 清空现有项目
for item in self.db_tree.get_children():
self.db_tree.delete(item)
# 添加保存的数据库数据
for db in self.databases_data:
self.db_tree.insert('', 'end', values=(
db['name'],
db['size'],
db['tables'],
db['charset'],
db.get('status', '正常'), # 默认状态为"正常"
db.get('password', '*' * 8) # 默认显示加密的密码
))
def add_website(self):
"""添加新网站"""
dialog = tk.Toplevel(self.root)
dialog.title("添加网站")
dialog.geometry("400x450")
# 域名输入
ttk.Label(dialog, text="域名:").pack(pady=5)
domain_entry = ttk.Entry(dialog)
domain_entry.pack(fill='x', padx=5)
domain_entry.insert(0, "localhost")
# 端口输入
ttk.Label(dialog, text="端口:").pack(pady=5)
port_entry = ttk.Entry(dialog)
port_entry.pack(fill='x', padx=5)
port_entry.insert(0, "8080")
# 网站根目录输入
ttk.Label(dialog, text="网站根目录:").pack(pady=5)
path_frame = ttk.Frame(dialog)
path_frame.pack(fill='x', padx=5)
path_entry = ttk.Entry(path_frame)
path_entry.pack(side='left', fill='x', expand=True)
path_entry.insert(0, self.config['websites_path'])
def browse_path():
path = filedialog.askdirectory(initialdir=self.config['websites_path'])
if path:
path_entry.delete(0, tk.END)
path_entry.insert(0, path)
ttk.Button(path_frame, text="浏览", command=browse_path).pack(side='right', padx=5)
# 默认索引文件选择
ttk.Label(dialog, text="默认索引文件:").pack(pady=5)
index_var = tk.StringVar(value='index.php')
index_combo = ttk.Combobox(dialog, textvariable=index_var)
index_combo['values'] = self.default_index_files
index_combo.set('index.php') # 设置默认值
index_combo.pack(fill='x', padx=5)
def save_website():
domain = domain_entry.get()
port = port_entry.get()
path = path_entry.get()
default_index = index_var.get()
if not all([domain, port, path, default_index]):
messagebox.showerror("错误", "所有字段都必须填写")
return
try:
port = int(port)
if port < 1 or port > 65535:
raise ValueError()
except ValueError:
messagebox.showerror("错误", "端口必须是1-65535之间的数字")
return
try:
# 创建网站配置
self.create_nginx_config(domain, path, port, default_index)
# 保存网站数据
self.websites_data.append({
'domain': domain,
'port': port,
'path': path,
'default_index': default_index
})
self.save_data()
# 刷新显示
self.refresh_website_list()
messagebox.showinfo("成功", f"网站 {domain} 配置已创建")
dialog.destroy()
except Exception as e:
messagebox.showerror("错误", f"创建网站配置失败: {str(e)}")
ttk.Button(
dialog,
text="保存",
command=save_website
).pack(pady=20)
def delete_website(self):
"""删除选中的网站配置"""
selected = self.sites_tree.selection()
if not selected:
messagebox.showwarning("警告", "请先选择要删除的网站")
return
if not messagebox.askyesno("确认", "确定要删除选中的网站配置吗?"):
return
for item in selected:
values = self.sites_tree.item(item)['values']
domain = values[0]
port = values[1]
try:
# 只删除网站配置文件
config_file = os.path.join(self.config['nginx_conf_dir'], f"{domain}_{port}.conf")
if os.path.exists(config_file):
os.remove(config_file)
# 从数据中删除
self.websites_data = [site for site in self.websites_data
if not (site['domain'] == domain and str(site['port']) == str(port))]
self.save_data()
# 从树形视图中删除
self.sites_tree.delete(item)
# 重新加载nginx
subprocess.run(['brew', 'services', 'reload', 'nginx'], check=True)
messagebox.showinfo("成功", f"网站 {domain} 的配置已删除")
except Exception as e:
messagebox.showerror("错误", f"删除网站配置失败: {str(e)}")
def edit_website(self):
"""编辑选中的网站"""
selected = self.sites_tree.selection()
if not selected:
messagebox.showwarning("警告", "请先选择要编辑的网站")
return
item = selected[0]
values = self.sites_tree.item(item)['values']
old_domain = values[0]
old_port = values[1]
old_path = values[2]
old_index = values[3] # 从树形视图获取当前的默认索引文件
dialog = tk.Toplevel(self.root)
dialog.title("编辑网站")
dialog.geometry("400x450")
# 域名输入
ttk.Label(dialog, text="域名:").pack(pady=5)
domain_entry = ttk.Entry(dialog)
domain_entry.pack(fill='x', padx=5)
domain_entry.insert(0, old_domain)
# 端口输入
ttk.Label(dialog, text="端口:").pack(pady=5)
port_entry = ttk.Entry(dialog)
port_entry.pack(fill='x', padx=5)
port_entry.insert(0, old_port)
# 网站根目录输入
ttk.Label(dialog, text="网站根目录:").pack(pady=5)
path_frame = ttk.Frame(dialog)
path_frame.pack(fill='x', padx=5)
path_entry = ttk.Entry(path_frame)
path_entry.pack(side='left', fill='x', expand=True)
path_entry.insert(0, old_path)
def browse_path():
path = filedialog.askdirectory(initialdir=old_path)
if path:
path_entry.delete(0, tk.END)
path_entry.insert(0, path)
ttk.Button(path_frame, text="浏览", command=browse_path).pack(side='right', padx=5)
# 默认索引文件选择
ttk.Label(dialog, text="默认索引文件:").pack(pady=5)
index_var = tk.StringVar(value=old_index) # 使用从树形视图获取的当前索引
index_combo = ttk.Combobox(dialog, textvariable=index_var)
index_combo['values'] = self.default_index_files
index_combo.pack(fill='x', padx=5)
# 设置当前选中的索引文件
if old_index in self.default_index_files:
index_combo.set(old_index)
else:
index_combo.set('index.php') # 默认值
def save_changes():
new_domain = domain_entry.get()
new_port = port_entry.get()
new_path = path_entry.get()
new_index = index_var.get()
if not all([new_domain, new_port, new_path, new_index]):
messagebox.showerror("错误", "所有字段都必须填写")
return
try:
new_port = int(new_port)
if new_port < 1 or new_port > 65535:
raise ValueError()
except ValueError:
messagebox.showerror("错误", "端口必须是1-65535之间的数字")
return
try:
# 1. 删除旧的Nginx配置文件
old_config = os.path.join(self.config['nginx_conf_dir'], f"{old_domain}_{old_port}.conf")
if os.path.exists(old_config):
os.remove(old_config)
# 2. 创建新的Nginx配置
self.create_nginx_config(new_domain, new_path, new_port, new_index)
# 3. 更新网站数据
for site in self.websites_data:
if site['domain'] == old_domain and str(site['port']) == str(old_port):
site.update({
'domain': new_domain,
'port': new_port,
'path': new_path,
'default_index': new_index
})
break
self.save_data()
# 4. 刷新显示
self.refresh_website_list()
messagebox.showinfo("成功", "网站配置已更新")
dialog.destroy()
except Exception as e:
messagebox.showerror("错误", f"更新网站配置失败: {str(e)}")
# 添加保存按钮
ttk.Button(
dialog,
text="保存更改",
command=save_changes
).pack(pady=20)
# 添加取消按钮
ttk.Button(
dialog,
text="取消",
command=dialog.destroy
).pack(pady=5)
def create_database(self):
"""创建新数据库"""
dialog = tk.Toplevel(self.root)
dialog.title("创建数据库")
dialog.geometry("300x300")
ttk.Label(dialog, text="数据库名:").pack(pady=5)
db_entry = ttk.Entry(dialog)
db_entry.pack(fill='x', padx=5)
ttk.Label(dialog, text="密码:").pack(pady=5)
pwd_entry = ttk.Entry(dialog, show="*")
pwd_entry.pack(fill='x', padx=5)
ttk.Label(dialog, text="确认密码:").pack(pady=5)
pwd_confirm_entry = ttk.Entry(dialog, show="*")
pwd_confirm_entry.pack(fill='x', padx=5)
def save_database():
db_name = db_entry.get()
pwd = pwd_entry.get()
pwd_confirm = pwd_confirm_entry.get()
if not all([db_name, pwd, pwd_confirm]):
messagebox.showerror("错误", "所有字段都必须填写")
return
if pwd != pwd_confirm:
messagebox.showerror("错误", "两次输入的密码不一致")
return
try:
# 创建数据库
subprocess.run([
'mysql',
'-u', 'root',
'-e', f'CREATE DATABASE {db_name};'
], check=True)
# 创建用户并授权
subprocess.run([
'mysql',
'-u', 'root',
'-e', f"CREATE USER '{db_name}'@'localhost' IDENTIFIED BY '{pwd}';"
], check=True)
subprocess.run([
'mysql',
'-u', 'root',
'-e', f"GRANT ALL PRIVILEGES ON {db_name}.* TO '{db_name}'@'localhost';"
], check=True)
subprocess.run([
'mysql',
'-u', 'root',
'-e', "FLUSH PRIVILEGES;"
], check=True)
# 保存数据库信息,包括明文密码
self.databases_data.append({
'name': db_name,
'size': '0 MB',
'tables': '0',
'charset': 'utf8mb4',
'status': '正常',
'password': pwd # 保存明文密码
})
self.save_data()
# 刷新显示
self.refresh_database_list()
dialog.destroy()
except subprocess.CalledProcessError as e:
messagebox.showerror("错误", f"创建数据库失败: {str(e)}")
ttk.Button(
dialog,
text="保存",
command=save_database
).pack(pady=20)
def import_database(self):
"""导入数据库"""
filename = filedialog.askopenfilename(
title="选择SQL文件",
filetypes=[("SQL files", "*.sql"), ("All files", "*.*")]
)
if not filename:
return
dialog = tk.Toplevel(self.root)
dialog.title("导入数据库")
dialog.geometry("300x250")
ttk.Label(dialog, text="数据库名:").pack(pady=5)
db_entry = ttk.Entry(dialog)
db_entry.pack(fill='x', padx=5)
ttk.Label(dialog, text="用户名:").pack(pady=5)
user_entry = ttk.Entry(dialog)
user_entry.pack(fill='x', padx=5)
ttk.Label(dialog, text="密码:").pack(pady=5)
pwd_entry = ttk.Entry(dialog, show="*")
pwd_entry.pack(fill='x', padx=5)
def do_import():
db_name = db_entry.get()
user = user_entry.get()
pwd = pwd_entry.get()
if not all([db_name, user, pwd]):
messagebox.showerror("错误", "所有字段都必须填写")
return
try:
# 创建数据库
subprocess.run([
'mysql',
'-u', 'root',
'-e', f'CREATE DATABASE IF NOT EXISTS {db_name};'
], check=True)
# 创建用户并授权
subprocess.run([
'mysql',
'-u', 'root',
'-e', f"CREATE USER IF NOT EXISTS '{user}'@'localhost' IDENTIFIED BY '{pwd}';"
], check=True)
subprocess.run([
'mysql',
'-u', 'root',
'-e', f"GRANT ALL PRIVILEGES ON {db_name}.* TO '{user}'@'localhost';"
], check=True)
# 导入SQL文件
subprocess.run([
'mysql',
'-u', user,
f'-p{pwd}',
db_name,
'-e', f'source {filename}'
], check=True)
# 获取数据库大小和表数量
size_result = subprocess.run([
'mysql',
'-u', 'root',
'-N',
'-e',
f"""
SELECT ROUND(SUM(data_length + index_length) / 1024 / 1024, 2)
FROM information_schema.tables
WHERE table_schema = '{db_name}'
GROUP BY table_schema;
"""
], capture_output=True, text=True, check=True)
tables_result = subprocess.run([
'mysql',
'-u', 'root',
'-N',
'-e',
f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = '{db_name}';"
], capture_output=True, text=True, check=True)
size = float(size_result.stdout.strip() or 0)
tables = int(tables_result.stdout.strip() or 0)
# 保存数据库信息
self.databases_data.append({
'name': db_name,
'size': f'{size:.2f} MB',
'tables': str(tables),
'charset': 'utf8mb4'
})
self.save_data()
# 刷新显示
self.refresh_database_list()
messagebox.showinfo("成功", f"数据库 {db_name} 导入成功")
dialog.destroy()
except subprocess.CalledProcessError as e:
messagebox.showerror("错误", f"导入数据库失败: {str(e)}")
ttk.Button(
dialog,
text="导入",
command=do_import
).pack(pady=20)
def view_logs(self):
"""查看日志"""
dialog = tk.Toplevel(self.root)
dialog.title("日志查看器")
dialog.geometry("800x600")
notebook = ttk.Notebook(dialog)
# PHP错误日志
php_log_frame = ttk.Frame(notebook)
php_log = tk.Text(php_log_frame, wrap=tk.WORD)
php_log.pack(fill='both', expand=True)
notebook.add(php_log_frame, text='PHP错误日志')
# Nginx访问日志
nginx_access_frame = ttk.Frame(notebook)
nginx_access = tk.Text(nginx_access_frame, wrap=tk.WORD)
nginx_access.pack(fill='both', expand=True)
notebook.add(nginx_access_frame, text='Nginx访问日志')
# Nginx错误日志
nginx_error_frame = ttk.Frame(notebook)
nginx_error = tk.Text(nginx_error_frame, wrap=tk.WORD)
nginx_error.pack(fill='both', expand=True)
notebook.add(nginx_error_frame, text='Nginx错误日志')
# MySQL错误日志
mysql_error_frame = ttk.Frame(notebook)
mysql_error = tk.Text(mysql_error_frame, wrap=tk.WORD)
mysql_error.pack(fill='both', expand=True)
notebook.add(mysql_error_frame, text='MySQL错误日志')
notebook.pack(fill='both', expand=True, padx=5, pady=5)
def load_logs():
try:
# 加载PHP错误日志
php_log_file = "/opt/homebrew/var/log/php-fpm.log"
if os.path.exists(php_log_file):
with open(php_log_file, 'r') as f:
php_log.delete(1.0, tk.END)
php_log.insert(tk.END, f.read())
# 加载Nginx访问日志
nginx_access_file = "/opt/homebrew/var/log/nginx/access.log"
if os.path.exists(nginx_access_file):
with open(nginx_access_file, 'r') as f:
nginx_access.delete(1.0, tk.END)
nginx_access.insert(tk.END, f.read())
# 加载Nginx错误日志
nginx_error_file = "/opt/homebrew/var/log/nginx/error.log"
if os.path.exists(nginx_error_file):
with open(nginx_error_file, 'r') as f:
nginx_error.delete(1.0, tk.END)
nginx_error.insert(tk.END, f.read())
# 加载MySQL错误日志
mysql_error_file = "/opt/homebrew/var/mysql/*.err"
import glob
mysql_logs = glob.glob(mysql_error_file)
if mysql_logs:
with open(mysql_logs[0], 'r') as f:
mysql_error.delete(1.0, tk.END)
mysql_error.insert(tk.END, f.read())
except Exception as e:
messagebox.showerror("错误", f"加载日志失败: {str(e)}")
# 添加刷新按钮
ttk.Button(
dialog,
text="刷新",
command=load_logs
).pack(pady=5)
# 初始加载日志
load_logs()
def check_service_status(self, service):
"""检查服务状态"""
if service == 'PHP':
return self.check_process('php-fpm')
elif service == 'MySQL':
return self.check_process('mysqld')
elif service == 'Nginx':
return self.check_process('nginx')
return False
def check_process(self, process_name):
"""检查进程是否运行"""
for proc in psutil.process_iter(['name']):
try:
if process_name.lower() in proc.info['name'].lower():
return True
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return False
def update_service_status(self):
"""更新所有服务状态"""
for service in ['PHP', 'MySQL', 'Nginx']:
status = self.check_service_status(service)
self.service_status[service] = status
if service in self.status_labels:
label = self.status_labels[service]
label.config(
text="运行中" if status else "已停止",
foreground='green' if status else 'red'
)
self.root.after(5000, self.update_service_status)
def create_gui(self):
notebook = ttk.Notebook(self.root)
# 服务控制面板
services_frame = ttk.Frame(notebook)
self.create_services_panel(services_frame)
notebook.add(services_frame, text='服务控制')
# 网站管理
websites_frame = ttk.Frame(notebook)
self.create_websites_panel(websites_frame)
notebook.add(websites_frame, text='网站管理')
# 数据库管理
database_frame = ttk.Frame(notebook)
self.create_database_panel(database_frame)
notebook.add(database_frame, text='数据库管理')
notebook.pack(expand=True, fill='both', padx=5, pady=5)
def create_services_panel(self, parent):
services = ['PHP', 'MySQL', 'Nginx']
for i, service in enumerate(services):
frame = ttk.Frame(parent)
frame.pack(fill='x', padx=5, pady=5)
version = self.get_service_version(service)
ttk.Label(frame, text=f"{service} ({version}):").pack(side='left')
status_label = ttk.Label(frame, text="检查中...", foreground='gray')
status_label.pack(side='left', padx=5)
self.status_labels[service] = status_label
ttk.Button(
frame,
text="启动",
command=lambda s=service: self.toggle_service(s)
).pack(side='left')
ttk.Button(
frame,
text="停止",
command=lambda s=service: self.stop_service(s)
).pack(side='left', padx=5)
def on_tree_double_click(self, event):
"""处理树形视图的双击事件"""
item = self.sites_tree.selection()[0]
values = self.sites_tree.item(item)['values']
if values:
domain = values[0]
port = values[1]
self.open_website(domain, port)
def open_website(self, domain, port):
"""在默认浏览器中打开网站"""
import webbrowser
url = f"http://{domain}:{port}"
webbrowser.open(url)
class NginxConfigEditor:
def __init__(self, parent, domain, port, config_file):
self.dialog = tk.Toplevel(parent)
self.dialog.title(f"Nginx 配置 - {domain}:{port}")
self.dialog.geometry("600x700")
self.config_file = config_file
# 默认配置
self.default_config = {
'client_max_body_size': '10M',
'fastcgi_read_timeout': '60',
'fastcgi_send_timeout': '60',
'keepalive_timeout': '65',
'gzip': 'on',
'gzip_types': 'text/plain application/xml text/css application/javascript',
'access_log': 'on',
'error_log': 'on',
'autoindex': 'off',
'proxy_connect_timeout': '60',
'proxy_read_timeout': '60',
'proxy_send_timeout': '60',
'sendfile': 'on',
'tcp_nopush': 'on',
'tcp_nodelay': 'on'
}
# 加载当前配置
self.current_config = self.load_current_config()
self.create_gui()
def load_current_config(self):
"""从Nginx配置文件加载当前配置"""
config = self.default_config.copy()
try:
if os.path.exists(self.config_file):
with open(self.config_file, 'r') as f:
content = f.read()
# 解析配置文件中的参数
for key in self.default_config.keys():
if key in content:
# 提取配置值
import re
pattern = f"{key}\s+(.*?);"
match = re.search(pattern, content)
if match:
config[key] = match.group(1)
except Exception as e:
messagebox.showerror("错误", f"加载配置失败: {str(e)}")
return config
def create_gui(self):
"""创建配置界面"""
# 创建滚动框架
canvas = tk.Canvas(self.dialog)
scrollbar = ttk.Scrollbar(self.dialog, orient="vertical", command=canvas.yview)
scrollable_frame = ttk.Frame(canvas)
scrollable_frame.bind(
"<Configure>",
lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
)
canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
canvas.configure(yscrollcommand=scrollbar.set)
# 基本设置
basic_frame = ttk.LabelFrame(scrollable_frame, text="基本设置")
basic_frame.pack(fill="x", padx=5, pady=5)
# 上传大小限制
ttk.Label(basic_frame, text="最大上传大小:").grid(row=0, column=0, padx=5, pady=5)
self.body_size = ttk.Entry(basic_frame)
self.body_size.insert(0, self.current_config['client_max_body_size'])
self.body_size.grid(row=0, column=1, padx=5, pady=5)
# FastCGI 超时设置
timeout_frame = ttk.LabelFrame(scrollable_frame, text="FastCGI 超时设置")
timeout_frame.pack(fill="x", padx=5, pady=5)
ttk.Label(timeout_frame, text="读取超时:").grid(row=0, column=0, padx=5, pady=5)
self.read_timeout = ttk.Entry(timeout_frame)
self.read_timeout.insert(0, self.current_config['fastcgi_read_timeout'])
self.read_timeout.grid(row=0, column=1, padx=5, pady=5)
ttk.Label(timeout_frame, text="发送超时:").grid(row=1, column=0, padx=5, pady=5)
self.send_timeout = ttk.Entry(timeout_frame)
self.send_timeout.insert(0, self.current_config['fastcgi_send_timeout'])
self.send_timeout.grid(row=1, column=1, padx=5, pady=5)
# GZIP 设置
gzip_frame = ttk.LabelFrame(scrollable_frame, text="GZIP 压缩设置")
gzip_frame.pack(fill="x", padx=5, pady=5)
self.gzip_var = tk.StringVar(value=self.current_config['gzip'])
ttk.Radiobutton(gzip_frame, text="启用", variable=self.gzip_var, value="on").grid(row=0, column=0, padx=5,
pady=5)
ttk.Radiobutton(gzip_frame, text="禁用", variable=self.gzip_var, value="off").grid(row=0, column=1, padx=5,
pady=5)
ttk.Label(gzip_frame, text="压缩类型:").grid(row=1, column=0, padx=5, pady=5)
self.gzip_types = ttk.Entry(gzip_frame, width=50)
self.gzip_types.insert(0, self.current_config['gzip_types'])
self.gzip_types.grid(row=1, column=1, columnspan=2, padx=5, pady=5)
# 日志设置
log_frame = ttk.LabelFrame(scrollable_frame, text="日志设置")
log_frame.pack(fill="x", padx=5, pady=5)
self.access_log_var = tk.StringVar(value=self.current_config['access_log'])
ttk.Radiobutton(log_frame, text="启用访问日志", variable=self.access_log_var, value="on").grid(row=0, column=0,
padx=5, pady=5)
ttk.Radiobutton(log_frame, text="禁用访问日志", variable=self.access_log_var, value="off").grid(row=0, column=1,
padx=5, pady=5)
self.error_log_var = tk.StringVar(value=self.current_config['error_log'])
ttk.Radiobutton(log_frame, text="启用错误日志", variable=self.error_log_var, value="on").grid(row=1, column=0,
padx=5, pady=5)
ttk.Radiobutton(log_frame, text="禁用错误日志", variable=self.error_log_var, value="off").grid(row=1, column=1,
padx=5, pady=5)
# 性能优化设置
perf_frame = ttk.LabelFrame(scrollable_frame, text="性能优化设置")
perf_frame.pack(fill="x", padx=5, pady=5)
self.sendfile_var = tk.StringVar(value=self.current_config['sendfile'])
ttk.Radiobutton(perf_frame, text="启用 sendfile", variable=self.sendfile_var, value="on").grid(row=0, column=0,
padx=5, pady=5)
ttk.Radiobutton(perf_frame, text="禁用 sendfile", variable=self.sendfile_var, value="off").grid(row=0, column=1,
padx=5, pady=5)
self.tcp_nopush_var = tk.StringVar(value=self.current_config['tcp_nopush'])
ttk.Radiobutton(perf_frame, text="启用 tcp_nopush", variable=self.tcp_nopush_var, value="on").grid(row=1,
column=0,
padx=5,
pady=5)
ttk.Radiobutton(perf_frame, text="禁用 tcp_nopush", variable=self.tcp_nopush_var, value="off").grid(row=1,
column=1,
padx=5,
pady=5)
# 保存按钮
ttk.Button(scrollable_frame, text="保存配置", command=self.save_config).pack(pady=20)
# 配置滚动条
canvas.pack(side="left", fill="both", expand=True)
scrollbar.pack(side="right", fill="y")
def save_config(self):
"""保存Nginx配置"""
try:
config_values = {
'client_max_body_size': self.body_size.get(),
'fastcgi_read_timeout': self.read_timeout.get(),
'fastcgi_send_timeout': self.send_timeout.get(),
'gzip': self.gzip_var.get(),
'gzip_types': self.gzip_types.get(),
'access_log': self.access_log_var.get(),
'error_log': self.error_log_var.get(),
'sendfile': self.sendfile_var.get(),
'tcp_nopush': self.tcp_nopush_var.get()
}
# 生成配置文件内容
with open(self.config_file, 'r') as f:
content = f.read()
# 更新配置参数
for key, value in config_values.items():
if key in ['client_max_body_size', 'fastcgi_read_timeout', 'fastcgi_send_timeout']:
if not value.endswith('s') and not value.endswith('M'):
value += 's' if key.endswith('timeout') else 'M'
pattern = f"{key}\s+.*?;"
replacement = f"{key} {value};"
import re
content = re.sub(pattern, replacement, content)
# 写入配置文件
with open(self.config_file, 'w') as f:
f.write(content)
# 重新加载Nginx
import subprocess
subprocess.run(['brew', 'services', 'reload', 'nginx'], check=True)
messagebox.showinfo("成功", "Nginx配置已更新并重新加载")
self.dialog.destroy()
except Exception as e:
messagebox.showerror("错误", f"保存配置失败: {str(e)}")
# 在类定义之外添加主函数
def main():
root = tk.Tk()
app = PHPPanel(root)
root.geometry("800x600")
root.mainloop()
if __name__ == '__main__':
main()