import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import pandas as pd
import os
import subprocess
import platform
from typing import Optional, Dict, Any, List
class EnhancedDataMatcherApp:
def __init__(self, root):
self.root = root
self.root.title("小新增强版数据匹配替换工具 v4.1 - 支持多列并行替换和无匹配记录筛选")
self.root.geometry("1500x1200")
# 设置窗口图标和样式
self.root.configure(bg='#f0f0f0')
# 数据存储
self.file_a_data: Optional[pd.DataFrame] = None
self.file_b_data: Optional[pd.DataFrame] = None
self.file_a_path = ""
self.file_b_path = ""
self.file_a_header_row = 0 # 文件A的列名行
self.file_b_header_row = 0 # 文件B的列名行
self.file_a_sheet_name = None # 文件A的工作表名
self.file_b_sheet_name = None # 文件B的工作表名
self.processed_data = None
self.match_results = []
self.selected_data_columns = []
# 新增:多列替换配置
self.column_mappings = [] # 存储列映射关系 [{"a_col": "列A", "b_col": "列B", "enabled": True}, ...]
# 新增:无匹配记录存储
self.unmatched_records = None # 存储A文件中没有在B文件中找到匹配的记录
self.unmatched_stats = {} # 存储无匹配记录的统计信息
self.setup_ui()
def setup_ui(self):
# 主框架
main_frame = ttk.Frame(self.root, padding="15")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 标题
title_label = ttk.Label(main_frame, text="小新增强版数据匹配替换工具",
font=('Arial', 16, 'bold'))
title_label.grid(row=0, column=0, columnspan=2, pady=(0, 15))
# 说明文字
desc_label = ttk.Label(main_frame,
text="💡 智能识别Excel文件结构,支持复杂列名、多工作表、多列并行替换和无匹配记录筛选",
font=('Arial', 10), foreground="blue")
desc_label.grid(row=1, column=0, columnspan=2, pady=(0, 15))
# 文件加载区域
file_frame = ttk.LabelFrame(main_frame, text="📁 文件加载与分析", padding="15")
file_frame.grid(row=2, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 15))
# 文件A
ttk.Label(file_frame, text="文件A(标准数据源文件):",
font=('Arial', 10, 'bold')).grid(row=0, column=0, sticky=tk.W, pady=5)
self.file_a_label = ttk.Label(file_frame, text="未选择文件", foreground="gray")
self.file_a_label.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=(15, 0), pady=5)
ttk.Button(file_frame, text="选择文件A",
command=self.load_file_a).grid(row=0, column=2, padx=(15, 0), pady=5)
ttk.Button(file_frame, text="🔍 分析文件A",
command=lambda: self.analyze_file_structure("A")).grid(row=0, column=3, padx=(10, 0), pady=5)
# 文件B
ttk.Label(file_frame, text="文件B(待更新目标文件):",
font=('Arial', 10, 'bold')).grid(row=1, column=0, sticky=tk.W, pady=5)
self.file_b_label = ttk.Label(file_frame, text="未选择文件", foreground="gray")
self.file_b_label.grid(row=1, column=1, sticky=(tk.W, tk.E), padx=(15, 0), pady=5)
ttk.Button(file_frame, text="选择文件B",
command=self.load_file_b).grid(row=1, column=2, padx=(15, 0), pady=5)
ttk.Button(file_frame, text="🔍 分析文件B",
command=lambda: self.analyze_file_structure("B")).grid(row=1, column=3, padx=(10, 0), pady=5)
file_frame.columnconfigure(1, weight=1)
# 文件结构信息显示
info_frame = ttk.LabelFrame(main_frame, text="📊 文件结构信息", padding="10")
info_frame.grid(row=3, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 15))
self.file_info_text = tk.Text(info_frame, height=6, wrap=tk.WORD, font=('Consolas', 9))
info_scrollbar = ttk.Scrollbar(info_frame, orient=tk.VERTICAL, command=self.file_info_text.yview)
self.file_info_text.configure(yscrollcommand=info_scrollbar.set)
self.file_info_text.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
info_scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
info_frame.columnconfigure(0, weight=1)
info_frame.rowconfigure(0, weight=1)
# 匹配设置区域
settings_frame = ttk.LabelFrame(main_frame, text="⚙️ 匹配设置", padding="15")
settings_frame.grid(row=4, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 15))
# 匹配键设置
ttk.Label(settings_frame, text="匹配键设置:",
font=('Arial', 11, 'bold')).grid(row=0, column=0, sticky=tk.W, pady=5, columnspan=4)
ttk.Label(settings_frame, text="文件A匹配键:").grid(row=1, column=0, padx=(20, 5), pady=5)
self.file_a_key_combo = ttk.Combobox(settings_frame, width=35, state="readonly")
self.file_a_key_combo.grid(row=1, column=1, padx=5, pady=5)
ttk.Label(settings_frame, text="文件B匹配键:").grid(row=1, column=2, padx=(20, 5), pady=5)
self.file_b_key_combo = ttk.Combobox(settings_frame, width=35, state="readonly")
self.file_b_key_combo.grid(row=1, column=3, padx=5, pady=5)
# 多列替换设置区域
multi_frame = ttk.LabelFrame(settings_frame, text="🔄 多列替换配置", padding="10")
multi_frame.grid(row=2, column=0, columnspan=4, sticky=(tk.W, tk.E), pady=(15, 0))
# 多列替换说明
ttk.Label(multi_frame, text="配置需要替换的列对应关系(可添加多个)",
font=('Arial', 10), foreground="blue").grid(row=0, column=0, columnspan=4, pady=(0, 10))
# 多列替换控制按钮
multi_btn_frame = ttk.Frame(multi_frame)
multi_btn_frame.grid(row=1, column=0, columnspan=4, pady=(0, 10))
ttk.Button(multi_btn_frame, text="➕ 添加列映射",
command=self.add_column_mapping).pack(side=tk.LEFT, padx=5)
ttk.Button(multi_btn_frame, text="🗑️ 清空映射",
command=self.clear_column_mappings).pack(side=tk.LEFT, padx=5)
ttk.Button(multi_btn_frame, text="📋 智能匹配",
command=self.auto_match_columns).pack(side=tk.LEFT, padx=5)
# 列映射显示区域
self.mapping_frame = ttk.Frame(multi_frame)
self.mapping_frame.grid(row=2, column=0, columnspan=4, sticky=(tk.W, tk.E))
multi_frame.columnconfigure(0, weight=1)
# 操作按钮区域
button_frame = ttk.Frame(main_frame)
button_frame.grid(row=5, column=0, columnspan=2, pady=15)
self.match_btn = ttk.Button(button_frame, text="🔄 执行多列匹配替换",
command=self.execute_multi_column_replace)
self.match_btn.pack(side=tk.LEFT, padx=10)
self.preview_btn = ttk.Button(button_frame, text="👀 预览匹配结果",
command=self.preview_match_results)
self.preview_btn.pack(side=tk.LEFT, padx=10)
self.save_btn = ttk.Button(button_frame, text="💾 保存结果",
command=self.save_result, state='disabled')
self.save_btn.pack(side=tk.LEFT, padx=10)
# 新增:无匹配记录相关按钮
self.unmatched_preview_btn = ttk.Button(button_frame, text="🔍 查看无匹配记录",
command=self.preview_unmatched_records, state='disabled')
self.unmatched_preview_btn.pack(side=tk.LEFT, padx=10)
self.unmatched_save_btn = ttk.Button(button_frame, text="📄 保存无匹配记录",
command=self.save_unmatched_records, state='disabled')
self.unmatched_save_btn.pack(side=tk.LEFT, padx=10)
ttk.Button(button_frame, text="🗑️ 清空数据",
command=self.clear_data).pack(side=tk.LEFT, padx=10)
# 结果显示区域
result_frame = ttk.LabelFrame(main_frame, text="📊 处理结果", padding="10")
result_frame.grid(row=6, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))
self.result_tree = ttk.Treeview(result_frame, show="headings", height=10)
result_scrollbar_v = ttk.Scrollbar(result_frame, orient=tk.VERTICAL, command=self.result_tree.yview)
result_scrollbar_h = ttk.Scrollbar(result_frame, orient=tk.HORIZONTAL, command=self.result_tree.xview)
self.result_tree.configure(yscrollcommand=result_scrollbar_v.set, xscrollcommand=result_scrollbar_h.set)
self.result_tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
result_scrollbar_v.grid(row=0, column=1, sticky=(tk.N, tk.S))
result_scrollbar_h.grid(row=1, column=0, sticky=(tk.W, tk.E))
result_frame.columnconfigure(0, weight=1)
result_frame.rowconfigure(0, weight=1)
# 状态栏
self.status_label = ttk.Label(main_frame, text="请选择Excel文件并分析其结构",
relief=tk.SUNKEN, anchor=tk.W, font=('Arial', 9))
self.status_label.grid(row=7, column=0, columnspan=2, sticky=(tk.W, tk.E))
# 配置权重
main_frame.columnconfigure(0, weight=1)
main_frame.rowconfigure(6, weight=1)
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
def add_column_mapping(self):
"""添加列映射"""
if self.file_a_data is None or self.file_b_data is None:
messagebox.showwarning("警告", "请先分析两个文件")
return
mapping_id = len(self.column_mappings)
# 创建映射条目
mapping_row = ttk.Frame(self.mapping_frame)
mapping_row.grid(row=mapping_id, column=0, sticky=(tk.W, tk.E), pady=2)
self.mapping_frame.columnconfigure(0, weight=1)
# 启用复选框
enabled_var = tk.BooleanVar(value=True)
enabled_cb = ttk.Checkbutton(mapping_row, variable=enabled_var)
enabled_cb.grid(row=0, column=0, padx=5)
# 文件A列选择
ttk.Label(mapping_row, text=f"映射{mapping_id + 1}:").grid(row=0, column=1, padx=5)
ttk.Label(mapping_row, text="A文件列:").grid(row=0, column=2, padx=5)
a_combo = ttk.Combobox(mapping_row, values=list(self.file_a_data.columns),
width=25, state="readonly")
a_combo.grid(row=0, column=3, padx=5)
# 箭头
ttk.Label(mapping_row, text="→").grid(row=0, column=4, padx=5)
# 文件B列选择
ttk.Label(mapping_row, text="B文件列:").grid(row=0, column=5, padx=5)
b_combo = ttk.Combobox(mapping_row, values=list(self.file_b_data.columns),
width=25, state="readonly")
b_combo.grid(row=0, column=6, padx=5)
# 删除按钮
def remove_mapping():
mapping_row.destroy()
self.column_mappings[mapping_id] = None
self.update_column_mappings()
ttk.Button(mapping_row, text="❌", command=remove_mapping).grid(row=0, column=7, padx=5)
# 保存映射配置
mapping_config = {
"id": mapping_id,
"enabled_var": enabled_var,
"a_combo": a_combo,
"b_combo": b_combo,
"frame": mapping_row
}
self.column_mappings.append(mapping_config)
def clear_column_mappings(self):
"""清空所有列映射"""
for mapping in self.column_mappings:
if mapping and mapping.get("frame"):
mapping["frame"].destroy()
self.column_mappings.clear()
def auto_match_columns(self):
"""智能匹配列名"""
if self.file_a_data is None or self.file_b_data is None:
messagebox.showwarning("警告", "请先分析两个文件")
return
# 清空现有映射
self.clear_column_mappings()
a_columns = list(self.file_a_data.columns)
b_columns = list(self.file_b_data.columns)
matched_pairs = []
# 智能匹配逻辑
for a_col in a_columns:
for b_col in b_columns:
# 完全匹配
if a_col.lower().strip() == b_col.lower().strip():
matched_pairs.append((a_col, b_col))
break
# 包含匹配
elif a_col.lower().strip() in b_col.lower().strip() or b_col.lower().strip() in a_col.lower().strip():
if len(matched_pairs) < 10: # 限制数量
matched_pairs.append((a_col, b_col))
break
# 创建匹配的映射
for a_col, b_col in matched_pairs:
self.add_column_mapping()
latest_mapping = self.column_mappings[-1]
latest_mapping["a_combo"].set(a_col)
latest_mapping["b_combo"].set(b_col)
if matched_pairs:
messagebox.showinfo("智能匹配完成", f"成功匹配 {len(matched_pairs)} 个列对应关系")
else:
messagebox.showinfo("智能匹配完成", "未找到明显的列名匹配关系,请手动添加")
def update_column_mappings(self):
"""更新列映射列表,移除已删除的项"""
self.column_mappings = [m for m in self.column_mappings if m is not None]
def get_active_column_mappings(self):
"""获取当前激活的列映射"""
active_mappings = []
for mapping in self.column_mappings:
if mapping and mapping["enabled_var"].get():
a_col = mapping["a_combo"].get()
b_col = mapping["b_combo"].get()
if a_col and b_col:
active_mappings.append({"a_col": a_col, "b_col": b_col})
return active_mappings
def load_file_a(self):
"""加载文件A"""
file_path = filedialog.askopenfilename(
title="选择文件A(标准数据源文件)",
filetypes=[("Excel files", "*.xlsx *.xls"), ("CSV files", "*.csv")]
)
if file_path:
self.file_a_path = file_path
self.file_a_label.config(text=f"✅ {os.path.basename(file_path)}", foreground="green")
self.status_label.config(text=f"已选择文件A: {os.path.basename(file_path)} - 请点击'分析文件A'按钮")
def load_file_b(self):
"""加载文件B"""
file_path = filedialog.askopenfilename(
title="选择文件B(待更新目标文件)",
filetypes=[("Excel files", "*.xlsx *.xls"), ("CSV files", "*.csv")]
)
if file_path:
self.file_b_path = file_path
self.file_b_label.config(text=f"✅ {os.path.basename(file_path)}", foreground="orange")
self.status_label.config(text=f"已选择文件B: {os.path.basename(file_path)} - 请点击'分析文件B'按钮")
def analyze_file_structure(self, file_type):
"""分析文件结构"""
file_path = self.file_a_path if file_type == "A" else self.file_b_path
if not file_path:
messagebox.showwarning("警告", f"请先选择文件{file_type}")
return
try:
self.status_label.config(text="正在分析文件结构...")
self.root.update()
if file_path.lower().endswith('.csv'):
self.analyze_csv_structure(file_path, file_type)
else:
self.analyze_excel_structure(file_path, file_type)
except Exception as e:
messagebox.showerror("错误", f"分析文件{file_type}失败:\n{str(e)}")
self.status_label.config(text="分析失败")
def analyze_excel_structure(self, file_path, file_type):
"""分析Excel文件结构"""
import openpyxl
# 创建分析窗口
analysis_window = tk.Toplevel(self.root)
analysis_window.title(f"Excel文件结构分析 - 文件{file_type}")
analysis_window.geometry("1000x700")
analysis_window.transient(self.root)
analysis_window.grab_set()
main_frame = ttk.Frame(analysis_window, padding="15")
main_frame.pack(fill=tk.BOTH, expand=True)
ttk.Label(main_frame, text=f"文件{file_type}结构分析",
font=('Arial', 14, 'bold')).pack(pady=(0, 15))
# 创建notebook显示不同工作表
notebook = ttk.Notebook(main_frame)
notebook.pack(fill=tk.BOTH, expand=True, pady=(0, 15))
# 读取Excel文件的所有工作表
try:
xl_file = pd.ExcelFile(file_path)
sheet_names = xl_file.sheet_names
sheet_data = {}
for sheet_name in sheet_names:
# 读取前10行,不指定header
df_preview = pd.read_excel(file_path, sheet_name=sheet_name,
header=None, nrows=10, dtype=str)
sheet_data[sheet_name] = df_preview
# 创建工作表标签页
frame = ttk.Frame(notebook)
notebook.add(frame, text=f"{sheet_name} ({df_preview.shape[1]}列)")
# 创建文本框显示内容
text_widget = tk.Text(frame, wrap=tk.NONE, font=('Consolas', 9))
v_scrollbar = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=text_widget.yview)
h_scrollbar = ttk.Scrollbar(frame, orient=tk.HORIZONTAL, command=text_widget.xview)
text_widget.configure(yscrollcommand=v_scrollbar.set, xscrollcommand=h_scrollbar.set)
text_widget.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
v_scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
h_scrollbar.grid(row=1, column=0, sticky=(tk.W, tk.E))
frame.columnconfigure(0, weight=1)
frame.rowconfigure(0, weight=1)
# 显示数据内容
content = f"工作表: {sheet_name}\n"
content += f"维度: {df_preview.shape[0]} 行 × {df_preview.shape[1]} 列\n"
content += "=" * 80 + "\n\n"
for i, row in df_preview.iterrows():
content += f"第 {i + 1:2d} 行: " + " | ".join([str(cell)[:30] if pd.notna(cell) else "空"
for cell in row.values]) + "\n"
text_widget.insert(tk.END, content)
text_widget.config(state=tk.DISABLED)
# 选择工作表和列名行
selection_frame = ttk.Frame(main_frame)
selection_frame.pack(fill=tk.X, pady=10)
ttk.Label(selection_frame, text="选择工作表:").pack(side=tk.LEFT, padx=5)
sheet_combo = ttk.Combobox(selection_frame, values=sheet_names,
state="readonly", width=20)
sheet_combo.set(sheet_names[0])
sheet_combo.pack(side=tk.LEFT, padx=5)
ttk.Label(selection_frame, text="列名所在行:").pack(side=tk.LEFT, padx=(20, 5))
header_combo = ttk.Combobox(selection_frame,
values=[f"第{i + 1}行" for i in range(10)],
state="readonly", width=10)
header_combo.set("第1行")
header_combo.pack(side=tk.LEFT, padx=5)
# 确认按钮
def confirm_selection():
selected_sheet = sheet_combo.get()
header_row = int(header_combo.get().replace("第", "").replace("行", "")) - 1
try:
# 重新读取选定的工作表和列名行
df = pd.read_excel(file_path, sheet_name=selected_sheet,
header=header_row, dtype=str, na_filter=False)
# 清理列名
df.columns = df.columns.astype(str).str.strip()
if file_type == "A":
self.file_a_data = df
self.file_a_sheet_name = selected_sheet
self.file_a_header_row = header_row
else:
self.file_b_data = df
self.file_b_sheet_name = selected_sheet
self.file_b_header_row = header_row
# 更新下拉框
self.update_combo_options()
# 更新信息显示
self.update_file_info()
analysis_window.destroy()
messagebox.showinfo("成功",
f"文件{file_type}分析完成!\n"
f"工作表: {selected_sheet}\n"
f"列名行: 第{header_row + 1}行\n"
f"识别到 {len(df.columns)} 列")
except Exception as e:
messagebox.showerror("错误", f"读取失败:\n{str(e)}")
ttk.Button(selection_frame, text="✅ 确认选择",
command=confirm_selection).pack(side=tk.LEFT, padx=(20, 5))
ttk.Button(selection_frame, text="❌ 取消",
command=analysis_window.destroy).pack(side=tk.LEFT, padx=5)
except Exception as e:
analysis_window.destroy()
raise e
def analyze_csv_structure(self, file_path, file_type):
"""分析CSV文件结构"""
try:
# 尝试不同编码读取前几行
encodings = ['utf-8', 'gbk', 'utf-8-sig', 'gb2312']
df_preview = None
for encoding in encodings:
try:
df_preview = pd.read_csv(file_path, encoding=encoding,
header=None, nrows=10, dtype=str)
break
except UnicodeDecodeError:
continue
if df_preview is None:
raise Exception("无法读取CSV文件,请检查文件编码")
# 显示CSV结构分析窗口
analysis_window = tk.Toplevel(self.root)
analysis_window.title(f"CSV文件结构分析 - 文件{file_type}")
analysis_window.geometry("800x600")
analysis_window.transient(self.root)
analysis_window.grab_set()
main_frame = ttk.Frame(analysis_window, padding="15")
main_frame.pack(fill=tk.BOTH, expand=True)
ttk.Label(main_frame, text=f"文件{file_type}结构分析",
font=('Arial', 14, 'bold')).pack(pady=(0, 15))
# 显示前10行内容
text_widget = tk.Text(main_frame, wrap=tk.NONE, font=('Consolas', 9))
scrollbar_v = ttk.Scrollbar(main_frame, orient=tk.VERTICAL, command=text_widget.yview)
scrollbar_h = ttk.Scrollbar(main_frame, orient=tk.HORIZONTAL, command=text_widget.xview)
text_widget.configure(yscrollcommand=scrollbar_v.set, xscrollcommand=scrollbar_h.set)
text_widget.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar_v.pack(side=tk.RIGHT, fill=tk.Y)
scrollbar_h.pack(side=tk.BOTTOM, fill=tk.X)
content = f"CSV文件预览 (编码: {encoding})\n"
content += f"维度: {df_preview.shape[0]} 行 × {df_preview.shape[1]} 列\n"
content += "=" * 80 + "\n\n"
for i, row in df_preview.iterrows():
content += f"第 {i + 1:2d} 行: " + " | ".join([str(cell)[:30] if pd.notna(cell) else "空"
for cell in row.values]) + "\n"
text_widget.insert(tk.END, content)
text_widget.config(state=tk.DISABLED)
# 选择列名行
selection_frame = ttk.Frame(main_frame)
selection_frame.pack(fill=tk.X, pady=10)
ttk.Label(selection_frame, text="列名所在行:").pack(side=tk.LEFT, padx=5)
header_combo = ttk.Combobox(selection_frame,
values=[f"第{i + 1}行" for i in range(10)],
state="readonly", width=10)
header_combo.set("第1行")
header_combo.pack(side=tk.LEFT, padx=5)
def confirm_csv_selection():
header_row = int(header_combo.get().replace("第", "").replace("行", "")) - 1
try:
df = pd.read_csv(file_path, encoding=encoding, header=header_row,
dtype=str, na_filter=False)
df.columns = df.columns.astype(str).str.strip()
if file_type == "A":
self.file_a_data = df
self.file_a_header_row = header_row
else:
self.file_b_data = df
self.file_b_header_row = header_row
self.update_combo_options()
self.update_file_info()
analysis_window.destroy()
messagebox.showinfo("成功", f"CSV文件{file_type}分析完成!\n识别到 {len(df.columns)} 列")
except Exception as e:
messagebox.showerror("错误", f"读取失败:\n{str(e)}")
ttk.Button(selection_frame, text="✅ 确认选择",
command=confirm_csv_selection).pack(side=tk.LEFT, padx=(20, 5))
ttk.Button(selection_frame, text="❌ 取消",
command=analysis_window.destroy).pack(side=tk.LEFT, padx=5)
except Exception as e:
raise e
def update_file_info(self):
"""更新文件信息显示"""
self.file_info_text.config(state=tk.NORMAL)
self.file_info_text.delete(1.0, tk.END)
info_content = ""
if self.file_a_data is not None:
info_content += f"📄 文件A信息:\n"
info_content += f" 路径: {self.file_a_path}\n"
if hasattr(self, 'file_a_sheet_name') and self.file_a_sheet_name:
info_content += f" 工作表: {self.file_a_sheet_name}\n"
info_content += f" 列名行: 第{self.file_a_header_row + 1}行\n"
info_content += f" 维度: {self.file_a_data.shape[0]} 行 × {self.file_a_data.shape[1]} 列\n"
info_content += f" 列名: {', '.join(list(self.file_a_data.columns)[:3])}...\n\n"
if self.file_b_data is not None:
info_content += f"📄 文件B信息:\n"
info_content += f" 路径: {self.file_b_path}\n"
if hasattr(self, 'file_b_sheet_name') and self.file_b_sheet_name:
info_content += f" 工作表: {self.file_b_sheet_name}\n"
info_content += f" 列名行: 第{self.file_b_header_row + 1}行\n"
info_content += f" 维度: {self.file_b_data.shape[0]} 行 × {self.file_b_data.shape[1]} 列\n"
info_content += f" 列名: {', '.join(list(self.file_b_data.columns)[:3])}...\n\n"
if info_content:
self.file_info_text.insert(tk.END, info_content)
else:
self.file_info_text.insert(tk.END, "请选择并分析文件...")
self.file_info_text.config(state=tk.DISABLED)
def update_combo_options(self):
"""更新下拉框选项"""
if self.file_a_data is not None:
columns_a = list(self.file_a_data.columns)
self.file_a_key_combo['values'] = columns_a
if self.file_b_data is not None:
columns_b = list(self.file_b_data.columns)
self.file_b_key_combo['values'] = columns_b
def preview_match_results(self):
"""预览匹配结果"""
if self.file_a_data is None or self.file_b_data is None:
messagebox.showwarning("警告", "请先分析两个文件")
return
if not self.file_a_key_combo.get() or not self.file_b_key_combo.get():
messagebox.showwarning("警告", "请选择匹配键列")
return
active_mappings = self.get_active_column_mappings()
if not active_mappings:
messagebox.showwarning("警告", "请添加至少一个列映射关系")
return
try:
preview_window = tk.Toplevel(self.root)
preview_window.title("多列匹配结果预览")
preview_window.geometry("1200x700")
preview_window.transient(self.root)
frame = ttk.Frame(preview_window, padding="15")
frame.pack(fill=tk.BOTH, expand=True)
ttk.Label(frame, text="多列匹配结果预览", font=('Arial', 14, 'bold')).pack(pady=(0, 15))
# 显示配置信息
config_info = f"匹配键: {self.file_a_key_combo.get()} ↔ {self.file_b_key_combo.get()}\n"
config_info += f"替换列数: {len(active_mappings)}\n"
config_info += "列映射关系: " + ", ".join([f"{m['a_col']}→{m['b_col']}" for m in active_mappings])
config_label = ttk.Label(frame, text=config_info, font=('Arial', 10), foreground="blue")
config_label.pack(pady=(0, 10))
# 创建预览表格
columns = ["序号", "匹配键", "匹配状态"] + [f"A_{m['a_col']}" for m in active_mappings] + [f"B_{m['b_col']}"
for m in
active_mappings]
preview_tree = ttk.Treeview(frame, columns=columns, show="headings", height=15)
for col in columns:
preview_tree.heading(col, text=col)
preview_tree.column(col, width=100, anchor=tk.CENTER)
scrollbar_v = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=preview_tree.yview)
scrollbar_h = ttk.Scrollbar(frame, orient=tk.HORIZONTAL, command=preview_tree.xview)
preview_tree.configure(yscrollcommand=scrollbar_v.set, xscrollcommand=scrollbar_h.set)
preview_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar_v.pack(side=tk.RIGHT, fill=tk.Y)
scrollbar_h.pack(side=tk.BOTTOM, fill=tk.X)
# 生成预览数据
a_key_col = self.file_a_key_combo.get()
b_key_col = self.file_b_key_combo.get()
# 创建A文件的映射字典
a_mappings = {}
for _, row in self.file_a_data.iterrows():
key = str(row[a_key_col]).strip().lower()
a_mappings[key] = {}
for mapping in active_mappings:
a_mappings[key][mapping['a_col']] = str(row[mapping['a_col']])
matched = 0
not_matched = 0
for i, (_, row) in enumerate(self.file_b_data.head(100).iterrows()):
key_b = str(row[b_key_col]).strip().lower()
values = [i + 1, key_b]
if key_b in a_mappings:
status = "✅ 可匹配"
matched += 1
# 添加A文件数据
for mapping in active_mappings:
values.append(a_mappings[key_b][mapping['a_col']])
# 添加B文件数据
for mapping in active_mappings:
values.append(str(row[mapping['b_col']]))
else:
status = "❌ 无匹配"
not_matched += 1
# 添加空的A文件数据
for mapping in active_mappings:
values.append("未找到")
# 添加B文件数据
for mapping in active_mappings:
values.append(str(row[mapping['b_col']]))
values.insert(2, status)
preview_tree.insert('', 'end', values=values)
# 显示统计
stats_label = ttk.Label(frame,
text=f"预览统计(前100行):可匹配 {matched} 条,无匹配 {not_matched} 条",
font=('Arial', 10, 'bold'))
stats_label.pack(pady=10)
except Exception as e:
messagebox.showerror("错误", f"预览失败:\n{str(e)}")
def execute_multi_column_replace(self):
"""执行多列匹配和替换"""
if self.file_a_data is None or self.file_b_data is None:
messagebox.showwarning("警告", "请先分析两个文件")
return
if not self.file_a_key_combo.get() or not self.file_b_key_combo.get():
messagebox.showwarning("警告", "请选择匹配键列")
return
active_mappings = self.get_active_column_mappings()
if not active_mappings:
messagebox.showwarning("警告", "请添加至少一个列映射关系")
return
try:
self.status_label.config(text="正在执行多列匹配替换...")
self.root.update()
# 执行匹配替换逻辑
a_key_col = self.file_a_key_combo.get()
b_key_col = self.file_b_key_combo.get()
# 创建A文件的映射字典和B文件的键集合
a_mappings = {}
b_keys = set()
for _, row in self.file_a_data.iterrows():
key = str(row[a_key_col]).strip().lower()
a_mappings[key] = {}
for mapping in active_mappings:
a_mappings[key][mapping['a_col']] = row[mapping['a_col']]
# 收集B文件中的所有键
for _, row in self.file_b_data.iterrows():
key_b = str(row[b_key_col]).strip().lower()
b_keys.add(key_b)
# 找出A文件中存在但B文件中不存在的记录(无匹配记录)
unmatched_keys = set(a_mappings.keys()) - b_keys
# 创建无匹配记录的DataFrame
if unmatched_keys:
unmatched_data = []
for _, row in self.file_a_data.iterrows():
key = str(row[a_key_col]).strip().lower()
if key in unmatched_keys:
unmatched_data.append(row)
self.unmatched_records = pd.DataFrame(unmatched_data)
self.unmatched_stats = {
'total_count': len(unmatched_keys),
'key_column': a_key_col,
'sample_keys': list(unmatched_keys)[:10] # 保存前10个样本键
}
else:
self.unmatched_records = None
self.unmatched_stats = {'total_count': 0}
# 复制文件B数据并执行替换
self.processed_data = self.file_b_data.copy()
matched_count = 0
total_replacements = 0
replacement_details = []
# 设置结果表格列
columns = ["序号", "匹配键", "匹配状态", "替换列数", "详细信息"]
self.result_tree['columns'] = columns
for col in columns:
self.result_tree.heading(col, text=col)
if col == "详细信息":
self.result_tree.column(col, width=300, anchor=tk.W)
else:
self.result_tree.column(col, width=100, anchor=tk.CENTER)
# 清空之前的结果
for item in self.result_tree.get_children():
self.result_tree.delete(item)
for idx, row in self.file_b_data.iterrows():
key_b = str(row[b_key_col]).strip().lower()
if key_b in a_mappings:
matched_count += 1
replaced_columns = 0
details = []
# 对每个映射列进行替换
for mapping in active_mappings:
a_col = mapping['a_col']
b_col = mapping['b_col']
original_value = str(row[b_col])
new_value = str(a_mappings[key_b][a_col])
if original_value != new_value:
self.processed_data.loc[idx, b_col] = new_value
replaced_columns += 1
total_replacements += 1
details.append(f"{b_col}: {original_value[:20]}→{new_value[:20]}")
else:
details.append(f"{b_col}: 无需替换")
status = "✅ 已匹配"
detail_text = "; ".join(details)
self.result_tree.insert('', 'end', values=(
idx + 1, key_b, status, replaced_columns, detail_text
))
else:
self.result_tree.insert('', 'end', values=(
idx + 1, key_b, "❌ 未匹配", 0, "无匹配数据"
))
# 启用相关按钮
self.save_btn.config(state='normal')
if self.unmatched_records is not None and len(self.unmatched_records) > 0:
self.unmatched_preview_btn.config(state='normal')
self.unmatched_save_btn.config(state='normal')
unmatched_info = f",发现 {self.unmatched_stats['total_count']} 条无匹配记录"
else:
self.unmatched_preview_btn.config(state='disabled')
self.unmatched_save_btn.config(state='disabled')
unmatched_info = ""
self.status_label.config(
text=f"✅ 多列替换完成!匹配 {matched_count} 条,总替换 {total_replacements} 个单元格{unmatched_info}")
messagebox.showinfo("完成",
f"多列匹配替换完成!\n"
f"总记录数: {len(self.file_b_data)}\n"
f"成功匹配: {matched_count} 条\n"
f"配置列数: {len(active_mappings)} 列\n"
f"总替换数: {total_replacements} 个单元格\n"
f"无匹配记录: {self.unmatched_stats['total_count']} 条")
except Exception as e:
messagebox.showerror("错误", f"执行失败:\n{str(e)}")
self.status_label.config(text="执行失败")
def preview_unmatched_records(self):
"""预览无匹配记录"""
if self.unmatched_records is None or len(self.unmatched_records) == 0:
messagebox.showinfo("信息", "没有发现无匹配记录")
return
try:
preview_window = tk.Toplevel(self.root)
preview_window.title("无匹配记录预览")
preview_window.geometry("1200x700")
preview_window.transient(self.root)
frame = ttk.Frame(preview_window, padding="15")
frame.pack(fill=tk.BOTH, expand=True)
ttk.Label(frame, text="无匹配记录预览", font=('Arial', 14, 'bold')).pack(pady=(0, 15))
# 显示统计信息
stats_info = f"总无匹配记录数: {self.unmatched_stats['total_count']} 条\n"
stats_info += f"匹配键列: {self.unmatched_stats['key_column']}\n"
if len(self.unmatched_stats['sample_keys']) > 0:
stats_info += f"样本键值: {', '.join(self.unmatched_stats['sample_keys'][:5])}..."
stats_label = ttk.Label(frame, text=stats_info, font=('Arial', 10), foreground="blue")
stats_label.pack(pady=(0, 10))
# 创建预览表格
columns = list(self.unmatched_records.columns)
preview_tree = ttk.Treeview(frame, columns=columns, show="headings", height=20)
for col in columns:
preview_tree.heading(col, text=col)
preview_tree.column(col, width=120, anchor=tk.CENTER)
scrollbar_v = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=preview_tree.yview)
scrollbar_h = ttk.Scrollbar(frame, orient=tk.HORIZONTAL, command=preview_tree.xview)
preview_tree.configure(yscrollcommand=scrollbar_v.set, xscrollcommand=scrollbar_h.set)
preview_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar_v.pack(side=tk.RIGHT, fill=tk.Y)
scrollbar_h.pack(side=tk.BOTTOM, fill=tk.X)
# 填充数据(限制前1000行避免性能问题)
display_data = self.unmatched_records.head(1000)
for idx, (_, row) in enumerate(display_data.iterrows()):
values = [str(row[col])[:50] for col in columns] # 限制显示长度
preview_tree.insert('', 'end', values=values)
# 显示底部统计
bottom_stats = ttk.Label(frame,
text=f"显示前 {len(display_data)} 行 / 总计 {len(self.unmatched_records)} 行无匹配记录",
font=('Arial', 10, 'bold'))
bottom_stats.pack(pady=10)
# 操作按钮
btn_frame = ttk.Frame(frame)
btn_frame.pack(pady=10)
ttk.Button(btn_frame, text="💾 保存无匹配记录",
command=self.save_unmatched_records).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="❌ 关闭",
command=preview_window.destroy).pack(side=tk.LEFT, padx=5)
except Exception as e:
messagebox.showerror("错误", f"预览无匹配记录失败:\n{str(e)}")
def save_unmatched_records(self):
"""保存无匹配记录"""
if self.unmatched_records is None or len(self.unmatched_records) == 0:
messagebox.showwarning("警告", "没有无匹配记录可保存")
return
file_path = filedialog.asksaveasfilename(
title="保存无匹配记录",
defaultextension=".xlsx",
filetypes=[("Excel files", "*.xlsx"), ("CSV files", "*.csv")]
)
if file_path:
try:
if file_path.lower().endswith('.xlsx'):
self.unmatched_records.to_excel(file_path, index=False)
else:
self.unmatched_records.to_csv(file_path, index=False, encoding='utf-8-sig')
self.status_label.config(text=f"✅ 无匹配记录已保存: {os.path.basename(file_path)}")
messagebox.showinfo("成功",
f"无匹配记录已保存到:\n{file_path}\n\n"
f"保存记录数: {len(self.unmatched_records)} 条")
except Exception as e:
messagebox.showerror("错误", f"保存无匹配记录失败:\n{str(e)}")
def save_result(self):
"""保存结果"""
if self.processed_data is None:
messagebox.showwarning("警告", "没有处理结果可保存")
return
file_path = filedialog.asksaveasfilename(
title="保存处理结果",
defaultextension=".xlsx",
filetypes=[("Excel files", "*.xlsx"), ("CSV files", "*.csv")]
)
if file_path:
try:
if file_path.lower().endswith('.xlsx'):
self.processed_data.to_excel(file_path, index=False)
else:
self.processed_data.to_csv(file_path, index=False, encoding='utf-8-sig')
self.status_label.config(text=f"✅ 已保存: {os.path.basename(file_path)}")
messagebox.showinfo("成功", f"文件已保存到:\n{file_path}")
except Exception as e:
messagebox.showerror("错误", f"保存失败:\n{str(e)}")
def clear_data(self):
"""清空数据"""
if messagebox.askyesno("确认", "确定要清空所有数据吗?"):
self.file_a_data = None
self.file_b_data = None
self.processed_data = None
self.unmatched_records = None
self.unmatched_stats = {}
self.file_a_path = ""
self.file_b_path = ""
self.file_a_label.config(text="未选择文件", foreground="gray")
self.file_b_label.config(text="未选择文件", foreground="gray")
for combo in [self.file_a_key_combo, self.file_b_key_combo]:
combo['values'] = []
combo.set('')
# 清空列映射
self.clear_column_mappings()
for item in self.result_tree.get_children():
self.result_tree.delete(item)
self.update_file_info()
self.save_btn.config(state='disabled')
self.unmatched_preview_btn.config(state='disabled')
self.unmatched_save_btn.config(state='disabled')
self.status_label.config(text="已清空所有数据")
def main():
root = tk.Tk()
app = EnhancedDataMatcherApp(root)
root.mainloop()
if __name__ == "__main__":
main()