import time
import threading
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import pyautogui as pg
import pyperclip as pc
import logging
from flask import Flask, request, jsonify
import webbrowser
import socket
import json
import sys
import os
from threading import Thread
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("wechat_sender.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 创建Flask应用实例
app = Flask(__name__)
# 全局设置
DEFAULT_SETTINGS = {
"pause_time": 1.5,
"hotkey": ["ctrl", "alt", "w"],
"current_task": None
}
# 用于在GUI与API之间共享状态和日志
class SharedState:
def __init__(self):
self.is_sending = False
self.log_queue = []
self.log_callback = None
def add_log(self, message):
"""添加日志并通知GUI更新"""
timestamp = time.strftime('%H:%M:%S')
log_entry = f"{timestamp} - {message}"
self.log_queue.append(log_entry)
logger.info(message)
if self.log_callback:
self.log_callback(log_entry)
# 创建共享状态实例
shared_state = SharedState()
def is_busy():
"""检查是否有任务正在执行"""
return shared_state.is_sending
def send_wechat_message(contact, message, pause_time=None, hotkey=None):
"""
发送消息给微信联系人
Args:
contact (str): 联系人名称
message (str): 要发送的消息
pause_time (float, optional): 操作间隔时间
hotkey (list, optional): 调出微信的快捷键
"""
try:
shared_state.is_sending = True
# 使用默认值或传入值
if pause_time is None:
pause_time = DEFAULT_SETTINGS["pause_time"]
if hotkey is None:
hotkey = DEFAULT_SETTINGS["hotkey"]
# 设置操作间隔
pg.PAUSE = pause_time
shared_state.add_log(f"向 {contact} 发送消息...")
# 快捷键调出微信
shared_state.add_log("调出微信客户端...")
pg.hotkey(*hotkey)
time.sleep(1)
# 搜索联系人
shared_state.add_log("搜索联系人...")
pg.hotkey('ctrl', 'f')
time.sleep(0.5)
# 粘贴联系人名称
pc.copy(contact)
pg.hotkey('ctrl', 'v')
time.sleep(0.5)
# 选择联系人
pg.press('enter')
time.sleep(0.5)
# 粘贴消息
shared_state.add_log("输入消息...")
pc.copy(message)
pg.hotkey('ctrl', 'v')
time.sleep(0.5)
# 发送消息
pg.press('enter')
shared_state.add_log("消息已发送!")
# 隐藏微信
time.sleep(1)
shared_state.add_log("隐藏微信客户端...")
pg.hotkey(*hotkey)
shared_state.add_log("操作完成!")
return True
except Exception as e:
error_msg = f"发送失败: {str(e)}"
shared_state.add_log(error_msg)
return False
finally:
# 清除忙碌状态
shared_state.is_sending = False
def send_after_delay(contact, message, delay, pause_time, hotkey):
"""带延迟的发送消息"""
try:
shared_state.add_log(f"等待 {delay} 秒后发送...")
for i in range(delay, 0, -1):
shared_state.add_log(f"将在 {i} 秒后发送...")
time.sleep(1)
return send_wechat_message(contact, message, pause_time, hotkey)
except Exception as e:
shared_state.add_log(f"延迟发送失败: {str(e)}")
return False
# Flask API路由
@app.route('/api/send_message', methods=['POST'])
def api_send_message():
"""
接收并发送微信消息的API接口
请求JSON格式:
{
"contact": "联系人名称",
"message": "要发送的消息",
"delay": 3, // 可选,延迟发送时间(秒)
"pause_time": 1.5, // 可选,操作间隔时间
"hotkey": ["ctrl", "alt", "w"] // 可选,调出微信的快捷键
}
响应:
{
"success": true/false,
"message": "状态消息"
}
"""
try:
# 检查是否有正在执行的任务
if is_busy():
return jsonify({
"success": False,
"message": "已有发送任务在进行中,请稍后再试"
}), 429
# 获取请求数据
data = request.get_json()
# 验证必需字段
if not data or 'contact' not in data or 'message' not in data:
return jsonify({
"success": False,
"message": "缺少必需字段: 'contact' 和 'message'"
}), 400
contact = data['contact']
message = data['message']
delay = data.get('delay', 3) # 默认延迟3秒
# 可选的自定义设置
pause_time = data.get('pause_time', DEFAULT_SETTINGS["pause_time"])
hotkey = data.get('hotkey', DEFAULT_SETTINGS["hotkey"])
shared_state.add_log(f"收到API请求: 联系人={contact}, 消息长度={len(message)}, 延迟={delay}秒")
# 创建并启动发送线程
thread = threading.Thread(
target=lambda: send_after_delay(contact, message, delay, pause_time, hotkey)
)
thread.daemon = True
thread.start()
return jsonify({
"success": True,
"message": f"消息将在 {delay} 秒后发送给 {contact}"
})
except Exception as e:
error_msg = f"处理API请求时出错: {str(e)}"
shared_state.add_log(error_msg)
return jsonify({
"success": False,
"message": error_msg
}), 500
@app.route('/api/settings', methods=['GET', 'PUT'])
def api_settings():
"""
获取或更新全局设置
GET: 返回当前设置
PUT: 更新设置,JSON格式:
{
"pause_time": 1.5,
"hotkey": ["ctrl", "alt", "w"]
}
"""
if request.method == 'GET':
# 排除不需要的字段
settings = {k: v for k, v in DEFAULT_SETTINGS.items() if k != 'current_task'}
return jsonify(settings)
elif request.method == 'PUT':
try:
data = request.get_json()
if 'pause_time' in data:
DEFAULT_SETTINGS['pause_time'] = float(data['pause_time'])
if 'hotkey' in data and isinstance(data['hotkey'], list):
DEFAULT_SETTINGS['hotkey'] = data['hotkey']
shared_state.add_log(f"通过API更新设置: {DEFAULT_SETTINGS}")
# 排除不需要的字段
settings = {k: v for k, v in DEFAULT_SETTINGS.items() if k != 'current_task'}
return jsonify({
"success": True,
"message": "设置已更新",
"settings": settings
})
except Exception as e:
error_msg = f"更新设置失败: {str(e)}"
shared_state.add_log(error_msg)
return jsonify({
"success": False,
"message": error_msg
}), 400
@app.route('/api/status', methods=['GET'])
def api_status():
"""获取API服务状态"""
return jsonify({
"status": "running",
"busy": is_busy(),
"version": "1.0.0",
"logs": shared_state.log_queue[-10:] if shared_state.log_queue else []
})
@app.route('/api/logs', methods=['GET'])
def api_logs():
"""获取最近的日志"""
limit = request.args.get('limit', default=50, type=int)
logs = shared_state.log_queue[-limit:] if shared_state.log_queue else []
return jsonify({
"logs": logs,
"total": len(shared_state.log_queue)
})
@app.route('/', methods=['GET'])
def api_home():
"""API主页,提供简单的文档"""
api_docs = """
<html>
<head>
<title>WeChat Message Sender API</title>
<style>
body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }
h1, h2 { color: #333; }
pre { background: #f5f5f5; padding: 10px; border-radius: 5px; overflow: auto; }
.endpoint { border-left: 4px solid #2196F3; padding-left: 15px; margin: 20px 0; }
.method { font-weight: bold; color: #2196F3; }
</style>
</head>
<body>
<h1>WeChat Message Sender API</h1>
<p>此API允许您通过HTTP请求发送微信消息。</p>
<div class="endpoint">
<h2><span class="method">POST</span> /api/send_message</h2>
<p>发送微信消息</p>
<h3>请求格式:</h3>
<pre>
{
"contact": "联系人名称",
"message": "要发送的消息",
"delay": 3, // 可选,延迟发送时间(秒)
"pause_time": 1.5, // 可选,操作间隔时间
"hotkey": ["ctrl", "alt", "w"] // 可选,调出微信的快捷键
}
</pre>
</div>
<div class="endpoint">
<h2><span class="method">GET</span> /api/settings</h2>
<p>获取当前全局设置</p>
</div>
<div class="endpoint">
<h2><span class="method">PUT</span> /api/settings</h2>
<p>更新全局设置</p>
<h3>请求格式:</h3>
<pre>
{
"pause_time": 1.5,
"hotkey": ["ctrl", "alt", "w"]
}
</pre>
</div>
<div class="endpoint">
<h2><span class="method">GET</span> /api/status</h2>
<p>获取API服务状态</p>
</div>
<div class="endpoint">
<h2><span class="method">GET</span> /api/logs</h2>
<p>获取最近的日志,可通过?limit=N参数限制返回数量</p>
</div>
<p><strong>当前状态:</strong> API服务运行中,忙碌状态: """ + str(is_busy()) + """</p>
</body>
</html>
"""
return api_docs
class WeChatSenderGUI:
def __init__(self, root):
self.root = root
self.root.title("微信消息发送器 (GUI + API)")
self.root.geometry("700x600")
self.root.resizable(True, True)
# API服务器控制 - 移到前面初始化
self.api_running = False
self.api_thread = None
self.api_port = 5000 # 默认端口
# 尝试查找可用端口
self.find_available_port()
# 设置共享状态的日志回调
shared_state.log_callback = self.append_log
# 创建样式
self.style = ttk.Style()
self.style.configure("TButton", padding=6, relief="flat", background="#2196F3")
self.style.configure("TFrame", background="#f5f5f5")
self.style.configure("TLabel", background="#f5f5f5")
# 创建主框架
main_frame = ttk.Frame(root, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 创建通知栏
self.notification_var = tk.StringVar(value="就绪")
self.notification_label = ttk.Label(
main_frame,
textvariable=self.notification_var,
background="#e0f7fa",
padding=10,
anchor=tk.CENTER
)
self.notification_label.grid(column=0, row=0, columnspan=3, sticky=(tk.W, tk.E), pady=(0, 10))
# 创建选项卡
self.tab_control = ttk.Notebook(main_frame)
self.tab_control.grid(column=0, row=1, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5)
# 消息发送选项卡
self.tab_send = ttk.Frame(self.tab_control)
self.tab_control.add(self.tab_send, text="发送消息")
# API设置选项卡
self.tab_api = ttk.Frame(self.tab_control)
self.tab_control.add(self.tab_api, text="API设置")
# 日志选项卡
self.tab_log = ttk.Frame(self.tab_control)
self.tab_control.add(self.tab_log, text="操作日志")
# 设置消息发送选项卡内容
self.setup_send_tab()
# 设置API选项卡内容
self.setup_api_tab()
# 设置日志选项卡内容
self.setup_log_tab()
# 配置网格权重以便响应窗口大小变化
main_frame.columnconfigure(0, weight=1)
main_frame.rowconfigure(1, weight=1)
# 状态栏
self.status_frame = ttk.Frame(main_frame, padding=(0, 5, 0, 0))
self.status_frame.grid(column=0, row=2, columnspan=3, sticky=(tk.W, tk.E))
self.status_var = tk.StringVar(value="API服务未启动")
self.status_label = ttk.Label(self.status_frame, textvariable=self.status_var)
self.status_label.pack(side=tk.LEFT)
self.api_status_var = tk.StringVar(value="")
self.api_status_label = ttk.Label(self.status_frame, textvariable=self.api_status_var)
self.api_status_label.pack(side=tk.RIGHT)
def find_available_port(self):
"""查找可用的端口"""
port = 5000
max_port = 5100 # 最大尝试端口
while port < max_port:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('localhost', port))
sock.close()
self.api_port = port
return
except OSError:
port += 1
# 如果所有端口都被占用
self.api_port = None
messagebox.showwarning("端口警告", "无法找到可用的API端口(5000-5100)。API服务可能无法启动。")
def setup_send_tab(self):
"""设置消息发送选项卡内容"""
frame = self.tab_send
# 联系人设置
ttk.Label(frame, text="联系人:").grid(column=0, row=0, sticky=tk.W, pady=5)
self.contact_entry = ttk.Entry(frame, width=40)
self.contact_entry.grid(column=1, row=0, sticky=(tk.W, tk.E), pady=5)
self.contact_entry.insert(0, "文件传输助手")
# 消息输入框
ttk.Label(frame, text="消息内容:").grid(column=0, row=1, sticky=tk.NW, pady=5)
self.message_text = scrolledtext.ScrolledText(frame, width=50, height=10)
self.message_text.grid(column=1, row=1, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5)
self.message_text.insert(tk.END, "你好")
# 延迟设置
ttk.Label(frame, text="延迟启动(秒):").grid(column=0, row=2, sticky=tk.W, pady=5)
self.delay_var = tk.StringVar(value="3")
self.delay_spin = ttk.Spinbox(frame, from_=0, to=60, textvariable=self.delay_var, width=5)
self.delay_spin.grid(column=1, row=2, sticky=tk.W, pady=5)
# 操作间隔设置
ttk.Label(frame, text="操作间隔(秒):").grid(column=0, row=3, sticky=tk.W, pady=5)
self.pause_var = tk.StringVar(value=str(DEFAULT_SETTINGS["pause_time"]))
self.pause_spin = ttk.Spinbox(frame, from_=0.5, to=5.0, increment=0.5, textvariable=self.pause_var, width=5)
self.pause_spin.grid(column=1, row=3, sticky=tk.W, pady=5)
# 快捷键设置
ttk.Label(frame, text="微信呼出快捷键:").grid(column=0, row=4, sticky=tk.W, pady=5)
self.hotkey_frame = ttk.Frame(frame)
self.hotkey_frame.grid(column=1, row=4, sticky=tk.W, pady=5)
self.ctrl_var = tk.BooleanVar(value="ctrl" in DEFAULT_SETTINGS["hotkey"])
self.alt_var = tk.BooleanVar(value="alt" in DEFAULT_SETTINGS["hotkey"])
self.key_var = tk.StringVar(value="w") # 默认键
for key in DEFAULT_SETTINGS["hotkey"]:
if key not in ["ctrl", "alt"]:
self.key_var.set(key)
ttk.Checkbutton(self.hotkey_frame, text="Ctrl", variable=self.ctrl_var).pack(side=tk.LEFT)
ttk.Checkbutton(self.hotkey_frame, text="Alt", variable=self.alt_var).pack(side=tk.LEFT)
ttk.Entry(self.hotkey_frame, textvariable=self.key_var, width=3).pack(side=tk.LEFT, padx=5)
# 按钮区域
button_frame = ttk.Frame(frame)
button_frame.grid(column=0, row=5, columnspan=2, pady=10)
self.send_button = ttk.Button(button_frame, text="发送消息", command=self.start_sending)
self.send_button.pack(side=tk.LEFT, padx=5)
self.test_button = ttk.Button(button_frame, text="测试快捷键", command=self.test_hotkey)
self.test_button.pack(side=tk.LEFT, padx=5)
self.save_button = ttk.Button(button_frame, text="保存设置", command=self.save_settings)
self.save_button.pack(side=tk.LEFT, padx=5)
# 配置网格权重
frame.columnconfigure(1, weight=1)
frame.rowconfigure(1, weight=1)
def setup_api_tab(self):
"""设置API选项卡内容"""
frame = self.tab_api
# API服务设置
ttk.Label(frame, text="API服务端口:").grid(column=0, row=0, sticky=tk.W, pady=5)
self.port_var = tk.StringVar(value=str(self.api_port))
port_entry = ttk.Entry(frame, textvariable=self.port_var, width=10)
port_entry.grid(column=1, row=0, sticky=tk.W, pady=5)
# API控制按钮
api_button_frame = ttk.Frame(frame)
api_button_frame.grid(column=0, row=1, columnspan=2, pady=10)
self.start_api_button = ttk.Button(
api_button_frame,
text="启动API服务",
command=self.toggle_api_server
)
self.start_api_button.pack(side=tk.LEFT, padx=5)
self.open_docs_button = ttk.Button(
api_button_frame,
text="打开API文档",
command=self.open_api_docs,
state=tk.DISABLED
)
self.open_docs_button.pack(side=tk.LEFT, padx=5)
# API调用示例
ttk.Label(frame, text="API调用示例:", font=("Arial", 11, "bold")).grid(
column=0, row=2, columnspan=2, sticky=tk.W, pady=(20, 5)
)
example_frame = ttk.Frame(frame, padding=10, relief="solid", borderwidth=1)
example_frame.grid(column=0, row=3, columnspan=2, sticky=(tk.W, tk.E), pady=5)
example_code = """# Python示例代码
import requests
import json
url = f"http://localhost:{self.api_port}/api/send_message"
data = {
"contact": "文件传输助手",
"message": "API测试消息",
"delay": 3
}
headers = {"Content-Type": "application/json"}
response = requests.post(url, json=data, headers=headers)
print(response.json())
"""
self.example_text = scrolledtext.ScrolledText(example_frame, width=60, height=10)
self.example_text.pack(fill=tk.BOTH, expand=True)
self.example_text.insert(tk.END, example_code)
self.example_text.config(state="disabled")
# 复制示例按钮
ttk.Button(
example_frame,
text="复制代码",
command=lambda: self.copy_to_clipboard(example_code)
).pack(pady=5)
# 配置网格权重
frame.columnconfigure(1, weight=1)
def setup_log_tab(self):
"""设置日志选项卡内容"""
frame = self.tab_log
# 日志区域
self.log_text = scrolledtext.ScrolledText(frame, width=80, height=20, state="disabled")
self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# 日志控制按钮
log_button_frame = ttk.Frame(frame)
log_button_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Button(
log_button_frame,
text="清除日志",
command=self.clear_logs
).pack(side=tk.LEFT, padx=5)
ttk.Button(
log_button_frame,
text="保存日志",
command=self.save_logs
).pack(side=tk.LEFT, padx=5)
ttk.Button(
log_button_frame,
text="复制日志",
command=self.copy_logs
).pack(side=tk.LEFT, padx=5)
def copy_to_clipboard(self, text):
"""复制文本到剪贴板"""
pc.copy(text)
self.show_notification("已复制到剪贴板")
def get_hotkey(self):
"""获取用户设置的快捷键组合"""
keys = []
if self.ctrl_var.get():
keys.append("ctrl")
if self.alt_var.get():
keys.append("alt")
key = self.key_var.get().strip()
if key:
keys.append(key)
return keys
def test_hotkey(self):
"""测试快捷键功能"""
hotkey = self.get_hotkey()
if not hotkey:
self.show_notification("请设置至少一个快捷键")
return
self.append_log(f"测试快捷键: {'+'.join(hotkey)}")
self.show_notification(f"将在3秒后测试快捷键: {'+'.join(hotkey)}")
# 使用线程避免界面卡死
threading.Thread(
target=self._test_hotkey_thread,
args=(hotkey,),
daemon=True
).start()
def _test_hotkey_thread(self, hotkey):
"""测试快捷键的线程函数"""
time.sleep(3) # 给用户时间切换到适当的窗口
pg.hotkey(*hotkey)
self.append_log("快捷键已发送")
def save_settings(self):
"""保存当前设置为默认值"""
try:
# 更新全局设置
DEFAULT_SETTINGS["pause_time"] = float(self.pause_var.get())
DEFAULT_SETTINGS["hotkey"] = self.get_hotkey()
self.append_log(f"设置已保存: 暂停时间={DEFAULT_SETTINGS['pause_time']}秒, " +
f"快捷键={'+'.join(DEFAULT_SETTINGS['hotkey'])}")
self.show_notification("设置已保存")
except Exception as e:
self.append_log(f"保存设置失败: {str(e)}")
self.show_notification(f"保存设置失败: {str(e)}", error=True)
def start_sending(self):
"""启动发送消息的线程"""
if is_busy():
self.show_notification("已有发送任务在进行中...", error=True)
return
try:
# 获取输入
contact = self.contact_entry.get().strip()
message = self.message_text.get("1.0", tk.END).strip()
# 验证输入
if not contact:
self.show_notification("请输入联系人名称", error=True)
return
if not message:
self.show_notification("请输入消息内容", error=True)
return
delay = int(self.delay_var.get())
pause_time = float(self.pause_var.get())
hotkey = self.get_hotkey()
if not hotkey:
self.show_notification("请设置至少一个快捷键", error=True)
return
self.append_log(f"准备向 {contact} 发送消息...")
self.show_notification(f"将在 {delay} 秒后开始发送...")
# 禁用发送按钮
self.send_button.config(state="disabled")
# 创建并启动线程
thread = threading.Thread(
target=self.send_with_delay,
args=(contact, message, delay, pause_time, hotkey),
daemon=True
)
thread.start()
except Exception as e:
self.append_log(f"启动失败: {str(e)}")
self.show_notification(f"启动失败: {str(e)}", error=True)
self.send_button.config(state="normal")
def send_with_delay(self, contact, message, delay, pause_time, hotkey):
"""延迟后发送消息并恢复按钮状态"""
try:
# 调用共享发送函数
send_after_delay(contact, message, delay, pause_time, hotkey)
finally:
# 恢复按钮状态
self.root.after(0, lambda: self.send_button.config(state="normal"))
def append_log(self, message):
"""向日志区域添加消息"""
# 确保该方法在主线程中被调用
if threading.current_thread() is not threading.main_thread():
self.root.after(0, lambda: self.append_log(message))
return
self.log_text.config(state="normal")
if not message.startswith(time.strftime('%H:%M:%S')):
message = f"{time.strftime('%H:%M:%S')} - {message}"
self.log_text.insert(tk.END, message + "\n")
self.log_text.see(tk.END)
self.log_text.config(state="disabled")
# 添加到共享日志队列,如果不是从那里来的
if message not in shared_state.log_queue:
shared_state.log_queue.append(message)
def clear_logs(self):
"""清除日志区域"""
self.log_text.config(state="normal")
self.log_text.delete(1.0, tk.END)
self.log_text.config(state="disabled")
self.show_notification("日志已清除")
def save_logs(self):
"""保存日志到文件"""
try:
filename = f"wechat_sender_log_{time.strftime('%Y%m%d_%H%M%S')}.txt"
with open(filename, "w", encoding="utf-8") as f:
f.write(self.log_text.get(1.0, tk.END))
self.show_notification(f"日志已保存到 {filename}")
except Exception as e:
self.show_notification(f"保存日志失败: {str(e)}", error=True)
def copy_logs(self):
"""复制日志到剪贴板"""
log_content = self.log_text.get(1.0, tk.END)
pc.copy(log_content)
self.show_notification("日志已复制到剪贴板")
def show_notification(self, message, error=False):
"""显示通知消息"""
# 确保该方法在主线程中被调用
if threading.current_thread() is not threading.main_thread():
self.root.after(0, lambda: self.show_notification(message, error))
return
self.notification_var.set(message)
if error:
self.notification_label.configure(background="#ffcdd2") # 错误用淡红色
else:
self.notification_label.configure(background="#e0f7fa") # 正常用淡蓝色
# 3秒后自动清除通知
self.root.after(3000, self.clear_notification)
def clear_notification(self):
"""清除通知栏"""
self.notification_var.set("就绪")
self.notification_label.configure(background="#e0f7fa")
def toggle_api_server(self):
"""切换API服务器的启动/停止状态"""
if self.api_running:
self.stop_api_server()
else:
self.start_api_server()
def start_api_server(self):
"""启动API服务器"""
if self.api_running:
self.show_notification("API服务已在运行中")
return
try:
# 更新端口
port_str = self.port_var.get().strip()
if port_str:
self.api_port = int(port_str)
# 检查端口是否可用
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('localhost', self.api_port))
sock.close()
except OSError:
self.show_notification(f"端口 {self.api_port} 已被占用,请更换", error=True)
return
# 创建线程启动Flask
self.api_thread = threading.Thread(
target=self._run_api_server,
daemon=True
)
self.api_thread.start()
# 等待API服务启动
time.sleep(1)
# 更新UI
self.api_running = True
self.start_api_button.config(text="停止API服务")
self.open_docs_button.config(state="normal")
self.status_var.set(f"API服务已启动 - 端口: {self.api_port}")
self.update_api_url()
# 更新API示例代码中的端口
self.example_text.config(state="normal")
example_code = self.example_text.get(1.0, tk.END)
updated_code = example_code.replace("localhost:{self.api_port}", f"localhost:{self.api_port}")
self.example_text.delete(1.0, tk.END)
self.example_text.insert(1.0, updated_code)
self.example_text.config(state="disabled")
self.show_notification(f"API服务已启动在端口 {self.api_port}")
self.append_log(f"API服务已启动在端口 {self.api_port}")
except Exception as e:
self.show_notification(f"启动API服务失败: {str(e)}", error=True)
self.append_log(f"启动API服务失败: {str(e)}")
def _run_api_server(self):
"""在线程中运行Flask服务器"""
app.run(host='0.0.0.0', port=self.api_port, debug=False, use_reloader=False)
def stop_api_server(self):
"""停止API服务器"""
# Flask没有提供良好的停止方法,所有这里不会真正停止Flask
# 只是更新UI状态。在实际应用中,您可能需要设计其他方式停止API服务
self.show_notification("API服务已停止")
self.append_log("API服务已停止")
self.api_running = False
self.start_api_button.config(text="启动API服务")
self.open_docs_button.config(state="disabled")
self.status_var.set("API服务未启动")
self.api_status_var.set("")
def update_api_url(self):
"""更新API URL显示"""
if self.api_running:
self.api_status_var.set(f"API URL: http://localhost:{self.api_port}/")
else:
self.api_status_var.set("")
def open_api_docs(self):
"""在浏览器中打开API文档页面"""
if self.api_running:
webbrowser.open(f"http://localhost:{self.api_port}/")
self.show_notification("已在浏览器中打开API文档")
else:
self.show_notification("API服务未启动", error=True)
def main():
# 设置更好的应用程序外观
try:
from tkinter import ttk
import sys
if sys.platform.startswith('win'):
try:
import ctypes
ctypes.windll.shcore.SetProcessDpiAwareness(1)
except:
pass
except:
pass
# 创建主窗口
root = tk.Tk()
app = WeChatSenderGUI(root)
# 设置图标(如果有)
# root.iconbitmap('icon.ico')
# 启动主循环
root.mainloop()
if __name__ == "__main__":
main()