import io
import os
import logging
import requests
from typing import Any, Union
import pandas as pd
import docx
from docx import Document
from docx.oxml.ns import qn
from docx.text.paragraph import Paragraph
from docx.table import Table
import re
from .pdf_parser import PDFParser
logger = logging.getLogger()
_PDF_API_URL = "http://182.209.186.75:10230/docx2pdf"
def _get_key_from_text(text: str) -> str:
text = re.sub("\W", "", text.strip())
return text
def sync_word_file_to_pdf(word_object: Any, pdf_object: Any) -> Any:
raise NotImplementedError
def get_pdf_from_api(file_path: Union[str, os.PathLike]) -> Union[bytes, io.BytesIO]:
with open(file_path, "rb") as f:
response = requests.post(_PDF_API_URL, files={"file": f})
response.raise_for_status()
return response.content
class WordParser:
@staticmethod
def parse(file_path: str) -> list:
doc = Document(file_path)
pdf_file = get_pdf_from_api(file_path)
pdf_parsed_content = PDFParser.parse(pdf_file, file_name=os.path.basename(file_path))
pdf_parsed_content = [_get_key_from_text(page["text"]) for page in pdf_parsed_content]
document_name = os.path.basename(file_path)
parsed_content = [""]
for element in doc.element.body:
key, line = None, ""
tag = element.tag
if tag.endswith('p'):
line = Paragraph(element, doc).text
key = _get_key_from_text(line)
elif tag.endswith('tbl'):
table = Table(element, doc)
df = WordParser.unmerge_table(table)
line = WordParser.dataframe_to_csv(df)
key = "
"
else:
continue
idx = min(len(parsed_content) - 1, len(pdf_parsed_content) - 1)
if key not in pdf_parsed_content[idx] and key != "":
parsed_content.append("")
pdf_parsed_content[idx] = pdf_parsed_content[idx].replace(key, "", 1)
parsed_content[-1] += (line + "\n")
parsed_content = [
{
"id": "{}@{}".format(document_name, p + 1),
"text": content
}
for p, content in enumerate(parsed_content)
]
return parsed_content
@staticmethod
def extract_tables(file_path: str) -> list:
doc = Document(file_path)
tables = []
for table in doc.tables:
df = WordParser.unmerge_table(table)
tables.append(df)
return tables
@staticmethod
def unmerge_table(table):
rows = len(table.rows)
cols = max(len(row.cells) for row in table.rows)
data = [['' for _ in range(cols)] for _ in range(rows)]
for row_idx, row in enumerate(table.rows):
for col_idx, cell in enumerate(row.cells):
cell_text = cell.text.strip()
cell_span = cell._element
grid_span = int(cell_span.get(qn('w:gridSpan'), 1))
v_merge = cell_span.get(qn('w:vMerge'))
if v_merge == 'restart':
for i in range(row_idx, rows):
if table.cell(i, col_idx).text.strip() == '':
data[i][col_idx] = cell_text
else:
break
else:
for j in range(col_idx, col_idx + grid_span):
data[row_idx][j] = cell_text
# 첫 번째 행을 열 이름으로 설정
df = pd.DataFrame(data)
df.columns = df.iloc[0]
df = df.drop(0).reset_index(drop=True)
return df
@staticmethod
def dataframe_to_csv(df):
csv_content = df.to_csv(index=False)
return csv_content
if __name__ == "__main__":
word_path = "/app/static/uploads/test_docx.docx"
test_content = WordParser.parse(word_path)
for slide_content in test_content:
print(slide_content)