integrate_knowledge.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. #!/usr/bin/env python3
  2. """
  3. 知识库集成脚本
  4. 演示如何将转换后的JSON文档集成到RAG服务中
  5. """
  6. import json
  7. import asyncio
  8. import os
  9. import sys
  10. from typing import List, Dict, Any
  11. # 添加项目根目录到Python路径
  12. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  13. try:
  14. from src.services.rag_service import RAGService
  15. except ImportError:
  16. print("无法导入RAG服务,请确保项目结构正确")
  17. sys.exit(1)
  18. class KnowledgeIntegrator:
  19. """知识库集成器"""
  20. def __init__(self, openai_api_key: str):
  21. """
  22. 初始化集成器
  23. Args:
  24. openai_api_key: OpenAI API密钥
  25. """
  26. self.rag_service = RAGService(openai_api_key)
  27. async def load_json_knowledge(self, json_file: str) -> List[Dict[str, Any]]:
  28. """
  29. 从JSON文件加载知识库文档
  30. Args:
  31. json_file: JSON文件路径
  32. Returns:
  33. 文档列表
  34. """
  35. try:
  36. with open(json_file, 'r', encoding='utf-8') as f:
  37. documents = json.load(f)
  38. print(f"从 {json_file} 加载了 {len(documents)} 个文档")
  39. return documents
  40. except Exception as e:
  41. print(f"加载JSON文件失败: {e}")
  42. return []
  43. async def add_documents_by_type(self, documents: List[Dict[str, Any]]):
  44. """
  45. 按类型将文档添加到对应的知识库集合
  46. Args:
  47. documents: 文档列表
  48. """
  49. # 按文档类型分组
  50. type_groups = {}
  51. for doc in documents:
  52. doc_type = doc['metadata']['type']
  53. if doc_type not in type_groups:
  54. type_groups[doc_type] = []
  55. type_groups[doc_type].append(doc)
  56. # 类型到集合的映射
  57. type_to_collection = {
  58. 'login_guide': 'system_guide',
  59. 'operation_guide': 'system_guide',
  60. 'basic_settings': 'system_guide',
  61. 'system_management': 'system_guide',
  62. 'personal_center': 'system_guide',
  63. 'purchase_management': 'production_data',
  64. 'inbound_management': 'production_data',
  65. 'outbound_management': 'production_data',
  66. 'inventory_management': 'production_data',
  67. 'process_management': 'production_data',
  68. 'production_management': 'production_data',
  69. 'assembly_management': 'production_data',
  70. }
  71. # 添加文档到对应集合
  72. for doc_type, docs in type_groups.items():
  73. collection_name = type_to_collection.get(doc_type, 'system_guide')
  74. print(f"正在将 {len(docs)} 个 {doc_type} 类型的文档添加到 {collection_name} 集合...")
  75. # 转换文档格式以适配RAG服务
  76. formatted_docs = []
  77. for doc in docs:
  78. formatted_doc = {
  79. 'content': doc['content'],
  80. 'metadata': {
  81. 'source': doc['metadata']['source'],
  82. 'type': doc['metadata']['type'],
  83. 'section': doc['metadata']['section'],
  84. 'file_path': doc['metadata']['file_path']
  85. }
  86. }
  87. formatted_docs.append(formatted_doc)
  88. # 添加到RAG服务
  89. await self.rag_service.add_documents(formatted_docs, collection_name)
  90. async def test_query(self, query: str, collection_name: str = 'system_guide'):
  91. """
  92. 测试查询功能
  93. Args:
  94. query: 查询文本
  95. collection_name: 集合名称
  96. """
  97. print(f"\n测试查询: '{query}' (集合: {collection_name})")
  98. print("-" * 50)
  99. results = await self.rag_service.query(query, collection_name, top_k=3)
  100. if results:
  101. for i, result in enumerate(results, 1):
  102. print(f"结果 {i}:")
  103. print(f" 相关性分数: {result.relevance_score:.3f}")
  104. print(f" 来源: {result.source}")
  105. print(f" 内容: {result.content[:100]}...")
  106. print()
  107. else:
  108. print("未找到相关结果")
  109. async def main():
  110. """主函数"""
  111. # 配置参数
  112. OPENAI_API_KEY = "your-openai-api-key" # 请替换为实际的API密钥
  113. JSON_FILE = "knowledge_base/sample_mom_knowledge.json"
  114. # 检查文件是否存在
  115. if not os.path.exists(JSON_FILE):
  116. print(f"错误: JSON文件 {JSON_FILE} 不存在")
  117. print("请先运行 md_to_knowledge_converter.py 生成JSON文件")
  118. return
  119. try:
  120. # 创建集成器
  121. integrator = KnowledgeIntegrator(OPENAI_API_KEY)
  122. # 加载JSON文档
  123. documents = await integrator.load_json_knowledge(JSON_FILE)
  124. if not documents:
  125. print("没有找到要集成的文档")
  126. return
  127. # 将文档添加到知识库
  128. await integrator.add_documents_by_type(documents)
  129. print("\n知识库集成完成!")
  130. # 测试查询
  131. test_queries = [
  132. ("如何登录系统", "system_guide"),
  133. ("怎么进行入库操作", "production_data"),
  134. ("生产计划如何制定", "production_data")
  135. ]
  136. for query, collection in test_queries:
  137. await integrator.test_query(query, collection)
  138. except Exception as e:
  139. print(f"集成过程中发生错误: {e}")
  140. import traceback
  141. traceback.print_exc()
  142. def demo_without_rag():
  143. """演示模式 - 不依赖RAG服务"""
  144. print("=== 演示模式:展示JSON文档结构 ===\n")
  145. json_file = "knowledge_base/sample_mom_knowledge.json"
  146. if not os.path.exists(json_file):
  147. print(f"JSON文件 {json_file} 不存在")
  148. return
  149. with open(json_file, 'r', encoding='utf-8') as f:
  150. documents = json.load(f)
  151. print(f"加载了 {len(documents)} 个文档\n")
  152. # 按类型统计
  153. type_count = {}
  154. for doc in documents:
  155. doc_type = doc['metadata']['type']
  156. type_count[doc_type] = type_count.get(doc_type, 0) + 1
  157. print("文档类型统计:")
  158. for doc_type, count in type_count.items():
  159. print(f" {doc_type}: {count} 个文档")
  160. print(f"\n前3个文档示例:")
  161. for i, doc in enumerate(documents[:3]):
  162. print(f"\n文档 {i+1}:")
  163. print(f" 标题: {doc['metadata']['section']}")
  164. print(f" 类型: {doc['metadata']['type']}")
  165. print(f" 来源: {doc['metadata']['source']}")
  166. print(f" 内容: {doc['content'][:80]}...")
  167. print("\n这些文档可以直接集成到RAG知识库中!")
  168. if __name__ == "__main__":
  169. # 检查是否可以导入RAG服务
  170. try:
  171. from src.services.rag_service import RAGService
  172. # 如果能导入,运行完整版本
  173. asyncio.run(main())
  174. except ImportError:
  175. # 如果不能导入,运行演示版本
  176. demo_without_rag()