RAG索引流程详解:如何高效解析文档构建知识库

作者:北辰alk日期:2026/1/9

image.png

引言:为什么文档解析是RAG的基石?

在RAG(检索增强生成)系统中,文档解析是整个知识库构建的第一步,也是最关键的一步。就像建房子需要打好地基一样,良好的文档解析质量直接决定了后续检索和生成的效果。今天,我们就深入探讨RAG索引流程中的文档解析技术。

一、RAG文档解析的整体架构

首先,让我们通过一个流程图了解完整的解析流程:

1┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
2                                                           
3  原始文档集合   │───▶│  文档解析与拆分 │───▶│  文本向量化     
4                                                           
5└─────────────────┘    └─────────────────┘    └────────┬────────┘
6                                                        
7┌─────────────────┐    ┌─────────────────┐             
8                                                   
9  元数据提取    │◀───│  语义分块           ┌─────────────────┐
10                                                           
11└─────────────────┘    └─────────────────┘      向量存储索引  
12                                                                         
13                               └─────────────────┘
14

二、文档解析的核心步骤详解

2.1 支持多种文档格式

实际项目中,文档格式多种多样。我们需要一个能处理各种格式的解析器:

1import os
2from typing import List, Dict, Any
3from langchain.document_loaders import (
4    PyPDFLoader,
5    Docx2txtLoader,
6    UnstructuredHTMLLoader,
7    UnstructuredMarkdownLoader,
8    TextLoader
9)
10from langchain.schema import Document
11
12class MultiFormatDocumentParser:
13    """多格式文档解析器"""
14    
15    def __init__(self):
16        self.format_handlers = {
17            '.pdf': self._parse_pdf,
18            '.docx': self._parse_docx,
19            '.html': self._parse_html,
20            '.htm': self._parse_html,
21            '.md': self._parse_markdown,
22            '.txt': self._parse_text,
23        }
24    
25    def parse_document(self, file_path: str) -> List[Document]:
26        """解析单个文档"""
27        ext = os.path.splitext(file_path)[1].lower()
28        
29        if ext not in self.format_handlers:
30            raise ValueError(f"不支持的文件格式: {ext}")
31        
32        return self.format_handlers[ext](file_path)
33    
34    def _parse_pdf(self, file_path: str) -> List[Document]:
35        """解析PDF文档"""
36        loader = PyPDFLoader(file_path)
37        documents = loader.load()
38        
39        # 提取PDF元数据
40        for doc in documents:
41            doc.metadata.update({
42                'source': file_path,
43                'format': 'pdf',
44                'total_pages': len(documents)
45            })
46        
47        return documents
48    
49    def _parse_docx(self, file_path: str) -> List[Document]:
50        """解析Word文档"""
51        loader = Docx2txtLoader(file_path)
52        documents = loader.load()
53        
54        # 添加文档结构信息
55        for doc in documents:
56            doc.metadata.update({
57                'source': file_path,
58                'format': 'docx'
59            })
60        
61        return documents
62    
63    def _parse_html(self, file_path: str) -> List[Document]:
64        """解析HTML文档"""
65        loader = UnstructuredHTMLLoader(file_path)
66        return loader.load()
67    
68    def _parse_markdown(self, file_path: str) -> List[Document]:
69        """解析Markdown文档"""
70        loader = UnstructuredMarkdownLoader(file_path)
71        return loader.load()
72    
73    def _parse_text(self, file_path: str) -> List[Document]:
74        """解析纯文本文档"""
75        loader = TextLoader(file_path, encoding='utf-8')
76        return loader.load()
77
78# 使用示例
79parser = MultiFormatDocumentParser()
80documents = parser.parse_document("example.pdf")
81

2.2 智能文档分块策略

文档分块是解析的核心环节,直接影响检索质量:

1from langchain.text_splitter import (
2    RecursiveCharacterTextSplitter,
3    TokenTextSplitter,
4    MarkdownHeaderTextSplitter
5)
6import re
7from typing import List
8
9class SmartChunker:
10    """智能文档分块器"""
11    
12    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
13        self.chunk_size = chunk_size
14        self.chunk_overlap = chunk_overlap
15        
16        # 初始化不同的分块器
17        self.recursive_splitter = RecursiveCharacterTextSplitter(
18            chunk_size=chunk_size,
19            chunk_overlap=chunk_overlap,
20            separators=["\n\n", "\n", "。", "!", "?", ";", ",", " ", ""]
21        )
22        
23        self.token_splitter = TokenTextSplitter(
24            chunk_size=chunk_size,
25            chunk_overlap=chunk_overlap
26        )
27    
28    def semantic_chunking(self, text: str, doc_type: str = None) -> List[str]:
29        """语义感知的分块"""
30        
31        # 根据文档类型选择分块策略
32        if doc_type == 'markdown':
33            return self._markdown_chunking(text)
34        elif self._is_code_document(text):
35            return self._code_chunking(text)
36        else:
37            return self._semantic_paragraph_chunking(text)
38    
39    def _markdown_chunking(self, text: str) -> List[str]:
40        """Markdown文档分块"""
41        headers_to_split_on = [
42            ("#", "标题1"),
43            ("##", "标题2"),
44            ("###", "标题3"),
45        ]
46        
47        markdown_splitter = MarkdownHeaderTextSplitter(
48            headers_to_split_on=headers_to_split_on
49        )
50        
51        chunks = markdown_splitter.split_text(text)
52        return [chunk.page_content for chunk in chunks]
53    
54    def _code_chunking(self, text: str) -> List[str]:
55        """代码文档分块"""
56        # 按函数、类、方法进行分块
57        patterns = [
58            r'(def\s+\w+(.*?):.*?(?=\n\s*def|\Z))',  # 函数
59            r'(class\s+\w+(.*?):.*?(?=\n\s*class|\Z))',  # 
60            r'(//\s*===.*?===)',  # 注释区块
61        ]
62        
63        chunks = []
64        for pattern in patterns:
65            chunks.extend(re.findall(pattern, text, re.DOTALL))
66        
67        return chunks if chunks else self.recursive_splitter.split_text(text)
68    
69    def _semantic_paragraph_chunking(self, text: str) -> List[str]:
70        """语义段落分块"""
71        # 先按段落分割
72        paragraphs = re.split(r'\n\s*\n', text)
73        
74        chunks = []
75        current_chunk = ""
76        
77        for para in paragraphs:
78            para = para.strip()
79            if not para:
80                continue
81                
82            # 如果当前块加上新段落不超过限制,就合并
83            if len(current_chunk) + len(para) + 1 <= self.chunk_size:
84                if current_chunk:
85                    current_chunk += "\n\n" + para
86                else:
87                    current_chunk = para
88            else:
89                # 保存当前块,开始新块
90                if current_chunk:
91                    chunks.append(current_chunk)
92                current_chunk = para
93        
94        # 添加最后一个块
95        if current_chunk:
96            chunks.append(current_chunk)
97        
98        return chunks
99    
100    def _is_code_document(self, text: str) -> bool:
101        """判断是否为代码文档"""
102        code_keywords = ['def ', 'class ', 'import ', 'function ', 'var ', 'let ', 'const ']
103        return any(keyword in text[:500] for keyword in code_keywords)
104
105# 使用示例
106chunker = SmartChunker(chunk_size=1000, chunk_overlap=200)
107chunks = chunker.semantic_chunking(large_text_document, doc_type='markdown')
108

2.3 高级元数据提取

元数据能显著提升检索精度:

1import hashlib
2from datetime import datetime
3import pytz
4from langchain.schema import Document
5
6class MetadataExtractor:
7    """元数据提取器"""
8    
9    def __init__(self):
10        self.zh_timezone = pytz.timezone('Asia/Shanghai')
11    
12    def extract_document_metadata(self, 
13                                 content: str, 
14                                 file_path: str, 
15                                 doc_type: str) -> Dict[str, Any]:
16        """提取文档元数据"""
17        
18        metadata = {
19            'source': file_path,
20            'doc_type': doc_type,
21            'file_name': os.path.basename(file_path),
22            'file_size': os.path.getsize(file_path),
23            'last_modified': self._get_file_mtime(file_path),
24            'content_hash': self._calculate_content_hash(content),
25            'chunk_count': 0,
26            'total_length': len(content),
27            'indexing_time': datetime.now(self.zh_timezone).isoformat(),
28        }
29        
30        # 提取内容相关元数据
31        content_metadata = self._extract_content_metadata(content)
32        metadata.update(content_metadata)
33        
34        return metadata
35    
36    def _extract_content_metadata(self, content: str) -> Dict[str, Any]:
37        """从内容中提取元数据"""
38        
39        # 提取标题
40        title = self._extract_title(content)
41        
42        # 提取关键词(简单实现)
43        keywords = self._extract_keywords(content)
44        
45        # 提取文档结构信息
46        structure_info = self._analyze_structure(content)
47        
48        # 提取时间信息
49        time_info = self._extract_time_info(content)
50        
51        return {
52            'title': title,
53            'keywords': keywords,
54            'sections': structure_info.get('sections', []),
55            'paragraph_count': structure_info.get('paragraph_count', 0),
56            'mentioned_dates': time_info,
57            'language': self._detect_language(content),
58            'has_tables': self._has_tables(content),
59            'has_code_blocks': self._has_code_blocks(content),
60        }
61    
62    def _extract_title(self, content: str) -> str:
63        """提取文档标题"""
64        # 尝试从开头或Markdown标题中提取
65        lines = content.strip().split('\n')
66        
67        for line in lines[:10]:  # 检查前10行
68            line = line.strip()
69            # Markdown标题
70            if line.startswith('# '):
71                return line[2:].strip()
72            # HTML标题标签
73            if '<h1>' in line.lower():
74                match = re.search(r'<h1[^>]*>(.*?)</h1>', line, re.IGNORECASE)
75                if match:
76                    return match.group(1).strip()
77        
78        # 如果没有明确标题,使用第一行非空内容
79        for line in lines:
80            if line.strip() and len(line.strip()) > 10:
81                return line.strip()[:100]
82        
83        return "未命名文档"
84    
85    def _extract_keywords(self, content: str, top_n: int = 10) -> List[str]:
86        """提取关键词(简化版)"""
87        # 移除常见停用词
88        stop_words = {'的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', '一个'}
89        
90        # 中文分词(简化处理)
91        words = re.findall(r'[\u4e00-\u9fff]{2,}', content)
92        
93        # 统计词频
94        word_freq = {}
95        for word in words:
96            if word not in stop_words:
97                word_freq[word] = word_freq.get(word, 0) + 1
98        
99        # 返回频率最高的词
100        sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
101        return [word for word, freq in sorted_words[:top_n]]
102    
103    def _analyze_structure(self, content: str) -> Dict[str, Any]:
104        """分析文档结构"""
105        sections = []
106        
107        # 提取标题
108        headings = re.findall(r'^(#{1,6})\s+(.+)$', content, re.MULTILINE)
109        
110        for level, title in headings:
111            sections.append({
112                'level': len(level),
113                'title': title.strip(),
114                'type': 'heading'
115            })
116        
117        # 统计段落
118        paragraphs = [p for p in re.split(r'\n\s*\n', content) if p.strip()]
119        
120        return {
121            'sections': sections,
122            'paragraph_count': len(paragraphs),
123            'has_headings': len(headings) > 0
124        }
125    
126    def _extract_time_info(self, content: str) -> List[str]:
127        """提取时间信息"""
128        # 匹配常见日期格式
129        date_patterns = [
130            r'\d{4}年\d{1,2}月\d{1,2}日',
131            r'\d{4}-\d{1,2}-\d{1,2}',
132            r'\d{4}/\d{1,2}/\d{1,2}',
133        ]
134        
135        dates = []
136        for pattern in date_patterns:
137            dates.extend(re.findall(pattern, content))
138        
139        return list(set(dates))  # 去重
140    
141    def _detect_language(self, content: str) -> str:
142        """检测语言"""
143        # 简单基于字符判断
144        zh_chars = len(re.findall(r'[\u4e00-\u9fff]', content))
145        en_chars = len(re.findall(r'[a-zA-Z]', content))
146        
147        if zh_chars > en_chars:
148            return 'zh'
149        elif en_chars > zh_chars:
150            return 'en'
151        else:
152            return 'mixed'
153    
154    def _has_tables(self, content: str) -> bool:
155        """判断是否包含表格"""
156        # Markdown表格
157        if re.search(r'|.*|.*\n|[-:\s|]+|', content):
158            return True
159        # HTML表格
160        if re.search(r'<table[^>]*>', content, re.IGNORECASE):
161            return True
162        return False
163    
164    def _has_code_blocks(self, content: str) -> bool:
165        """判断是否包含代码块"""
166        return bool(re.search(r'``[`[\s\S]*?`](https://xplanc.org/primers/document/zh/03.HTML/EX.HTML%20%E5%85%83%E7%B4%A0/EX.s.md)``', content))
167    
168    def _get_file_mtime(self, file_path: str) -> str:
169        """获取文件修改时间"""
170        mtime = os.path.getmtime(file_path)
171        return datetime.fromtimestamp(mtime, self.zh_timezone).isoformat()
172    
173    def _calculate_content_hash(self, content: str) -> str:
174        """计算内容哈希值"""
175        return hashlib.md5(content.encode('utf-8')).hexdigest()
176

2.4 完整的文档处理流水线

1class DocumentProcessingPipeline:
2    """文档处理流水线"""
3    
4    def __init__(self, 
5                 chunk_size: int = 1000,
6                 chunk_overlap: int = 200):
7        
8        self.parser = MultiFormatDocumentParser()
9        self.chunker = SmartChunker(chunk_size, chunk_overlap)
10        self.metadata_extractor = MetadataExtractor()
11    
12    def process_document(self, file_path: str) -> Dict[str, Any]:
13        """处理单个文档"""
14        
15        print(f"开始处理文档: {file_path}")
16        
17        # 1. 解析文档
18        raw_documents = self.parser.parse_document(file_path)
19        
20        if not raw_documents:
21            raise ValueError(f"无法解析文档: {file_path}")
22        
23        # 2. 合并所有页面/部分的内容
24        full_content = "\n\n".join([doc.page_content for doc in raw_documents])
25        
26        # 3. 提取文档级元数据
27        doc_type = os.path.splitext(file_path)[1][1:].lower()
28        document_metadata = self.metadata_extractor.extract_document_metadata(
29            full_content, file_path, doc_type
30        )
31        
32        # 4. 智能分块
33        chunks = self.chunker.semantic_chunking(full_content, doc_type)
34        
35        # 5. 为每个块添加元数据
36        processed_chunks = []
37        for i, chunk_content in enumerate(chunks):
38            chunk_metadata = document_metadata.copy()
39            chunk_metadata.update({
40                'chunk_id': i + 1,
41                'chunk_index': i,
42                'chunk_length': len(chunk_content),
43                'is_first_chunk': i == 0,
44                'is_last_chunk': i == len(chunks) - 1,
45            })
46            
47            # 创建Document对象
48            chunk_doc = Document(
49                page_content=chunk_content,
50                metadata=chunk_metadata
51            )
52            processed_chunks.append(chunk_doc)
53        
54        # 更新文档级元数据
55        document_metadata['chunk_count'] = len(processed_chunks)
56        
57        print(f"文档处理完成: {file_path}, 生成 {len(processed_chunks)} 个块")
58        
59        return {
60            'document_metadata': document_metadata,
61            'chunks': processed_chunks,
62            'original_path': file_path
63        }
64    
65    def process_directory(self, 
66                         directory_path: str, 
67                         extensions: List[str] = None) -> List[Dict[str, Any]]:
68        """批量处理目录中的文档"""
69        
70        if extensions is None:
71            extensions = ['.pdf', '.docx', '.txt', '.md', '.html']
72        
73        all_results = []
74        
75        for root, _, files in os.walk(directory_path):
76            for file in files:
77                file_path = os.path.join(root, file)
78                ext = os.path.splitext(file)[1].lower()
79                
80                if extensions and ext not in extensions:
81                    continue
82                
83                try:
84                    result = self.process_document(file_path)
85                    all_results.append(result)
86                except Exception as e:
87                    print(f"处理文件失败 {file_path}: {str(e)}")
88                    continue
89        
90        print(f"目录处理完成,共处理 {len(all_results)} 个文档")
91        return all_results
92
93# 使用示例
94pipeline = DocumentProcessingPipeline(
95    chunk_size=1000,
96    chunk_overlap=200
97)
98
99# 处理单个文档
100result = pipeline.process_document("document.pdf")
101
102# 处理整个目录
103results = pipeline.process_directory(
104    "./knowledge_base",
105    extensions=['.pdf', '.docx', '.md', '.txt']
106)
107
108# 提取所有块
109all_chunks = []
110for result in results:
111    all_chunks.extend(result['chunks'])
112
113print(f"总共生成 {len(all_chunks)} 个文本块")
114

三、性能优化和最佳实践

3.1 并行处理加速

1import concurrent.futures
2from tqdm import tqdm
3
4class ParallelDocumentProcessor:
5    """并行文档处理器"""
6    
7    def __init__(self, max_workers: int = 4):
8        self.pipeline = DocumentProcessingPipeline()
9        self.max_workers = max_workers
10    
11    def process_batch(self, file_paths: List[str]) -> List[Dict[str, Any]]:
12        """批量并行处理文档"""
13        
14        results = []
15        
16        with concurrent.futures.ProcessPoolExecutor(
17            max_workers=self.max_workers
18        ) as executor:
19            # 提交任务
20            future_to_file = {
21                executor.submit(self.pipeline.process_document, fp): fp 
22                for fp in file_paths
23            }
24            
25            # 处理结果
26            for future in tqdm(
27                concurrent.futures.as_completed(future_to_file),
28                total=len(file_paths),
29                desc="处理文档"
30            ):
31                file_path = future_to_file[future]
32                try:
33                    result = future.result()
34                    results.append(result)
35                except Exception as e:
36                    print(f"处理失败 {file_path}: {str(e)}")
37        
38        return results
39

3.2 增量更新处理

1class IncrementalIndexUpdater:
2    """增量索引更新器"""
3    
4    def __init__(self, index_storage_path: str):
5        self.index_storage_path = index_storage_path
6        self.processed_files = self._load_processed_files()
7    
8    def _load_processed_files(self) -> Dict[str, str]:
9        """加载已处理文件记录"""
10        record_file = os.path.join(self.index_storage_path, "processed_files.json")
11        
12        if os.path.exists(record_file):
13            with open(record_file, 'r', encoding='utf-8') as f:
14                return json.load(f)
15        return {}
16    
17    def _save_processed_files(self):
18        """保存已处理文件记录"""
19        record_file = os.path.join(self.index_storage_path, "processed_files.json")
20        with open(record_file, 'w', encoding='utf-8') as f:
21            json.dump(self.processed_files, f, ensure_ascii=False, indent=2)
22    
23    def get_changed_files(self, 
24                         directory_path: str, 
25                         extensions: List[str] = None) -> List[str]:
26        """获取有变化的文件"""
27        
28        changed_files = []
29        
30        for root, _, files in os.walk(directory_path):
31            for file in files:
32                file_path = os.path.join(root, file)
33                ext = os.path.splitext(file)[1].lower()
34                
35                if extensions and ext not in extensions:
36                    continue
37                
38                # 检查文件是否已处理或有更新
39                file_hash = self._calculate_file_hash(file_path)
40                
41                if (file_path not in self.processed_files or 
42                    self.processed_files[file_path] != file_hash):
43                    changed_files.append(file_path)
44                    self.processed_files[file_path] = file_hash
45        
46        return changed_files
47    
48    def _calculate_file_hash(self, file_path: str) -> str:
49        """计算文件哈希"""
50        hasher = hashlib.md5()
51        
52        with open(file_path, 'rb') as f:
53            buf = f.read(65536)
54            while len(buf) > 0:
55                hasher.update(buf)
56                buf = f.read(65536)
57        
58        return hasher.hexdigest()
59

四、常见问题与解决方案

问题1:文档格式兼容性

解决方案:使用多个解析库,并实现降级策略

1def robust_pdf_parsing(file_path: str):
2    """鲁棒的PDF解析"""
3    parsers = [PyPDFLoader, UnstructuredPDFLoader]
4    
5    for parser_class in parsers:
6        try:
7            loader = parser_class(file_path)
8            return loader.load()
9        except Exception as e:
10            continue
11    
12    # 降级到文本提取
13    try:
14        import textract
15        text = textract.process(file_path).decode('utf-8')
16        return [Document(page_content=text, metadata={'source': file_path})]
17    except:
18        raise ValueError(f"无法解析PDF文件: {file_path}")
19

问题2:中文文档分块效果不佳

解决方案:使用专门的中文分块器

1class ChineseTextSplitter:
2    """中文文本分块器"""
3    
4    def __init__(self, chunk_size=500, chunk_overlap=50):
5        self.chunk_size = chunk_size
6        self.chunk_overlap = chunk_overlap
7    
8    def split_text(self, text: str) -> List[str]:
9        # 使用中文标点进行分割
10        separators = ['\n\n', '\n', '。', '!', '?', ';', '……', '…', ',', '、']
11        
12        chunks = []
13        current_chunk = ""
14        
15        # 按句子分割
16        sentences = self._split_into_sentences(text)
17        
18        for sentence in sentences:
19            if len(current_chunk) + len(sentence) <= self.chunk_size:
20                current_chunk += sentence
21            else:
22                if current_chunk:
23                    chunks.append(current_chunk)
24                current_chunk = sentence
25        
26        if current_chunk:
27            chunks.append(current_chunk)
28        
29        return chunks
30    
31    def _split_into_sentences(self, text: str) -> List[str]:
32        """将中文文本分割成句子"""
33        import re
34        # 中文句子分割规则
35        pattern = r'([。!?;…]+\s*)'
36        parts = re.split(pattern, text)
37        
38        sentences = []
39        for i in range(0, len(parts)-1, 2):
40            sentence = parts[i] + (parts[i+1] if i+1 < len(parts) else '')
41            if sentence.strip():
42                sentences.append(sentence)
43        
44        return sentences
45

五、实战:构建完整的RAG索引系统

1class CompleteRAGIndexer:
2    """完整的RAG索引构建系统"""
3    
4    def __init__(self, 
5                 embedding_model=None,
6                 vector_store=None):
7        
8        self.pipeline = DocumentProcessingPipeline()
9        self.parallel_processor = ParallelDocumentProcessor()
10        self.updater = IncrementalIndexUpdater("./vector_store")
11        
12        self.embedding_model = embedding_model
13        self.vector_store = vector_store
14    
15    def build_index(self, 
16                   knowledge_base_path: str,
17                   incremental: bool = True) -> Dict[str, Any]:
18        """构建知识库索引"""
19        
20        print("开始构建RAG索引...")
21        
22        # 1. 获取需要处理的文件
23        if incremental:
24            files_to_process = self.updater.get_changed_files(
25                knowledge_base_path,
26                extensions=['.pdf', '.docx', '.md', '.txt', '.html']
27            )
28            print(f"增量更新: 发现 {len(files_to_process)} 个变更文件")
29        else:
30            # 获取所有文件
31            all_files = []
32            for root, _, files in os.walk(knowledge_base_path):
33                for file in files:
34                    if file.lower().endswith(('.pdf', '.docx', '.md', '.txt', '.html')):
35                        all_files.append(os.path.join(root, file))
36            files_to_process = all_files
37        
38        if not files_to_process:
39            print("没有需要处理的文件")
40            return {'status': 'no_changes', 'chunks': []}
41        
42        # 2. 并行处理文档
43        processing_results = self.parallel_processor.process_batch(files_to_process)
44        
45        # 3. 提取所有文本块
46        all_chunks = []
47        for result in processing_results:
48            all_chunks.extend(result['chunks'])
49        
50        print(f"总共生成 {len(all_chunks)} 个文本块")
51        
52        # 4. 生成向量嵌入
53        if self.embedding_model and self.vector_store:
54            print("开始生成向量嵌入...")
55            
56            texts = [chunk.page_content for chunk in all_chunks]
57            metadatas = [chunk.metadata for chunk in all_chunks]
58            
59            embeddings = self.embedding_model.embed_documents(texts)
60            
61            # 存储到向量数据库
62            self.vector_store.add_embeddings(
63                texts=texts,
64                embeddings=embeddings,
65                metadatas=metadatas
66            )
67            
68            print("向量索引构建完成")
69        
70        # 5. 保存处理记录
71        self.updater._save_processed_files()
72        
73        return {
74            'status': 'success',
75            'total_chunks': len(all_chunks),
76            'processed_files': len(files_to_process),
77            'chunks': all_chunks
78        }
79
80# 使用示例
81if __name__ == "__main__":
82    # 初始化组件
83    from langchain.embeddings import OpenAIEmbeddings
84    from langchain.vectorstores import Chroma
85    
86    embedding_model = OpenAIEmbeddings()
87    vector_store = Chroma(
88        persist_directory="./chroma_db",
89        embedding_function=embedding_model
90    )
91    
92    # 构建索引
93    indexer = CompleteRAGIndexer(
94        embedding_model=embedding_model,
95        vector_store=vector_store
96    )
97    
98    # 首次全量构建
99    result = indexer.build_index(
100        knowledge_base_path="./knowledge_base",
101        incremental=False
102    )
103    
104    # 后续增量更新
105    result = indexer.build_index(
106        knowledge_base_path="./knowledge_base",
107        incremental=True
108    )
109

六、总结与展望

文档解析是RAG系统的基石,一个优秀的解析系统应该具备:

  1. 1. 多格式支持:能处理各种常见文档格式
  2. 2. 智能分块:理解文档结构,保持语义完整性
  3. 3. 丰富元数据:提取有用信息辅助检索
  4. 4. 高效处理:支持并行和增量更新
  5. 5. 鲁棒性:能处理各种异常情况

随着大模型技术的发展,未来的文档解析会更加智能化,比如:

  • • 使用视觉模型解析复杂版式
  • • 利用大模型理解文档语义结构
  • • 自动识别文档类型和领域
  • • 智能提取表格、图表等非文本内容

希望本文对你构建自己的RAG系统有所帮助!记住,好的开始是成功的一半,精心设计的文档解析流程会让你的RAG系统事半功倍。


RAG索引流程详解:如何高效解析文档构建知识库》 是转载文章,点击查看原文


相关推荐


Pico裸机2(汇编基础)
fanged2026/1/1

既然都裸机了,还是简单回顾一下汇编吧。。。 1 概念 来自:https://redfoxsec.com/blog/introduction-to-assembly-language/ 汇编基本上就是机器码。汇编语言是一种直接对应处理器指令集的低级语言,它以人类可读的形式表达机器指令,是软件与硬件之间几乎最底层的一层接口;每一条汇编指令几乎都能映射为一条机器指令,能够精确控制寄存器、内存、指令顺序和硬件状态,因此被广泛用于启动代码、中断处理、上下文切换和性能或时序极端敏感的场景。


Flutter 开发实战:解决华为 HarmonyOS 任务列表不显示 App 名称的终极指南
雨夜寻晴天2025/12/22

问题背景 在 Flutter 应用开发中,我们最近遇到了一个棘手的兼容性问题:在部分 华为手机(HarmonyOS 4.2.0,如 Mate 30 Pro 5G) 上,应用运行时的最近任务列表(Overview Screen)中,只显示应用图标,却不显示应用名称(App Name)。 虽然我们在 AndroidManifest.xml 中正确配置了 android:label,但在 HarmonyOS 系统上依然无效。这不仅影响用户体验,也可能导致应用在审核时被拒(如华为应用市场审核指南第 2.


【鸿蒙开发案例篇】定点出击!鸿蒙6.0视频碰一碰流转+实时进度同步案例
威哥爱编程2025/12/14

兄弟们抄家伙!今天V哥要用鸿蒙6.0的分布式能力撕碎视频跨设备流转的防线!目标:手机碰一下车机/平板,视频秒级切换+进度毫秒级同步,全程零手动干预!以下基于HarmonyOS 6.0(API 21)的ArkTS实战核弹代码已就位👇 联系V哥获取 鸿蒙学习资料 🔥 一、技术架构:分布式视频作战链 核心武器库: 碰一碰触发:NFC+分布式设备管理(@ohos.distributedDeviceManager) 进度同步引擎:AVSession Kit(@kit.AVSessionKit) 数


【Claude Code】长时间运行 agents 的有效控制框架
是魔丸啊2025/12/6

转载 Harness原意是马具,从马具控制马匹,引申为一个控制和管理框架,用于封装、引导和监督AI代理的行为。后文为了保持原意,作为专有名词不翻译了。 文中提到的initializer agent / coding agent,并不是claude code已经内置的机制,而是一个最佳实践,需要自行配置。如果想要prompt,可以去这里寻找: github.com/anthropics/… 随着 AI agents 变得更加强大,开发者越来越多地要求它们承担需要跨越数小时甚至数天工作的复杂任务


《 Linux 修炼全景指南: 七 》 指尖下的利刃:深入理解 Vim 的高效世界
Lenyiin2025/11/28

摘要 本篇《Linux Vim 入门指南》从零开始,系统而全面地介绍了 Vim 的操作理念、基础模式、光标移动、文本编辑、搜索替换、可视化模式、多窗口与多文件协作等核心能力,并深入讲解 Vim 配置、插件体系与高效编辑技巧。文章不仅涵盖新手最容易踩的坑,还通过实战示例带你完成一次完整的编辑任务,使读者不但 “会用 Vim”,更真正理解 Vim 背后的高效思维方式。无论你是 Linux 新手,还是想进一步提升编辑效率的开发者,这篇指南都将成为你学习 Vim 的最佳起点。 1、引言:为什么学 Vi


【SpringBoot】从学会使用maven开始
那我掉的头发算什么2026/1/17

🎬 那我掉的头发算什么:个人主页 🔥 个人专栏: 《javaSE》《数据结构》《数据库》《javaEE》 ⛺️待到苦尽甘来日 引言 当我们在创建一个新的idea项目时,不知道大家注意过没有 在这个页面中除了IntelliJ选项之外,还有一个Maven选项。而这个Maven恰好就是我们今天这篇文章的重头戏! 文章目录 引言创建Maven项目pom文件项目基本信息GAVproperties依赖管理核心:dependencies与depe


CSDN创作变现活动!社区镜像或使用视频教程分别单个最高得 80 元,收益上不封顶!
CSDN官方博客2026/1/26

CSDN AI 社区是聚焦 AI 技术产业落地的开发者服务平台(官方入口),核心为创作者搭建技术价值转化桥梁,AI社区涵盖: 镜像市场(社区镜像)、算力市场等模块。 本次推出镜像创作激励活动,以下是方案活动规则、参与要求及激励政策,保障创作者权益与活动有序开展。 一、活动总则 活动时间: 2026年1月1日 - 2026年2月28日 现金奖励: 1、按照官方指定镜像任务创作,单个社区镜像奖励 30-80元现金

首页编辑器站点地图

本站内容在 CC BY-SA 4.0 协议下发布

Copyright © 2026 XYZ博客