word_parser_pg.py 3.95 KB
Newer Older
kihoon.lee's avatar
upload  
kihoon.lee committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import io
import os
import logging
import requests
from typing import Any, Union

import pandas as pd
import docx
from docx import Document
from docx.oxml.ns import qn
from docx.text.paragraph import Paragraph
from docx.table import Table
import re

from .pdf_parser import PDFParser


logger = logging.getLogger()

_PDF_API_URL = "http://182.209.186.75:10230/docx2pdf"


def _get_key_from_text(text: str) -> str:
    text = re.sub("\W", "", text.strip())
    return text

def sync_word_file_to_pdf(word_object: Any, pdf_object: Any) -> Any:
    raise NotImplementedError

def get_pdf_from_api(file_path: Union[str, os.PathLike]) -> Union[bytes, io.BytesIO]:
    with open(file_path, "rb") as f:
        response = requests.post(_PDF_API_URL, files={"file": f})
        response.raise_for_status()
    return response.content


class WordParser:

    @staticmethod
    def parse(file_path: str) -> list:
        doc = Document(file_path)
        pdf_file = get_pdf_from_api(file_path)
        pdf_parsed_content = PDFParser.parse(pdf_file, file_name=os.path.basename(file_path))
        pdf_parsed_content = [_get_key_from_text(page["text"]) for page in pdf_parsed_content]
        document_name = os.path.basename(file_path)

        parsed_content = [""]
        for element in doc.element.body:
            key, line = None, ""
            tag = element.tag
            if tag.endswith('p'):
                line = Paragraph(element, doc).text
                key = _get_key_from_text(line)

            elif tag.endswith('tbl'):
                table = Table(element, doc)
                df = WordParser.unmerge_table(table)
                line = WordParser.dataframe_to_csv(df)
                key = "<table></table>"
            else:
                continue

            idx = min(len(parsed_content) - 1, len(pdf_parsed_content) - 1)
            if key not in pdf_parsed_content[idx] and key != "<table></table>":
                parsed_content.append("")

            pdf_parsed_content[idx] = pdf_parsed_content[idx].replace(key, "", 1)

            parsed_content[-1] += (line + "\n")

        parsed_content = [
            {
                "id": "{}@{}".format(document_name, p + 1),
                "text": content
            }
            for p, content in enumerate(parsed_content)
        ]

        return parsed_content

    @staticmethod
    def extract_tables(file_path: str) -> list:
        doc = Document(file_path)
        tables = []
        for table in doc.tables:
            df = WordParser.unmerge_table(table)
            tables.append(df)
        return tables
    
    @staticmethod
    def unmerge_table(table):
        rows = len(table.rows)
        cols = max(len(row.cells) for row in table.rows)
        data = [['' for _ in range(cols)] for _ in range(rows)]

        for row_idx, row in enumerate(table.rows):
            for col_idx, cell in enumerate(row.cells):
                cell_text = cell.text.strip()
                cell_span = cell._element
                grid_span = int(cell_span.get(qn('w:gridSpan'), 1))
                v_merge = cell_span.get(qn('w:vMerge'))
                if v_merge == 'restart':
                    for i in range(row_idx, rows):
                        if table.cell(i, col_idx).text.strip() == '':
                            data[i][col_idx] = cell_text
                        else:
                            break
                else:
                    for j in range(col_idx, col_idx + grid_span):
                        data[row_idx][j] = cell_text

        # 첫 번째 행을 열 이름으로 설정
        df = pd.DataFrame(data)
        df.columns = df.iloc[0]
        df = df.drop(0).reset_index(drop=True)
        return df

    @staticmethod
    def dataframe_to_csv(df):
        csv_content = df.to_csv(index=False)
        return csv_content


if __name__ == "__main__":
    word_path = "/app/static/uploads/test_docx.docx"

    test_content = WordParser.parse(word_path)
    for slide_content in test_content:
        print(slide_content)