import pandas as pd import logging from openpyxl import load_workbook from openpyxl.drawing.image import Image as OpenPyXLImage from typing import Optional from PIL import Image, UnidentifiedImageError import io import xlrd from modules.ocr import ReaderForEasyOCR import asyncio import os logger = logging.getLogger() class ExcelParser: def __init__(self, use_ocr: bool, ocr_reader: Optional[ReaderForEasyOCR] = None): self.use_ocr = use_ocr self.ocr_reader = ocr_reader async def parse(self, contents: bytes, filename: str) -> list: document_name = filename parsed_content = [] file_extension = os.path.splitext(filename)[1].lower() # 확장자만 소문자로 변환 if file_extension == '.csv': df = pd.read_csv(io.BytesIO(contents)) df_csv = df.to_csv(index=False, header=False) sheet_name = 'Sheet1' entry = { "document_id": f"{document_name}@{sheet_name}", "text": df_csv } parsed_content.append(entry) elif file_extension == '.xls': workbook = xlrd.open_workbook(io.BytesIO(contents), formatting_info=True) for sheet in workbook.sheets(): data = [sheet.row_values(row) for row in range(sheet.nrows)] df = pd.DataFrame(data) df = self.fill_merged_cells_xls(df, sheet) df_csv = df.to_csv(index=False, header=False) sheet_name = sheet.name entry = { "document_id": f"{document_name}@{sheet_name}", "text": df_csv } parsed_content.append(entry) elif file_extension == '.xlsx': workbook = load_workbook(filename=io.BytesIO(contents), data_only=True) for sheet_name in workbook.sheetnames: sheet = workbook[sheet_name] data = sheet.values df = pd.DataFrame(data) # 이미지 및 OCR 처리 for img in sheet._images: # 이미지의 위치 계산 img_cell = img.anchor._from.row - 1, img.anchor._from.col - 1 # 이미지의 위치 (행, 열) # DataFrame 크기 조정 (필요한 경우) max_row, max_col = img_cell if max_row >= len(df): df = df.reindex(range(max_row + 1), fill_value='') if max_col >= len(df.columns): df = df.reindex(columns=range(max_col + 1), fill_value='') img_data = img._data() # 이미지 데이터 try: img_obj = Image.open(io.BytesIO(img_data)) # WMF 형식 처리 방지 if img_obj.format == "WMF": logger.warning(f"Skipping WMF image in sheet {sheet_name} as it cannot be processed.") continue ocr_text = "" width, height = img_obj.size if self.use_ocr and self.ocr_reader and (width >= 150 and height >= 150): # 이미지를 흑백으로 변환 img_obj = img_obj.convert('L') ocr_results = await self.ocr_reader(img_obj) ocr_text = "\n".join([text for bbox, text in ocr_results]) ocr_text = f"(ocr)\n{ocr_text}\n(/ocr)" # OCR 텍스트를 셀에 삽입 df.iat[img_cell[0], img_cell[1]] = f"(image)\n{ocr_text}\n(/image)" # logger.info(f"Inserted OCR text at cell ({img_cell[0]}, {img_cell[1]}): {df.iat[img_cell[0], img_cell[1]]}") except UnidentifiedImageError: logger.error(f"Unable to identify image format in sheet {sheet_name}. Skipping this image.") continue df = self.fill_merged_cells_xlsx(df, sheet) df_csv = df.to_csv(index=False, header=False) entry = { "document_id": f"{document_name}@{sheet_name}", "text": df_csv } parsed_content.append(entry) else: raise ValueError("Unsupported file format") return parsed_content def fill_merged_cells_xlsx(self, df, sheet): for merged_cell in sheet.merged_cells.ranges: min_col, min_row, max_col, max_row = merged_cell.bounds top_left_cell_value = sheet.cell(row=min_row, column=min_col).value for row in range(min_row, max_row + 1): for col in range(min_col, max_col + 1): if pd.isna(df.iat[row - 1, col - 1]): # 기존 값이 비어 있는 경우에만 채우기 df.iat[row - 1, col - 1] = top_left_cell_value return df def fill_merged_cells_xls(self, df, sheet): for merged_cell in sheet.merged_cells: min_row, max_row, min_col, max_col = merged_cell top_left_cell_value = sheet.cell_value(min_row, min_col) for row in range(min_row, max_row): for col in range(min_col, max_col): if pd.isna(df.iat[row, col]) or df.iat[row, col] == '' or df.iat[row, col] is None: # 기존 값이 비어 있는 경우에만 채우기 df.iat[row, col] = top_left_cell_value return df