MicroFish/backend/app/utils/file_parser.py

181 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""File parsing utilities.
Supports text extraction from PDF, Markdown, and plain-text files.
"""
import os
from pathlib import Path
from typing import List, Optional
def _read_text_with_fallback(file_path: str) -> str:
"""Read a text file, falling back through encoding detectors when UTF-8 fails.
Multi-stage fallback strategy:
1. Try UTF-8 first.
2. Use ``charset_normalizer`` to detect the encoding.
3. Fall back to ``chardet``.
4. Last resort: decode with UTF-8 + ``errors='replace'``.
Args:
file_path: Path to the file to read.
Returns:
The decoded text content.
"""
data = Path(file_path).read_bytes()
try:
return data.decode('utf-8')
except UnicodeDecodeError:
pass
encoding = None
try:
from charset_normalizer import from_bytes
best = from_bytes(data).best()
if best and best.encoding:
encoding = best.encoding
except Exception:
pass
if not encoding:
try:
import chardet
result = chardet.detect(data)
encoding = result.get('encoding') if result else None
except Exception:
pass
if not encoding:
encoding = 'utf-8'
return data.decode(encoding, errors='replace')
class FileParser:
"""Parser for the supported document formats."""
SUPPORTED_EXTENSIONS = {'.pdf', '.md', '.markdown', '.txt'}
@classmethod
def extract_text(cls, file_path: str) -> str:
"""Extract plain text from a single supported file.
Args:
file_path: Path to the file.
Returns:
The extracted text content.
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
suffix = path.suffix.lower()
if suffix not in cls.SUPPORTED_EXTENSIONS:
raise ValueError(f"不支持的文件格式: {suffix}")
if suffix == '.pdf':
return cls._extract_from_pdf(file_path)
elif suffix in {'.md', '.markdown'}:
return cls._extract_from_md(file_path)
elif suffix == '.txt':
return cls._extract_from_txt(file_path)
raise ValueError(f"无法处理的文件格式: {suffix}")
@staticmethod
def _extract_from_pdf(file_path: str) -> str:
"""Extract text from a PDF file using PyMuPDF."""
try:
import fitz # PyMuPDF
except ImportError:
raise ImportError("需要安装PyMuPDF: pip install PyMuPDF")
text_parts = []
with fitz.open(file_path) as doc:
for page in doc:
text = page.get_text()
if text.strip():
text_parts.append(text)
return "\n\n".join(text_parts)
@staticmethod
def _extract_from_md(file_path: str) -> str:
"""Extract text from a Markdown file with automatic encoding detection."""
return _read_text_with_fallback(file_path)
@staticmethod
def _extract_from_txt(file_path: str) -> str:
"""Extract text from a plain-text file with automatic encoding detection."""
return _read_text_with_fallback(file_path)
@classmethod
def extract_from_multiple(cls, file_paths: List[str]) -> str:
"""Extract and concatenate text from multiple files.
Args:
file_paths: Paths of files to read.
Returns:
The merged text, with per-file headers separating each section.
"""
all_texts = []
for i, file_path in enumerate(file_paths, 1):
try:
text = cls.extract_text(file_path)
filename = Path(file_path).name
all_texts.append(f"=== 文档 {i}: {filename} ===\n{text}")
except Exception as e:
all_texts.append(f"=== 文档 {i}: {file_path} (提取失败: {str(e)}) ===")
return "\n\n".join(all_texts)
def split_text_into_chunks(
text: str,
chunk_size: int = 500,
overlap: int = 50
) -> List[str]:
"""Split text into overlapping chunks.
Args:
text: The source text to split.
chunk_size: Target characters per chunk.
overlap: Number of characters overlapping between consecutive chunks.
Returns:
A list of chunk strings.
"""
if len(text) <= chunk_size:
return [text] if text.strip() else []
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
# Prefer splitting on a sentence boundary near the chunk end
if end < len(text):
for sep in ['', '', '', '.\n', '!\n', '?\n', '\n\n', '. ', '! ', '? ']:
last_sep = text[start:end].rfind(sep)
if last_sep != -1 and last_sep > chunk_size * 0.3:
end = start + last_sep + len(sep)
break
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
# Next chunk starts at the overlap point
start = end - overlap if end < len(text) else len(text)
return chunks