RAG索引流程详解:如何高效解析文档构建知识库

作者:北辰alk日期:2026/1/9

image.png

引言:为什么文档解析是RAG的基石?

在RAG(检索增强生成)系统中,文档解析是整个知识库构建的第一步,也是最关键的一步。就像建房子需要打好地基一样,良好的文档解析质量直接决定了后续检索和生成的效果。今天,我们就深入探讨RAG索引流程中的文档解析技术。

一、RAG文档解析的整体架构

首先,让我们通过一个流程图了解完整的解析流程:

1┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
2                                                           
3  原始文档集合   │───▶│  文档解析与拆分 │───▶│  文本向量化     
4                                                           
5└─────────────────┘    └─────────────────┘    └────────┬────────┘
6                                                        
7┌─────────────────┐    ┌─────────────────┐             
8                                                   
9  元数据提取    │◀───│  语义分块           ┌─────────────────┐
10                                                           
11└─────────────────┘    └─────────────────┘      向量存储索引  
12                                                                         
13                               └─────────────────┘
14

二、文档解析的核心步骤详解

2.1 支持多种文档格式

实际项目中,文档格式多种多样。我们需要一个能处理各种格式的解析器:

1import os
2from typing import List, Dict, Any
3from langchain.document_loaders import (
4    PyPDFLoader,
5    Docx2txtLoader,
6    UnstructuredHTMLLoader,
7    UnstructuredMarkdownLoader,
8    TextLoader
9)
10from langchain.schema import Document
11
12class MultiFormatDocumentParser:
13    """多格式文档解析器"""
14    
15    def __init__(self):
16        self.format_handlers = {
17            '.pdf': self._parse_pdf,
18            '.docx': self._parse_docx,
19            '.html': self._parse_html,
20            '.htm': self._parse_html,
21            '.md': self._parse_markdown,
22            '.txt': self._parse_text,
23        }
24    
25    def parse_document(self, file_path: str) -> List[Document]:
26        """解析单个文档"""
27        ext = os.path.splitext(file_path)[1].lower()
28        
29        if ext not in self.format_handlers:
30            raise ValueError(f"不支持的文件格式: {ext}")
31        
32        return self.format_handlers[ext](file_path)
33    
34    def _parse_pdf(self, file_path: str) -> List[Document]:
35        """解析PDF文档"""
36        loader = PyPDFLoader(file_path)
37        documents = loader.load()
38        
39        # 提取PDF元数据
40        for doc in documents:
41            doc.metadata.update({
42                'source': file_path,
43                'format': 'pdf',
44                'total_pages': len(documents)
45            })
46        
47        return documents
48    
49    def _parse_docx(self, file_path: str) -> List[Document]:
50        """解析Word文档"""
51        loader = Docx2txtLoader(file_path)
52        documents = loader.load()
53        
54        # 添加文档结构信息
55        for doc in documents:
56            doc.metadata.update({
57                'source': file_path,
58                'format': 'docx'
59            })
60        
61        return documents
62    
63    def _parse_html(self, file_path: str) -> List[Document]:
64        """解析HTML文档"""
65        loader = UnstructuredHTMLLoader(file_path)
66        return loader.load()
67    
68    def _parse_markdown(self, file_path: str) -> List[Document]:
69        """解析Markdown文档"""
70        loader = UnstructuredMarkdownLoader(file_path)
71        return loader.load()
72    
73    def _parse_text(self, file_path: str) -> List[Document]:
74        """解析纯文本文档"""
75        loader = TextLoader(file_path, encoding='utf-8')
76        return loader.load()
77
78# 使用示例
79parser = MultiFormatDocumentParser()
80documents = parser.parse_document("example.pdf")
81

2.2 智能文档分块策略

文档分块是解析的核心环节,直接影响检索质量:

1from langchain.text_splitter import (
2    RecursiveCharacterTextSplitter,
3    TokenTextSplitter,
4    MarkdownHeaderTextSplitter
5)
6import re
7from typing import List
8
9class SmartChunker:
10    """智能文档分块器"""
11    
12    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
13        self.chunk_size = chunk_size
14        self.chunk_overlap = chunk_overlap
15        
16        # 初始化不同的分块器
17        self.recursive_splitter = RecursiveCharacterTextSplitter(
18            chunk_size=chunk_size,
19            chunk_overlap=chunk_overlap,
20            separators=["\n\n", "\n", "。", "!", "?", ";", ",", " ", ""]
21        )
22        
23        self.token_splitter = TokenTextSplitter(
24            chunk_size=chunk_size,
25            chunk_overlap=chunk_overlap
26        )
27    
28    def semantic_chunking(self, text: str, doc_type: str = None) -> List[str]:
29        """语义感知的分块"""
30        
31        # 根据文档类型选择分块策略
32        if doc_type == 'markdown':
33            return self._markdown_chunking(text)
34        elif self._is_code_document(text):
35            return self._code_chunking(text)
36        else:
37            return self._semantic_paragraph_chunking(text)
38    
39    def _markdown_chunking(self, text: str) -> List[str]:
40        """Markdown文档分块"""
41        headers_to_split_on = [
42            ("#", "标题1"),
43            ("##", "标题2"),
44            ("###", "标题3"),
45        ]
46        
47        markdown_splitter = MarkdownHeaderTextSplitter(
48            headers_to_split_on=headers_to_split_on
49        )
50        
51        chunks = markdown_splitter.split_text(text)
52        return [chunk.page_content for chunk in chunks]
53    
54    def _code_chunking(self, text: str) -> List[str]:
55        """代码文档分块"""
56        # 按函数、类、方法进行分块
57        patterns = [
58            r'(def\s+\w+(.*?):.*?(?=\n\s*def|\Z))',  # 函数
59            r'(class\s+\w+(.*?):.*?(?=\n\s*class|\Z))',  # 
60            r'(//\s*===.*?===)',  # 注释区块
61        ]
62        
63        chunks = []
64        for pattern in patterns:
65            chunks.extend(re.findall(pattern, text, re.DOTALL))
66        
67        return chunks if chunks else self.recursive_splitter.split_text(text)
68    
69    def _semantic_paragraph_chunking(self, text: str) -> List[str]:
70        """语义段落分块"""
71        # 先按段落分割
72        paragraphs = re.split(r'\n\s*\n', text)
73        
74        chunks = []
75        current_chunk = ""
76        
77        for para in paragraphs:
78            para = para.strip()
79            if not para:
80                continue
81                
82            # 如果当前块加上新段落不超过限制,就合并
83            if len(current_chunk) + len(para) + 1 <= self.chunk_size:
84                if current_chunk:
85                    current_chunk += "\n\n" + para
86                else:
87                    current_chunk = para
88            else:
89                # 保存当前块,开始新块
90                if current_chunk:
91                    chunks.append(current_chunk)
92                current_chunk = para
93        
94        # 添加最后一个块
95        if current_chunk:
96            chunks.append(current_chunk)
97        
98        return chunks
99    
100    def _is_code_document(self, text: str) -> bool:
101        """判断是否为代码文档"""
102        code_keywords = ['def ', 'class ', 'import ', 'function ', 'var ', 'let ', 'const ']
103        return any(keyword in text[:500] for keyword in code_keywords)
104
105# 使用示例
106chunker = SmartChunker(chunk_size=1000, chunk_overlap=200)
107chunks = chunker.semantic_chunking(large_text_document, doc_type='markdown')
108

2.3 高级元数据提取

元数据能显著提升检索精度:

1import hashlib
2from datetime import datetime
3import pytz
4from langchain.schema import Document
5
6class MetadataExtractor:
7    """元数据提取器"""
8    
9    def __init__(self):
10        self.zh_timezone = pytz.timezone('Asia/Shanghai')
11    
12    def extract_document_metadata(self, 
13                                 content: str, 
14                                 file_path: str, 
15                                 doc_type: str) -> Dict[str, Any]:
16        """提取文档元数据"""
17        
18        metadata = {
19            'source': file_path,
20            'doc_type': doc_type,
21            'file_name': os.path.basename(file_path),
22            'file_size': os.path.getsize(file_path),
23            'last_modified': self._get_file_mtime(file_path),
24            'content_hash': self._calculate_content_hash(content),
25            'chunk_count': 0,
26            'total_length': len(content),
27            'indexing_time': datetime.now(self.zh_timezone).isoformat(),
28        }
29        
30        # 提取内容相关元数据
31        content_metadata = self._extract_content_metadata(content)
32        metadata.update(content_metadata)
33        
34        return metadata
35    
36    def _extract_content_metadata(self, content: str) -> Dict[str, Any]:
37        """从内容中提取元数据"""
38        
39        # 提取标题
40        title = self._extract_title(content)
41        
42        # 提取关键词(简单实现)
43        keywords = self._extract_keywords(content)
44        
45        # 提取文档结构信息
46        structure_info = self._analyze_structure(content)
47        
48        # 提取时间信息
49        time_info = self._extract_time_info(content)
50        
51        return {
52            'title': title,
53            'keywords': keywords,
54            'sections': structure_info.get('sections', []),
55            'paragraph_count': structure_info.get('paragraph_count', 0),
56            'mentioned_dates': time_info,
57            'language': self._detect_language(content),
58            'has_tables': self._has_tables(content),
59            'has_code_blocks': self._has_code_blocks(content),
60        }
61    
62    def _extract_title(self, content: str) -> str:
63        """提取文档标题"""
64        # 尝试从开头或Markdown标题中提取
65        lines = content.strip().split('\n')
66        
67        for line in lines[:10]:  # 检查前10行
68            line = line.strip()
69            # Markdown标题
70            if line.startswith('# '):
71                return line[2:].strip()
72            # HTML标题标签
73            if '<h1>' in line.lower():
74                match = re.search(r'<h1[^>]*>(.*?)</h1>', line, re.IGNORECASE)
75                if match:
76                    return match.group(1).strip()
77        
78        # 如果没有明确标题,使用第一行非空内容
79        for line in lines:
80            if line.strip() and len(line.strip()) > 10:
81                return line.strip()[:100]
82        
83        return "未命名文档"
84    
85    def _extract_keywords(self, content: str, top_n: int = 10) -> List[str]:
86        """提取关键词(简化版)"""
87        # 移除常见停用词
88        stop_words = {'的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', '一个'}
89        
90        # 中文分词(简化处理)
91        words = re.findall(r'[\u4e00-\u9fff]{2,}', content)
92        
93        # 统计词频
94        word_freq = {}
95        for word in words:
96            if word not in stop_words:
97                word_freq[word] = word_freq.get(word, 0) + 1
98        
99        # 返回频率最高的词
100        sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
101        return [word for word, freq in sorted_words[:top_n]]
102    
103    def _analyze_structure(self, content: str) -> Dict[str, Any]:
104        """分析文档结构"""
105        sections = []
106        
107        # 提取标题
108        headings = re.findall(r'^(#{1,6})\s+(.+)$', content, re.MULTILINE)
109        
110        for level, title in headings:
111            sections.append({
112                'level': len(level),
113                'title': title.strip(),
114                'type': 'heading'
115            })
116        
117        # 统计段落
118        paragraphs = [p for p in re.split(r'\n\s*\n', content) if p.strip()]
119        
120        return {
121            'sections': sections,
122            'paragraph_count': len(paragraphs),
123            'has_headings': len(headings) > 0
124        }
125    
126    def _extract_time_info(self, content: str) -> List[str]:
127        """提取时间信息"""
128        # 匹配常见日期格式
129        date_patterns = [
130            r'\d{4}年\d{1,2}月\d{1,2}日',
131            r'\d{4}-\d{1,2}-\d{1,2}',
132            r'\d{4}/\d{1,2}/\d{1,2}',
133        ]
134        
135        dates = []
136        for pattern in date_patterns:
137            dates.extend(re.findall(pattern, content))
138        
139        return list(set(dates))  # 去重
140    
141    def _detect_language(self, content: str) -> str:
142        """检测语言"""
143        # 简单基于字符判断
144        zh_chars = len(re.findall(r'[\u4e00-\u9fff]', content))
145        en_chars = len(re.findall(r'[a-zA-Z]', content))
146        
147        if zh_chars > en_chars:
148            return 'zh'
149        elif en_chars > zh_chars:
150            return 'en'
151        else:
152            return 'mixed'
153    
154    def _has_tables(self, content: str) -> bool:
155        """判断是否包含表格"""
156        # Markdown表格
157        if re.search(r'|.*|.*\n|[-:\s|]+|', content):
158            return True
159        # HTML表格
160        if re.search(r'<table[^>]*>', content, re.IGNORECASE):
161            return True
162        return False
163    
164    def _has_code_blocks(self, content: str) -> bool:
165        """判断是否包含代码块"""
166        return bool(re.search(r'``[`[\s\S]*?`](https://xplanc.org/primers/document/zh/03.HTML/EX.HTML%20%E5%85%83%E7%B4%A0/EX.s.md)``', content))
167    
168    def _get_file_mtime(self, file_path: str) -> str:
169        """获取文件修改时间"""
170        mtime = os.path.getmtime(file_path)
171        return datetime.fromtimestamp(mtime, self.zh_timezone).isoformat()
172    
173    def _calculate_content_hash(self, content: str) -> str:
174        """计算内容哈希值"""
175        return hashlib.md5(content.encode('utf-8')).hexdigest()
176

2.4 完整的文档处理流水线

1class DocumentProcessingPipeline:
2    """文档处理流水线"""
3    
4    def __init__(self, 
5                 chunk_size: int = 1000,
6                 chunk_overlap: int = 200):
7        
8        self.parser = MultiFormatDocumentParser()
9        self.chunker = SmartChunker(chunk_size, chunk_overlap)
10        self.metadata_extractor = MetadataExtractor()
11    
12    def process_document(self, file_path: str) -> Dict[str, Any]:
13        """处理单个文档"""
14        
15        print(f"开始处理文档: {file_path}")
16        
17        # 1. 解析文档
18        raw_documents = self.parser.parse_document(file_path)
19        
20        if not raw_documents:
21            raise ValueError(f"无法解析文档: {file_path}")
22        
23        # 2. 合并所有页面/部分的内容
24        full_content = "\n\n".join([doc.page_content for doc in raw_documents])
25        
26        # 3. 提取文档级元数据
27        doc_type = os.path.splitext(file_path)[1][1:].lower()
28        document_metadata = self.metadata_extractor.extract_document_metadata(
29            full_content, file_path, doc_type
30        )
31        
32        # 4. 智能分块
33        chunks = self.chunker.semantic_chunking(full_content, doc_type)
34        
35        # 5. 为每个块添加元数据
36        processed_chunks = []
37        for i, chunk_content in enumerate(chunks):
38            chunk_metadata = document_metadata.copy()
39            chunk_metadata.update({
40                'chunk_id': i + 1,
41                'chunk_index': i,
42                'chunk_length': len(chunk_content),
43                'is_first_chunk': i == 0,
44                'is_last_chunk': i == len(chunks) - 1,
45            })
46            
47            # 创建Document对象
48            chunk_doc = Document(
49                page_content=chunk_content,
50                metadata=chunk_metadata
51            )
52            processed_chunks.append(chunk_doc)
53        
54        # 更新文档级元数据
55        document_metadata['chunk_count'] = len(processed_chunks)
56        
57        print(f"文档处理完成: {file_path}, 生成 {len(processed_chunks)} 个块")
58        
59        return {
60            'document_metadata': document_metadata,
61            'chunks': processed_chunks,
62            'original_path': file_path
63        }
64    
65    def process_directory(self, 
66                         directory_path: str, 
67                         extensions: List[str] = None) -> List[Dict[str, Any]]:
68        """批量处理目录中的文档"""
69        
70        if extensions is None:
71            extensions = ['.pdf', '.docx', '.txt', '.md', '.html']
72        
73        all_results = []
74        
75        for root, _, files in os.walk(directory_path):
76            for file in files:
77                file_path = os.path.join(root, file)
78                ext = os.path.splitext(file)[1].lower()
79                
80                if extensions and ext not in extensions:
81                    continue
82                
83                try:
84                    result = self.process_document(file_path)
85                    all_results.append(result)
86                except Exception as e:
87                    print(f"处理文件失败 {file_path}: {str(e)}")
88                    continue
89        
90        print(f"目录处理完成,共处理 {len(all_results)} 个文档")
91        return all_results
92
93# 使用示例
94pipeline = DocumentProcessingPipeline(
95    chunk_size=1000,
96    chunk_overlap=200
97)
98
99# 处理单个文档
100result = pipeline.process_document("document.pdf")
101
102# 处理整个目录
103results = pipeline.process_directory(
104    "./knowledge_base",
105    extensions=['.pdf', '.docx', '.md', '.txt']
106)
107
108# 提取所有块
109all_chunks = []
110for result in results:
111    all_chunks.extend(result['chunks'])
112
113print(f"总共生成 {len(all_chunks)} 个文本块")
114

三、性能优化和最佳实践

3.1 并行处理加速

1import concurrent.futures
2from tqdm import tqdm
3
4class ParallelDocumentProcessor:
5    """并行文档处理器"""
6    
7    def __init__(self, max_workers: int = 4):
8        self.pipeline = DocumentProcessingPipeline()
9        self.max_workers = max_workers
10    
11    def process_batch(self, file_paths: List[str]) -> List[Dict[str, Any]]:
12        """批量并行处理文档"""
13        
14        results = []
15        
16        with concurrent.futures.ProcessPoolExecutor(
17            max_workers=self.max_workers
18        ) as executor:
19            # 提交任务
20            future_to_file = {
21                executor.submit(self.pipeline.process_document, fp): fp 
22                for fp in file_paths
23            }
24            
25            # 处理结果
26            for future in tqdm(
27                concurrent.futures.as_completed(future_to_file),
28                total=len(file_paths),
29                desc="处理文档"
30            ):
31                file_path = future_to_file[future]
32                try:
33                    result = future.result()
34                    results.append(result)
35                except Exception as e:
36                    print(f"处理失败 {file_path}: {str(e)}")
37        
38        return results
39

3.2 增量更新处理

1class IncrementalIndexUpdater:
2    """增量索引更新器"""
3    
4    def __init__(self, index_storage_path: str):
5        self.index_storage_path = index_storage_path
6        self.processed_files = self._load_processed_files()
7    
8    def _load_processed_files(self) -> Dict[str, str]:
9        """加载已处理文件记录"""
10        record_file = os.path.join(self.index_storage_path, "processed_files.json")
11        
12        if os.path.exists(record_file):
13            with open(record_file, 'r', encoding='utf-8') as f:
14                return json.load(f)
15        return {}
16    
17    def _save_processed_files(self):
18        """保存已处理文件记录"""
19        record_file = os.path.join(self.index_storage_path, "processed_files.json")
20        with open(record_file, 'w', encoding='utf-8') as f:
21            json.dump(self.processed_files, f, ensure_ascii=False, indent=2)
22    
23    def get_changed_files(self, 
24                         directory_path: str, 
25                         extensions: List[str] = None) -> List[str]:
26        """获取有变化的文件"""
27        
28        changed_files = []
29        
30        for root, _, files in os.walk(directory_path):
31            for file in files:
32                file_path = os.path.join(root, file)
33                ext = os.path.splitext(file)[1].lower()
34                
35                if extensions and ext not in extensions:
36                    continue
37                
38                # 检查文件是否已处理或有更新
39                file_hash = self._calculate_file_hash(file_path)
40                
41                if (file_path not in self.processed_files or 
42                    self.processed_files[file_path] != file_hash):
43                    changed_files.append(file_path)
44                    self.processed_files[file_path] = file_hash
45        
46        return changed_files
47    
48    def _calculate_file_hash(self, file_path: str) -> str:
49        """计算文件哈希"""
50        hasher = hashlib.md5()
51        
52        with open(file_path, 'rb') as f:
53            buf = f.read(65536)
54            while len(buf) > 0:
55                hasher.update(buf)
56                buf = f.read(65536)
57        
58        return hasher.hexdigest()
59

四、常见问题与解决方案

问题1:文档格式兼容性

解决方案:使用多个解析库,并实现降级策略

1def robust_pdf_parsing(file_path: str):
2    """鲁棒的PDF解析"""
3    parsers = [PyPDFLoader, UnstructuredPDFLoader]
4    
5    for parser_class in parsers:
6        try:
7            loader = parser_class(file_path)
8            return loader.load()
9        except Exception as e:
10            continue
11    
12    # 降级到文本提取
13    try:
14        import textract
15        text = textract.process(file_path).decode('utf-8')
16        return [Document(page_content=text, metadata={'source': file_path})]
17    except:
18        raise ValueError(f"无法解析PDF文件: {file_path}")
19

问题2:中文文档分块效果不佳

解决方案:使用专门的中文分块器

1class ChineseTextSplitter:
2    """中文文本分块器"""
3    
4    def __init__(self, chunk_size=500, chunk_overlap=50):
5        self.chunk_size = chunk_size
6        self.chunk_overlap = chunk_overlap
7    
8    def split_text(self, text: str) -> List[str]:
9        # 使用中文标点进行分割
10        separators = ['\n\n', '\n', '。', '!', '?', ';', '……', '…', ',', '、']
11        
12        chunks = []
13        current_chunk = ""
14        
15        # 按句子分割
16        sentences = self._split_into_sentences(text)
17        
18        for sentence in sentences:
19            if len(current_chunk) + len(sentence) <= self.chunk_size:
20                current_chunk += sentence
21            else:
22                if current_chunk:
23                    chunks.append(current_chunk)
24                current_chunk = sentence
25        
26        if current_chunk:
27            chunks.append(current_chunk)
28        
29        return chunks
30    
31    def _split_into_sentences(self, text: str) -> List[str]:
32        """将中文文本分割成句子"""
33        import re
34        # 中文句子分割规则
35        pattern = r'([。!?;…]+\s*)'
36        parts = re.split(pattern, text)
37        
38        sentences = []
39        for i in range(0, len(parts)-1, 2):
40            sentence = parts[i] + (parts[i+1] if i+1 < len(parts) else '')
41            if sentence.strip():
42                sentences.append(sentence)
43        
44        return sentences
45

五、实战:构建完整的RAG索引系统

1class CompleteRAGIndexer:
2    """完整的RAG索引构建系统"""
3    
4    def __init__(self, 
5                 embedding_model=None,
6                 vector_store=None):
7        
8        self.pipeline = DocumentProcessingPipeline()
9        self.parallel_processor = ParallelDocumentProcessor()
10        self.updater = IncrementalIndexUpdater("./vector_store")
11        
12        self.embedding_model = embedding_model
13        self.vector_store = vector_store
14    
15    def build_index(self, 
16                   knowledge_base_path: str,
17                   incremental: bool = True) -> Dict[str, Any]:
18        """构建知识库索引"""
19        
20        print("开始构建RAG索引...")
21        
22        # 1. 获取需要处理的文件
23        if incremental:
24            files_to_process = self.updater.get_changed_files(
25                knowledge_base_path,
26                extensions=['.pdf', '.docx', '.md', '.txt', '.html']
27            )
28            print(f"增量更新: 发现 {len(files_to_process)} 个变更文件")
29        else:
30            # 获取所有文件
31            all_files = []
32            for root, _, files in os.walk(knowledge_base_path):
33                for file in files:
34                    if file.lower().endswith(('.pdf', '.docx', '.md', '.txt', '.html')):
35                        all_files.append(os.path.join(root, file))
36            files_to_process = all_files
37        
38        if not files_to_process:
39            print("没有需要处理的文件")
40            return {'status': 'no_changes', 'chunks': []}
41        
42        # 2. 并行处理文档
43        processing_results = self.parallel_processor.process_batch(files_to_process)
44        
45        # 3. 提取所有文本块
46        all_chunks = []
47        for result in processing_results:
48            all_chunks.extend(result['chunks'])
49        
50        print(f"总共生成 {len(all_chunks)} 个文本块")
51        
52        # 4. 生成向量嵌入
53        if self.embedding_model and self.vector_store:
54            print("开始生成向量嵌入...")
55            
56            texts = [chunk.page_content for chunk in all_chunks]
57            metadatas = [chunk.metadata for chunk in all_chunks]
58            
59            embeddings = self.embedding_model.embed_documents(texts)
60            
61            # 存储到向量数据库
62            self.vector_store.add_embeddings(
63                texts=texts,
64                embeddings=embeddings,
65                metadatas=metadatas
66            )
67            
68            print("向量索引构建完成")
69        
70        # 5. 保存处理记录
71        self.updater._save_processed_files()
72        
73        return {
74            'status': 'success',
75            'total_chunks': len(all_chunks),
76            'processed_files': len(files_to_process),
77            'chunks': all_chunks
78        }
79
80# 使用示例
81if __name__ == "__main__":
82    # 初始化组件
83    from langchain.embeddings import OpenAIEmbeddings
84    from langchain.vectorstores import Chroma
85    
86    embedding_model = OpenAIEmbeddings()
87    vector_store = Chroma(
88        persist_directory="./chroma_db",
89        embedding_function=embedding_model
90    )
91    
92    # 构建索引
93    indexer = CompleteRAGIndexer(
94        embedding_model=embedding_model,
95        vector_store=vector_store
96    )
97    
98    # 首次全量构建
99    result = indexer.build_index(
100        knowledge_base_path="./knowledge_base",
101        incremental=False
102    )
103    
104    # 后续增量更新
105    result = indexer.build_index(
106        knowledge_base_path="./knowledge_base",
107        incremental=True
108    )
109

六、总结与展望

文档解析是RAG系统的基石,一个优秀的解析系统应该具备:

  1. 1. 多格式支持:能处理各种常见文档格式
  2. 2. 智能分块:理解文档结构,保持语义完整性
  3. 3. 丰富元数据:提取有用信息辅助检索
  4. 4. 高效处理:支持并行和增量更新
  5. 5. 鲁棒性:能处理各种异常情况

随着大模型技术的发展,未来的文档解析会更加智能化,比如:

  • • 使用视觉模型解析复杂版式
  • • 利用大模型理解文档语义结构
  • • 自动识别文档类型和领域
  • • 智能提取表格、图表等非文本内容

希望本文对你构建自己的RAG系统有所帮助!记住,好的开始是成功的一半,精心设计的文档解析流程会让你的RAG系统事半功倍。


RAG索引流程详解:如何高效解析文档构建知识库》 是转载文章,点击查看原文


相关推荐


Pico裸机2(汇编基础)
fanged2026/1/1

既然都裸机了,还是简单回顾一下汇编吧。。。 1 概念 来自:https://redfoxsec.com/blog/introduction-to-assembly-language/ 汇编基本上就是机器码。汇编语言是一种直接对应处理器指令集的低级语言,它以人类可读的形式表达机器指令,是软件与硬件之间几乎最底层的一层接口;每一条汇编指令几乎都能映射为一条机器指令,能够精确控制寄存器、内存、指令顺序和硬件状态,因此被广泛用于启动代码、中断处理、上下文切换和性能或时序极端敏感的场景。


Flutter 开发实战:解决华为 HarmonyOS 任务列表不显示 App 名称的终极指南
雨夜寻晴天2025/12/22

问题背景 在 Flutter 应用开发中,我们最近遇到了一个棘手的兼容性问题:在部分 华为手机(HarmonyOS 4.2.0,如 Mate 30 Pro 5G) 上,应用运行时的最近任务列表(Overview Screen)中,只显示应用图标,却不显示应用名称(App Name)。 虽然我们在 AndroidManifest.xml 中正确配置了 android:label,但在 HarmonyOS 系统上依然无效。这不仅影响用户体验,也可能导致应用在审核时被拒(如华为应用市场审核指南第 2.


【鸿蒙开发案例篇】定点出击!鸿蒙6.0视频碰一碰流转+实时进度同步案例
威哥爱编程2025/12/14

兄弟们抄家伙!今天V哥要用鸿蒙6.0的分布式能力撕碎视频跨设备流转的防线!目标:手机碰一下车机/平板,视频秒级切换+进度毫秒级同步,全程零手动干预!以下基于HarmonyOS 6.0(API 21)的ArkTS实战核弹代码已就位👇 联系V哥获取 鸿蒙学习资料 🔥 一、技术架构:分布式视频作战链 核心武器库: 碰一碰触发:NFC+分布式设备管理(@ohos.distributedDeviceManager) 进度同步引擎:AVSession Kit(@kit.AVSessionKit) 数


程序员从大厂回重庆工作一年
uzong2025/12/6

从大厂裸辞回重庆工作,整整一年了。 时间快得让人心惊。停下回望,从裸辞、归乡、求职到适应,再到角色转换,种种心绪,感慨颇多。 一、离开时,那句话成了种子 最后一个工作日的下午,领导把我叫到楼道,做了一次临别交谈。 他有一句话,我至今记得清清楚楚:“以后出去,一定要想办法走向管理岗位,那是完全不同的竞争力。” 当时只是记下。一年后的今天,当我开始带领一个小团队时,这句话突然在心里发了芽。 它像一颗提前埋下的种子,在合适的时节悄然生长。 二、裸辞回渝:一场恰如其分的“任性” 回重庆是裸辞的。所有


Python微服务架构在分布式电商系统中的高性能设计与实战经验总结分享
2501_941810832025/11/28

在大型电商系统中,用户请求量巨大、数据访问密集、服务链路复杂,要求系统具备高响应速度、高并发吞吐能力与稳定扩展性。Python 凭借开发效率高、生态完善与易维护特性,越来越多被用于电商系统的接口层、交易逻辑层、库存管理、推荐系统以及风控服务。本文结合实战电商系统落地经验,分享 Python 在分布式微服务架构中的模块划分、性能调优、服务治理与高并发优化,为开发者提供可落地的架构经验参考。 一、Python 架构选型思路 在传统单体架构中,全站服务聚合在同一进程中,随着并发量增长,性能和可维


【SpringBoot】从学会使用maven开始
那我掉的头发算什么2026/1/17

🎬 那我掉的头发算什么:个人主页 🔥 个人专栏: 《javaSE》《数据结构》《数据库》《javaEE》 ⛺️待到苦尽甘来日 引言 当我们在创建一个新的idea项目时,不知道大家注意过没有 在这个页面中除了IntelliJ选项之外,还有一个Maven选项。而这个Maven恰好就是我们今天这篇文章的重头戏! 文章目录 引言创建Maven项目pom文件项目基本信息GAVproperties依赖管理核心:dependencies与depe


OoderAgent V0.6.5 Nexus 重磅发布:开启超级智能体开发框架新纪元
OneCodeCN2026/1/26

前言: v0.6.5 使用了一个特别的代号,Nexus(枢纽)她不再是一次简单的技术升级。而是一次重生。cong 从0.6.2到0.6.5我们在AI的驱动先快速的迭代,从从基础架构到核心升级,再到技能统一提升,直到0.6.5 一次质的跃迁。本次版本以“构建个人超级终端、赋能全场景智能开发”为核心,重构技术架构、强化能力体系、拓展生态边界,为开发者提供一套从设备协同到AI能力编排的全链路智能体开发解决方案,标志着SuperAgent向“去中心化超级智能体底座”迈出关键一步。 一、Nexu


Rust多线程编程学习笔记
sayang_shao2026/2/4

目录 Rust 多线程基础同步线程编程 基本线程创建线程间通信共享状态线程返回值线程池 异步线程编程 Tokio 异步运行时异步任务异步通道异步共享状态 线程安全 所有权与借用同步原语Send 和 Sync trait 性能优化 线程数量避免竞争异步 vs 同步 最佳实践完整代码示例总结 Rust 多线程基础 Rust 的多线程编程建立在标准库的 std::thread 模块之上。与其他语言不同,Rust 通过其所有权系统和类型系统来保证线程安全,避免了常见的并发问


2025 年客户端技术盘点与 2026 年技术展望
陆业聪2026/2/13

摘要:2025 年客户端技术围绕三条主线展开:Apple Liquid Glass 与 Android Material 3 Expressive 引领设计革新,端侧 AI 通过 Apple Foundation Models 框架和 Google Gemini 走向开发者可编程化,Flutter、React Native、KMP 等跨平台框架在性能上全面向原生看齐。2026 年的核心看点在于端侧 AI 生态建设、新设计语言落地及鸿蒙全球化验证。 本文基于 2025 年各平台官方发布的公开信


【大模型面试突击】03_大模型架构演进与对比
香芋Yu2026/2/21

2026大模型面试:大模型架构演进与对比必考28题(含答案) 精选自176道采集题目,保留最高频最核心的28题 | 难度:⭐基础 ⭐⭐进阶 ⭐⭐⭐深入 一、GPT与LLaMA系列演进(7题) 1. ⭐⭐ [字节/高频] GPT系列从GPT-1到GPT-4的架构演进主要脉络是什么? 一句话秒答: 四代GPT走的是一条"预训练范式→暴力出奇迹→多模态融合"的进化路线,每一步都在重新定义规模的上限。 展开来说: GPT-1其实干了一件很简单但当时很大胆的事——把Transformer Decod

首页编辑器站点地图

本站内容在 CC BY-SA 4.0 协议下发布

Copyright © 2026 XYZ博客