import io import os import logging import requests from typing import Any, Union import pandas as pd import docx from docx import Document from docx.oxml.ns import qn from docx.text.paragraph import Paragraph from docx.table import Table import re from .pdf_parser import PDFParser logger = logging.getLogger() _PDF_API_URL = "http://182.209.186.75:10230/docx2pdf" def _get_key_from_text(text: str) -> str: text = re.sub("\W", "", text.strip()) return text def sync_word_file_to_pdf(word_object: Any, pdf_object: Any) -> Any: raise NotImplementedError def get_pdf_from_api(file_path: Union[str, os.PathLike]) -> Union[bytes, io.BytesIO]: with open(file_path, "rb") as f: response = requests.post(_PDF_API_URL, files={"file": f}) response.raise_for_status() return response.content class WordParser: @staticmethod def parse(file_path: str) -> list: doc = Document(file_path) pdf_file = get_pdf_from_api(file_path) pdf_parsed_content = PDFParser.parse(pdf_file, file_name=os.path.basename(file_path)) pdf_parsed_content = [_get_key_from_text(page["text"]) for page in pdf_parsed_content] document_name = os.path.basename(file_path) parsed_content = [""] for element in doc.element.body: key, line = None, "" tag = element.tag if tag.endswith('p'): line = Paragraph(element, doc).text key = _get_key_from_text(line) elif tag.endswith('tbl'): table = Table(element, doc) df = WordParser.unmerge_table(table) line = WordParser.dataframe_to_csv(df) key = "
" else: continue idx = min(len(parsed_content) - 1, len(pdf_parsed_content) - 1) if key not in pdf_parsed_content[idx] and key != "
": parsed_content.append("") pdf_parsed_content[idx] = pdf_parsed_content[idx].replace(key, "", 1) parsed_content[-1] += (line + "\n") parsed_content = [ { "id": "{}@{}".format(document_name, p + 1), "text": content } for p, content in enumerate(parsed_content) ] return parsed_content @staticmethod def extract_tables(file_path: str) -> list: doc = Document(file_path) tables = [] for table in doc.tables: df = WordParser.unmerge_table(table) tables.append(df) return tables @staticmethod def unmerge_table(table): rows = len(table.rows) cols = max(len(row.cells) for row in table.rows) data = [['' for _ in range(cols)] for _ in range(rows)] for row_idx, row in enumerate(table.rows): for col_idx, cell in enumerate(row.cells): cell_text = cell.text.strip() cell_span = cell._element grid_span = int(cell_span.get(qn('w:gridSpan'), 1)) v_merge = cell_span.get(qn('w:vMerge')) if v_merge == 'restart': for i in range(row_idx, rows): if table.cell(i, col_idx).text.strip() == '': data[i][col_idx] = cell_text else: break else: for j in range(col_idx, col_idx + grid_span): data[row_idx][j] = cell_text # 첫 번째 행을 열 이름으로 설정 df = pd.DataFrame(data) df.columns = df.iloc[0] df = df.drop(0).reset_index(drop=True) return df @staticmethod def dataframe_to_csv(df): csv_content = df.to_csv(index=False) return csv_content if __name__ == "__main__": word_path = "/app/static/uploads/test_docx.docx" test_content = WordParser.parse(word_path) for slide_content in test_content: print(slide_content)