from pptx import Presentation from pptx.enum.shapes import MSO_SHAPE_TYPE import os from typing import Optional from modules.ocr import ReaderForEasyOCR from PIL import Image, UnidentifiedImageError import io import asyncio import logging logger = logging.getLogger() class PPTParser: def __init__(self, use_ocr: bool, ocr_reader: Optional[ReaderForEasyOCR] = None): self.use_ocr = use_ocr self.ocr_reader = ocr_reader async def parse(self, contents: bytes, filename: str) -> list: # contents를 BytesIO 객체로 변환하여 사용 with io.BytesIO(contents) as temp_file: prs = Presentation(temp_file) document_name = os.path.basename(filename) parsed_content = [] for slide_idx, slide in enumerate(prs.slides): slide_identifier = f"{document_name}@{slide_idx + 1:04}" slide_text = await self.process_slide(slide) # 비동기 호출 notes_text = self.extract_notes(slide) # 슬라이드의 모든 텍스트와 노트를 하나의 항목으로 결합 full_text = slide_text if notes_text.strip(): full_text += "\n\n[Notes]\n" + notes_text if full_text.strip(): slide_entry = { "document_id": slide_identifier, "text": full_text } parsed_content.append(slide_entry) return parsed_content async def extract_text_from_shape(self, shape): text_runs = [] if shape.shape_type == MSO_SHAPE_TYPE.GROUP: text_runs.append(await self.extract_text_from_group(shape)) # 비동기 호출 elif shape.shape_type == MSO_SHAPE_TYPE.PICTURE: try: image_stream = shape.image.blob image = Image.open(io.BytesIO(image_stream)) if image.format == "WMF": logger.warning(f"Skipping WMF image in shape {shape} as it cannot be processed.") return "" # WMF 이미지는 건너뛰기 ocr_text = "" width, height = image.size # OCR 적용 조건: 150x150 픽셀 이상 if self.use_ocr and self.ocr_reader and (width >= 150 and height >= 150): # 이미지 흑백 변환 image = image.convert('L') ocr_results = await self.ocr_reader(image) # 비동기 호출 ocr_text = "\n".join([text for bbox, text in ocr_results]) ocr_text = f"(ocr)\n{ocr_text}\n(/ocr)" text_runs.append(f"(image)\n{ocr_text}\n(/image)") except UnidentifiedImageError: logger.error(f"Unable to identify image format for shape {shape}. Skipping this image.") return "" elif shape.has_text_frame: for paragraph in shape.text_frame.paragraphs: paragraph_text = "" for run in paragraph.runs: paragraph_text += run.text text_runs.append(paragraph_text) return '\n'.join(text_runs) async def extract_text_from_group(self, group): text_runs = [] shapes_sorted = sorted(group.shapes, key=lambda shape: (shape.top, shape.left)) for shape in shapes_sorted: text_runs.append(await self.extract_text_from_shape(shape)) # 비동기 호출 return '\n'.join(text_runs) async def extract_text_from_slide(self, slide): grouped_texts = [] ungrouped_texts = [] shapes_sorted = sorted(slide.shapes, key=lambda shape: (shape.top, shape.left)) for shape in shapes_sorted: if shape.shape_type == MSO_SHAPE_TYPE.GROUP: grouped_texts.append(await self.extract_text_from_group(shape)) # 비동기 호출 else: ungrouped_texts.append(await self.extract_text_from_shape(shape)) # 비동기 호출 return "\n".join(grouped_texts + ungrouped_texts) def extract_and_split_table(self, slide): tables = [] for shape in slide.shapes: if shape.shape_type == MSO_SHAPE_TYPE.TABLE: table = shape.table table_data = [] for row in range(len(table.rows)): row_data = [] for col in range(len(table.columns)): cell = table.cell(row, col) if cell.is_merge_origin: text = cell.text span_height = cell.span_height span_width = cell.span_width cell.split() # 병합 해제 for i in range(span_height): for j in range(span_width): table.cell(row + i, col + j).text = text row_data.append(cell.text) table_data.append(row_data) tables.append(table_data) return tables def table_to_csv(self, table): csv_content = "" for row in table: csv_content += ", ".join(row) + "\n" return csv_content async def process_slide(self, slide): slide_text_sections = await self.extract_text_from_slide(slide) # 비동기 호출 tables = self.extract_and_split_table(slide) full_text = slide_text_sections + "\n" for table in tables: full_text += self.table_to_csv(table) + "\n" return full_text def extract_notes(self, slide): if slide.has_notes_slide: notes_slide = slide.notes_slide notes_text = notes_slide.notes_text_frame.text return notes_text return ""