#!/usr/bin/env python3 """ Markdown文档转知识库格式转换器 将md格式的文档转换为RAG知识库所需的JSON格式 包含content、metadata、source、type等字段 """ import json import re import os from typing import List, Dict, Any from pathlib import Path class MarkdownToKnowledgeConverter: """Markdown转知识库格式转换器""" def __init__(self): self.documents = [] def parse_markdown(self, file_path: str) -> List[Dict[str, Any]]: """ 解析markdown文件并转换为知识库格式 Args: file_path: markdown文件路径 Returns: 转换后的文档列表 """ with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # 提取文档标题 title_match = re.search(r'^#\s+(.+?)$', content, re.MULTILINE) document_title = title_match.group(1) if title_match else "未知文档" # 按章节分割文档 sections = self._split_by_sections(content) documents = [] for section in sections: if section['content'].strip(): doc = self._create_document(section, document_title, file_path) documents.append(doc) return documents def _split_by_sections(self, content: str) -> List[Dict[str, str]]: """按章节分割文档内容""" sections = [] # 使用正则表达式分割章节 # 匹配 # 标题 (一级标题) pattern = r'^(#{1,3})\s+(.+?)$' matches = list(re.finditer(pattern, content, re.MULTILINE)) for i, match in enumerate(matches): start_pos = match.start() end_pos = matches[i + 1].start() if i + 1 < len(matches) else len(content) section_content = content[start_pos:end_pos].strip() level = len(match.group(1)) # 标题级别 title = match.group(2).strip() # 提取章节内容(去除标题行) lines = section_content.split('\n') content_lines = lines[1:] # 跳过标题行 # 过滤掉图片引用和空行 filtered_lines = [] for line in content_lines: line = line.strip() if line and not line.startswith('![') and not line.startswith('图'): filtered_lines.append(line) if filtered_lines: sections.append({ 'title': title, 'content': '\n'.join(filtered_lines), 'level': level }) return sections def _create_document(self, section: Dict[str, str], document_title: str, file_path: str) -> Dict[str, Any]: """创建知识库文档格式""" # 确定文档类型 doc_type = self._determine_type(section['title']) # 生成内容 content = f"标题: {section['title']}\n\n{section['content']}" # 清理内容(去除多余的空行和特殊字符) content = self._clean_content(content) return { "content": content, "metadata": { "source": document_title, "type": doc_type, "section": section['title'], "level": section['level'], "file_path": file_path } } def _determine_type(self, title: str) -> str: """根据标题确定文档类型""" title_lower = title.lower() # 定义类型映射 type_mapping = { "登录": "login_guide", "采购": "purchase_management", "入库": "inbound_management", "出库": "outbound_management", "库存": "inventory_management", "工艺": "process_management", "生产": "production_management", "基础设置": "basic_settings", "系统管理": "system_management", "个人中心": "personal_center", "装配": "assembly_management" } for keyword, doc_type in type_mapping.items(): if keyword in title_lower: return doc_type # 默认类型 return "operation_guide" def _clean_content(self, content: str) -> str: """清理文档内容""" # 去除多余的空行 lines = content.split('\n') cleaned_lines = [] for line in lines: line = line.strip() if line: # 去除markdown链接格式但保留文本 line = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', line) # 去除markdown图片引用 line = re.sub(r'!\[.*?\]\([^\)]+\)', '', line) # 去除特殊格式字符 line = re.sub(r'[{}]+', '', line) # 去除过多的空格 line = re.sub(r'\s+', ' ', line) cleaned_lines.append(line) # 重新组合,去除连续的空行 result = '\n'.join(cleaned_lines) result = re.sub(r'\n\s*\n', '\n\n', result) return result.strip() def convert_file(self, input_file: str, output_file: str = None) -> List[Dict[str, Any]]: """ 转换单个markdown文件 Args: input_file: 输入的markdown文件路径 output_file: 输出的JSON文件路径(可选) Returns: 转换后的文档列表 """ print(f"正在转换文件: {input_file}") documents = self.parse_markdown(input_file) if output_file: with open(output_file, 'w', encoding='utf-8') as f: json.dump(documents, f, ensure_ascii=False, indent=2) print(f"转换完成,已保存到: {output_file}") print(f"共生成 {len(documents)} 个知识库条目") return documents def convert_directory(self, input_dir: str, output_dir: str): """ 转换目录下的所有markdown文件 Args: input_dir: 输入目录路径 output_dir: 输出目录路径 """ input_path = Path(input_dir) output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) for md_file in input_path.glob("**/*.md"): relative_path = md_file.relative_to(input_path) output_file = output_path / relative_path.with_suffix('.json') output_file.parent.mkdir(parents=True, exist_ok=True) self.convert_file(str(md_file), str(output_file)) def main(): """主函数 - 示例用法""" converter = MarkdownToKnowledgeConverter() # 转换mom.md文件 input_file = "assets/docs/mom.md" output_file = "knowledge_base/mom_knowledge.json" # 检查输入文件是否存在 if not os.path.exists(input_file): print(f"错误:输入文件 {input_file} 不存在") print("请确保文件路径正确") return try: # 确保输出目录存在 os.makedirs(os.path.dirname(output_file), exist_ok=True) # 执行转换 documents = converter.convert_file(input_file, output_file) # 打印一些统计信息 print(f"\n转换统计:") print(f"总文档数: {len(documents)}") # 按类型统计 type_count = {} for doc in documents: doc_type = doc['metadata']['type'] type_count[doc_type] = type_count.get(doc_type, 0) + 1 print("各类型文档数量:") for doc_type, count in type_count.items(): print(f" {doc_type}: {count}") # 显示前几个文档的示例 print(f"\n前3个文档示例:") for i, doc in enumerate(documents[:3]): print(f"\n文档 {i+1}:") print(f" 标题: {doc['metadata']['section']}") print(f" 类型: {doc['metadata']['type']}") print(f" 内容长度: {len(doc['content'])} 字符") print(f" 内容预览: {doc['content'][:100]}...") except Exception as e: print(f"转换过程中发生错误: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()