word_parser.py 5.79 KB
Newer Older
kihoon.lee's avatar
upload  
kihoon.lee committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from docx import Document
import docx
import pandas as pd
import os
from docx.oxml.ns import qn
from PIL import Image, UnidentifiedImageError
import io
from typing import Optional
from modules.ocr import ReaderForEasyOCR
import asyncio
import logging
logger = logging.getLogger()

class WordParser:
    def __init__(self, use_ocr: bool, ocr_reader: Optional[ReaderForEasyOCR] = None):
        self.use_ocr = use_ocr
        self.ocr_reader = ocr_reader
    
    async def parse(self, contents: bytes, filename: str) -> list:
        doc = Document(io.BytesIO(contents))
        document_name = os.path.basename(filename)
        parsed_content = []
        page_number = 1

        text = ''
        for element in doc.element.body:
            if element.tag.endswith('p'):
                para = docx.text.paragraph.Paragraph(element, doc)
                text += await self.extract_paragraph_content(para) + '\n'
            elif element.tag.endswith('tbl'):
                table = docx.table.Table(element, doc)
                df = await self.unmerge_table(table)
                text += await self.dataframe_to_csv(df) + '\n'
        if text:
            entry = {
                    "document_id": f"{document_name}@{page_number:04}",
                    "text": text
                }
            parsed_content.append(entry)
        return parsed_content

    def extract_tables(self, file_path: str) -> list:
        doc = Document(file_path)
        tables = []
        for table in doc.tables:
            df = self.unmerge_table(table)
            tables.append(df)
        return tables

    async def extract_paragraph_content(self, para):
        content = ''
        for run in para.runs:
            if 'graphic' in run._element.xml:
                image_stream = run.part.related_parts[run._element.xpath(".//a:blip/@r:embed")[0]].blob
                try:
                    image = Image.open(io.BytesIO(image_stream))

                    # WMF 형식 건너뛰기
                    if image.format == "WMF":
                        continue

                    ocr_text = ""
                    width, height = image.size

                    # OCR 적용 조건: 150x150 픽셀 이상
                    if self.use_ocr and self.ocr_reader and (width >= 150 and height >= 150): 
                    
                        # 이미지 흑백 변환
                        image = image.convert('L')

                        ocr_results = await self.ocr_reader(image)
                        ocr_text = "\n".join([text for bbox, text in ocr_results])
                        ocr_text = f"(ocr)\n{ocr_text}\n(/ocr)"

                    content += f"(image)\n{ocr_text}\n(/image)"

                except UnidentifiedImageError:
                    logger.error(f"Unable to identify image format. Skipping this image.")
                    continue
            else:
                content += run.text
        return content

    async def extract_cell_content(self, cell):
        content = ''
        for element in cell._element:
            if element.tag.endswith('p'):
                para = docx.text.paragraph.Paragraph(element, cell)
                content += await self.extract_paragraph_content(para) + '\n'
            elif element.tag.endswith('tbl'):
                nested_table = docx.table.Table(element, cell._parent)
                nested_df = await self.unmerge_table(nested_table)
                content += await self.dataframe_to_csv(nested_df) + '\n'
            elif 'graphic' in element.xml:
                image_stream = element.part.related_parts[element.xpath(".//a:blip/@r:embed")[0]].blob
                try:
                    image = Image.open(io.BytesIO(image_stream))

                    # WMF 형식 건너뛰기
                    if image.format == "WMF":
                        continue

                    ocr_text = ""
                    width, height = image.size

                    # OCR 적용 조건: 150x150 픽셀 이상
                    if self.use_ocr and self.ocr_reader and (width >= 150 and height >= 150): 
                    
                        # 이미지 흑백 변환
                        image = image.convert('L')

                        ocr_results = await self.ocr_reader(image)
                        ocr_text = "\n".join([text for bbox, text in ocr_results])
                        ocr_text = f"(ocr)\n{ocr_text}\n(/ocr)"

                    content += f"(image)\n{ocr_text}\n(/image)\n"

                except UnidentifiedImageError:
                    logger.error(f"Unable to identify image format. Skipping this image.")
                    continue
        return content.strip()

    async def unmerge_table(self, table):
        rows = len(table.rows)
        cols = max(len(row.cells) for row in table.rows)
        data = [['' for _ in range(cols)] for _ in range(rows)]

        for row_idx, row in enumerate(table.rows):
            for col_idx, cell in enumerate(row.cells):
                cell_content = await self.extract_cell_content(cell)
                cell_span = cell._element
                grid_span = int(cell_span.get(qn('w:gridSpan'), 1))
                v_merge = cell_span.get(qn('w:vMerge'))
                if v_merge == 'restart':
                    for i in range(row_idx, rows):
                        if table.cell(i, col_idx).text.strip() == '':
                            data[i][col_idx] = cell_content
                        else:
                            break
                else:
                    for j in range(col_idx, col_idx + grid_span):
                        data[row_idx][j] = cell_content

        df = pd.DataFrame(data)
        df.columns = df.iloc[0]
        df = df.drop(0).reset_index(drop=True)
        return df

    async def dataframe_to_csv(self, df):
        csv_content = df.to_csv(index=False)
        return csv_content