#!/usr/bin/env python3 """ 知识库集成脚本 演示如何将转换后的JSON文档集成到RAG服务中 """ import json import asyncio import os import sys from typing import List, Dict, Any # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) try: from src.services.rag_service import RAGService except ImportError: print("无法导入RAG服务,请确保项目结构正确") sys.exit(1) class KnowledgeIntegrator: """知识库集成器""" def __init__(self, openai_api_key: str): """ 初始化集成器 Args: openai_api_key: OpenAI API密钥 """ self.rag_service = RAGService(openai_api_key) async def load_json_knowledge(self, json_file: str) -> List[Dict[str, Any]]: """ 从JSON文件加载知识库文档 Args: json_file: JSON文件路径 Returns: 文档列表 """ try: with open(json_file, 'r', encoding='utf-8') as f: documents = json.load(f) print(f"从 {json_file} 加载了 {len(documents)} 个文档") return documents except Exception as e: print(f"加载JSON文件失败: {e}") return [] async def add_documents_by_type(self, documents: List[Dict[str, Any]]): """ 按类型将文档添加到对应的知识库集合 Args: documents: 文档列表 """ # 按文档类型分组 type_groups = {} for doc in documents: doc_type = doc['metadata']['type'] if doc_type not in type_groups: type_groups[doc_type] = [] type_groups[doc_type].append(doc) # 类型到集合的映射 type_to_collection = { 'login_guide': 'system_guide', 'operation_guide': 'system_guide', 'basic_settings': 'system_guide', 'system_management': 'system_guide', 'personal_center': 'system_guide', 'purchase_management': 'production_data', 'inbound_management': 'production_data', 'outbound_management': 'production_data', 'inventory_management': 'production_data', 'process_management': 'production_data', 'production_management': 'production_data', 'assembly_management': 'production_data', } # 添加文档到对应集合 for doc_type, docs in type_groups.items(): collection_name = type_to_collection.get(doc_type, 'system_guide') print(f"正在将 {len(docs)} 个 {doc_type} 类型的文档添加到 {collection_name} 集合...") # 转换文档格式以适配RAG服务 formatted_docs = [] for doc in docs: formatted_doc = { 'content': doc['content'], 'metadata': { 'source': doc['metadata']['source'], 'type': doc['metadata']['type'], 'section': doc['metadata']['section'], 'file_path': doc['metadata']['file_path'] } } formatted_docs.append(formatted_doc) # 添加到RAG服务 await self.rag_service.add_documents(formatted_docs, collection_name) async def test_query(self, query: str, collection_name: str = 'system_guide'): """ 测试查询功能 Args: query: 查询文本 collection_name: 集合名称 """ print(f"\n测试查询: '{query}' (集合: {collection_name})") print("-" * 50) results = await self.rag_service.query(query, collection_name, top_k=3) if results: for i, result in enumerate(results, 1): print(f"结果 {i}:") print(f" 相关性分数: {result.relevance_score:.3f}") print(f" 来源: {result.source}") print(f" 内容: {result.content[:100]}...") print() else: print("未找到相关结果") async def main(): """主函数""" # 配置参数 OPENAI_API_KEY = "your-openai-api-key" # 请替换为实际的API密钥 JSON_FILE = "knowledge_base/sample_mom_knowledge.json" # 检查文件是否存在 if not os.path.exists(JSON_FILE): print(f"错误: JSON文件 {JSON_FILE} 不存在") print("请先运行 md_to_knowledge_converter.py 生成JSON文件") return try: # 创建集成器 integrator = KnowledgeIntegrator(OPENAI_API_KEY) # 加载JSON文档 documents = await integrator.load_json_knowledge(JSON_FILE) if not documents: print("没有找到要集成的文档") return # 将文档添加到知识库 await integrator.add_documents_by_type(documents) print("\n知识库集成完成!") # 测试查询 test_queries = [ ("如何登录系统", "system_guide"), ("怎么进行入库操作", "production_data"), ("生产计划如何制定", "production_data") ] for query, collection in test_queries: await integrator.test_query(query, collection) except Exception as e: print(f"集成过程中发生错误: {e}") import traceback traceback.print_exc() def demo_without_rag(): """演示模式 - 不依赖RAG服务""" print("=== 演示模式:展示JSON文档结构 ===\n") json_file = "knowledge_base/sample_mom_knowledge.json" if not os.path.exists(json_file): print(f"JSON文件 {json_file} 不存在") return with open(json_file, 'r', encoding='utf-8') as f: documents = json.load(f) print(f"加载了 {len(documents)} 个文档\n") # 按类型统计 type_count = {} for doc in documents: doc_type = doc['metadata']['type'] type_count[doc_type] = type_count.get(doc_type, 0) + 1 print("文档类型统计:") for doc_type, count in type_count.items(): print(f" {doc_type}: {count} 个文档") print(f"\n前3个文档示例:") for i, doc in enumerate(documents[:3]): print(f"\n文档 {i+1}:") print(f" 标题: {doc['metadata']['section']}") print(f" 类型: {doc['metadata']['type']}") print(f" 来源: {doc['metadata']['source']}") print(f" 内容: {doc['content'][:80]}...") print("\n这些文档可以直接集成到RAG知识库中!") if __name__ == "__main__": # 检查是否可以导入RAG服务 try: from src.services.rag_service import RAGService # 如果能导入,运行完整版本 asyncio.run(main()) except ImportError: # 如果不能导入,运行演示版本 demo_without_rag()