Commit 9a983009 authored by kihoon.lee's avatar kihoon.lee
Browse files

upload

parent d65bdf42
import chardet
class TextParser:
def parse(file_path: str) -> str:
# 먼저 파일의 인코딩을 감지
with open(file_path, 'rb') as file:
raw_data = file.read()
result = chardet.detect(raw_data)
encoding = result['encoding']
# 감지된 인코딩으로 파일 읽기
if encoding:
try:
with open(file_path, 'r', encoding=encoding) as file:
text = file.read()
return text
except UnicodeDecodeError:
raise ValueError(f"Could not decode the file with the detected encoding: {encoding}")
else:
raise ValueError("Could not detect the encoding of the file.")
from docx import Document
import docx
import pandas as pd
import os
from docx.oxml.ns import qn
from PIL import Image, UnidentifiedImageError
import io
from typing import Optional
from modules.ocr import ReaderForEasyOCR
import asyncio
import logging
logger = logging.getLogger()
class WordParser:
def __init__(self, use_ocr: bool, ocr_reader: Optional[ReaderForEasyOCR] = None):
self.use_ocr = use_ocr
self.ocr_reader = ocr_reader
async def parse(self, contents: bytes, filename: str) -> list:
doc = Document(io.BytesIO(contents))
document_name = os.path.basename(filename)
parsed_content = []
page_number = 1
text = ''
for element in doc.element.body:
if element.tag.endswith('p'):
para = docx.text.paragraph.Paragraph(element, doc)
text += await self.extract_paragraph_content(para) + '\n'
elif element.tag.endswith('tbl'):
table = docx.table.Table(element, doc)
df = await self.unmerge_table(table)
text += await self.dataframe_to_csv(df) + '\n'
if text:
entry = {
"document_id": f"{document_name}@{page_number:04}",
"text": text
}
parsed_content.append(entry)
return parsed_content
def extract_tables(self, file_path: str) -> list:
doc = Document(file_path)
tables = []
for table in doc.tables:
df = self.unmerge_table(table)
tables.append(df)
return tables
async def extract_paragraph_content(self, para):
content = ''
for run in para.runs:
if 'graphic' in run._element.xml:
image_stream = run.part.related_parts[run._element.xpath(".//a:blip/@r:embed")[0]].blob
try:
image = Image.open(io.BytesIO(image_stream))
# WMF 형식 건너뛰기
if image.format == "WMF":
continue
ocr_text = ""
width, height = image.size
# OCR 적용 조건: 150x150 픽셀 이상
if self.use_ocr and self.ocr_reader and (width >= 150 and height >= 150):
# 이미지 흑백 변환
image = image.convert('L')
ocr_results = await self.ocr_reader(image)
ocr_text = "\n".join([text for bbox, text in ocr_results])
ocr_text = f"(ocr)\n{ocr_text}\n(/ocr)"
content += f"(image)\n{ocr_text}\n(/image)"
except UnidentifiedImageError:
logger.error(f"Unable to identify image format. Skipping this image.")
continue
else:
content += run.text
return content
async def extract_cell_content(self, cell):
content = ''
for element in cell._element:
if element.tag.endswith('p'):
para = docx.text.paragraph.Paragraph(element, cell)
content += await self.extract_paragraph_content(para) + '\n'
elif element.tag.endswith('tbl'):
nested_table = docx.table.Table(element, cell._parent)
nested_df = await self.unmerge_table(nested_table)
content += await self.dataframe_to_csv(nested_df) + '\n'
elif 'graphic' in element.xml:
image_stream = element.part.related_parts[element.xpath(".//a:blip/@r:embed")[0]].blob
try:
image = Image.open(io.BytesIO(image_stream))
# WMF 형식 건너뛰기
if image.format == "WMF":
continue
ocr_text = ""
width, height = image.size
# OCR 적용 조건: 150x150 픽셀 이상
if self.use_ocr and self.ocr_reader and (width >= 150 and height >= 150):
# 이미지 흑백 변환
image = image.convert('L')
ocr_results = await self.ocr_reader(image)
ocr_text = "\n".join([text for bbox, text in ocr_results])
ocr_text = f"(ocr)\n{ocr_text}\n(/ocr)"
content += f"(image)\n{ocr_text}\n(/image)\n"
except UnidentifiedImageError:
logger.error(f"Unable to identify image format. Skipping this image.")
continue
return content.strip()
async def unmerge_table(self, table):
rows = len(table.rows)
cols = max(len(row.cells) for row in table.rows)
data = [['' for _ in range(cols)] for _ in range(rows)]
for row_idx, row in enumerate(table.rows):
for col_idx, cell in enumerate(row.cells):
cell_content = await self.extract_cell_content(cell)
cell_span = cell._element
grid_span = int(cell_span.get(qn('w:gridSpan'), 1))
v_merge = cell_span.get(qn('w:vMerge'))
if v_merge == 'restart':
for i in range(row_idx, rows):
if table.cell(i, col_idx).text.strip() == '':
data[i][col_idx] = cell_content
else:
break
else:
for j in range(col_idx, col_idx + grid_span):
data[row_idx][j] = cell_content
df = pd.DataFrame(data)
df.columns = df.iloc[0]
df = df.drop(0).reset_index(drop=True)
return df
async def dataframe_to_csv(self, df):
csv_content = df.to_csv(index=False)
return csv_content
import io
import os
import logging
import requests
from typing import Any, Union
import pandas as pd
import docx
from docx import Document
from docx.oxml.ns import qn
from docx.text.paragraph import Paragraph
from docx.table import Table
import re
from .pdf_parser import PDFParser
logger = logging.getLogger()
_PDF_API_URL = "http://182.209.186.75:10230/docx2pdf"
def _get_key_from_text(text: str) -> str:
text = re.sub("\W", "", text.strip())
return text
def sync_word_file_to_pdf(word_object: Any, pdf_object: Any) -> Any:
raise NotImplementedError
def get_pdf_from_api(file_path: Union[str, os.PathLike]) -> Union[bytes, io.BytesIO]:
with open(file_path, "rb") as f:
response = requests.post(_PDF_API_URL, files={"file": f})
response.raise_for_status()
return response.content
class WordParser:
@staticmethod
def parse(file_path: str) -> list:
doc = Document(file_path)
pdf_file = get_pdf_from_api(file_path)
pdf_parsed_content = PDFParser.parse(pdf_file, file_name=os.path.basename(file_path))
pdf_parsed_content = [_get_key_from_text(page["text"]) for page in pdf_parsed_content]
document_name = os.path.basename(file_path)
parsed_content = [""]
for element in doc.element.body:
key, line = None, ""
tag = element.tag
if tag.endswith('p'):
line = Paragraph(element, doc).text
key = _get_key_from_text(line)
elif tag.endswith('tbl'):
table = Table(element, doc)
df = WordParser.unmerge_table(table)
line = WordParser.dataframe_to_csv(df)
key = "<table></table>"
else:
continue
idx = min(len(parsed_content) - 1, len(pdf_parsed_content) - 1)
if key not in pdf_parsed_content[idx] and key != "<table></table>":
parsed_content.append("")
pdf_parsed_content[idx] = pdf_parsed_content[idx].replace(key, "", 1)
parsed_content[-1] += (line + "\n")
parsed_content = [
{
"id": "{}@{}".format(document_name, p + 1),
"text": content
}
for p, content in enumerate(parsed_content)
]
return parsed_content
@staticmethod
def extract_tables(file_path: str) -> list:
doc = Document(file_path)
tables = []
for table in doc.tables:
df = WordParser.unmerge_table(table)
tables.append(df)
return tables
@staticmethod
def unmerge_table(table):
rows = len(table.rows)
cols = max(len(row.cells) for row in table.rows)
data = [['' for _ in range(cols)] for _ in range(rows)]
for row_idx, row in enumerate(table.rows):
for col_idx, cell in enumerate(row.cells):
cell_text = cell.text.strip()
cell_span = cell._element
grid_span = int(cell_span.get(qn('w:gridSpan'), 1))
v_merge = cell_span.get(qn('w:vMerge'))
if v_merge == 'restart':
for i in range(row_idx, rows):
if table.cell(i, col_idx).text.strip() == '':
data[i][col_idx] = cell_text
else:
break
else:
for j in range(col_idx, col_idx + grid_span):
data[row_idx][j] = cell_text
# 첫 번째 행을 열 이름으로 설정
df = pd.DataFrame(data)
df.columns = df.iloc[0]
df = df.drop(0).reset_index(drop=True)
return df
@staticmethod
def dataframe_to_csv(df):
csv_content = df.to_csv(index=False)
return csv_content
if __name__ == "__main__":
word_path = "/app/static/uploads/test_docx.docx"
test_content = WordParser.parse(word_path)
for slide_content in test_content:
print(slide_content)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment