123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- #!/usr/bin/env python3
- """
- 知识库集成脚本
- 演示如何将转换后的JSON文档集成到RAG服务中
- """
- import json
- import asyncio
- import os
- import sys
- from typing import List, Dict, Any
- # 添加项目根目录到Python路径
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- try:
- from src.services.rag_service import RAGService
- except ImportError:
- print("无法导入RAG服务,请确保项目结构正确")
- sys.exit(1)
- class KnowledgeIntegrator:
- """知识库集成器"""
-
- def __init__(self, openai_api_key: str):
- """
- 初始化集成器
-
- Args:
- openai_api_key: OpenAI API密钥
- """
- self.rag_service = RAGService(openai_api_key)
-
- async def load_json_knowledge(self, json_file: str) -> List[Dict[str, Any]]:
- """
- 从JSON文件加载知识库文档
-
- Args:
- json_file: JSON文件路径
-
- Returns:
- 文档列表
- """
- try:
- with open(json_file, 'r', encoding='utf-8') as f:
- documents = json.load(f)
-
- print(f"从 {json_file} 加载了 {len(documents)} 个文档")
- return documents
-
- except Exception as e:
- print(f"加载JSON文件失败: {e}")
- return []
-
- async def add_documents_by_type(self, documents: List[Dict[str, Any]]):
- """
- 按类型将文档添加到对应的知识库集合
-
- Args:
- documents: 文档列表
- """
- # 按文档类型分组
- type_groups = {}
- for doc in documents:
- doc_type = doc['metadata']['type']
- if doc_type not in type_groups:
- type_groups[doc_type] = []
- type_groups[doc_type].append(doc)
-
- # 类型到集合的映射
- type_to_collection = {
- 'login_guide': 'system_guide',
- 'operation_guide': 'system_guide',
- 'basic_settings': 'system_guide',
- 'system_management': 'system_guide',
- 'personal_center': 'system_guide',
-
- 'purchase_management': 'production_data',
- 'inbound_management': 'production_data',
- 'outbound_management': 'production_data',
- 'inventory_management': 'production_data',
- 'process_management': 'production_data',
- 'production_management': 'production_data',
- 'assembly_management': 'production_data',
- }
-
- # 添加文档到对应集合
- for doc_type, docs in type_groups.items():
- collection_name = type_to_collection.get(doc_type, 'system_guide')
-
- print(f"正在将 {len(docs)} 个 {doc_type} 类型的文档添加到 {collection_name} 集合...")
-
- # 转换文档格式以适配RAG服务
- formatted_docs = []
- for doc in docs:
- formatted_doc = {
- 'content': doc['content'],
- 'metadata': {
- 'source': doc['metadata']['source'],
- 'type': doc['metadata']['type'],
- 'section': doc['metadata']['section'],
- 'file_path': doc['metadata']['file_path']
- }
- }
- formatted_docs.append(formatted_doc)
-
- # 添加到RAG服务
- await self.rag_service.add_documents(formatted_docs, collection_name)
-
- async def test_query(self, query: str, collection_name: str = 'system_guide'):
- """
- 测试查询功能
-
- Args:
- query: 查询文本
- collection_name: 集合名称
- """
- print(f"\n测试查询: '{query}' (集合: {collection_name})")
- print("-" * 50)
-
- results = await self.rag_service.query(query, collection_name, top_k=3)
-
- if results:
- for i, result in enumerate(results, 1):
- print(f"结果 {i}:")
- print(f" 相关性分数: {result.relevance_score:.3f}")
- print(f" 来源: {result.source}")
- print(f" 内容: {result.content[:100]}...")
- print()
- else:
- print("未找到相关结果")
- async def main():
- """主函数"""
- # 配置参数
- OPENAI_API_KEY = "your-openai-api-key" # 请替换为实际的API密钥
- JSON_FILE = "knowledge_base/sample_mom_knowledge.json"
-
- # 检查文件是否存在
- if not os.path.exists(JSON_FILE):
- print(f"错误: JSON文件 {JSON_FILE} 不存在")
- print("请先运行 md_to_knowledge_converter.py 生成JSON文件")
- return
-
- try:
- # 创建集成器
- integrator = KnowledgeIntegrator(OPENAI_API_KEY)
-
- # 加载JSON文档
- documents = await integrator.load_json_knowledge(JSON_FILE)
-
- if not documents:
- print("没有找到要集成的文档")
- return
-
- # 将文档添加到知识库
- await integrator.add_documents_by_type(documents)
-
- print("\n知识库集成完成!")
-
- # 测试查询
- test_queries = [
- ("如何登录系统", "system_guide"),
- ("怎么进行入库操作", "production_data"),
- ("生产计划如何制定", "production_data")
- ]
-
- for query, collection in test_queries:
- await integrator.test_query(query, collection)
-
- except Exception as e:
- print(f"集成过程中发生错误: {e}")
- import traceback
- traceback.print_exc()
- def demo_without_rag():
- """演示模式 - 不依赖RAG服务"""
- print("=== 演示模式:展示JSON文档结构 ===\n")
-
- json_file = "knowledge_base/sample_mom_knowledge.json"
-
- if not os.path.exists(json_file):
- print(f"JSON文件 {json_file} 不存在")
- return
-
- with open(json_file, 'r', encoding='utf-8') as f:
- documents = json.load(f)
-
- print(f"加载了 {len(documents)} 个文档\n")
-
- # 按类型统计
- type_count = {}
- for doc in documents:
- doc_type = doc['metadata']['type']
- type_count[doc_type] = type_count.get(doc_type, 0) + 1
-
- print("文档类型统计:")
- for doc_type, count in type_count.items():
- print(f" {doc_type}: {count} 个文档")
-
- print(f"\n前3个文档示例:")
- for i, doc in enumerate(documents[:3]):
- print(f"\n文档 {i+1}:")
- print(f" 标题: {doc['metadata']['section']}")
- print(f" 类型: {doc['metadata']['type']}")
- print(f" 来源: {doc['metadata']['source']}")
- print(f" 内容: {doc['content'][:80]}...")
-
- print("\n这些文档可以直接集成到RAG知识库中!")
- if __name__ == "__main__":
- # 检查是否可以导入RAG服务
- try:
- from src.services.rag_service import RAGService
- # 如果能导入,运行完整版本
- asyncio.run(main())
- except ImportError:
- # 如果不能导入,运行演示版本
- demo_without_rag()
|