Skip to content

架构设计

本文档详细介绍图像分割工具的软件架构设计,包括设计模式、模块划分和扩展性考虑。

🏛️ 总体架构

架构层次

┌─────────────────────────────────────┐
│           用户界面层 (UI Layer)        │
│  ┌─────────────┐ ┌─────────────────┐ │
│  │  主界面组件  │ │   设置界面组件   │ │
│  └─────────────┘ └─────────────────┘ │
├─────────────────────────────────────┤
│         业务逻辑层 (Business Layer)    │
│  ┌─────────────┐ ┌─────────────────┐ │
│  │  文件管理器  │ │   图像处理器     │ │
│  └─────────────┘ └─────────────────┘ │
│  ┌─────────────┐ ┌─────────────────┐ │
│  │  PDF合并器  │ │   状态管理器     │ │
│  └─────────────┘ └─────────────────┘ │
├─────────────────────────────────────┤
│         数据访问层 (Data Layer)       │
│  ┌─────────────┐ ┌─────────────────┐ │
│  │  文件系统    │ │   配置管理       │ │
│  └─────────────┘ └─────────────────┘ │
├─────────────────────────────────────┤
│         基础设施层 (Infrastructure)   │
│  ┌─────────────┐ ┌─────────────────┐ │
│  │  图像库     │ │   PDF库          │ │
│  └─────────────┘ └─────────────────┘ │
│  ┌─────────────┐ ┌─────────────────┐ │
│  │  GUI框架    │ │   系统接口       │ │
│  └─────────────┘ └─────────────────┘ │
└─────────────────────────────────────┘

核心组件关系

mermaid
graph TB
    A[ImageSplitterApp] --> B[FileSelector]
    A --> C[ParameterController]
    A --> D[ImageProcessor]
    A --> E[PDFMerger]
    A --> F[StatusManager]
    
    B --> G[FileValidator]
    C --> H[PresetManager]
    D --> I[TileGenerator]
    E --> J[PDFBuilder]
    
    G --> K[FileSystem]
    H --> K
    I --> K
    J --> K
    
    L[GUIFramework] --> A
    M[ImageLibrary] --> D
    N[PDFLibrary] --> E

🎯 设计模式应用

1. 单例模式 (Singleton)

python
class ConfigManager:
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self):
        if not hasattr(self, 'initialized'):
            self.config = self.load_config()
            self.initialized = True

应用场景:配置管理、状态管理 优势:确保全局唯一实例,避免配置冲突

2. 观察者模式 (Observer)

python
class StatusManager:
    def __init__(self):
        self.observers = []
    
    def add_observer(self, observer):
        self.observers.append(observer)
    
    def notify_observers(self, status):
        for observer in self.observers:
            observer.update_status(status)

class ProgressObserver:
    def update_status(self, status):
        # 更新进度条和状态标签
        self.progress_bar['value'] = status.progress
        self.status_label.config(text=status.message)

应用场景:进度更新、状态变化通知 优势:松耦合的状态管理机制

3. 策略模式 (Strategy)

python
class SaveStrategy:
    def save(self, image_tiles, output_path):
        raise NotImplementedError

class SeparateFolderStrategy(SaveStrategy):
    def save(self, image_tiles, output_path):
        # 每张图片单独文件夹保存逻辑
        pass

class DateFolderStrategy(SaveStrategy):
    def save(self, image_tiles, output_path):
        # 按日期统一文件夹保存逻辑
        pass

class ImageSaver:
    def __init__(self, strategy: SaveStrategy):
        self.strategy = strategy
    
    def set_strategy(self, strategy: SaveStrategy):
        self.strategy = strategy
    
    def save_images(self, image_tiles, output_path):
        self.strategy.save(image_tiles, output_path)

应用场景:不同的保存模式 优势:算法可替换,易于扩展新的保存方式

4. 工厂模式 (Factory)

python
class ProcessorFactory:
    @staticmethod
    def create_processor(processor_type, **kwargs):
        if processor_type == 'image':
            return ImageProcessor(**kwargs)
        elif processor_type == 'pdf':
            return PDFProcessor(**kwargs)
        else:
            raise ValueError(f"Unknown processor type: {processor_type}")

# 使用示例
image_processor = ProcessorFactory.create_processor(
    'image', 
    quality=95,
    format='PNG'
)

应用场景:创建不同类型的处理器 优势:封装对象创建逻辑,提高代码复用性

📦 模块设计

1. 文件管理模块

python
class FileManager:
    def __init__(self, upload_folder='uploads', split_folder='splits'):
        self.upload_folder = upload_folder
        self.split_folder = split_folder
        self.ensure_folders_exist()
    
    def ensure_folders_exist(self):
        """确保必要的文件夹存在"""
        os.makedirs(self.upload_folder, exist_ok=True)
        os.makedirs(self.split_folder, exist_ok=True)
    
    def validate_file(self, file_path):
        """验证文件格式和大小"""
        valid_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff'}
        file_ext = os.path.splitext(file_path)[1].lower()
        
        if file_ext not in valid_extensions:
            raise ValueError(f"不支持的文件格式: {file_ext}")
        
        if os.path.getsize(file_path) > 100 * 1024 * 1024:  # 100MB
            raise ValueError("文件大小超过限制")
        
        return True
    
    def copy_to_upload(self, file_path):
        """复制文件到上传文件夹"""
        filename = os.path.basename(file_path)
        upload_path = os.path.join(self.upload_folder, filename)
        shutil.copy2(file_path, upload_path)
        return upload_path

2. 图像处理模块

python
class ImageProcessor:
    def __init__(self, quality=95, output_format='PNG'):
        self.quality = quality
        self.output_format = output_format
    
    def split_into_tiles(self, image_path, cols, rows):
        """将图像分割成图块"""
        image = Image.open(image_path)
        width, height = image.size
        
        tile_width = width // cols
        tile_height = height // rows
        
        tiles = []
        for row in range(rows):
            for col in range(cols):
                tile = self._extract_tile(image, col, row, tile_width, tile_height)
                tiles.append(tile)
        
        return tiles
    
    def _extract_tile(self, image, col, row, tile_width, tile_height):
        """提取单个图块"""
        left = col * tile_width
        upper = row * tile_height
        right = (col + 1) * tile_width
        lower = (row + 1) * tile_height
        
        return image.crop((left, upper, right, lower))
    
    def save_tile(self, tile, save_path):
        """保存图块"""
        if self.output_format.upper() == 'JPEG':
            tile.save(save_path, format='JPEG', quality=self.quality)
        else:
            tile.save(save_path, format=self.output_format)

3. PDF处理模块

python
class PDFProcessor:
    def __init__(self):
        self.pdf_writer = PdfWriter()
    
    def merge_images_to_pdf(self, image_paths, output_path):
        """将图像合并为PDF"""
        for image_path in sorted(image_paths):
            pdf_page = self._create_pdf_page(image_path)
            if pdf_page:
                self.pdf_writer.add_page(pdf_page)
        
        with open(output_path, 'wb') as output_file:
            self.pdf_writer.write(output_file)
    
    def _create_pdf_page(self, image_path):
        """为单个图像创建PDF页面"""
        try:
            image = Image.open(image_path)
            if image.mode != 'RGB':
                image = image.convert('RGB')
            
            pdf_buffer = io.BytesIO()
            canvas_obj = canvas.Canvas(pdf_buffer, pagesize=image.size)
            canvas_obj.drawImage(image_path, 0, 0, width=image.size[0], height=image.size[1])
            canvas_obj.save()
            
            pdf_buffer.seek(0)
            reader = PdfReader(pdf_buffer)
            return reader.pages[0]
        except Exception as e:
            print(f"创建PDF页面失败: {e}")
            return None

4. 状态管理模块

python
class StatusManager:
    def __init__(self):
        self.status_var = None
        self.progress_var = None
        self.observers = []
    
    def bind_widgets(self, status_var, progress_var):
        """绑定状态和进度控件"""
        self.status_var = status_var
        self.progress_var = progress_var
    
    def update_status(self, message, progress=None):
        """更新状态信息"""
        if self.status_var:
            self.status_var.set(message)
        
        if progress is not None and self.progress_var:
            self.progress_var.set(progress)
        
        # 通知观察者
        for observer in self.observers:
            observer.on_status_update(message, progress)
    
    def add_observer(self, observer):
        """添加状态观察者"""
        self.observers.append(observer)

🔧 扩展性设计

1. 插件架构

python
class PluginManager:
    def __init__(self):
        self.plugins = {}
    
    def register_plugin(self, name, plugin):
        """注册插件"""
        self.plugins[name] = plugin
    
    def get_plugin(self, name):
        """获取插件"""
        return self.plugins.get(name)
    
    def apply_filters(self, image, filter_names):
        """应用图像滤镜插件"""
        for filter_name in filter_names:
            plugin = self.get_plugin(filter_name)
            if plugin:
                image = plugin.apply(image)
        return image

# 插件接口
class ImageFilterPlugin:
    def apply(self, image):
        raise NotImplementedError

# 具体插件实现
class GrayscaleFilter(ImageFilterPlugin):
    def apply(self, image):
        return image.convert('L')

class BlurFilter(ImageFilterPlugin):
    def apply(self, image):
        from PIL import ImageFilter
        return image.filter(ImageFilter.BLUR)

2. 配置系统

python
class ConfigSystem:
    def __init__(self, config_file='config.json'):
        self.config_file = config_file
        self.config = self.load_config()
    
    def load_config(self):
        """加载配置文件"""
        if os.path.exists(self.config_file):
            with open(self.config_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        return self.get_default_config()
    
    def get_default_config(self):
        """获取默认配置"""
        return {
            'output': {
                'format': 'PNG',
                'quality': 95,
                'save_mode': 'separate_folders'
            },
            'ui': {
                'theme': 'litera',
                'window_size': '1103x898',
                'min_size': '900x600'
            },
            'processing': {
                'max_file_size': 100 * 1024 * 1024,  # 100MB
                'max_concurrent': 4
            }
        }
    
    def save_config(self):
        """保存配置文件"""
        with open(self.config_file, 'w', encoding='utf-8') as f:
            json.dump(self.config, f, indent=2, ensure_ascii=False)
    
    def get(self, key, default=None):
        """获取配置值"""
        keys = key.split('.')
        value = self.config
        for k in keys:
            value = value.get(k, default)
            if value is None:
                return default
        return value
    
    def set(self, key, value):
        """设置配置值"""
        keys = key.split('.')
        config = self.config
        for k in keys[:-1]:
            config = config.setdefault(k, {})
        config[keys[-1]] = value

3. 事件系统

python
class EventBus:
    def __init__(self):
        self.listeners = {}
    
    def subscribe(self, event_type, listener):
        """订阅事件"""
        if event_type not in self.listeners:
            self.listeners[event_type] = []
        self.listeners[event_type].append(listener)
    
    def unsubscribe(self, event_type, listener):
        """取消订阅"""
        if event_type in self.listeners:
            self.listeners[event_type].remove(listener)
    
    def publish(self, event):
        """发布事件"""
        event_type = event.type
        if event_type in self.listeners:
            for listener in self.listeners[event_type]:
                listener(event)

class Event:
    def __init__(self, event_type, data=None):
        self.type = event_type
        self.data = data
        self.timestamp = datetime.datetime.now()

# 事件类型定义
class EventType:
    FILE_SELECTED = 'file_selected'
    PROCESSING_STARTED = 'processing_started'
    PROCESSING_COMPLETED = 'processing_completed'
    ERROR_OCCURRED = 'error_occurred'

🔄 数据流设计

处理流程

mermaid
flowchart TD
    A[用户选择文件] --> B[文件验证]
    B --> C[参数设置]
    C --> D[开始处理]
    D --> E[复制文件]
    E --> F[图像分割]
    F --> G[保存图块]
    G --> H[更新进度]
    H --> I{还有文件?}
    I -->|是| E
    I -->|否| J[处理完成]
    J --> K{合并PDF?}
    K -->|是| L[PDF合并]
    K -->|否| M[结束]
    L --> N[保存PDF]
    N --> M

数据传递

python
class ProcessingContext:
    def __init__(self):
        self.selected_files = []
        self.processing_params = {}
        self.output_paths = []
        self.current_progress = 0
        self.status_messages = []
        self.errors = []
    
    def add_file(self, file_path):
        """添加待处理文件"""
        if file_path not in self.selected_files:
            self.selected_files.append(file_path)
    
    def set_params(self, cols, rows, mode, **kwargs):
        """设置处理参数"""
        self.processing_params.update({
            'cols': cols,
            'rows': rows,
            'mode': mode,
            **kwargs
        })
    
    def add_output_path(self, path):
        """添加输出路径"""
        self.output_paths.append(path)
    
    def update_progress(self, current, total):
        """更新进度"""
        self.current_progress = (current / total) * 100 if total > 0 else 0
    
    def add_status_message(self, message):
        """添加状态消息"""
        self.status_messages.append({
            'message': message,
            'timestamp': datetime.datetime.now()
        })
    
    def add_error(self, error):
        """添加错误信息"""
        self.errors.append({
            'error': str(error),
            'timestamp': datetime.datetime.now()
        })

🛡️ 错误处理架构

异常层次结构

python
class ImageSplitterError(Exception):
    """基础异常类"""
    pass

class FileError(ImageSplitterError):
    """文件相关错误"""
    pass

class ProcessingError(ImageSplitterError):
    """处理相关错误"""
    pass

class ConfigurationError(ImageSplitterError):
    """配置相关错误"""
    pass

class ValidationError(ImageSplitterError):
    """验证相关错误"""
    pass

错误处理策略

python
class ErrorHandler:
    def __init__(self, logger=None):
        self.logger = logger
        self.error_handlers = {
            FileError: self.handle_file_error,
            ProcessingError: self.handle_processing_error,
            ConfigurationError: self.handle_config_error,
            ValidationError: self.handle_validation_error
        }
    
    def handle_error(self, error):
        """统一错误处理入口"""
        error_type = type(error)
        handler = self.error_handlers.get(error_type, self.handle_unknown_error)
        return handler(error)
    
    def handle_file_error(self, error):
        """处理文件错误"""
        if self.logger:
            self.logger.error(f"文件错误: {error}")
        return {"status": "error", "message": f"文件操作失败: {error}"}
    
    def handle_processing_error(self, error):
        """处理处理错误"""
        if self.logger:
            self.logger.error(f"处理错误: {error}")
        return {"status": "error", "message": f"图像处理失败: {error}"}
    
    def handle_config_error(self, error):
        """处理配置错误"""
        if self.logger:
            self.logger.error(f"配置错误: {error}")
        return {"status": "error", "message": f"配置错误: {error}"}
    
    def handle_validation_error(self, error):
        """处理验证错误"""
        if self.logger:
            self.logger.error(f"验证错误: {error}")
        return {"status": "error", "message": f"参数验证失败: {error}"}
    
    def handle_unknown_error(self, error):
        """处理未知错误"""
        if self.logger:
            self.logger.error(f"未知错误: {error}")
        return {"status": "error", "message": f"发生未知错误: {error}"}

📈 性能优化架构

1. 缓存系统

python
class CacheManager:
    def __init__(self, max_size=100):
        self.cache = {}
        self.max_size = max_size
        self.access_order = []
    
    def get(self, key):
        """获取缓存值"""
        if key in self.cache:
            self.access_order.remove(key)
            self.access_order.append(key)
            return self.cache[key]
        return None
    
    def set(self, key, value):
        """设置缓存值"""
        if len(self.cache) >= self.max_size:
            oldest_key = self.access_order.pop(0)
            del self.cache[oldest_key]
        
        self.cache[key] = value
        self.access_order.append(key)
    
    def clear(self):
        """清空缓存"""
        self.cache.clear()
        self.access_order.clear()

2. 线程池管理

python
import concurrent.futures
from queue import Queue

class ThreadPoolManager:
    def __init__(self, max_workers=4):
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
        self.task_queue = Queue()
        self.active_tasks = []
    
    def submit_task(self, func, *args, **kwargs):
        """提交任务到线程池"""
        future = self.executor.submit(func, *args, **kwargs)
        self.active_tasks.append(future)
        return future
    
    def wait_for_completion(self):
        """等待所有任务完成"""
        for future in concurrent.futures.as_completed(self.active_tasks):
            try:
                result = future.result()
            except Exception as e:
                print(f"任务执行失败: {e}")
        self.active_tasks.clear()
    
    def shutdown(self):
        """关闭线程池"""
        self.executor.shutdown(wait=True)

这个架构设计确保了系统的可扩展性、可维护性和高性能,为未来的功能扩展提供了坚实的基础。

基于 MIT 许可证发布