ppt_parser.py 5.72 KB
Newer Older
kihoon.lee's avatar
upload  
kihoon.lee committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from pptx import Presentation
from pptx.enum.shapes import MSO_SHAPE_TYPE
import os
from typing import Optional
from modules.ocr import ReaderForEasyOCR
from PIL import Image, UnidentifiedImageError
import io
import asyncio
import logging
logger = logging.getLogger()

class PPTParser:
    def __init__(self, use_ocr: bool, ocr_reader: Optional[ReaderForEasyOCR] = None):
        self.use_ocr = use_ocr
        self.ocr_reader = ocr_reader

    async def parse(self, contents: bytes, filename: str) -> list:
        # contents를 BytesIO 객체로 변환하여 사용
        with io.BytesIO(contents) as temp_file:
            prs = Presentation(temp_file)

        document_name = os.path.basename(filename)
        parsed_content = []

        for slide_idx, slide in enumerate(prs.slides):
            slide_identifier = f"{document_name}@{slide_idx + 1:04}"
            slide_text = await self.process_slide(slide)  # 비동기 호출
            notes_text = self.extract_notes(slide)

            # 슬라이드의 모든 텍스트와 노트를 하나의 항목으로 결합
            full_text = slide_text
            if notes_text.strip():
                full_text += "\n\n[Notes]\n" + notes_text

            if full_text.strip():
                slide_entry = {
                    "document_id": slide_identifier,
                    "text": full_text
                }
                parsed_content.append(slide_entry)

        return parsed_content

    async def extract_text_from_shape(self, shape):
        text_runs = []
        if shape.shape_type == MSO_SHAPE_TYPE.GROUP:
            text_runs.append(await self.extract_text_from_group(shape))  # 비동기 호출
        elif shape.shape_type == MSO_SHAPE_TYPE.PICTURE:
            try:
                image_stream = shape.image.blob
                image = Image.open(io.BytesIO(image_stream))

                if image.format == "WMF":
                    logger.warning(f"Skipping WMF image in shape {shape} as it cannot be processed.")
                    return ""  # WMF 이미지는 건너뛰기

                ocr_text = ""
                width, height = image.size

                # OCR 적용 조건: 150x150 픽셀 이상
                if self.use_ocr and self.ocr_reader and (width >= 150 and height >= 150): 
                    # 이미지 흑백 변환
                    image = image.convert('L')
                    ocr_results = await self.ocr_reader(image)  # 비동기 호출
                    ocr_text = "\n".join([text for bbox, text in ocr_results])
                    ocr_text = f"(ocr)\n{ocr_text}\n(/ocr)"

                text_runs.append(f"(image)\n{ocr_text}\n(/image)")

            except UnidentifiedImageError:
                logger.error(f"Unable to identify image format for shape {shape}. Skipping this image.")
                return ""

        elif shape.has_text_frame:
            for paragraph in shape.text_frame.paragraphs:
                paragraph_text = ""
                for run in paragraph.runs:
                    paragraph_text += run.text
                text_runs.append(paragraph_text)
        return '\n'.join(text_runs)

    async def extract_text_from_group(self, group):
        text_runs = []
        shapes_sorted = sorted(group.shapes, key=lambda shape: (shape.top, shape.left))
        for shape in shapes_sorted:
            text_runs.append(await self.extract_text_from_shape(shape))  # 비동기 호출
        return '\n'.join(text_runs)

    async def extract_text_from_slide(self, slide):
        grouped_texts = []
        ungrouped_texts = []
        shapes_sorted = sorted(slide.shapes, key=lambda shape: (shape.top, shape.left))
        
        for shape in shapes_sorted:
            if shape.shape_type == MSO_SHAPE_TYPE.GROUP:
                grouped_texts.append(await self.extract_text_from_group(shape))  # 비동기 호출
            else:
                ungrouped_texts.append(await self.extract_text_from_shape(shape))  # 비동기 호출
        
        return "\n".join(grouped_texts + ungrouped_texts)

    def extract_and_split_table(self, slide):
        tables = []
        for shape in slide.shapes:
            if shape.shape_type == MSO_SHAPE_TYPE.TABLE:
                table = shape.table
                table_data = []
                for row in range(len(table.rows)):
                    row_data = []
                    for col in range(len(table.columns)):
                        cell = table.cell(row, col)
                        if cell.is_merge_origin:
                            text = cell.text
                            span_height = cell.span_height
                            span_width = cell.span_width
                            cell.split()  # 병합 해제
                            for i in range(span_height):
                                for j in range(span_width):
                                    table.cell(row + i, col + j).text = text
                        row_data.append(cell.text)
                    table_data.append(row_data)
                tables.append(table_data)
        return tables

    def table_to_csv(self, table):
        csv_content = ""
        for row in table:
            csv_content += ", ".join(row) + "\n"
        return csv_content

    async def process_slide(self, slide):
        slide_text_sections = await self.extract_text_from_slide(slide)  # 비동기 호출
        tables = self.extract_and_split_table(slide)
        full_text = slide_text_sections + "\n"
        for table in tables:
            full_text += self.table_to_csv(table) + "\n"
        return full_text

    def extract_notes(self, slide):
        if slide.has_notes_slide:
            notes_slide = slide.notes_slide
            notes_text = notes_slide.notes_text_frame.text
            return notes_text
        return ""