@clawhub-gaoshuping99-0442151f59
Natural language search for local files (100G-1T). Supports xlsx, pptx, pdf, docx formats with location info. Triggered when user asks to search local/comput...
---
name: local-ai-search
description: Natural language search for local files (100G-1T). Supports xlsx, pptx, pdf, docx formats with location info. Triggered when user asks to search local/computer/folder content.
---
# Local AI Search
## 触发条件
**当用户说以下内容时,调用此 Skill:**
- "帮我在本地搜索..."
- "帮我在本电脑搜索..."
- "帮我在某个文件夹中搜索..."
- "搜索本地文件..."
- "搜索我的文档..."
- "在本机查找..."
- "从我的文件中查找..."
- 或任何涉及**本地/本机/文件夹内容检索**的请求
## 功能说明
本 Skill 提供本地文件的 AI 智能搜索功能:
- ✅ 支持 xlsx, pptx, pdf, docx, md 等格式
- ✅ 自然语言查询(用日常语言描述要找的内容)
- ✅ 指定文件夹范围进行搜索
- ✅ 返回文件位置信息(工作表名、幻灯片页码)
- ✅ 无需本地大模型,使用云端 API
## 使用方式
### 方式一:直接搜索(推荐)
```
用户: 帮我在本地搜索关于销售数据的内容
用户: 在 ~/Documents/Projects 文件夹中搜索 API 相关的文档
用户: 搜索本电脑中包含"关键词"的文件
```
### 方式二:指定目录搜索
```
用户: 帮我在 ~/Documents/Projects 文件夹中搜索技术文档
```
### 方式三:自然语言查询
```
用户: 帮我找一下第三季度的销售报告
用户: 搜索一下关于数字化转型的内容
用户: 找找看有没有关于项目计划的 PPT
```
## 调用流程
1. **检查服务状态**:确认 Khoj 服务是否运行
2. **确定搜索范围**:用户指定的文件夹,或默认已索引的知识库
3. **执行搜索**:使用自然语言查询本地文件
4. **返回结果**:显示匹配的文件名、位置信息、内容片段
---
## 快速验证(已测试)
```bash
# 1. 启动 Khoj 服务(嵌入式 PostgreSQL 模式)
export USE_EMBEDDED_DB="true"
khoj --anonymous-mode
# 2. 转换文档
~/.agents/skills/local-ai-search/scripts/convert.py ~/Documents/source -o ~/Documents/converted
# 3. 索引文件(API 方式)
curl -X PATCH "http://localhost:42110/api/content" \
-F "files=@~/Documents/converted/example.xlsx.md"
# 4. 搜索查询
~/.agents/skills/local-ai-search/scripts/query.py "搜索内容"
```
### 验证结果示例
```
[1] 文件: test_data.xlsx.md
工作表: Sales Data
内容: | Month | Sales | | January | $10,000 |...
[2] 文件: test_slides.pptx.md
幻灯片: 第 1 页
内容: # Project Overview This is a test presentation...
```
---
## 概述
基于 Khoj 的本地 RAG 知识库解决方案,支持大规模文件(100G到1T)的全文检索和自然语言查询。通过 MarkItDown 转换 Office 文档,结合云端 LLM API 实现轻量级部署,适合资源受限环境。
---
## 需求背景
### 核心需求
| 需求项 | 具体要求 |
|---|---|
| **数据规模** | 建议小于1T的数据量,例如200GB 本地文件 |
| **文件格式** | xlsx, pptx, pdf, docx, md 等 |
| **检索方式** | 自然语言查询 |
| **大模型** | 云端 API(OpenAI/DeepSeek/Claude/Qwen/Kimi/Minmax) |
| **定位精度** | 来源文件 + 大致位置(工作表/幻灯片) |
| **集成方式** | 封装为 OpenCode Skill |
### 硬件约束
| 约束项 | 配置 |
|---|---|
| **设备** | 常规个人PC,例如MacBook Air M2 |
| **内存** | 8GB+ 可用内存 |
| **剩余空间** | 足够的磁盘空间(文档大小的 25-40%)。例如200G的文件,需要有80GB空闲空间,支持本地向量数据库存储RAG结果。 |
| **本地 LLM** | 无法部署(资源不足) |
---
## 技术架构
### 架构图
```
┌─────────────────────────────────────────────────────────────────┐
│ OpenCode Skill │
│ rag query "搜索内容" --top-k 10 │
│ rag index /path/to/files │
│ rag status │
└─────────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Khoj API Server │
│ localhost:42110 │
│ • 向量检索 │
│ • 对话生成 │
│ • 文件管理 │
└─────────────────────────┬───────────────────────────────────────┘
│
┌───────────────┴───────────────┐
▼ ▼
┌─────────────────────┐ ┌─────────────────────┐
│ PostgreSQL 数据库 │ │ 云端 LLM API │
│ (嵌入式 pgserver) │ │ 多模型支持 │
│ • 向量存储 │ │ • Chat Model │
│ • 文档索引 │ │ • 对话生成 │
│ • ~50-80GB │ │ • 无本地占用 │
└─────────────────────┘ └─────────────────────┘
```
### 数据流
```
xlsx/pptx → MarkItDown 转换 → Markdown → Khoj 索引 → 向量数据库
↓
用户查询 → 向量检索 → 匹配片段 → 云端 LLM → 自然语言回答
↓
显示来源文件 + 位置
```
### 组件说明
| 组件 | 选择 | 理由 |
|---|---|---|
| **RAG 服务** | Khoj | 成熟(33k stars)、API 友好、内存占用低 |
| **文档转换** | MarkItDown | 微软开源、支持 xlsx/pptx、保留位置信息 |
| **向量数据库** | PostgreSQL(嵌入式) | 成熟稳定、pgvector 向量索引、8GB+ RAM 友好 |
| **Embedding** | 本地模型(sentence-transformers) | 免费、快速、隐私保护 |
| **LLM** | 云端 API | 解放内存压力、性能更好 |
---
## 安装部署
### 环境要求
- Python 3.10+
- macOS / Linux / Windows
- 建议 8GB+ 可用内存
- 足够的磁盘空间(文档大小的 25-40%)
### 平台支持
| 平台 | 支持状态 | 说明 |
|------|----------|------|
| macOS | ✅ 完全支持 | 原生支持,直接使用 |
| Linux | ✅ 完全支持 | 原生支持,直接使用 |
| Windows | ⚠️ 需要 WSL2 | 使用 WSL2 运行 Linux 环境 |
#### Windows 用户:安装 WSL2
**WSL2**(Windows Subsystem for Linux 2)让 Windows 可以直接运行 Linux,无需虚拟机或双系统。
```powershell
# 1. 在 Windows PowerShell(管理员模式)中运行
wsl --install
# 2. 重启电脑后,打开 "Ubuntu" 应用
# 3. 在 Ubuntu 终端中继续以下安装步骤
```
安装 WSL2 后,在 Ubuntu 终端中执行所有后续命令。
### 安装步骤
#### 1. 安装依赖
```bash
# 安装 Khoj
pip install khoj
# 安装 MarkItDown(含 Office 文档支持)
pip install "markitdown[xlsx,pptx]"
```
#### 2. 配置云端 LLM API
```bash
# OpenAI
export OPENAI_API_KEY="sk-xxx"
# DeepSeek(推荐,性价比高)
export OPENAI_API_KEY="sk-xxx"
export OPENAI_BASE_URL="https://api.deepseek.com/v1"
# Anthropic Claude
export ANTHROPIC_API_KEY="sk-xxx"
```
#### 3. 启动 Khoj 服务
```bash
# 嵌入式 PostgreSQL 模式(推荐个人使用)
export USE_EMBEDDED_DB="true"
khoj --anonymous-mode
# 访问 Web UI
open http://localhost:42110
```
---
## 使用指南
### 命令列表
| 命令 | 说明 | 示例 |
|---|---|---|
| `rag start` | 启动 Khoj 服务 | `rag start` |
| `rag stop` | 停止服务 | `rag stop` |
| `rag status` | 查看服务状态 | `rag status` |
| `rag convert <dir>` | 转换 xlsx/pptx 为 Markdown | `rag convert ~/Documents` |
| `rag index <dir>` | 索引文件到知识库 | `rag index ~/Documents/converted` |
| `rag query "<问题>"` | 查询知识库 | `rag query "第三季度销售数据"` |
| `rag clean` | 清理转换后的临时文件 | `rag clean` |
| `rag sync` | 增量同步目录到知识库 | `rag sync ~/Documents` |
| `rag schedule` | 管理定时同步任务 | `rag schedule ~/Documents --enable` |
### 文档转换
```bash
# 转换指定目录下的 xlsx/pptx 文件
markitdown convert ~/Documents/source -o ~/Documents/converted
# 转换单个文件
markitdown convert report.xlsx -o report.md
```
转换结果示例:
**Excel (xlsx)**:
```markdown
## Sheet1
| Name | Age | City |
|---|---|---|
| Alice | 30 | NYC |
| Bob | 25 | LA |
## Sheet2
| Product | Price |
|---|---|
| Apple | $1 |
```
**PowerPoint (pptx)**:
```markdown
<!-- Slide number: 1 -->
# Project Overview
This is the introduction...
<!-- Slide number: 2 -->
## Key Features
- Feature 1
- Feature 2
```
### 知识库索引
#### 方式一:Web UI
1. 打开 http://localhost:42110/config
2. 点击 "Add Content Source"
3. 选择文件夹路径
4. 等待索引完成
#### 方式二:API
```bash
curl -X PATCH "http://localhost:42110/api/content" \
-H "Authorization: Bearer YOUR_API_KEY" \
-F "files=@/path/to/document.md"
```
### 查询示例
```bash
# CLI 查询
khoj query "第三季度的销售数据在哪?"
# API 查询
curl "http://localhost:42110/api/search?q=第三季度销售数据&n=5" \
-H "Authorization: Bearer YOUR_API_KEY"
```
**返回结果示例**:
```
来源文件: Q3_report.xlsx
位置: Sheet1
匹配内容:
| Month | Sales | Growth |
|---|---|---|
| July | $50,000 | +12% |
| August | $55,000 | +10% |
| September | $60,000 | +9% |
来源文件: sales.pptx
位置: Slide 5
匹配内容:
Q3 Sales Summary: Total $165,000
```
---
## 文件格式支持
| 格式 | 支持方式 | 位置信息 | 说明 |
|---|---|---|---|
| **xlsx/xls** | MarkItDown 转换 | ✅ 工作表名称 | 表格数据完整保留 |
| **pptx** | MarkItDown 转换 | ✅ 幻灯片编号 | 文本、表格提取 |
| **pdf** | Khoj 原生支持 | ✅ 页码 | 自动 OCR 扫描版 |
| **docx** | Khoj 原生支持 | ✅ 段落标题 | 完整文档结构 |
| **md/txt** | Khoj 原生支持 | ✅ 文件名 | 最佳支持 |
| **org** | Khoj 原生支持 | ✅ 文件名 | Emacs 用户友好 |
### 不支持的格式
| 格式 | 解决方案 |
|---|---|
| .epub | `pandoc book.epub -o book.md` |
| .html | `pandoc page.html -o page.md` |
| .rtf | `pandoc doc.rtf -o doc.md` |
---
## 空间与性能
### 空间估算
| 项目 | 空间占用 | 说明 |
|---|---|---|
| **原始文件** | 200GB | 保留不变 |
| **转换后 Markdown** | ~20-40GB | 索引后可删除 |
| **Khoj 安装** | ~0.5GB | Python 包 + 本地模型 |
| **向量数据库** | ~50-80GB | 索引文件 |
| **临时占用(最大)** | ~70-120GB | 索引过程中 |
| **最终占用** | ~250-280GB | 删除 Markdown 后 |
### 空间时间线
```
初始状态: 100GB 可用
安装后: 99.5GB 可用(-0.5GB)
转换后: 60-80GB 可用(-20-40GB)
索引完成: 10-30GB 可用(-50-80GB)← 最紧张时刻
删除 Markdown: 50-70GB 可用(+20-40GB)
```
### 性能指标
| 指标 | 数值 | 说明 |
|---|---|---|
| **索引速度** | ~1-2GB/小时 | 常规个人PC |
| **查询响应** | 50-200ms | 向量检索 |
| **对话生成** | 1-3秒 | 取决于云端 API |
| **内存占用** | ~200-500MB | 空闲时 |
| **索引时内存** | ~2-4GB | 取决于文件大小 |
---
## 云端 API 配置
### 支持的 LLM 提供商
| 提供商 | API Key 环境变量 | 模型示例 |
|---|---|---|
| **OpenAI** | `OPENAI_API_KEY` | gpt-4o, gpt-4o-mini |
| **DeepSeek** | `OPENAI_API_KEY` + `OPENAI_BASE_URL` | deepseek-chat, deepseek-reasoner |
| **Anthropic** | `ANTHROPIC_API_KEY` | claude-3-5-sonnet |
| **Google** | `GEMINI_API_KEY` | gemini-2.0-flash |
| **Qwen/通义** | `OPENAI_API_KEY` + `OPENAI_BASE_URL` | qwen-turbo, qwen-plus |
| **Kimi/月之暗面** | `OPENAI_API_KEY` + `OPENAI_BASE_URL` | moonshot-v1-8k |
| **Minimax** | `OPENAI_API_KEY` + `OPENAI_BASE_URL` | abab6.5-chat |
| **本地 Ollama** | - | llama3, qwen2.5 |
### 配置文件示例
```yaml
# ~/.khoj/config.yml
# LLM 配置
chat-model:
provider: openai # 或 deepseek, anthropic
model: gpt-4o-mini
api-key: OPENAI_API_KEY
# Embedding 配置(使用本地模型)
embedding-model:
provider: local
model: BAAI/bge-small-zh-v1.5
# 数据库配置
database:
type: sqlite
path: ~/.khoj/khoj.db
```
---
## 最佳实践
### 增量索引建议
```bash
# 1. 先小规模测试
rag convert ~/Documents/test_folder
rag index ~/Documents/test_folder_converted
# 2. 观察实际空间占用
du -sh ~/.khoj/
# 3. 根据测试结果推算全量需求
# 4. 分批索引核心文件
rag convert ~/Documents/important_folder
rag index ~/Documents/important_folder_converted
```
### 定期维护
```bash
# 查看索引状态
rag status
# 清理过期文件
rag clean
# 备份知识库
cp -r ~/.khoj ~/.khoj_backup_$(date +%Y%m%d)
# 重新索引(更换 Embedding 模型后)
khoj --regenerate
```
### 故障排查
| 问题 | 解决方案 |
|---|---|
| 服务启动失败 | 检查端口 42110 是否被占用 |
| 索引速度慢 | 减少并发、关闭其他应用 |
| 内存不足 | 使用云端 Embedding API |
| 查询无结果 | 检查文件格式、重新索引 |
---
## 安全与隐私
### 数据安全
| 项目 | 说明 |
|---|---|
| **本地数据** | 所有向量存储在本地 PostgreSQL |
| **Embedding** | 默认使用本地模型,数据不上传 |
| **LLM 对话** | 仅查询内容发送到云端 API |
| **API Key** | 存储在本地环境变量或配置文件 |
### 隐私建议
- 敏感文档可使用本地 Embedding 模型
- 对话内容会被发送到云端 LLM,注意脱敏
- API Key 不要提交到版本控制
- 定期备份 ~/.khoj/ 目录
---
## 扩展与集成
### OpenCode Skill 集成
```bash
# Skill 目录结构
~/.agents/skills/khoj-rag/
├── SKILL.md # 本文档
├── khoj_cli.py # CLI 封装脚本
├── config.yaml # 默认配置
└── scripts/
├── start_server.sh # 启动服务
├── convert.py # 批量转换
└── query.py # 查询封装
```
### API 集成
```python
import requests
KHOJ_URL = "http://localhost:42110"
API_KEY = "your-api-key"
# 搜索
response = requests.get(
f"{KHOJ_URL}/api/search",
params={"q": "查询内容", "n": 5},
headers={"Authorization": f"Bearer {API_KEY}"}
)
# 对话
response = requests.post(
f"{KHOJ_URL}/api/chat",
json={"q": "问题内容"},
headers={"Authorization": f"Bearer {API_KEY}"}
)
```
### 客户端支持
| 客户端 | 说明 |
|---|---|
| **Web UI** | http://localhost:42110 |
| **Obsidian 插件** | 自动同步 Markdown 笔记 |
| **Emacs** | `M-x khoj` 命令 |
| **Desktop App** | 跨平台桌面客户端 |
| **API** | RESTful API 接口 |
---
## 增量同步
### 手动触发同步
当文件有更新时,可以手动触发增量同步:
```bash
# 增量同步(只处理变化的文件)
~/.agents/skills/local-ai-search/scripts/sync.py ~/Documents
# 全量同步(强制重新索引所有文件)
~/.agents/skills/local-ai-search/scripts/sync.py ~/Documents --full
# 详细输出
~/.agents/skills/local-ai-search/scripts/sync.py ~/Documents --verbose
```
或使用 CLI:
```bash
# 增量同步
local-ai-search sync ~/Documents
# 全量同步
local-ai-search sync ~/Documents --full
```
### 定时自动同步
设置每小时自动同步:
```bash
# 启用定时同步(每小时)
~/.agents/skills/local-ai-search/scripts/schedule_sync.sh ~/Documents --enable
# 启用定时同步(每2小时)
~/.agents/skills/local-ai-search/scripts/schedule_sync.sh ~/Documents --enable --interval 2
# 查看定时同步状态
~/.agents/skills/local-ai-search/scripts/schedule_sync.sh --status
# 禁用定时同步
~/.agents/skills/local-ai-search/scripts/schedule_sync.sh --disable
# 立即执行一次同步
~/.agents/skills/local-ai-search/scripts/schedule_sync.sh ~/Documents --run
```
或使用 CLI:
```bash
# 启用定时同步
local-ai-search schedule ~/Documents --enable
# 设置每2小时同步
local-ai-search schedule ~/Documents --enable --interval 2
# 查看状态
local-ai-search schedule --status
# 禁用定时同步
local-ai-search schedule --disable
```
### 进度显示
在大规模索引时会显示进度:
```
扫描目录: ~/Documents
找到文件: 150 个
已索引: 120 个
需要同步: 30 个
[=============> ] 60.0% (18/30) report.xlsx
✓ 成功: 28
✗ 失败: 2
同步完成!
```
### 同步原理
增量同步通过以下方式判断文件变化:
| 检查项 | 说明 |
|--------|------|
| 文件修改时间 | 文件被修改时时间会变化 |
| 文件大小 | 内容变化时大小会变化 |
| 已索引文件列表 | 对比 Khoj 已索引的文件 |
同步状态保存在 `~/.khoj/sync_state.json`,记录每个文件的同步状态。
---
## 常见问题
### Q1: 索引完成后可以删除 Markdown 文件吗?
**可以**。Khoj 已将内容存入向量数据库,Markdown 文件仅作为临时转换产物,索引完成后可安全删除,节省 20-40GB 空间。
### Q2: 如何处理内存限制?
- 使用嵌入式 PostgreSQL 模式(USE_EMBEDDED_DB=true)
- Embedding 使用本地模型(而非云端 API)
- LLM 使用云端 API(而非本地部署)
- 分批索引,避免一次性处理大量文件
### Q3: 查询结果能定位到具体单元格吗?
**默认不支持**。MarkItDown 保留工作表名称和幻灯片编号,但不保留单元格位置。如需精确定位,需自定义转换脚本。
### Q4: 支持实时文件监控吗?
**支持**。在 Khoj 配置中启用文件监控,文档变更会自动触发重新索引。
### Q5: 如何迁移到其他机器?
```bash
# 备份
tar -czf khoj_backup.tar.gz ~/.khoj/
# 恢复
tar -xzf khoj_backup.tar.gz -C ~/
```
---
## 参考资源
- [Khoj 官方文档](https://docs.khoj.dev/)
- [Khoj GitHub](https://github.com/khoj-ai/khoj)
- [MarkItDown GitHub](https://github.com/microsoft/markitdown)
- [DeepSeek API](https://platform.deepseek.com/)
- [OpenAI API](https://platform.openai.com/)
---
## 更新日志
| 版本 | 日期 | 说明 |
|---|---|---|
| 1.1.0 | 2026-03-20 | 新增增量同步、定时同步、进度显示功能 |
| 1.0.4 | 2026-03-20 | 添加 Windows 平台说明(需要 WSL2) |
| 1.0.3 | 2026-03-20 | 澄清数据库类型:Khoj 使用嵌入式 PostgreSQL(非 SQLite) |
| 1.0.1 | 2026-03-20 | 更新文档:数据规模调整,新增 LLM 支持 |
| 1.0.0 | 2026-03-20 | 初始版本 |
---
## 许可证
本 Skill 基于 MIT 许可证开源。Khoj 和 MarkItDown 分别遵循各自的许可证。
FILE:README.md
# Local AI Search
> 本地知识库 AI 搜索,支持 100G-1T 文件的全文检索和自然语言查询
[](https://github.com/khoj-ai/khoj)
[](LICENSE)
## 触发条件
**当用户说以下内容时,调用此 Skill:**
- "帮我在本地搜索..."
- "帮我在本电脑搜索..."
- "帮我在某个文件夹中搜索..."
- "搜索本地文件..."
- "搜索我的文档..."
- 或任何涉及**本地/本机/文件夹内容检索**的请求
## 快速开始
### 安装
```bash
# 安装依赖
pip install khoj "markitdown[xlsx,pptx]"
# 配置 API Key
export OPENAI_API_KEY="your-api-key"
# 或 DeepSeek
export OPENAI_API_KEY="your-api-key"
export OPENAI_BASE_URL="https://api.deepseek.com/v1"
```
### 使用
```bash
# 启动服务
local-ai-search start
# 转换文档
local-ai-search convert ~/Documents/source -o ~/Documents/converted
# 索引到知识库
local-ai-search index ~/Documents/converted
# 查询
local-ai-search query "第三季度销售数据"
```
## 特性
- ✅ 支持 100G-1T 大规模文件
- ✅ 支持 xlsx, xls, pptx, ppt, docx, doc, pdf, md, txt, csv 等格式
- ✅ 本地 OCR 支持扫描版 PDF(使用 Tesseract,无需云 API)
- ✅ 自然语言查询
- ✅ 云端 LLM API(无需本地大模型)
- ✅ 精确定位到源文件位置
- ✅ 轻量级部署(8GB+ RAM 友好)
- ✅ 内存占用仅 ~70MB
## 支持的文件格式
| 格式 | 说明 | 支持状态 |
|------|------|----------|
| `.xlsx`, `.xls` | Excel 表格 | ✅ 支持 |
| `.pptx`, `.ppt` | PowerPoint 演示文稿 | ✅ 支持 |
| `.docx` | Word 文档 (2007+) | ✅ 支持 |
| `.doc` | Word 文档 (97-2003) | ✅ 支持 (需 LibreOffice) |
| `.pdf` | PDF 文档 | ✅ 支持 (含扫描版 OCR) |
| `.md` | Markdown 文档 | ✅ 支持 |
| `.txt` | 纯文本 | ✅ 支持 |
| `.csv` | CSV 数据 | ✅ 支持 |
### 不支持的文件格式
以下格式会被自动跳过:
| 类别 | 格式 | 原因 |
|------|------|------|
| 图片 | `.png`, `.jpg`, `.jpeg`, `.gif`, `.bmp`, `.svg`, `.webp` | 二进制图片,无文本内容 |
| 音频 | `.mp3`, `.wav`, `.aac`, `.flac`, `.pcm` | 音频文件,需语音识别 |
| 视频 | `.mp4`, `.mov`, `.avi`, `.mkv`, `.webm` | 视频文件,需视频处理 |
| 代码 | `.js`, `.py`, `.ts`, `.java`, `.go`, `.html`, `.css` | 代码文件,通常不需要索引 |
| 压缩包 | `.zip`, `.rar`, `.7z`, `.tar`, `.gz` | 压缩文件,需先解压 |
| 其他 | `.rtf`, `.mat`, `.db`, `.sqlite`, `.dll`, `.so` | 不常见或二进制格式 |
## 依赖安装
```bash
# 基础依赖
pip install khoj "markitdown[xlsx,pptx]" requests
# PDF 转图片(用于 OCR)
pip install pdf2image
# 本地 OCR(扫描版 PDF 必需)
pip install pytesseract
# macOS 安装 Tesseract
brew install tesseract tesseract-lang
# Ubuntu 安装 Tesseract
sudo apt install tesseract-ocr tesseract-ocr-chi-sim
# .doc 文件支持(可选)
# macOS
brew install libreoffice
# Ubuntu
sudo apt install libreoffice
```
## 文件结构
```
~/.agents/skills/local-ai-search/
├── SKILL.md # 完整文档
├── khoj_cli.py # CLI 工具
├── config.yaml # 配置文件
├── requirements.txt # 依赖
└── scripts/
├── start_server.sh # 启动脚本
├── convert.py # 转换脚本
└── query.py # 查询脚本
```
## 系统要求
- Python 3.10+
- 8GB+ 可用内存
- 足够的磁盘空间(文档大小的 25-40%)
## 文档
完整文档请参阅 [SKILL.md](SKILL.md)
## 许可证
MIT License
FILE:config.yaml
# Khoj RAG 默认配置
# 复制此文件到 ~/.khoj/config.yml 并根据需要修改
version: "1.4.0"
# LLM 配置
# 支持的提供商: openai, deepseek, anthropic, gemini, ollama
llm:
provider: deepseek
model: deepseek-chat
api_key: OPENAI_API_KEY # 从环境变量读取
base_url: -https://api.deepseek.com/v1
# 或使用 OpenAI
# provider: openai
# model: gpt-4o-mini
# api_key: OPENAI_API_KEY
# Embedding 配置
# 推荐使用本地模型,无需 API 费用
embedding:
provider: local
model: BAAI/bge-small-zh-v1.5 # 中文优化,仅 ~100MB
# 或使用云端 Embedding API
# provider: openai
# model: text-embedding-3-small
# api_key: OPENAI_API_KEY
# 数据库配置(Khoj 使用嵌入式 PostgreSQL)
# 注意:Khoj 只支持 PostgreSQL,不支持 SQLite
# 启动时设置: export USE_EMBEDDED_DB="true"
database:
type: postgresql # 嵌入式 PostgreSQL (pgserver)
# 文档处理配置
content:
# 支持的文件格式
supported_formats:
- ".md"
- ".txt"
- ".pdf"
- ".docx"
- ".org"
# 分块配置
chunk_size: 500
chunk_overlap: 100
# 服务配置
server:
host: 0.0.0.0
port: 42110
# 索引配置
indexing:
# 文件监控
watch_for_changes: false
# 并发数
concurrency: 4
# 批处理大小
batch_size: 100
FILE:khoj_cli.py
#!/opt/anaconda3/bin/python3
"""
Khoj RAG CLI 封装
提供简洁的命令行接口操作 Khoj 知识库
v1.1.0 - 新增增量同步、进度显示功能
"""
import os
import sys
import json
import subprocess
import signal
import time
from pathlib import Path
from typing import Optional, Callable
import requests
import click
# 配置
KHOJ_URL = os.environ.get("KHOJ_URL", "http://localhost:42110")
KHOJ_API_KEY = os.environ.get("KHOJ_API_KEY", "")
DEFAULT_CONVERTED_DIR = os.environ.get("KHOJ_CONVERTED_DIR", "~/.khoj/converted")
class KhojClient:
"""Khoj API 客户端"""
def __init__(self, base_url: str = KHOJ_URL, api_key: str = KHOJ_API_KEY):
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
def is_running(self) -> bool:
"""检查服务是否运行"""
try:
response = requests.get(f"{self.base_url}/api/health", timeout=5)
return response.status_code == 200
except requests.exceptions.RequestException:
return False
def search(self, query: str, top_k: int = 5) -> dict:
"""搜索知识库"""
response = requests.get(
f"{self.base_url}/api/search",
params={"q": query, "n": top_k},
headers=self.headers,
timeout=30
)
response.raise_for_status()
return response.json()
def chat(self, query: str) -> dict:
"""对话查询"""
response = requests.post(
f"{self.base_url}/api/chat",
json={"q": query},
headers=self.headers,
timeout=60
)
response.raise_for_status()
return response.json()
def index_files(self, directory: str, progress_callback: Callable = None) -> dict:
"""索引文件(支持进度回调)"""
dir_path = Path(directory).expanduser()
if not dir_path.exists():
raise FileNotFoundError(f"目录不存在: {directory}")
files = []
for ext in ["*.md", "*.txt", "*.pdf", "*.docx"]:
files.extend(dir_path.rglob(ext))
if not files:
return {"status": "no_files", "count": 0, "success": 0, "failed": 0}
success_count = 0
failed_count = 0
total = len(files)
# 通过 API 上传文件
for i, file_path in enumerate(files):
try:
with open(file_path, "rb") as f:
response = requests.patch(
f"{self.base_url}/api/content",
headers=self.headers,
files={"file": (file_path.name, f)},
timeout=60
)
response.raise_for_status()
success_count += 1
except Exception as e:
failed_count += 1
# 进度回调
if progress_callback:
progress_callback(i + 1, total, file_path.name, success_count, failed_count)
return {
"status": "success",
"count": len(files),
"success": success_count,
"failed": failed_count
}
# CLI 命令
@click.group()
def cli():
"""Khoj RAG - 本地知识库检索工具"""
pass
@cli.command()
@click.option("--port", default=42110, help="服务端口")
@click.option("--anonymous", is_flag=True, help="匿名模式")
def start(port: int, anonymous: bool):
"""启动 Khoj 服务"""
client = KhojClient()
if client.is_running():
click.echo("✓ Khoj 服务已在运行")
return
env = os.environ.copy()
env["USE_EMBEDDED_DB"] = "true"
cmd = ["khoj"]
if anonymous:
cmd.append("--anonymous-mode")
click.echo(f"启动 Khoj 服务 (端口: {port})...")
# 后台启动
process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True
)
# 等待服务启动
import time
for _ in range(30):
time.sleep(1)
if client.is_running():
click.echo(f"✓ Khoj 服务已启动: http://localhost:{port}")
return
click.echo("✗ 服务启动超时", err=True)
sys.exit(1)
@cli.command()
def stop():
"""停止 Khoj 服务"""
try:
result = subprocess.run(
["pkill", "-f", "khoj"],
capture_output=True
)
if result.returncode == 0:
click.echo("✓ Khoj 服务已停止")
else:
click.echo("Khoj 服务未运行")
except Exception as e:
click.echo(f"✗ 停止失败: {e}", err=True)
@cli.command()
def status():
"""查看服务状态"""
client = KhojClient()
if not client.is_running():
click.echo("状态: 未运行")
return
click.echo("状态: 运行中")
click.echo(f"地址: {KHOJ_URL}")
# 获取统计信息
try:
response = requests.get(f"{KHOJ_URL}/api/content/stats", timeout=5)
if response.status_code == 200:
stats = response.json()
click.echo(f"文档数: {stats.get('document_count', 'N/A')}")
click.echo(f"索引大小: {stats.get('index_size', 'N/A')}")
except:
pass
@cli.command()
@click.argument("input_dir")
@click.option("-o", "--output", help="输出目录")
def convert(input_dir: str, output: Optional[str]):
"""转换 xlsx/pptx 文件为 Markdown"""
input_path = Path(input_dir).expanduser()
output_path = Path(output).expanduser() if output else Path(DEFAULT_CONVERTED_DIR).expanduser()
if not input_path.exists():
click.echo(f"✗ 目录不存在: {input_dir}", err=True)
sys.exit(1)
output_path.mkdir(parents=True, exist_ok=True)
click.echo(f"转换目录: {input_path}")
click.echo(f"输出目录: {output_path}")
try:
result = subprocess.run(
["markitdown", "convert", str(input_path), "-o", str(output_path)],
capture_output=True,
text=True
)
if result.returncode == 0:
click.echo("✓ 转换完成")
else:
click.echo(f"✗ 转换失败: {result.stderr}", err=True)
sys.exit(1)
except FileNotFoundError:
click.echo("✗ markitdown 未安装,请运行: pip install 'markitdown[xlsx,pptx]'", err=True)
sys.exit(1)
@cli.command()
@click.argument("directory")
@click.option("--progress", is_flag=True, help="显示进度")
def index(directory: str, progress: bool):
"""索引文件到知识库(支持进度显示)"""
client = KhojClient()
if not client.is_running():
click.echo("✗ Khoj 服务未运行,请先执行: rag start", err=True)
sys.exit(1)
def show_progress(current: int, total: int, filename: str, success: int, failed: int):
"""进度显示回调"""
percent = current / total * 100 if total > 0 else 0
bar_width = 30
filled = int(bar_width * current / total) if total > 0 else 0
bar = '=' * filled + '>' + ' ' * (bar_width - filled - 1)
click.echo(f"\r[{bar}] {percent:.1f}% ({current}/{total}) {filename[:25]}", nl=False)
try:
callback = show_progress if progress else None
result = client.index_files(directory, callback)
if progress:
click.echo() # 换行
click.echo(f"\n✓ 成功: {result.get('success', result['count'])}")
if result.get('failed', 0) > 0:
click.echo(f"✗ 失败: {result['failed']}")
click.echo(f"总计: {result['count']} 个文件")
except Exception as e:
click.echo(f"\n✗ 索引失败: {e}", err=True)
sys.exit(1)
@cli.command()
@click.argument("query")
@click.option("-n", "--top-k", default=5, help="返回结果数量")
@click.option("--chat", is_flag=True, help="对话模式")
def query(query: str, top_k: int, chat: bool):
"""查询知识库"""
client = KhojClient()
if not client.is_running():
click.echo("✗ Khoj 服务未运行,请先执行: rag start", err=True)
sys.exit(1)
try:
if chat:
result = client.chat(query)
click.echo("\n" + result.get("response", "无响应"))
else:
results = client.search(query, top_k)
if not results:
click.echo("未找到相关内容")
return
for i, item in enumerate(results, 1):
click.echo(f"\n[{i}] 来源: {item.get('file', '未知')}")
if 'additional' in item:
loc = item['additional'].get('file', '')
if loc:
click.echo(f" 位置: {loc}")
content = item.get('entry', '')[:300]
click.echo(f" 内容: {content}...")
except Exception as e:
click.echo(f"✗ 查询失败: {e}", err=True)
sys.exit(1)
@cli.command()
@click.option("--converted", is_flag=True, help="清理转换后的 Markdown 文件")
def clean(converted: bool):
"""清理临时文件"""
if converted:
converted_path = Path(DEFAULT_CONVERTED_DIR).expanduser()
if converted_path.exists():
import shutil
shutil.rmtree(converted_path)
click.echo(f"✓ 已清理: {converted_path}")
else:
click.echo("转换目录不存在")
@cli.command()
@click.argument("directory")
@click.option("--full", is_flag=True, help="强制全量同步")
@click.option("-v", "--verbose", is_flag=True, help="详细输出")
def sync(directory: str, full: bool, verbose: bool):
"""增量同步目录到知识库(带进度显示)"""
script_path = Path(__file__).parent / "scripts" / "sync.py"
if not script_path.exists():
click.echo(f"✗ 同步脚本不存在: {script_path}", err=True)
sys.exit(1)
cmd = ["python3", str(script_path), directory]
if full:
cmd.append("--full")
if verbose:
cmd.append("--verbose")
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
click.echo(f"✗ 同步失败: {e}", err=True)
sys.exit(1)
@cli.command()
@click.argument("directory", required=False)
@click.option("--enable", is_flag=True, help="启用定时同步")
@click.option("--disable", is_flag=True, help="禁用定时同步")
@click.option("--status", is_flag=True, help="查看定时同步状态")
@click.option("--interval", default=1, help="同步间隔(小时)")
@click.option("--run", is_flag=True, help="立即执行一次同步")
def schedule(directory: str, enable: bool, disable: bool, status: bool, interval: int, run: bool):
"""管理定时同步任务"""
script_path = Path(__file__).parent / "scripts" / "schedule_sync.sh"
if not script_path.exists():
click.echo(f"✗ 调度脚本不存在: {script_path}", err=True)
sys.exit(1)
cmd = [str(script_path)]
if status:
cmd.append("--status")
elif enable:
if not directory:
click.echo("✗ 请指定要同步的目录", err=True)
sys.exit(1)
cmd.extend([directory, "--enable", "--interval", str(interval)])
elif disable:
cmd.append("--disable")
elif run:
if not directory:
click.echo("✗ 请指定要同步的目录", err=True)
sys.exit(1)
cmd.extend([directory, "--run"])
elif directory:
cmd.extend([directory, "--run"])
else:
cmd.append("--status")
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
click.echo(f"✗ 执行失败: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
cli()
FILE:requirements.txt
# Khoj RAG Skill 依赖
# 核心
khoj>=0.1.0
markitdown>=0.1.0
# Office 文档支持
openpyxl>=3.1.0
python-pptx>=0.6.21
python-docx>=1.1.0
# CLI
click>=8.0.0
requests>=2.28.0
# 可选:PDF 支持
# PyMuPDF>=1.23.0
# pdfplumber>=0.10.0
FILE:scripts/convert.py
#!/opt/anaconda3/bin/python3
"""
批量转换文档为 Markdown
支持 xlsx, xls, pptx, ppt, docx, pdf 等格式
"""
import argparse
import sys
from pathlib import Path
try:
from markitdown import MarkItDown
except ImportError:
print("错误: markitdown 未安装")
print("请运行: pip install 'markitdown[xlsx,pptx]'")
sys.exit(1)
def convert_directory(input_dir: str, output_dir: str, formats: list = None):
"""批量转换目录下的文档"""
input_path = Path(input_dir).expanduser().resolve()
output_path = Path(output_dir).expanduser().resolve()
if not input_path.exists():
print(f"错误: 输入目录不存在 - {input_dir}")
sys.exit(1)
output_path.mkdir(parents=True, exist_ok=True)
# 默认支持的格式
if formats is None:
formats = ['.xlsx', '.xls', '.pptx', '.ppt', '.docx', '.pdf', '.html']
target_formats = set(formats)
md = MarkItDown()
converted = 0
failed = 0
print(f"输入目录: {input_path}")
print(f"输出目录: {output_path}")
print(f"目标格式: {', '.join(formats)}")
print("-" * 50)
for file_path in input_path.rglob("*"):
if file_path.suffix.lower() in target_formats:
try:
result = md.convert(file_path)
# 保持目录结构
rel_path = file_path.relative_to(input_path)
output_file = output_path / f"{rel_path}.md"
output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.write_text(result.text_content, encoding="utf-8")
print(f"✓ {file_path.name} -> {output_file.name}")
converted += 1
except Exception as e:
print(f"✗ {file_path.name}: {e}")
failed += 1
print("-" * 50)
print(f"完成: 转换 {converted} 个文件, 失败 {failed} 个")
def main():
parser = argparse.ArgumentParser(
description="批量转换文档为 Markdown"
)
parser.add_argument(
"input_dir",
help="输入目录路径"
)
parser.add_argument(
"-o", "--output",
default="~/Documents/converted_md",
help="输出目录路径 (默认: ~/Documents/converted_md)"
)
parser.add_argument(
"-f", "--formats",
nargs="+",
default=['.xlsx', '.xls', '.pptx', '.ppt', '.docx', '.pdf'],
help="要转换的文件格式 (默认: xlsx xls pptx ppt docx pdf)"
)
args = parser.parse_args()
convert_directory(args.input_dir, args.output, args.formats)
if __name__ == "__main__":
main()
FILE:scripts/query.py
#!/opt/anaconda3/bin/python3
"""
Khoj 知识库查询工具
"""
import argparse
import json
import sys
import os
try:
import requests
except ImportError:
print("错误: requests 未安装")
print("请运行: pip install requests")
sys.exit(1)
KHOJ_URL = os.environ.get("KHOJ_URL", "http://localhost:42110")
KHOJ_API_KEY = os.environ.get("KHOJ_API_KEY", "")
def search(query: str, top_k: int = 5):
"""搜索知识库"""
headers = {}
if KHOJ_API_KEY:
headers["Authorization"] = f"Bearer {KHOJ_API_KEY}"
try:
response = requests.get(
f"{KHOJ_URL}/api/search",
params={"q": query, "n": top_k},
headers=headers,
timeout=30
)
response.raise_for_status()
return response.json()
except requests.exceptions.ConnectionError:
print("错误: 无法连接到 Khoj 服务")
print(f"请确认服务正在运行: {KHOJ_URL}")
sys.exit(1)
except requests.exceptions.HTTPError as e:
print(f"错误: {e}")
sys.exit(1)
def chat(query: str):
"""对话查询"""
headers = {}
if KHOJ_API_KEY:
headers["Authorization"] = f"Bearer {KHOJ_API_KEY}"
try:
response = requests.post(
f"{KHOJ_URL}/api/chat",
json={"q": query},
headers=headers,
timeout=60
)
response.raise_for_status()
return response.json()
except requests.exceptions.ConnectionError:
print("错误: 无法连接到 Khoj 服务")
print(f"请确认服务正在运行: {KHOJ_URL}")
sys.exit(1)
except requests.exceptions.HTTPError as e:
print(f"错误: {e}")
sys.exit(1)
def format_search_results(results: list) -> str:
"""格式化搜索结果"""
if not results:
return "未找到相关内容"
output = []
for i, item in enumerate(results, 1):
# 文件名在 additional.file 或 entry 第一行
additional = item.get('additional', {})
file_name = additional.get('file', '')
if not file_name:
# 从 entry 第一行提取
content = item.get('entry', '')
file_name = content.split('\n')[0] if content else '未知'
output.append(f"\n[{i}] 文件: {file_name}")
# 尝试提取位置信息(优先检测幻灯片标记)
content = item.get('entry', '')
import re
# 先检查 PPT 幻灯片(更具体的标记)
if '<!-- Slide number:' in content:
match = re.search(r'<!-- Slide number: (\d+) -->', content)
if match:
output.append(f" 📍 幻灯片: 第 {match.group(1)} 页")
# 再检查 Excel 工作表(确保是真正的表格内容)
elif '## Sheet' in content and '|' in content:
match = re.search(r'## (Sheet\d+)', content)
if match:
output.append(f" 📍 工作表: {match.group(1)}")
# 显示内容片段
snippet = content[:200].replace('\n', ' ').strip()
output.append(f" 内容: {snippet}...")
return '\n'.join(output)
def main():
parser = argparse.ArgumentParser(
description="Khoj 知识库查询工具"
)
parser.add_argument(
"query",
help="查询内容"
)
parser.add_argument(
"-n", "--top-k",
type=int,
default=5,
help="返回结果数量 (默认: 5)"
)
parser.add_argument(
"--chat",
action="store_true",
help="对话模式"
)
parser.add_argument(
"--json",
action="store_true",
help="输出 JSON 格式"
)
args = parser.parse_args()
if args.chat:
result = chat(args.query)
if args.json:
print(json.dumps(result, ensure_ascii=False, indent=2))
else:
print(result.get("response", "无响应"))
else:
results = search(args.query, args.top_k)
if args.json:
print(json.dumps(results, ensure_ascii=False, indent=2))
else:
print(format_search_results(results))
if __name__ == "__main__":
main()
FILE:scripts/schedule_sync.sh
#!/bin/bash
# 定时同步脚本 - 设置 cron 定时任务,每小时自动同步
# 使用方法: ./schedule_sync.sh <目录> [小时间隔]
set -e
SCRIPT_DIR="$(cd "$(dirname "BASH_SOURCE[0]")" && pwd)"
SYNC_SCRIPT="$SCRIPT_DIR/sync.py"
LOG_DIR="$HOME/.khoj/logs"
LOG_FILE="$LOG_DIR/sync.log"
# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
usage() {
echo "使用方法: $0 <目录> [选项]"
echo ""
echo "选项:"
echo " --interval <小时> 同步间隔(默认: 1 小时)"
echo " --enable 启用定时同步"
echo " --disable 禁用定时同步"
echo " --status 查看定时同步状态"
echo " --run 立即执行一次同步"
echo ""
echo "示例:"
echo " $0 ~/Documents --enable # 启用每小时同步"
echo " $0 ~/Documents --interval 2 --enable # 每2小时同步一次"
echo " $0 ~/Documents --disable # 禁用定时同步"
echo " $0 --status # 查看状态"
exit 1
}
setup_cron() {
local directory="$1"
local interval="$2"
# 验证目录
if [ ! -d "$directory" ]; then
echo -e "RED错误: 目录不存在 - $directoryNC"
exit 1
fi
# 创建日志目录
mkdir -p "$LOG_DIR"
# 创建 cron 任务
local cron_job="0 */$interval * * * $SYNC_SCRIPT \"$directory\" >> \"$LOG_FILE\" 2>&1"
# 检查是否已存在
if crontab -l 2>/dev/null | grep -q "$SYNC_SCRIPT"; then
echo -e "YELLOW定时任务已存在,正在更新...NC"
# 移除旧任务
crontab -l 2>/dev/null | grep -v "$SYNC_SCRIPT" | crontab -
fi
# 添加新任务
(crontab -l 2>/dev/null; echo "$cron_job") | crontab -
echo -e "GREEN✓ 已设置定时同步NC"
echo " 目录: $directory"
echo " 间隔: 每 $interval 小时"
echo " 日志: $LOG_FILE"
echo ""
echo "查看日志: tail -f $LOG_FILE"
}
disable_cron() {
if crontab -l 2>/dev/null | grep -q "$SYNC_SCRIPT"; then
crontab -l 2>/dev/null | grep -v "$SYNC_SCRIPT" | crontab -
echo -e "GREEN✓ 已禁用定时同步NC"
else
echo -e "YELLOW定时同步未启用NC"
fi
}
show_status() {
echo "=== 定时同步状态 ==="
if crontab -l 2>/dev/null | grep -q "$SYNC_SCRIPT"; then
echo -e "状态: GREEN已启用NC"
echo ""
echo "Cron 任务:"
crontab -l 2>/dev/null | grep "$SYNC_SCRIPT"
echo ""
if [ -f "$LOG_FILE" ]; then
echo "最近日志:"
tail -10 "$LOG_FILE"
fi
else
echo -e "状态: YELLOW未启用NC"
fi
}
run_sync() {
local directory="$1"
if [ -z "$directory" ]; then
echo -e "RED错误: 请指定要同步的目录NC"
usage
fi
echo -e "YELLOW开始同步...NC"
python3 "$SYNC_SCRIPT" "$directory" --verbose
}
# 解析参数
DIRECTORY=""
INTERVAL=1
ACTION=""
while [[ $# -gt 0 ]]; do
case $1 in
--interval)
INTERVAL="$2"
shift 2
;;
--enable)
ACTION="enable"
shift
;;
--disable)
ACTION="disable"
shift
;;
--status)
ACTION="status"
shift
;;
--run)
ACTION="run"
shift
;;
-h|--help)
usage
;;
*)
if [ -z "$DIRECTORY" ]; then
DIRECTORY="$1"
fi
shift
;;
esac
done
# 执行操作
case $ACTION in
enable)
if [ -z "$DIRECTORY" ]; then
echo -e "RED错误: 请指定要同步的目录NC"
usage
fi
setup_cron "$DIRECTORY" "$INTERVAL"
;;
disable)
disable_cron
;;
status)
show_status
;;
run)
run_sync "$DIRECTORY"
;;
*)
if [ -n "$DIRECTORY" ]; then
run_sync "$DIRECTORY"
else
usage
fi
;;
esac
FILE:scripts/start_server.sh
#!/bin/bash
# 启动 Khoj 服务
set -e
# 配置
KHOJ_PORT="-42110"
KHOJ_URL="http://localhost:KHOJ_PORT"
# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
echo -e "YELLOW启动 Khoj RAG 服务...NC"
# 检查是否已运行
if curl -s "KHOJ_URL/api/health" > /dev/null 2>&1; then
echo -e "GREEN✓ Khoj 服务已在运行NC"
echo " 地址: KHOJ_URL"
exit 0
fi
# 检查依赖
if ! command -v khoj &> /dev/null; then
echo -e "RED✗ khoj 未安装NC"
echo " 请运行: pip install khoj"
exit 1
fi
# 启动服务(嵌入式 PostgreSQL 模式)
export USE_EMBEDDED_DB="true"
nohup khoj --anonymous-mode > /tmp/khoj.log 2>&1 &
# 等待启动
echo -n " 等待服务启动"
for i in {1..30}; do
if curl -s "KHOJ_URL/api/health" > /dev/null 2>&1; then
echo ""
echo -e "GREEN✓ Khoj 服务已启动NC"
echo " 地址: KHOJ_URL"
echo " 日志: /tmp/khoj.log"
exit 0
fi
echo -n "."
sleep 1
done
echo ""
echo -e "RED✗ 服务启动超时NC"
echo " 查看日志: cat /tmp/khoj.log"
exit 1
FILE:scripts/sync.py
#!/opt/anaconda3/bin/python3
"""
增量同步脚本 - 扫描文件变化并更新 Khoj 知识库
支持进度显示、增量更新、多编码支持和错误详情记录
"""
import argparse
import json
import os
import sys
import time
import signal
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Set, Tuple, Optional
try:
import requests
except ImportError:
print("错误: requests 未安装")
print("请运行: pip install requests")
sys.exit(1)
try:
from markitdown import MarkItDown
except ImportError:
print("错误: markitdown 未安装")
print("请运行: pip install 'markitdown[xlsx,pptx]'")
sys.exit(1)
try:
import base64
HAS_BASE64 = True
except ImportError:
HAS_BASE64 = False
KHOJ_URL = os.environ.get("KHOJ_URL", "http://localhost:42110")
KHOJ_API_KEY = os.environ.get("KHOJ_API_KEY", "")
SUPPORTED_FORMATS = {'.xlsx', '.xls', '.pptx', '.ppt', '.docx', '.doc', '.pdf', '.md', '.txt', '.csv'}
SYNC_STATE_FILE = Path.home() / ".khoj" / "sync_state.json"
LOG_FILE = Path.home() / ".khoj" / "sync.log"
# 配置
MAX_FILE_SIZE_MB = 1000 # 最大文件大小限制(MB)
CONVERSION_TIMEOUT = 600 # 转换超时时间(秒)
API_TIMEOUT = 600 # API 超时时间(秒)
# 本地 OCR 配置(使用 Tesseract,无需云 API)
OCR_LANGUAGES = os.environ.get("OCR_LANGUAGES", "chi_sim+eng") # 中文简体 + 英文
OCR_DPI = int(os.environ.get("OCR_DPI", "200")) # OCR 分辨率
OCR_MAX_PAGES = int(os.environ.get("OCR_MAX_PAGES", "20")) # 最大处理页数
# MIME 类型映射
MIME_TYPES = {
'.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'.xls': 'application/vnd.ms-excel',
'.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'.ppt': 'application/vnd.ms-powerpoint',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.doc': 'application/msword',
'.pdf': 'application/pdf',
'.md': 'text/markdown',
'.txt': 'text/plain',
'.csv': 'text/csv',
}
class TimeoutError(Exception):
"""超时错误"""
pass
def timeout_handler(signum, frame):
"""超时信号处理器"""
raise TimeoutError("操作超时")
def with_timeout(seconds):
"""超时装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
# 设置信号处理器
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
return result
return wrapper
return decorator
class ProgressBar:
"""进度条显示"""
def __init__(self, total: int, width: int = 40):
self.total = total
self.width = width
self.current = 0
def update(self, current: int, message: str = ""):
self.current = current
percent = current / self.total if self.total > 0 else 0
filled = int(self.width * percent)
bar = '=' * filled + '>' + ' ' * (self.width - filled - 1)
line = f"\r[{bar}] {percent*100:.1f}% ({current}/{self.total})"
if message:
line += f" {message[:30]}"
print(line, end='', flush=True)
def finish(self):
print()
class SyncState:
"""同步状态管理"""
def __init__(self, state_file: Path = SYNC_STATE_FILE):
self.state_file = state_file
self.state: Dict[str, dict] = {}
self.load()
def load(self):
if self.state_file.exists():
try:
with open(self.state_file, 'r', encoding='utf-8') as f:
self.state = json.load(f)
except:
self.state = {}
def save(self):
self.state_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.state_file, 'w', encoding='utf-8') as f:
json.dump(self.state, f, indent=2, ensure_ascii=False)
def get_file_hash(self, file_path: Path) -> str:
"""计算文件哈希(修改时间 + 大小)"""
stat = file_path.stat()
return f"{stat.st_mtime}_{stat.st_size}"
def needs_sync(self, file_path: Path) -> bool:
"""检查文件是否需要同步"""
key = str(file_path)
current_hash = self.get_file_hash(file_path)
if key not in self.state:
return True
# 如果之前失败,应该重试
if not self.state[key].get('success', True):
return True
return self.state[key].get('hash') != current_hash
def mark_synced(self, file_path: Path, success: bool = True, error: str = ""):
"""标记文件已同步"""
key = str(file_path)
self.state[key] = {
'hash': self.get_file_hash(file_path),
'last_sync': datetime.now().isoformat(),
'success': success,
'error': error if error else None
}
def remove_file(self, file_path: Path):
"""移除文件记录"""
key = str(file_path)
if key in self.state:
del self.state[key]
class KhojSyncClient:
"""Khoj 同步客户端"""
def __init__(self, base_url: str = KHOJ_URL):
self.base_url = base_url.rstrip('/')
self.headers = {}
if KHOJ_API_KEY:
self.headers["Authorization"] = f"Bearer {KHOJ_API_KEY}"
def is_running(self) -> bool:
"""检查服务是否运行"""
try:
response = requests.get(f"{self.base_url}/api/health", timeout=5)
return response.status_code == 200
except:
return False
def get_indexed_files(self) -> Set[str]:
"""获取已索引的文件列表"""
try:
response = requests.get(
f"{self.base_url}/api/content",
headers=self.headers,
timeout=30
)
if response.status_code == 200:
data = response.json()
return {item.get('file', '') for item in data if isinstance(data, list)}
except:
pass
return set()
def index_file(self, file_path: Path, converted_content: Optional[str] = None, verbose: bool = False) -> Tuple[bool, str]:
"""索引单个文件
Returns:
(success, error_message)
"""
try:
# 获取 MIME 类型
ext = file_path.suffix.lower()
mime_type = MIME_TYPES.get(ext, 'application/octet-stream')
if converted_content:
files = {'files': (file_path.name, converted_content, mime_type)}
else:
with open(file_path, 'rb') as f:
files = {'files': (file_path.name, f.read(), mime_type)}
response = requests.patch(
f"{self.base_url}/api/content",
headers=self.headers,
files=files,
timeout=API_TIMEOUT
)
if response.status_code == 200:
return True, ""
else:
error_msg = f"HTTP {response.status_code}: {response.text[:200]}"
if verbose:
print(f"\n API 错误: {error_msg}")
return False, error_msg
except requests.exceptions.Timeout:
return False, f"请求超时({API_TIMEOUT}秒)"
except Exception as e:
return False, str(e)
def scan_files(directory: Path) -> List[Path]:
"""扫描目录下的支持文件"""
files = []
for ext in SUPPORTED_FORMATS:
files.extend(directory.rglob(f'*{ext}'))
return sorted(files)
def read_text_with_fallback(file_path: Path) -> Tuple[bool, str]:
"""使用多种编码尝试读取文本文件
Returns:
(success, content_or_error)
"""
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
for encoding in encodings:
try:
content = file_path.read_text(encoding=encoding)
return True, content
except (UnicodeDecodeError, UnicodeError):
continue
except Exception as e:
return False, f"读取错误: {e}"
return False, "无法识别文件编码(尝试了 utf-8, gbk, gb2312, latin-1)"
def convert_file_with_timeout(file_path: Path, md: MarkItDown, timeout: int = CONVERSION_TIMEOUT) -> Tuple[bool, str]:
"""带超时的文件转换
Returns:
(success, content_or_error)
"""
try:
# 设置超时
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
try:
result = md.convert(file_path)
content = result.text_content
# 验证内容
if not content or len(content.strip()) == 0:
# 如果是PDF且内容为空,可能是扫描版PDF,尝试本地OCR处理
if file_path.suffix.lower() == '.pdf':
return process_scanned_pdf_with_ocr(file_path)
return False, "转换结果为空"
return True, content
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
except TimeoutError:
return False, f"转换超时({timeout}秒)- 文件可能太大或太复杂"
except Exception as e:
error_str = str(e)
# 检查是否是 xlsx 样式错误,尝试用 pandas 回退
if 'Fill' in error_str or 'TypeError' in error_str:
return try_xlsx_with_pandas(file_path)
return False, f"转换错误: {error_str[:200]}"
def process_scanned_pdf_with_ocr(file_path: Path) -> Tuple[bool, str]:
"""使用本地 OCR (Tesseract) 处理扫描版 PDF
将 PDF 页面转换为图片,然后使用 Tesseract OCR 提取文本内容
无需云 API,完全本地运行
Returns:
(success, content_or_error)
"""
try:
# 使用 pdf2image 将 PDF 转换为图片
try:
from pdf2image import convert_from_path
except ImportError:
return False, "需要安装 pdf2image: pip install pdf2image"
# 使用 pytesseract 进行 OCR
try:
import pytesseract
except ImportError:
return False, "需要安装 pytesseract: pip install pytesseract"
# 转换 PDF 为图片
pages = convert_from_path(
file_path,
first_page=1,
last_page=OCR_MAX_PAGES,
dpi=OCR_DPI
)
if not pages:
return False, "PDF 无法转换为图片"
content_parts = []
for i, page_image in enumerate(pages, 1):
# 使用 Tesseract 进行 OCR
try:
text = pytesseract.image_to_string(
page_image,
lang=OCR_LANGUAGES
)
if text and text.strip():
content_parts.append(f"## Page {i}\n\n{text.strip()}\n\n")
else:
content_parts.append(f"## Page {i}\n\n[OCR 未识别到文字]\n\n")
except Exception as e:
content_parts.append(f"## Page {i}\n\n[OCR 处理错误: {str(e)[:50]}]\n\n")
if not content_parts:
return False, "OCR 未能提取任何内容"
content = "".join(content_parts)
return True, content
except Exception as e:
return False, f"OCR 处理错误: {str(e)[:100]}"
def convert_doc_with_libreoffice(file_path: Path) -> Tuple[bool, str]:
"""使用 LibreOffice 将 .doc 文件转换为文本
Returns:
(success, content_or_error)
"""
import subprocess
import tempfile
import shutil
try:
# 检查 LibreOffice 是否可用
soffice_path = shutil.which('soffice')
if not soffice_path:
return False, "需要安装 LibreOffice: brew install libreoffice"
# 创建临时目录
with tempfile.TemporaryDirectory() as temp_dir:
# 使用 LibreOffice 转换为 txt
result = subprocess.run(
[soffice_path, '--headless', '--convert-to', 'txt:Text',
'--outdir', temp_dir, str(file_path)],
capture_output=True,
text=True,
timeout=60
)
if result.returncode != 0:
return False, f"LibreOffice 转换失败: {result.stderr[:100]}"
# 读取转换后的文件
txt_file = Path(temp_dir) / (file_path.stem + '.txt')
if txt_file.exists():
content = txt_file.read_text(encoding='utf-8', errors='replace')
if content.strip():
return True, content
return False, "转换结果为空"
else:
return False, "转换后的文件不存在"
except subprocess.TimeoutExpired:
return False, "LibreOffice 转换超时"
except Exception as e:
return False, f"转换错误: {str(e)[:100]}"
def try_xlsx_with_pandas(file_path: Path) -> Tuple[bool, str]:
"""使用直接 XML 提取读取 xlsx 文件(作为 markitdown 的回退方案)
用于处理样式损坏的 xlsx 文件,直接从 ZIP 中提取数据
Returns:
(success, content_or_error)
"""
try:
import zipfile
import xml.etree.ElementTree as ET
ns = {'ns': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'}
with zipfile.ZipFile(file_path, 'r') as z:
# 读取共享字符串
shared_strings = []
shared_strings_file = 'xl/sharedStrings.xml'
if shared_strings_file in z.namelist():
with z.open(shared_strings_file) as f:
for event, elem in ET.iterparse(f):
if elem.tag == '{http://schemas.openxmlformats.org/spreadsheetml/2006/main}t':
shared_strings.append(elem.text or '')
elem.clear()
# 读取工作表
content_parts = []
max_rows = 500 # 限制行数
max_cols = 50 # 限制列数
for name in sorted(z.namelist()):
if name.startswith('xl/worksheets/') and name.endswith('.xml'):
sheet_name = name.split('/')[-1].replace('.xml', '')
content_parts.append(f"## {sheet_name}\n")
with z.open(name) as f:
tree = ET.parse(f)
root = tree.getroot()
rows = root.findall('.//ns:row', ns)
rows_content = []
for i, row in enumerate(rows):
if i >= max_rows:
rows_content.append(f"... (truncated at {max_rows} rows)")
break
cells = row.findall('ns:c', ns)
row_values = []
for j, cell in enumerate(cells):
if j >= max_cols:
break
cell_type = cell.get('t', '')
value_elem = cell.find('ns:v', ns)
if value_elem is not None:
value = value_elem.text or ''
if cell_type == 's' and value.isdigit():
idx = int(value)
if idx < len(shared_strings):
value = shared_strings[idx]
row_values.append(value[:100]) # 限制单元格内容长度
else:
row_values.append('')
if row_values:
rows_content.append(' | '.join(row_values))
if rows_content:
content_parts.append('\n'.join(rows_content))
content_parts.append("\n")
if not content_parts:
return False, "工作表为空"
content = "\n".join(content_parts)
return True, content
except Exception as e:
return False, f"XML 提取失败: {str(e)[:100]}"
def log_message(message: str, level: str = "INFO"):
"""记录日志"""
timestamp = datetime.now().isoformat()
log_entry = f"[{timestamp}] [{level}] {message}\n"
try:
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(LOG_FILE, 'a', encoding='utf-8') as f:
f.write(log_entry)
except:
pass
def sync_directory(
input_dir: str,
output_dir: str = None,
full_sync: bool = False,
verbose: bool = False
) -> dict:
"""
同步目录到 Khoj 知识库
返回:
{
'total': 总文件数,
'indexed': 已索引数,
'synced': 本次同步数,
'success': 成功数,
'failed': 失败数,
'skipped': 跳过数,
'errors': [错误列表]
}
"""
input_path = Path(input_dir).expanduser().resolve()
output_path = Path(output_dir).expanduser() if output_dir else Path("/tmp/khoj_sync")
if not input_path.exists():
print(f"错误: 目录不存在 - {input_dir}")
sys.exit(1)
# 检查服务
client = KhojSyncClient()
if not client.is_running():
print("错误: Khoj 服务未运行")
print(f"请先启动服务: khoj --anonymous-mode")
sys.exit(1)
# 初始化
output_path.mkdir(parents=True, exist_ok=True)
sync_state = SyncState()
md = MarkItDown()
# 扫描文件
print(f"扫描目录: {input_path}")
all_files = scan_files(input_path)
print(f"找到文件: {len(all_files)} 个")
# 获取已索引文件
indexed_files = client.get_indexed_files()
print(f"已索引: {len(indexed_files)} 个")
# 确定需要同步的文件
if full_sync:
files_to_sync = all_files
else:
files_to_sync = [f for f in all_files if sync_state.needs_sync(f)]
print(f"需要同步: {len(files_to_sync)} 个\n")
if not files_to_sync:
print("所有文件已是最新,无需同步")
return {
'total': len(all_files),
'indexed': len(indexed_files),
'synced': 0,
'success': 0,
'failed': 0,
'skipped': 0,
'errors': []
}
# 开始同步
progress = ProgressBar(len(files_to_sync))
success_count = 0
failed_count = 0
skipped_count = 0
errors = []
for i, file_path in enumerate(files_to_sync):
progress.update(i + 1, file_path.name)
# 检查文件大小
file_size_mb = file_path.stat().st_size / (1024 * 1024)
if file_size_mb > MAX_FILE_SIZE_MB:
skip_msg = f"文件过大 ({file_size_mb:.1f}MB > {MAX_FILE_SIZE_MB}MB)"
skipped_count += 1
errors.append(f"{file_path.name}: {skip_msg}")
sync_state.mark_synced(file_path, success=False, error=skip_msg)
sync_state.save()
log_message(f"跳过: {file_path.name} - {skip_msg}", "WARN")
continue
# 根据文件类型处理
ext = file_path.suffix.lower()
if ext in {'.xlsx', '.xls', '.pptx', '.ppt', '.docx', '.pdf'}:
# Office/PDF 文件需要转换
ok, content_or_error = convert_file_with_timeout(file_path, md)
if not ok:
failed_count += 1
errors.append(f"{file_path.name}: {content_or_error}")
sync_state.mark_synced(file_path, success=False, error=content_or_error)
sync_state.save()
log_message(f"转换失败: {file_path.name} - {content_or_error}", "ERROR")
continue
content = content_or_error
elif ext == '.doc':
# .doc 文件使用 LibreOffice 转换
ok, content_or_error = convert_doc_with_libreoffice(file_path)
if not ok:
failed_count += 1
errors.append(f"{file_path.name}: {content_or_error}")
sync_state.mark_synced(file_path, success=False, error=content_or_error)
sync_state.save()
log_message(f"转换失败: {file_path.name} - {content_or_error}", "ERROR")
continue
content = content_or_error
elif ext in {'.md', '.txt', '.csv'}:
# 文本文件直接读取(支持多种编码)
ok, content_or_error = read_text_with_fallback(file_path)
if not ok:
failed_count += 1
errors.append(f"{file_path.name}: {content_or_error}")
sync_state.mark_synced(file_path, success=False, error=content_or_error)
sync_state.save()
log_message(f"读取失败: {file_path.name} - {content_or_error}", "ERROR")
continue
content = content_or_error
else:
# 不支持的格式
skip_msg = f"不支持的格式: {ext}"
skipped_count += 1
errors.append(f"{file_path.name}: {skip_msg}")
sync_state.mark_synced(file_path, success=False, error=skip_msg)
sync_state.save()
continue
# 索引文件
success, error_msg = client.index_file(
file_path,
content if ext in {'.xlsx', '.xls', '.pptx', '.ppt', '.docx', '.doc', '.pdf'} else None,
verbose=verbose
)
if success:
success_count += 1
sync_state.mark_synced(file_path, success=True)
if verbose:
print(f"\n ✓ {file_path.name}")
log_message(f"成功: {file_path.name}", "INFO")
else:
failed_count += 1
errors.append(f"{file_path.name}: {error_msg}")
sync_state.mark_synced(file_path, success=False, error=error_msg)
if verbose:
print(f"\n ✗ {file_path.name}: {error_msg}")
log_message(f"索引失败: {file_path.name} - {error_msg}", "ERROR")
# 每处理完一个文件就保存状态,防止中断丢失
sync_state.save()
progress.finish()
# 输出结果
print(f"\n{'='*50}")
print(f"同步结果:")
print(f" ✓ 成功: {success_count}")
print(f" ✗ 失败: {failed_count}")
print(f" ⊘ 跳过: {skipped_count}")
print(f" 总计: {success_count + failed_count + skipped_count}")
if errors:
print(f"\n错误/跳过列表 ({len(errors)} 个):")
for err in errors[:20]:
print(f" - {err}")
if len(errors) > 20:
print(f" ... 还有 {len(errors) - 20} 个")
# 计算成功率
total_processed = success_count + failed_count + skipped_count
if total_processed > 0:
success_rate = success_count / total_processed * 100
print(f"\n成功率: {success_rate:.1f}%")
print("\n同步完成!")
log_message(f"同步完成: 成功 {success_count}, 失败 {failed_count}, 跳过 {skipped_count}", "INFO")
return {
'total': len(all_files),
'indexed': len(indexed_files),
'synced': len(files_to_sync),
'success': success_count,
'failed': failed_count,
'skipped': skipped_count,
'errors': errors
}
def main():
parser = argparse.ArgumentParser(
description="增量同步文件到 Khoj 知识库",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
# 增量同步(只同步有变化的文件)
python sync.py ~/Documents
# 全量同步(强制重新索引所有文件)
python sync.py ~/Documents --full
# 详细输出(显示每个文件的处理结果)
python sync.py ~/Documents --verbose
# 组合使用
python sync.py ~/Documents --full --verbose
"""
)
parser.add_argument(
"directory",
help="要同步的目录路径"
)
parser.add_argument(
"-o", "--output",
default="/tmp/khoj_sync",
help="转换后 Markdown 输出目录 (默认: /tmp/khoj_sync)"
)
parser.add_argument(
"--full",
action="store_true",
help="强制全量同步(忽略增量判断)"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="显示详细输出"
)
args = parser.parse_args()
sync_directory(
args.directory,
args.output,
args.full,
args.verbose
)
if __name__ == "__main__":
main()