架构设计
本文档详细介绍图像分割工具的软件架构设计,包括设计模式、模块划分和扩展性考虑。
🏛️ 总体架构
架构层次
┌─────────────────────────────────────┐
│ 用户界面层 (UI Layer) │
│ ┌─────────────┐ ┌─────────────────┐ │
│ │ 主界面组件 │ │ 设置界面组件 │ │
│ └─────────────┘ └─────────────────┘ │
├─────────────────────────────────────┤
│ 业务逻辑层 (Business Layer) │
│ ┌─────────────┐ ┌─────────────────┐ │
│ │ 文件管理器 │ │ 图像处理器 │ │
│ └─────────────┘ └─────────────────┘ │
│ ┌─────────────┐ ┌─────────────────┐ │
│ │ PDF合并器 │ │ 状态管理器 │ │
│ └─────────────┘ └─────────────────┘ │
├─────────────────────────────────────┤
│ 数据访问层 (Data Layer) │
│ ┌─────────────┐ ┌─────────────────┐ │
│ │ 文件系统 │ │ 配置管理 │ │
│ └─────────────┘ └─────────────────┘ │
├─────────────────────────────────────┤
│ 基础设施层 (Infrastructure) │
│ ┌─────────────┐ ┌─────────────────┐ │
│ │ 图像库 │ │ PDF库 │ │
│ └─────────────┘ └─────────────────┘ │
│ ┌─────────────┐ ┌─────────────────┐ │
│ │ GUI框架 │ │ 系统接口 │ │
│ └─────────────┘ └─────────────────┘ │
└─────────────────────────────────────┘核心组件关系
mermaid
graph TB
A[ImageSplitterApp] --> B[FileSelector]
A --> C[ParameterController]
A --> D[ImageProcessor]
A --> E[PDFMerger]
A --> F[StatusManager]
B --> G[FileValidator]
C --> H[PresetManager]
D --> I[TileGenerator]
E --> J[PDFBuilder]
G --> K[FileSystem]
H --> K
I --> K
J --> K
L[GUIFramework] --> A
M[ImageLibrary] --> D
N[PDFLibrary] --> E🎯 设计模式应用
1. 单例模式 (Singleton)
python
class ConfigManager:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, 'initialized'):
self.config = self.load_config()
self.initialized = True应用场景:配置管理、状态管理 优势:确保全局唯一实例,避免配置冲突
2. 观察者模式 (Observer)
python
class StatusManager:
def __init__(self):
self.observers = []
def add_observer(self, observer):
self.observers.append(observer)
def notify_observers(self, status):
for observer in self.observers:
observer.update_status(status)
class ProgressObserver:
def update_status(self, status):
# 更新进度条和状态标签
self.progress_bar['value'] = status.progress
self.status_label.config(text=status.message)应用场景:进度更新、状态变化通知 优势:松耦合的状态管理机制
3. 策略模式 (Strategy)
python
class SaveStrategy:
def save(self, image_tiles, output_path):
raise NotImplementedError
class SeparateFolderStrategy(SaveStrategy):
def save(self, image_tiles, output_path):
# 每张图片单独文件夹保存逻辑
pass
class DateFolderStrategy(SaveStrategy):
def save(self, image_tiles, output_path):
# 按日期统一文件夹保存逻辑
pass
class ImageSaver:
def __init__(self, strategy: SaveStrategy):
self.strategy = strategy
def set_strategy(self, strategy: SaveStrategy):
self.strategy = strategy
def save_images(self, image_tiles, output_path):
self.strategy.save(image_tiles, output_path)应用场景:不同的保存模式 优势:算法可替换,易于扩展新的保存方式
4. 工厂模式 (Factory)
python
class ProcessorFactory:
@staticmethod
def create_processor(processor_type, **kwargs):
if processor_type == 'image':
return ImageProcessor(**kwargs)
elif processor_type == 'pdf':
return PDFProcessor(**kwargs)
else:
raise ValueError(f"Unknown processor type: {processor_type}")
# 使用示例
image_processor = ProcessorFactory.create_processor(
'image',
quality=95,
format='PNG'
)应用场景:创建不同类型的处理器 优势:封装对象创建逻辑,提高代码复用性
📦 模块设计
1. 文件管理模块
python
class FileManager:
def __init__(self, upload_folder='uploads', split_folder='splits'):
self.upload_folder = upload_folder
self.split_folder = split_folder
self.ensure_folders_exist()
def ensure_folders_exist(self):
"""确保必要的文件夹存在"""
os.makedirs(self.upload_folder, exist_ok=True)
os.makedirs(self.split_folder, exist_ok=True)
def validate_file(self, file_path):
"""验证文件格式和大小"""
valid_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff'}
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext not in valid_extensions:
raise ValueError(f"不支持的文件格式: {file_ext}")
if os.path.getsize(file_path) > 100 * 1024 * 1024: # 100MB
raise ValueError("文件大小超过限制")
return True
def copy_to_upload(self, file_path):
"""复制文件到上传文件夹"""
filename = os.path.basename(file_path)
upload_path = os.path.join(self.upload_folder, filename)
shutil.copy2(file_path, upload_path)
return upload_path2. 图像处理模块
python
class ImageProcessor:
def __init__(self, quality=95, output_format='PNG'):
self.quality = quality
self.output_format = output_format
def split_into_tiles(self, image_path, cols, rows):
"""将图像分割成图块"""
image = Image.open(image_path)
width, height = image.size
tile_width = width // cols
tile_height = height // rows
tiles = []
for row in range(rows):
for col in range(cols):
tile = self._extract_tile(image, col, row, tile_width, tile_height)
tiles.append(tile)
return tiles
def _extract_tile(self, image, col, row, tile_width, tile_height):
"""提取单个图块"""
left = col * tile_width
upper = row * tile_height
right = (col + 1) * tile_width
lower = (row + 1) * tile_height
return image.crop((left, upper, right, lower))
def save_tile(self, tile, save_path):
"""保存图块"""
if self.output_format.upper() == 'JPEG':
tile.save(save_path, format='JPEG', quality=self.quality)
else:
tile.save(save_path, format=self.output_format)3. PDF处理模块
python
class PDFProcessor:
def __init__(self):
self.pdf_writer = PdfWriter()
def merge_images_to_pdf(self, image_paths, output_path):
"""将图像合并为PDF"""
for image_path in sorted(image_paths):
pdf_page = self._create_pdf_page(image_path)
if pdf_page:
self.pdf_writer.add_page(pdf_page)
with open(output_path, 'wb') as output_file:
self.pdf_writer.write(output_file)
def _create_pdf_page(self, image_path):
"""为单个图像创建PDF页面"""
try:
image = Image.open(image_path)
if image.mode != 'RGB':
image = image.convert('RGB')
pdf_buffer = io.BytesIO()
canvas_obj = canvas.Canvas(pdf_buffer, pagesize=image.size)
canvas_obj.drawImage(image_path, 0, 0, width=image.size[0], height=image.size[1])
canvas_obj.save()
pdf_buffer.seek(0)
reader = PdfReader(pdf_buffer)
return reader.pages[0]
except Exception as e:
print(f"创建PDF页面失败: {e}")
return None4. 状态管理模块
python
class StatusManager:
def __init__(self):
self.status_var = None
self.progress_var = None
self.observers = []
def bind_widgets(self, status_var, progress_var):
"""绑定状态和进度控件"""
self.status_var = status_var
self.progress_var = progress_var
def update_status(self, message, progress=None):
"""更新状态信息"""
if self.status_var:
self.status_var.set(message)
if progress is not None and self.progress_var:
self.progress_var.set(progress)
# 通知观察者
for observer in self.observers:
observer.on_status_update(message, progress)
def add_observer(self, observer):
"""添加状态观察者"""
self.observers.append(observer)🔧 扩展性设计
1. 插件架构
python
class PluginManager:
def __init__(self):
self.plugins = {}
def register_plugin(self, name, plugin):
"""注册插件"""
self.plugins[name] = plugin
def get_plugin(self, name):
"""获取插件"""
return self.plugins.get(name)
def apply_filters(self, image, filter_names):
"""应用图像滤镜插件"""
for filter_name in filter_names:
plugin = self.get_plugin(filter_name)
if plugin:
image = plugin.apply(image)
return image
# 插件接口
class ImageFilterPlugin:
def apply(self, image):
raise NotImplementedError
# 具体插件实现
class GrayscaleFilter(ImageFilterPlugin):
def apply(self, image):
return image.convert('L')
class BlurFilter(ImageFilterPlugin):
def apply(self, image):
from PIL import ImageFilter
return image.filter(ImageFilter.BLUR)2. 配置系统
python
class ConfigSystem:
def __init__(self, config_file='config.json'):
self.config_file = config_file
self.config = self.load_config()
def load_config(self):
"""加载配置文件"""
if os.path.exists(self.config_file):
with open(self.config_file, 'r', encoding='utf-8') as f:
return json.load(f)
return self.get_default_config()
def get_default_config(self):
"""获取默认配置"""
return {
'output': {
'format': 'PNG',
'quality': 95,
'save_mode': 'separate_folders'
},
'ui': {
'theme': 'litera',
'window_size': '1103x898',
'min_size': '900x600'
},
'processing': {
'max_file_size': 100 * 1024 * 1024, # 100MB
'max_concurrent': 4
}
}
def save_config(self):
"""保存配置文件"""
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(self.config, f, indent=2, ensure_ascii=False)
def get(self, key, default=None):
"""获取配置值"""
keys = key.split('.')
value = self.config
for k in keys:
value = value.get(k, default)
if value is None:
return default
return value
def set(self, key, value):
"""设置配置值"""
keys = key.split('.')
config = self.config
for k in keys[:-1]:
config = config.setdefault(k, {})
config[keys[-1]] = value3. 事件系统
python
class EventBus:
def __init__(self):
self.listeners = {}
def subscribe(self, event_type, listener):
"""订阅事件"""
if event_type not in self.listeners:
self.listeners[event_type] = []
self.listeners[event_type].append(listener)
def unsubscribe(self, event_type, listener):
"""取消订阅"""
if event_type in self.listeners:
self.listeners[event_type].remove(listener)
def publish(self, event):
"""发布事件"""
event_type = event.type
if event_type in self.listeners:
for listener in self.listeners[event_type]:
listener(event)
class Event:
def __init__(self, event_type, data=None):
self.type = event_type
self.data = data
self.timestamp = datetime.datetime.now()
# 事件类型定义
class EventType:
FILE_SELECTED = 'file_selected'
PROCESSING_STARTED = 'processing_started'
PROCESSING_COMPLETED = 'processing_completed'
ERROR_OCCURRED = 'error_occurred'🔄 数据流设计
处理流程
mermaid
flowchart TD
A[用户选择文件] --> B[文件验证]
B --> C[参数设置]
C --> D[开始处理]
D --> E[复制文件]
E --> F[图像分割]
F --> G[保存图块]
G --> H[更新进度]
H --> I{还有文件?}
I -->|是| E
I -->|否| J[处理完成]
J --> K{合并PDF?}
K -->|是| L[PDF合并]
K -->|否| M[结束]
L --> N[保存PDF]
N --> M数据传递
python
class ProcessingContext:
def __init__(self):
self.selected_files = []
self.processing_params = {}
self.output_paths = []
self.current_progress = 0
self.status_messages = []
self.errors = []
def add_file(self, file_path):
"""添加待处理文件"""
if file_path not in self.selected_files:
self.selected_files.append(file_path)
def set_params(self, cols, rows, mode, **kwargs):
"""设置处理参数"""
self.processing_params.update({
'cols': cols,
'rows': rows,
'mode': mode,
**kwargs
})
def add_output_path(self, path):
"""添加输出路径"""
self.output_paths.append(path)
def update_progress(self, current, total):
"""更新进度"""
self.current_progress = (current / total) * 100 if total > 0 else 0
def add_status_message(self, message):
"""添加状态消息"""
self.status_messages.append({
'message': message,
'timestamp': datetime.datetime.now()
})
def add_error(self, error):
"""添加错误信息"""
self.errors.append({
'error': str(error),
'timestamp': datetime.datetime.now()
})🛡️ 错误处理架构
异常层次结构
python
class ImageSplitterError(Exception):
"""基础异常类"""
pass
class FileError(ImageSplitterError):
"""文件相关错误"""
pass
class ProcessingError(ImageSplitterError):
"""处理相关错误"""
pass
class ConfigurationError(ImageSplitterError):
"""配置相关错误"""
pass
class ValidationError(ImageSplitterError):
"""验证相关错误"""
pass错误处理策略
python
class ErrorHandler:
def __init__(self, logger=None):
self.logger = logger
self.error_handlers = {
FileError: self.handle_file_error,
ProcessingError: self.handle_processing_error,
ConfigurationError: self.handle_config_error,
ValidationError: self.handle_validation_error
}
def handle_error(self, error):
"""统一错误处理入口"""
error_type = type(error)
handler = self.error_handlers.get(error_type, self.handle_unknown_error)
return handler(error)
def handle_file_error(self, error):
"""处理文件错误"""
if self.logger:
self.logger.error(f"文件错误: {error}")
return {"status": "error", "message": f"文件操作失败: {error}"}
def handle_processing_error(self, error):
"""处理处理错误"""
if self.logger:
self.logger.error(f"处理错误: {error}")
return {"status": "error", "message": f"图像处理失败: {error}"}
def handle_config_error(self, error):
"""处理配置错误"""
if self.logger:
self.logger.error(f"配置错误: {error}")
return {"status": "error", "message": f"配置错误: {error}"}
def handle_validation_error(self, error):
"""处理验证错误"""
if self.logger:
self.logger.error(f"验证错误: {error}")
return {"status": "error", "message": f"参数验证失败: {error}"}
def handle_unknown_error(self, error):
"""处理未知错误"""
if self.logger:
self.logger.error(f"未知错误: {error}")
return {"status": "error", "message": f"发生未知错误: {error}"}📈 性能优化架构
1. 缓存系统
python
class CacheManager:
def __init__(self, max_size=100):
self.cache = {}
self.max_size = max_size
self.access_order = []
def get(self, key):
"""获取缓存值"""
if key in self.cache:
self.access_order.remove(key)
self.access_order.append(key)
return self.cache[key]
return None
def set(self, key, value):
"""设置缓存值"""
if len(self.cache) >= self.max_size:
oldest_key = self.access_order.pop(0)
del self.cache[oldest_key]
self.cache[key] = value
self.access_order.append(key)
def clear(self):
"""清空缓存"""
self.cache.clear()
self.access_order.clear()2. 线程池管理
python
import concurrent.futures
from queue import Queue
class ThreadPoolManager:
def __init__(self, max_workers=4):
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
self.task_queue = Queue()
self.active_tasks = []
def submit_task(self, func, *args, **kwargs):
"""提交任务到线程池"""
future = self.executor.submit(func, *args, **kwargs)
self.active_tasks.append(future)
return future
def wait_for_completion(self):
"""等待所有任务完成"""
for future in concurrent.futures.as_completed(self.active_tasks):
try:
result = future.result()
except Exception as e:
print(f"任务执行失败: {e}")
self.active_tasks.clear()
def shutdown(self):
"""关闭线程池"""
self.executor.shutdown(wait=True)这个架构设计确保了系统的可扩展性、可维护性和高性能,为未来的功能扩展提供了坚实的基础。