excel_parser.py 5.49 KB
Newer Older
kihoon.lee's avatar
upload  
kihoon.lee committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import pandas as pd
import logging
from openpyxl import load_workbook
from openpyxl.drawing.image import Image as OpenPyXLImage
from typing import Optional
from PIL import Image, UnidentifiedImageError
import io
import xlrd
from modules.ocr import ReaderForEasyOCR
import asyncio
import os
logger = logging.getLogger()


class ExcelParser:
    def __init__(self, use_ocr: bool, ocr_reader: Optional[ReaderForEasyOCR] = None):
        self.use_ocr = use_ocr
        self.ocr_reader = ocr_reader

    async def parse(self, contents: bytes, filename: str) -> list:
        document_name = filename
        parsed_content = []
        file_extension = os.path.splitext(filename)[1].lower()  # 확장자만 소문자로 변환

        if file_extension == '.csv':
            df = pd.read_csv(io.BytesIO(contents))
            df_csv = df.to_csv(index=False, header=False)
            sheet_name = 'Sheet1'
            entry = {
                "document_id": f"{document_name}@{sheet_name}",
                "text": df_csv
            }
            parsed_content.append(entry)

        elif file_extension == '.xls':
            workbook = xlrd.open_workbook(io.BytesIO(contents), formatting_info=True)

            for sheet in workbook.sheets():
                data = [sheet.row_values(row) for row in range(sheet.nrows)]
                df = pd.DataFrame(data)
                df = self.fill_merged_cells_xls(df, sheet)
                df_csv = df.to_csv(index=False, header=False)
                sheet_name = sheet.name
                entry = {
                    "document_id": f"{document_name}@{sheet_name}",
                    "text": df_csv
                }
                parsed_content.append(entry)

        elif file_extension == '.xlsx':
            workbook = load_workbook(filename=io.BytesIO(contents), data_only=True)

            for sheet_name in workbook.sheetnames:
                sheet = workbook[sheet_name]
                data = sheet.values
                df = pd.DataFrame(data)

                # 이미지 및 OCR 처리
                for img in sheet._images:
                    # 이미지의 위치 계산
                    img_cell = img.anchor._from.row - 1, img.anchor._from.col - 1  # 이미지의 위치 (행, 열)

                    # DataFrame 크기 조정 (필요한 경우)
                    max_row, max_col = img_cell
                    if max_row >= len(df):
                        df = df.reindex(range(max_row + 1), fill_value='')
                    if max_col >= len(df.columns):
                        df = df.reindex(columns=range(max_col + 1), fill_value='')

                    img_data = img._data()  # 이미지 데이터
                    try:

                        img_obj = Image.open(io.BytesIO(img_data))

                        # WMF 형식 처리 방지
                        if img_obj.format == "WMF":
                            logger.warning(f"Skipping WMF image in sheet {sheet_name} as it cannot be processed.")
                            continue

                        ocr_text = ""
                        width, height = img_obj.size
                        if self.use_ocr and self.ocr_reader and (width >= 150 and height >= 150): 
                            # 이미지를 흑백으로 변환
                            img_obj = img_obj.convert('L')

                            ocr_results = await self.ocr_reader(img_obj)
                            ocr_text = "\n".join([text for bbox, text in ocr_results])
                            ocr_text = f"(ocr)\n{ocr_text}\n(/ocr)"

                        # OCR 텍스트를 셀에 삽입
                        df.iat[img_cell[0], img_cell[1]] = f"(image)\n{ocr_text}\n(/image)"
                        # logger.info(f"Inserted OCR text at cell ({img_cell[0]}, {img_cell[1]}): {df.iat[img_cell[0], img_cell[1]]}")
                    except UnidentifiedImageError:
                        logger.error(f"Unable to identify image format in sheet {sheet_name}. Skipping this image.")
                        continue

                df = self.fill_merged_cells_xlsx(df, sheet)
                df_csv = df.to_csv(index=False, header=False)
                entry = {
                    "document_id": f"{document_name}@{sheet_name}",
                    "text": df_csv
                }
                parsed_content.append(entry)

        else:
            raise ValueError("Unsupported file format")

        return parsed_content


    def fill_merged_cells_xlsx(self, df, sheet):
        for merged_cell in sheet.merged_cells.ranges:
            min_col, min_row, max_col, max_row = merged_cell.bounds
            top_left_cell_value = sheet.cell(row=min_row, column=min_col).value
            for row in range(min_row, max_row + 1):
                for col in range(min_col, max_col + 1):
                    if pd.isna(df.iat[row - 1, col - 1]):  # 기존 값이 비어 있는 경우에만 채우기
                        df.iat[row - 1, col - 1] = top_left_cell_value
        return df


    def fill_merged_cells_xls(self, df, sheet):
        for merged_cell in sheet.merged_cells:
            min_row, max_row, min_col, max_col = merged_cell
            top_left_cell_value = sheet.cell_value(min_row, min_col)
            for row in range(min_row, max_row):
                for col in range(min_col, max_col):
                    if pd.isna(df.iat[row, col]) or df.iat[row, col] == '' or df.iat[row, col] is None:  # 기존 값이 비어 있는 경우에만 채우기
                        df.iat[row, col] = top_left_cell_value
        return df