@clawhub-dvnghiem-b7963ac585
Process accounting documents — invoices (hóa đơn GTGT), purchase orders, and bank statements. Extract structured data from PDF (digital and scanned), JPG, an...
---
name: accounting-skill
description: >
Process accounting documents — invoices (hóa đơn GTGT), purchase orders, and bank statements.
Extract structured data from PDF (digital and scanned), JPG, and PNG files using OCR.
Output Excel tracking sheets and JSON backups. Classify unknown documents and route to the
correct extractor. Supports Vietnamese and international formats. Use when asked to extract,
process, classify, or track invoices, POs, bank statements, or accounting documents.
---
# Accounting Skill
Extract structured data from accounting documents (invoices, POs, bank statements) into Excel tracking sheets with JSON backups. Handles digital PDFs, scanned PDFs, and images via automatic OCR.
## Prerequisites
Install system OCR dependencies before first use. See `{baseDir}/references/ocr-setup.md` for full guide.
```bash
# Ubuntu / Debian
sudo apt install tesseract-ocr tesseract-ocr-vie poppler-utils
# Verify
uv run {baseDir}/scripts/ocr_utils.py check
```
## Quick Start
### 1. Classify an unknown document
```bash
uv run {baseDir}/scripts/classify_document.py /path/to/document.pdf
```
Returns JSON with `type` (invoice / po / statement / other), `confidence`, and a ready-to-run extraction `command`.
### 2. Extract an invoice
```bash
uv run {baseDir}/scripts/extract_invoice.py /path/to/invoice.pdf -o invoice_tracking.xlsx
```
Appends to the Excel tracking sheet. Use `--dry-run` to preview parsed data without writing.
### 3. Extract a bank statement
```bash
uv run {baseDir}/scripts/extract_statement.py /path/to/statement.pdf
```
Creates `statement_{bank}_{date}.xlsx` with transactions. Use `-o` to specify output path.
### 4. Extract a purchase order
```bash
uv run {baseDir}/scripts/extract_po.py /path/to/po.pdf -o po_tracking.xlsx
```
Tracks delivery dates and flags overdue/urgent POs.
### 5. Generate empty Excel templates
```bash
uv run {baseDir}/scripts/generate_templates.py all -o ~/accounting/
```
Creates blank tracking sheets: `invoice_tracking.xlsx`, `po_tracking.xlsx`, `statement_template.xlsx`.
## Common Options (all extractors)
| Flag | Description |
|------|-------------|
| `--format excel\|json\|both` | Output format (default: `both`) |
| `--dry-run` | Parse and validate only, print JSON to stdout |
| `--json-dir DIR` | Directory for JSON backup files |
| `-o FILE` | Output Excel file path |
## Workflow
### Single Document
```
File → classify_document.py → route → extract_*.py → Excel + JSON
```
### Batch Processing
For a folder of mixed documents, classify first, then route:
```bash
for f in /path/to/docs/*; do
uv run {baseDir}/scripts/classify_document.py "$f" --output-dir ~/accounting/
done
```
Then run the suggested extraction commands from each classification result.
## OCR Strategy
All scripts share `{baseDir}/scripts/ocr_utils.py` which auto-selects the best extraction method:
1. **Digital PDFs** → pdfplumber (fast, no OCR needed)
2. **Scanned PDFs** → pdf2image + pytesseract at 300 DPI (fallback when pdfplumber gets <50 chars/page)
3. **Images** (JPG/PNG/TIFF) → pytesseract with grayscale preprocessing
Each result includes `ocr_confidence` and `extraction_confidence` percentages. Documents below 85% are automatically flagged `needs_review`.
## Validation Rules
- **Invoices**: Subtotal + VAT = Total (auto-checks math), duplicate detection by invoice number + vendor
- **Bank statements**: Opening balance + credits − debits = closing balance
- **POs**: Delivery date tracking with overdue/urgent alerts
## Reference Documents
Read these for field schemas, Vietnamese format details, and validation logic:
- `{baseDir}/references/invoice-fields.md` — Vietnamese VAT invoice fields, tax rates, patterns
- `{baseDir}/references/bank-formats.md` — Vietnamese bank names, transaction formats, amount patterns
- `{baseDir}/references/po-fields.md` — PO fields, delivery status logic, payment terms
- `{baseDir}/references/ocr-setup.md` — OCR installation, troubleshooting, confidence scoring
FILE:README.md
# Accounting Skill
Extract structured data from accounting documents (invoices, purchase orders, bank statements) using OCR. Outputs Excel tracking sheets and JSON backups with confidence scores and validation alerts.
## Features
- **Multi-format OCR**: Digital PDFs (pdfplumber), scanned PDFs + images (pytesseract), automatic fallback
- **Vietnamese + International Support**: Hóa đơn GTGT (VAT invoices), Vietnamese bank formats, multilingual extraction
- **Intelligent Document Classification**: Automatically classify documents and route to correct extractor
- **Data Validation**: Math checks (invoices), balance continuity (statements), delivery tracking (POs)
- **Duplicate Detection**: Prevents re-entry of duplicate invoices/POs
- **Batch Processing**: Classify folders of mixed documents, then extract each type
- **Confidence Scoring**: OCR confidence + extraction confidence for every document
## Installation
### System Dependencies
Install OCR and PDF tools (required):
```bash
# Ubuntu / Debian
sudo apt install tesseract-ocr tesseract-ocr-vie poppler-utils
# macOS (Homebrew)
brew install tesseract tesseract-lang poppler
```
### Verify Installation
```bash
uv run scripts/ocr_utils.py check
```
Should output:
```
✓ tesseract found at /usr/bin/tesseract
✓ poppler found at /usr/bin/pdftoimage
✓ Vietnamese language pack available
```
### Python Requirements
- Python 3.10+
- `uv` package manager (for running scripts with auto-dependency install)
All scripts use [PEP 723](https://peps.python.org/pep-0723/) inline dependencies, so dependencies are auto-installed with `uv run`.
## Quick Start
### 1. Classify a Document
```bash
uv run scripts/classify_document.py /path/to/document.pdf
```
Output:
```json
{
"type": "invoice",
"confidence": 92,
"action": "extract",
"command": "uv run scripts/extract_invoice.py /path/to/document.pdf --output invoice_tracking.xlsx",
"ocr_method": "pdfplumber",
"ocr_confidence": 95
}
```
### 2. Extract an Invoice
```bash
uv run scripts/extract_invoice.py /path/to/invoice.pdf -o invoice_tracking.xlsx
```
Appends row to Excel tracking sheet and saves JSON backup. Output:
```json
{
"invoice_number": "2024-001",
"vendor": "ABC Trading",
"total": 10100000,
"status": "processed",
"ocr_confidence": 94,
"extraction_confidence": 98
}
```
### 3. Extract Bank Statement
```bash
uv run scripts/extract_statement.py /path/to/statement.pdf
```
Creates `statement_vcb_20260319.xlsx` with all transactions. Validates opening + credits − debits = closing balance.
### 4. Extract Purchase Order
```bash
uv run scripts/extract_po.py /path/to/po.pdf -o po_tracking.xlsx
```
Tracks delivery dates, flags overdue (red) and urgent (yellow) POs.
### 5. Generate Empty Templates
```bash
uv run scripts/generate_templates.py all -o ~/accounting/
```
Creates blank Excel sheets:
- `invoice_tracking.xlsx`
- `po_tracking.xlsx`
- `statement_template.xlsx`
## Usage Guide
### Common Options
All extraction scripts support these flags:
| Option | Default | Description |
|--------|---------|-------------|
| `--output, -o` | Type-specific | Output Excel file path |
| `--format` | `both` | `excel`, `json`, or `both` |
| `--json-dir` | Same as Excel | Directory for JSON backup files |
| `--dry-run` | — | Parse only, print JSON to stdout (no file writes) |
### Dry-Run Mode (Preview Results)
```bash
uv run scripts/extract_invoice.py invoice.pdf --dry-run
```
Parses and validates, outputs JSON to stdout without modifying files. Useful for debugging or testing.
### Batch Processing
Process a folder of mixed documents:
```bash
for file in ~/documents/*.pdf; do
echo "Classifying: $file"
result=$(uv run scripts/classify_document.py "$file")
command=$(echo "$result" | jq -r '.command')
echo "Running: $command"
eval "$command"
done
```
### Custom Output Paths
```bash
# Invoice with custom path
uv run scripts/extract_invoice.py invoice.pdf -o ~/accounting/2024/invoices.xlsx
# Statement with JSON backup to specific directory
uv run scripts/extract_statement.py statement.pdf -o statement.xlsx --json-dir ~/accounting/json/
# PO with both formats
uv run scripts/extract_po.py po.pdf --format both --output-dir ~/accounting/
```
## Data Output
### Excel Format
Each extraction script appends to Excel tracking sheets:
**Invoice Tracking** (`invoice_tracking.xlsx`):
```
Invoice# Date Vendor TaxCode Subtotal VAT Total Status OCR% Extract%
2024-001 2024-03-15 ABC Trading 0100000001 10000000 100000 10100000 processed 94 98
2024-002 2024-03-16 XYZ Corp 0203000001 5000000 500000 5500000 needs_review 82 85
```
**Bank Statement** (`statement_vcb_20260319.xlsx`):
```
Date Description Debit Credit Balance Status
2024-03-01 Opening Balance 500000000 ✓
2024-03-01 Deposit 50000000 550000000 reconciled
```
**PO Tracking** (`po_tracking.xlsx`):
```
PO# Vendor Total Delivery Days Left Status Priority
PO-2024-1 Supplier A 50000000 2024-03-25 6 ✓ OK normal
PO-2024-2 Supplier B 30000000 2024-03-20 1 ⚠ URGENT urgent
```
### JSON Format
Each extraction also saves JSON backup (default: same directory as Excel):
```json
{
"invoice_number": "2024-001",
"vendor_name": "ABC Trading",
"vendor_tax_code": "0100000001",
"invoice_date": "2024-03-15",
"subtotal_amount": 10000000,
"vat_rate": 10,
"vat_amount": 100000,
"total_amount": 10100000,
"items": [
{
"description": "Product A",
"quantity": 10,
"unit_price": 500000,
"amount": 5000000,
"vat_rate": 10
}
],
"ocr_confidence": 94,
"extraction_confidence": 98,
"ocr_method": "pdfplumber",
"source_file": "/path/to/invoice.pdf",
"processed_at": "2024-03-19T10:30:45"
}
```
## OCR & Confidence Scoring
### OCR Strategy
The skill automatically selects the best extraction method per file:
1. **Digital PDFs** (text layer present)
- Method: `pdfplumber` (fast, accurate)
- Typical confidence: > 90%
2. **Scanned PDFs** (image-based)
- Method: `pdf2image` + `pytesseract` at 300 DPI
- Typical confidence: 70-85%
3. **Images** (JPG, PNG, TIFF, BMP)
- Method: `pytesseract` with grayscale preprocessing
- Typical confidence: 75-90%
### Confidence Thresholds
| OCR % | Meaning | Action |
|-------|---------|--------|
| > 90% | Excellent | Trust result |
| 85-90% | Good | Minor review |
| 70-85% | Fair | Review amounts |
| < 70% | Poor | Manual review required |
Documents with `extraction_confidence < 85` are automatically flagged `needs_review`.
### Example: Low Confidence Debug
```bash
uv run scripts/extract_invoice.py scanned_invoice.pdf --dry-run
```
Output shows:
```json
{
"ocr_confidence": 72,
"ocr_method": "pytesseract",
"extraction_confidence": 65,
"status": "needs_review",
"warnings": [
"OCR confidence below 85%",
"Missing vendor tax code",
"Could not parse delivery date"
]
}
```
## Validation Rules
### Invoices (extract_invoice.py)
- **Math validation**: `subtotal + VAT = total` (auto-checked)
- **VAT rates**: 0%, 5%, 8%, 10% (Vietnamese standards)
- **Duplicate detection**: Checks invoice # + vendor combo in existing Excel
- **Required fields**: Invoice number, date, vendor, total amount
### Bank Statements (extract_statement.py)
- **Balance continuity**: `opening + credits − debits = closing` (validates statement integrity)
- **Transaction parsing**: Handles multi-bank formats (VCB, TCB, BIDV, VComBank, etc.)
- **Amount format**: Detects VND format (dots as thousands: `1.000.000`)
- **Date format**: Handles dd/mm/yyyy, dd-mm-yyyy, Vietnamese labels
### Purchase Orders (extract_po.py)
- **Delivery tracking**: Calculates `days_left = delivery_date - today`
- **Status flags**:
- ✓ OK: 7+ days left
- ⚠ URGENT: 1-6 days left (yellow highlight)
- ✗ OVERDUE: delivery date passed (red highlight)
- **Duplicate detection**: Checks PO # + vendor
## File Structure
```
accounting-skill/
├── README.md ← This file
├── SKILL.md ← OpenClaw skill metadata & quick start
├── scripts/
│ ├── ocr_utils.py ← Shared OCR module (pdfplumber + pytesseract)
│ ├── extract_invoice.py ← Invoice extraction → Excel + JSON
│ ├── extract_statement.py ← Bank statement extraction
│ ├── extract_po.py ← PO extraction with delivery tracking
│ ├── classify_document.py ← Document classifier & router
│ └── generate_templates.py ← Blank Excel template generator
└── references/
├── invoice-fields.md ← Vietnamese VAT invoice field reference
├── bank-formats.md ← Vietnamese bank format patterns
├── po-fields.md ← PO field reference & logic
└── ocr-setup.md ← OCR installation & troubleshooting
```
## Troubleshooting
### "tesseract is not installed"
```bash
# Ubuntu / Debian
sudo apt install tesseract-ocr tesseract-ocr-vie
# macOS
brew install tesseract tesseract-lang
```
### Vietnamese text garbled or missing
Vietnamese language pack is required:
```bash
# Ubuntu / Debian
sudo apt install tesseract-ocr-vie
# Verify
tesseract --list-langs | grep vie
```
### "No text extracted from file"
Usually means:
- PDF is password-protected (decrypt first)
- Scanned image is too faint (try preprocessing)
- Missing `poppler-utils` (needed to convert PDF to image)
```bash
sudo apt install poppler-utils
```
### Low OCR confidence on clean document
Likely a digital PDF — pdfplumber handles these better than OCR. Check logs:
```bash
uv run scripts/extract_invoice.py invoice.pdf --dry-run 2>&1 | grep "OCR method"
```
If shows `pytesseract`, it means pdfplumber found <50 chars/page. Verify PDF isn't corrupted.
### Script fails to find output file
Ensure directory exists:
```bash
# Create directory first
mkdir -p ~/accounting/2024/
# Then run extraction
uv run scripts/extract_invoice.py invoice.pdf -o ~/accounting/2024/invoices.xlsx
```
Or use absolute paths:
```bash
uv run scripts/extract_invoice.py /path/to/invoice.pdf -o /home/user/accounting/invoices.xlsx
```
### "Duplicate detected" error
Document already exists in Excel. Options:
1. **Replace it**: Delete the row from Excel, re-run extraction
2. **Skip it**: Mark status as `duplicate` and move on
3. **Force update**: Use `--format json` to save JSON only, then manually update Excel
## Performance Tips
- **Batch classification first**: Classify all documents, then extract only the ones you need
- **Dry-run for testing**: Always use `--dry-run` to preview results before writing to Excel
- **Separate output directories**: Keep invoices, statements, and POs in different folders
- **Monitor OCR confidence**: Set up alerts when `ocr_confidence < 80` for manual review
## References
- See `references/invoice-fields.md` for complete Vietnamese VAT invoice field schema
- See `references/bank-formats.md` for supported bank formats and pattern details
- See `references/po-fields.md` for PO field definitions and delivery logic
- See `references/ocr-setup.md` for OCR installation and confidence scoring guide
## License
Part of OpenClaw Skills collection.
## Support
For issues or feature requests, refer to:
- System dependency issues: `references/ocr-setup.md`
- Field mapping questions: `references/invoice-fields.md`, `references/bank-formats.md`
- PO logic: `references/po-fields.md`
FILE:references/bank-formats.md
# Bank Statement Formats
## Table of Contents
1. [Vietnamese Banks](#vietnamese-banks)
2. [Statement Field Labels](#statement-field-labels)
3. [Transaction Table Columns](#transaction-table-columns)
4. [Amount Formats](#amount-formats)
## Vietnamese Banks
| Bank | Abbreviation | Format Notes |
|------|-------------|--------------|
| Vietcombank | VCB | PDF with tables, clear headers |
| Techcombank | TCB | PDF, often merged cells |
| BIDV | BIDV | PDF with standard table layout |
| VietinBank | CTG | PDF with header block |
| MB Bank | MBB | PDF, sometimes image-based |
| VPBank | VPB | PDF with summary section |
| ACB | ACB | PDF with transaction details |
| Sacombank | STB | PDF, mixed formats |
| TPBank | TPB | PDF, clean digital format |
| HDBank | HDB | PDF with account summary |
| Agribank | AGR | PDF, older format styles |
## Statement Field Labels
### Vietnamese → English
- **Sao kê tài khoản** → Account Statement
- **Số tài khoản** / **STK** → Account Number
- **Chủ tài khoản** → Account Holder
- **Kỳ sao kê** / **Từ ngày ... đến ngày** → Statement Period
- **Số dư đầu kỳ** → Opening Balance
- **Số dư cuối kỳ** → Closing Balance
- **Ngày giao dịch** → Transaction Date
- **Diễn giải** / **Nội dung** → Description
- **Số tiền ghi nợ** / **Chi** → Debit Amount
- **Số tiền ghi có** / **Thu** → Credit Amount
- **Số dư** → Balance
- **Mã giao dịch** / **Số tham chiếu** → Reference Number
## Transaction Table Columns
### Standard layout (most banks)
```
| Ngày GD | Ngày HL | Mô tả | Nợ (Debit) | Có (Credit) | Số dư |
```
### Compact layout
```
| Date | Description | Amount | Balance |
```
## Amount Formats
### Vietnamese Dong (VND)
- No decimal places (smallest unit is 1 VND)
- Thousands separator: dot (.) or comma (,)
- Examples: `1.000.000` or `1,000,000` = 1 million VND
- Suffixes: `VND`, `đ`, `VNĐ`
- Negative: parentheses `(500.000)` or minus `-500.000`
FILE:references/invoice-fields.md
# Invoice Field Reference
## Table of Contents
1. [Required Fields](#required-fields)
2. [Vietnamese Invoice Format (Hóa đơn GTGT)](#vietnamese-invoice-format)
3. [International Invoice Formats](#international-invoice-formats)
4. [VAT Rules (Vietnam)](#vat-rules-vietnam)
## Required Fields
| Field | Type | Description |
|-------|------|-------------|
| `invoice_number` | string | Unique invoice identifier |
| `invoice_date` | YYYY-MM-DD | Date of invoice issuance |
| `vendor_name` | string | Name of the seller/vendor |
| `vendor_tax_code` | string | Tax identification number |
| `items` | array | Line items with description, qty, unit_price, total |
| `subtotal` | number | Sum of line item totals (before tax) |
| `vat_rate` | number | VAT percentage (0, 5, 8, or 10) |
| `vat_amount` | number | Calculated VAT amount |
| `total_amount` | number | Final amount including VAT |
| `payment_terms` | string | Payment conditions |
## Vietnamese Invoice Format
Vietnamese VAT invoices (Hóa đơn GTGT) follow Circular 78/2021/TT-BTC:
### Key Labels (Vietnamese → English)
- **Ký hiệu** → Invoice series/symbol
- **Số** → Invoice number
- **Ngày ... tháng ... năm** → Date format (day month year)
- **Đơn vị bán hàng** → Seller/Vendor
- **Mã số thuế** → Tax code
- **Tên hàng hóa, dịch vụ** → Item description
- **Đơn vị tính** → Unit of measure
- **Số lượng** → Quantity
- **Đơn giá** → Unit price
- **Thành tiền** → Amount (line total)
- **Cộng tiền hàng** → Subtotal
- **Thuế suất GTGT** → VAT rate
- **Tiền thuế GTGT** → VAT amount
- **Tổng cộng tiền thanh toán** → Total amount
### E-Invoice Format
Since July 2022, Vietnam requires electronic invoices (hóa đơn điện tử) with QR code, digital signature, and unique lookup code from tax authority.
## International Invoice Formats
Common English labels: Invoice No, Date, Bill To, Subtotal, Tax/VAT/GST, Total, Payment Terms, Due Date.
## VAT Rules (Vietnam)
| Rate | Applies to |
|------|-----------|
| 0% | Exported goods and services |
| 5% | Essential goods (water, medicine, education) |
| 8% | Reduced rate (temporary policy for certain sectors) |
| 10% | Standard rate (most goods and services) |
### Validation Rules
1. `vat_amount = subtotal × vat_rate / 100`
2. `total_amount = subtotal + vat_amount`
3. `subtotal = sum(item.total for each item)`
4. `item.total = item.quantity × item.unit_price`
FILE:references/ocr-setup.md
# OCR Setup Guide
## System Dependencies
The accounting skill requires these system packages for OCR:
### Ubuntu / Debian
```bash
sudo apt install tesseract-ocr tesseract-ocr-vie poppler-utils
```
### macOS (Homebrew)
```bash
brew install tesseract tesseract-lang poppler
```
### Verify Installation
```bash
uv run {baseDir}/scripts/ocr_utils.py check
```
This checks for:
- `tesseract` — OCR engine for scanned documents / images
- `tesseract-ocr-vie` — Vietnamese language pack (critical for hóa đơn)
- `poppler-utils` — PDF-to-image conversion (needed for scanned PDFs)
## How OCR Works in This Skill
### Strategy (automatic per file)
1. **Digital PDFs**: Extracted via `pdfplumber` (fast, high accuracy, no OCR needed)
2. **Scanned PDFs**: If pdfplumber gets <50 chars/page, falls back to `pdf2image` + `pytesseract` at 300 DPI
3. **Images (JPG/PNG/TIFF)**: Direct OCR via `pytesseract` with grayscale preprocessing
### Confidence Reporting
Every extraction reports two confidence scores:
- **OCR Confidence**: Quality of text extraction (pytesseract word-level average)
- **Extraction Confidence**: How many expected fields were successfully parsed
| OCR Confidence | Meaning |
|---------------|---------|
| > 90% | Excellent — clean digital document |
| 85-90% | Good — minor OCR artifacts |
| 70-85% | Fair — some words may be wrong, review amounts |
| < 70% | Poor — likely needs manual review |
## Troubleshooting
| Problem | Solution |
|---------|----------|
| "tesseract is not installed" | `sudo apt install tesseract-ocr` |
| Vietnamese text garbled | `sudo apt install tesseract-ocr-vie` |
| "poppler-utils not installed" | `sudo apt install poppler-utils` |
| Low confidence on clean PDF | Likely a digital PDF — pdfplumber should work fine |
| Amounts parsed as 0 | Check VND format (dots as thousands separators) |
| Scanned PDF gets blank text | Increase DPI or check if PDF is password-protected |
FILE:references/po-fields.md
# Purchase Order Field Reference
## Table of Contents
1. [Required Fields](#required-fields)
2. [Vietnamese PO Labels](#vietnamese-po-labels)
3. [Delivery Tracking](#delivery-tracking)
4. [Common Payment Terms](#common-payment-terms)
## Required Fields
| Field | Type | Description |
|-------|------|-------------|
| `po_number` | string | Unique PO identifier |
| `po_date` | YYYY-MM-DD | Date the PO was issued |
| `vendor_name` | string | Supplier/vendor name |
| `delivery_date` | YYYY-MM-DD | Expected delivery date |
| `items` | array | Line items with description, qty, unit_price, total |
| `total_amount` | number | Total PO value |
| `payment_terms` | string | Payment conditions |
## Vietnamese PO Labels
- **Đơn đặt hàng** / **Đơn mua hàng** → Purchase Order
- **Số PO** / **Số ĐĐH** → PO Number
- **Ngày đặt hàng** → Order Date
- **Nhà cung cấp** / **Đơn vị cung cấp** → Vendor/Supplier
- **Ngày giao hàng** / **Hạn giao** → Delivery Date
- **Tên hàng hóa** → Item Description
- **Số lượng** → Quantity
- **Đơn giá** → Unit Price
- **Thành tiền** → Line Total
- **Tổng cộng** / **Tổng giá trị** → Total Amount
- **Điều khoản thanh toán** → Payment Terms
## Delivery Tracking
| Days Left | Status |
|-----------|--------|
| < 0 | **OVERDUE** — delivery date has passed |
| 0-3 | **URGENT** — delivery imminent |
| 4-7 | **APPROACHING** — delivery coming soon |
| > 7 | **ON_TRACK** — within normal timeline |
## Common Payment Terms
- **COD** — Cash on Delivery
- **Net 30 / Net 60** — Payment due N days after invoice
- **TT** — Telegraphic Transfer (wire)
- **L/C** — Letter of Credit
- **Trả trước** — Prepayment
- **Trả sau** — Deferred payment
FILE:scripts/classify_document.py
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pdfplumber>=0.10.0",
# "Pillow>=10.0.0",
# "pytesseract>=0.3.10",
# "pdf2image>=1.17.0",
# ]
# ///
"""Classify accounting documents (Invoice/PO/Statement/Other) and route to the right extractor.
Usage:
uv run classify_document.py /path/to/document.pdf
uv run classify_document.py document.pdf --classify-only
uv run classify_document.py document.pdf --output-dir ~/accounting/
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
from ocr_utils import eprint, extract_from_file
# Classification keywords with weights
INVOICE_KEYWORDS = {
"hóa đơn giá trị gia tăng": 20, "hóa đơn gtgt": 20,
"hóa đơn": 15, "hoá đơn": 15,
"invoice": 10, "tax invoice": 15, "invoice no": 12, "invoice number": 12,
"invoice date": 10, "thuế gtgt": 12, "mã số thuế": 10,
"đơn vị bán": 8, "thuế suất": 10, "tiền thuế": 8,
"vat": 8, "subtotal": 6, "payment due": 7, "amount due": 7,
"ký hiệu": 6, "bill to": 8,
}
PO_KEYWORDS = {
"purchase order": 20, "đơn đặt hàng": 20, "đơn mua hàng": 20,
"phiếu mua hàng": 15, "p.o.": 10, "po number": 15, "po no": 15, "po #": 15,
"delivery date": 10, "ngày giao hàng": 10, "hạn giao": 10,
"ship to": 8, "order date": 8, "nhà cung cấp": 6,
"đơn vị cung cấp": 8, "procurement": 6,
}
STATEMENT_KEYWORDS = {
"sao kê tài khoản": 25, "bank statement": 20, "account statement": 20,
"statement of account": 20, "sao kê": 20,
"số dư đầu kỳ": 15, "số dư cuối kỳ": 15,
"opening balance": 12, "closing balance": 12,
"transaction history": 15, "transaction date": 10,
"giao dịch": 8, "ngân hàng": 6, "số tài khoản": 10,
"chủ tài khoản": 8, "debit": 6, "credit": 6,
"account number": 8,
}
DOC_TYPES = {"invoice": INVOICE_KEYWORDS, "po": PO_KEYWORDS, "statement": STATEMENT_KEYWORDS}
def classify_document(text: str) -> dict:
text_lower = text.lower()
scores = {}
for doc_type, keywords in DOC_TYPES.items():
score = 0
matched = []
for keyword, weight in keywords.items():
count = text_lower.count(keyword)
if count > 0:
score += weight * min(count, 3)
matched.append(keyword)
scores[doc_type] = {"score": score, "matched": matched}
max_score = max(s["score"] for s in scores.values())
if max_score == 0:
return {"type": "other", "confidence": 0, "scores": {k: v["score"] for k, v in scores.items()}, "matched_keywords": {}}
winner = max(scores, key=lambda k: scores[k]["score"])
sorted_scores = sorted(scores.values(), key=lambda x: x["score"], reverse=True)
if sorted_scores[1]["score"] > 0:
confidence = min(100, int((1 - sorted_scores[1]["score"] / sorted_scores[0]["score"]) * 100 + 50))
else:
confidence = min(100, max(60, sorted_scores[0]["score"]))
return {
"type": winner,
"confidence": confidence,
"scores": {k: v["score"] for k, v in scores.items()},
"matched_keywords": {k: v["matched"] for k, v in scores.items() if v["matched"]},
}
ROUTER_MAP = {
"invoice": {"script": "extract_invoice.py", "output": "invoice_tracking.xlsx"},
"po": {"script": "extract_po.py", "output": "po_tracking.xlsx"},
"statement": {"script": "extract_statement.py", "output": None},
}
def main():
parser = argparse.ArgumentParser(description="Classify accounting documents and route to extractors")
parser.add_argument("file", help="Path to the document file (PDF, JPG, PNG)")
parser.add_argument("--classify-only", action="store_true", help="Only classify, don't generate route command")
parser.add_argument("--output-dir", default=".", help="Base directory for output files")
args = parser.parse_args()
file_path = os.path.abspath(args.file)
if not os.path.exists(file_path):
eprint(f"File not found: {file_path}")
sys.exit(1)
eprint(f"Classifying: {file_path}")
ocr = extract_from_file(file_path)
eprint(f"OCR method: {ocr.method} | Confidence: {ocr.confidence}%")
if not ocr.text.strip():
eprint("Warning: No text extracted. Cannot classify.")
print(json.dumps({
"file": file_path, "type": "other", "confidence": 0,
"action": "manual_review",
"ocr_method": ocr.method, "ocr_confidence": ocr.confidence,
"message": "No text extracted from file. Check OCR dependencies.",
}, ensure_ascii=False, indent=2))
sys.exit(0)
classification = classify_document(ocr.text)
doc_type = classification["type"]
confidence = classification["confidence"]
eprint(f"Classification: {doc_type} (confidence: {confidence}%)")
eprint(f"Scores: {classification['scores']}")
if args.classify_only:
print(json.dumps({
"file": file_path,
"ocr_method": ocr.method, "ocr_confidence": ocr.confidence,
**classification,
}, ensure_ascii=False, indent=2))
return
if doc_type == "other" or confidence < 50:
print(json.dumps({
"file": file_path, "type": doc_type, "confidence": confidence,
"action": "manual_review",
"ocr_method": ocr.method, "ocr_confidence": ocr.confidence,
"message": "Document could not be classified with sufficient confidence. Human review required.",
}, ensure_ascii=False, indent=2))
sys.exit(0)
route = ROUTER_MAP[doc_type]
scripts_dir = str(Path(__file__).resolve().parent)
script_path = os.path.join(scripts_dir, route["script"])
cmd_parts = ["uv", "run", script_path, file_path]
if route["output"]:
cmd_parts.extend(["--output", os.path.join(args.output_dir, route["output"])])
result = {
"file": file_path,
"type": doc_type,
"confidence": confidence,
"action": "extract",
"command": " ".join(cmd_parts),
"ocr_method": ocr.method,
"ocr_confidence": ocr.confidence,
"scores": classification["scores"],
}
print(json.dumps(result, ensure_ascii=False, indent=2))
if confidence < 75:
eprint(f"Note: Classification confidence is {confidence}%. Consider reviewing the result.")
if __name__ == "__main__":
main()
FILE:scripts/extract_invoice.py
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "openpyxl>=3.1.0",
# "pdfplumber>=0.10.0",
# "Pillow>=10.0.0",
# "pytesseract>=0.3.10",
# "pdf2image>=1.17.0",
# ]
# ///
"""Extract structured data from invoice PDF/image files and append to Excel tracking sheet.
Usage:
uv run extract_invoice.py /path/to/invoice.pdf
uv run extract_invoice.py invoice.jpg --output ~/accounting/invoice_tracking.xlsx
uv run extract_invoice.py invoice.pdf --dry-run
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
from dataclasses import asdict, dataclass, field
from datetime import datetime
from pathlib import Path
# Add scripts dir to path so we can import sibling modules
sys.path.insert(0, str(Path(__file__).resolve().parent))
from ocr_utils import OCRResult, clean_amount, eprint, extract_from_file, parse_date
import openpyxl
from openpyxl.styles import Alignment, Font, PatternFill
@dataclass
class InvoiceItem:
description: str
quantity: float
unit_price: float
total: float
@dataclass
class Invoice:
invoice_number: str = ""
invoice_date: str = ""
vendor_name: str = ""
vendor_tax_code: str = ""
items: list[InvoiceItem] = field(default_factory=list)
subtotal: float = 0.0
vat_rate: float = 0.0
vat_amount: float = 0.0
total_amount: float = 0.0
payment_terms: str = ""
ocr_confidence: float = 0.0
extraction_confidence: float = 0.0
ocr_method: str = ""
raw_text: str = ""
source_file: str = ""
status: str = "extracted"
def parse_invoice_text(ocr: OCRResult) -> Invoice:
"""Parse OCR result into Invoice structure using regex patterns."""
text = ocr.text
invoice = Invoice(
raw_text=text,
source_file=ocr.source_file,
ocr_confidence=ocr.confidence,
ocr_method=ocr.method,
)
# Invoice number patterns
inv_patterns = [
r"(?:Invoice\s*(?:No|Number|#|Num)[\s.:]*)\s*([A-Z0-9/-]+)",
r"(?:Số\s*(?:hóa đơn|HĐ|hoá đơn)[\s.:]*)\s*([A-Z0-9/-]+)",
r"(?:Mã\s*(?:hóa đơn|HĐ)[\s.:]*)\s*([A-Z0-9/-]+)",
r"(?:Ký hiệu[\s.:]*\s*[A-Z0-9]+\s*Số[\s.:]*)\s*(\d+)",
r"(?:Số[\s.:]*)\s*(\d{7,})",
]
for pat in inv_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
invoice.invoice_number = m.group(1).strip()
break
# Date patterns
date_patterns = [
r"(?:Invoice\s*Date|Ngày\s*\w*\s*tháng\s*\w*\s*năm|Ngày|Date)[\s.:]*(\d{1,2}[/.-]\d{1,2}[/.-]\d{2,4})",
r"[Nn]gày\s*(\d{1,2})\s*tháng\s*(\d{1,2})\s*năm\s*(\d{4})",
]
for pat in date_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
if len(m.groups()) == 3:
invoice.invoice_date = f"{int(m.group(3)):04d}-{int(m.group(2)):02d}-{int(m.group(1)):02d}"
else:
invoice.invoice_date = parse_date(m.group(1))
break
if not invoice.invoice_date:
m = re.search(r"(\d{1,2}[/.-]\d{1,2}[/.-]\d{4})", text)
if m:
invoice.invoice_date = parse_date(m.group(1))
# Vendor name
vendor_patterns = [
r"(?:Đơn vị bán hàng|Vendor|Supplier|Nhà cung cấp|Tên đơn vị)[\s.:]*(.+?)(?:\n|$)",
r"(?:Company|Công ty)[\s.:]*(.+?)(?:\n|$)",
]
for pat in vendor_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
invoice.vendor_name = m.group(1).strip()
break
# Tax code
tax_patterns = [
r"(?:Tax\s*(?:Code|ID|No)|Mã số thuế|MST)[\s.:]*(\d[\d-]+)",
]
for pat in tax_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
invoice.vendor_tax_code = m.group(1).strip()
break
# Amounts
amount_patterns = {
"subtotal": [
r"(?:Sub\s*total|Cộng tiền hàng|Thành tiền chưa thuế)[\s.:]*([0-9.,]+)",
],
"vat_amount": [
r"(?:Tiền thuế GTGT|VAT\s*Amount|Thuế GTGT|Tax Amount)[\s.:]*([0-9.,]+)",
],
"total_amount": [
r"(?:Tổng cộng tiền thanh toán|Total\s*Amount|Grand\s*Total|Tổng\s*(?:cộng|thanh toán))[\s.:]*([0-9.,]+)",
],
}
for field_name, patterns in amount_patterns.items():
for pat in patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
setattr(invoice, field_name, clean_amount(m.group(1)))
break
# VAT rate
vat_rate_patterns = [
r"(?:Thuế suất GTGT|VAT|Thuế suất)[\s.:]*(\d+)\s*%",
]
for pat in vat_rate_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
invoice.vat_rate = float(m.group(1))
break
# Payment terms
pay_patterns = [
r"(?:Payment\s*Terms?|Điều khoản thanh toán|Hình thức thanh toán)[\s.:]*(.+?)(?:\n|$)",
]
for pat in pay_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
invoice.payment_terms = m.group(1).strip()
break
# Extraction confidence (how many fields we got)
filled = sum(1 for v in [
invoice.invoice_number, invoice.invoice_date, invoice.vendor_name,
invoice.vendor_tax_code, invoice.subtotal, invoice.total_amount,
] if v)
invoice.extraction_confidence = round(filled / 6 * 100, 1)
return invoice
def validate_invoice(invoice: Invoice) -> list[str]:
"""Validate extracted invoice data. Returns list of warnings."""
warnings = []
if not invoice.invoice_number:
warnings.append("Missing invoice number")
if not invoice.invoice_date:
warnings.append("Missing invoice date")
if not invoice.vendor_name:
warnings.append("Missing vendor name")
if not invoice.total_amount:
warnings.append("Missing total amount")
# Verify subtotal + VAT = total
if invoice.subtotal > 0 and invoice.vat_amount >= 0 and invoice.total_amount > 0:
expected_total = invoice.subtotal + invoice.vat_amount
tolerance = max(invoice.total_amount * 0.01, 1.0)
if abs(expected_total - invoice.total_amount) > tolerance:
warnings.append(
f"Total mismatch: subtotal({invoice.subtotal:,.0f}) + VAT({invoice.vat_amount:,.0f}) "
f"= {expected_total:,.0f}, but total = {invoice.total_amount:,.0f}"
)
# Verify VAT calculation
if invoice.subtotal > 0 and invoice.vat_rate > 0:
expected_vat = invoice.subtotal * invoice.vat_rate / 100
tolerance = max(expected_vat * 0.01, 1.0)
if invoice.vat_amount > 0 and abs(expected_vat - invoice.vat_amount) > tolerance:
warnings.append(
f"VAT mismatch: {invoice.subtotal:,.0f} × {invoice.vat_rate}% = {expected_vat:,.0f}, "
f"but VAT amount = {invoice.vat_amount:,.0f}"
)
# OCR confidence
if invoice.ocr_confidence < 85:
warnings.append(f"Low OCR confidence: {invoice.ocr_confidence}% (method: {invoice.ocr_method})")
# Extraction confidence
if invoice.extraction_confidence < 85:
warnings.append(f"Low extraction confidence: {invoice.extraction_confidence}%")
return warnings
def check_duplicate(invoice: Invoice, excel_path: str) -> bool:
"""Check if invoice already exists in the tracking file."""
if not os.path.exists(excel_path):
return False
wb = openpyxl.load_workbook(excel_path)
ws = wb.active
for row in ws.iter_rows(min_row=2, values_only=True):
if row and len(row) >= 7:
existing_inv_no = str(row[0] or "")
existing_vendor = str(row[2] or "")
existing_total = float(row[6] or 0)
if (
existing_inv_no == invoice.invoice_number
and existing_vendor == invoice.vendor_name
and abs(existing_total - invoice.total_amount) < 1.0
):
wb.close()
return True
wb.close()
return False
HEADER_FILL = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
HEADER_FONT = Font(name="Calibri", bold=True, color="FFFFFF", size=11)
HEADERS = [
"Invoice#", "Date", "Vendor", "TaxCode", "Subtotal",
"VAT", "Total", "Status", "OCR%", "Extract%", "OCR Method", "FilePath", "ProcessedAt",
]
COL_WIDTHS = [15, 12, 30, 15, 15, 15, 15, 12, 8, 8, 10, 40, 20]
def init_excel(excel_path: str) -> openpyxl.Workbook:
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Invoice Tracking"
for col_idx, (header, width) in enumerate(zip(HEADERS, COL_WIDTHS), 1):
cell = ws.cell(row=1, column=col_idx, value=header)
cell.fill = HEADER_FILL
cell.font = HEADER_FONT
cell.alignment = Alignment(horizontal="center")
ws.column_dimensions[openpyxl.utils.get_column_letter(col_idx)].width = width
ws.auto_filter.ref = f"A1:{openpyxl.utils.get_column_letter(len(HEADERS))}1"
ws.freeze_panes = "A2"
wb.save(excel_path)
return wb
def append_to_excel(invoice: Invoice, excel_path: str, status: str = "extracted"):
if not os.path.exists(excel_path):
wb = init_excel(excel_path)
else:
wb = openpyxl.load_workbook(excel_path)
ws = wb.active
next_row = ws.max_row + 1
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
row_data = [
invoice.invoice_number, invoice.invoice_date, invoice.vendor_name,
invoice.vendor_tax_code, invoice.subtotal, invoice.vat_amount,
invoice.total_amount, status, invoice.ocr_confidence,
invoice.extraction_confidence, invoice.ocr_method, invoice.source_file, now,
]
for col_idx, value in enumerate(row_data, 1):
ws.cell(row=next_row, column=col_idx, value=value)
for col_idx in (5, 6, 7):
ws.cell(row=next_row, column=col_idx).number_format = "#,##0"
wb.save(excel_path)
wb.close()
def save_json_backup(invoice: Invoice, output_dir: str) -> str:
os.makedirs(output_dir, exist_ok=True)
safe_name = re.sub(r"[^\w-]", "_", invoice.invoice_number or "unknown")
json_path = os.path.join(output_dir, f"invoice_{safe_name}_{datetime.now():%Y%m%d_%H%M%S}.json")
data = asdict(invoice)
data.pop("raw_text", None)
with open(json_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return json_path
def main():
parser = argparse.ArgumentParser(description="Extract invoice data from PDF/image files")
parser.add_argument("file", help="Path to the invoice file (PDF, JPG, PNG)")
parser.add_argument("--output", "-o", default="invoice_tracking.xlsx",
help="Path to the output Excel tracking file")
parser.add_argument("--json-dir", default=None, help="Directory for JSON backup")
parser.add_argument("--format", choices=["excel", "json", "both"], default="both")
parser.add_argument("--dry-run", action="store_true", help="Parse and validate only")
args = parser.parse_args()
file_path = os.path.abspath(args.file)
if not os.path.exists(file_path):
eprint(f"File not found: {file_path}")
sys.exit(1)
# OCR / Extract text
eprint(f"Extracting text from: {file_path}")
ocr = extract_from_file(file_path)
eprint(f"OCR method: {ocr.method} | Confidence: {ocr.confidence}% | Text length: {len(ocr.text)}")
if not ocr.text.strip():
eprint("ERROR: No text extracted from file. Check OCR dependencies (tesseract, poppler).")
sys.exit(1)
# Parse
invoice = parse_invoice_text(ocr)
eprint(f"Parsed invoice: {invoice.invoice_number or '(unknown)'} from {invoice.vendor_name or '(unknown)'}")
# Validate
warnings = validate_invoice(invoice)
if warnings:
for w in warnings:
eprint(f" ⚠ {w}")
if invoice.extraction_confidence < 85 or invoice.ocr_confidence < 85:
invoice.status = "needs_review"
# Check duplicate
excel_path = os.path.abspath(args.output)
if not args.dry_run and check_duplicate(invoice, excel_path):
eprint(f"Duplicate detected: {invoice.invoice_number} from {invoice.vendor_name}")
invoice.status = "duplicate"
if args.dry_run:
data = asdict(invoice)
data.pop("raw_text", None)
data["warnings"] = warnings
print(json.dumps(data, ensure_ascii=False, indent=2))
return
if args.format in ("excel", "both"):
append_to_excel(invoice, excel_path, invoice.status)
eprint(f"Appended to: {excel_path}")
if args.format in ("json", "both"):
json_dir = args.json_dir or os.path.dirname(excel_path) or "."
json_path = save_json_backup(invoice, json_dir)
eprint(f"JSON backup: {json_path}")
result = {
"invoice_number": invoice.invoice_number,
"vendor": invoice.vendor_name,
"total": invoice.total_amount,
"status": invoice.status,
"ocr_confidence": invoice.ocr_confidence,
"ocr_method": invoice.ocr_method,
"extraction_confidence": invoice.extraction_confidence,
"warnings": warnings,
}
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()
FILE:scripts/extract_po.py
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "openpyxl>=3.1.0",
# "pdfplumber>=0.10.0",
# "Pillow>=10.0.0",
# "pytesseract>=0.3.10",
# "pdf2image>=1.17.0",
# ]
# ///
"""Extract structured data from Purchase Order PDF/image files and append to Excel.
Usage:
uv run extract_po.py /path/to/po.pdf
uv run extract_po.py po.jpg --output ~/accounting/po_tracking.xlsx
uv run extract_po.py po.pdf --dry-run
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
from dataclasses import asdict, dataclass, field
from datetime import datetime
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
from ocr_utils import OCRResult, clean_amount, eprint, extract_from_file, parse_date
import openpyxl
from openpyxl.styles import Alignment, Font, PatternFill
@dataclass
class POItem:
description: str
quantity: float
unit_price: float
total: float
@dataclass
class PurchaseOrder:
po_number: str = ""
po_date: str = ""
vendor_name: str = ""
delivery_date: str = ""
items: list[POItem] = field(default_factory=list)
total_amount: float = 0.0
payment_terms: str = ""
ocr_confidence: float = 0.0
extraction_confidence: float = 0.0
ocr_method: str = ""
raw_text: str = ""
source_file: str = ""
status: str = "extracted"
days_left: int | None = None
def parse_po_text(ocr: OCRResult) -> PurchaseOrder:
text = ocr.text
po = PurchaseOrder(
raw_text=text, source_file=ocr.source_file,
ocr_confidence=ocr.confidence, ocr_method=ocr.method,
)
po_patterns = [
r"(?:P\.?O\.?\s*(?:No|Number|#|Num)|Purchase\s*Order\s*(?:No|#))[\s.:]*\s*([A-Z0-9/-]+)",
r"(?:Đơn\s*(?:đặt hàng|mua hàng)\s*(?:số|#))[\s.:]*\s*([A-Z0-9/-]+)",
r"(?:Số\s*(?:PO|ĐĐH))[\s.:]*\s*([A-Z0-9/-]+)",
]
for pat in po_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
po.po_number = m.group(1).strip()
break
date_patterns = [
r"(?:P\.?O\.?\s*Date|Order\s*Date|Ngày\s*(?:đặt hàng|đơn hàng)|Date)[\s.:]*(\d{1,2}[/.-]\d{1,2}[/.-]\d{2,4})",
]
for pat in date_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
po.po_date = parse_date(m.group(1))
break
if not po.po_date:
m = re.search(r"(\d{1,2}[/.-]\d{1,2}[/.-]\d{4})", text)
if m:
po.po_date = parse_date(m.group(1))
vendor_patterns = [
r"(?:Vendor|Supplier|Nhà cung cấp|Đơn vị cung cấp|To)[\s.:]*(.+?)(?:\n|$)",
]
for pat in vendor_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
po.vendor_name = m.group(1).strip()
break
delivery_patterns = [
r"(?:Deliver(?:y|ed)\s*(?:Date|By)|Ship\s*(?:Date|By)|Ngày\s*giao\s*hàng|Hạn\s*giao|Expected\s*Delivery)[\s.:]*(\d{1,2}[/.-]\d{1,2}[/.-]\d{2,4})",
]
for pat in delivery_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
po.delivery_date = parse_date(m.group(1))
break
total_patterns = [
r"(?:Total\s*Amount|Grand\s*Total|Tổng\s*(?:cộng|giá trị))[\s.:]*([0-9.,]+)",
]
for pat in total_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
po.total_amount = clean_amount(m.group(1))
break
pay_patterns = [
r"(?:Payment\s*Terms?|Điều khoản thanh toán|Hình thức thanh toán)[\s.:]*(.+?)(?:\n|$)",
]
for pat in pay_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
po.payment_terms = m.group(1).strip()
break
if po.delivery_date:
try:
delivery = datetime.strptime(po.delivery_date, "%Y-%m-%d")
po.days_left = (delivery - datetime.now()).days
except ValueError:
pass
filled = sum(1 for v in [
po.po_number, po.po_date, po.vendor_name,
po.delivery_date, po.total_amount,
] if v)
po.extraction_confidence = round(filled / 5 * 100, 1)
return po
def validate_po(po: PurchaseOrder) -> list[str]:
warnings = []
if not po.po_number:
warnings.append("Missing PO number")
if not po.po_date:
warnings.append("Missing PO date")
if not po.vendor_name:
warnings.append("Missing vendor name")
if not po.total_amount:
warnings.append("Missing total amount")
if po.days_left is not None and po.days_left < 0:
warnings.append(f"OVERDUE: delivery was {abs(po.days_left)} days ago ({po.delivery_date})")
elif po.days_left is not None and po.days_left <= 3:
warnings.append(f"URGENT: delivery in {po.days_left} days ({po.delivery_date})")
if po.items and po.total_amount > 0:
items_total = sum(item.total for item in po.items)
tolerance = max(po.total_amount * 0.01, 1.0)
if abs(items_total - po.total_amount) > tolerance:
warnings.append(f"Total mismatch: items sum={items_total:,.0f}, total={po.total_amount:,.0f}")
if po.ocr_confidence < 85:
warnings.append(f"Low OCR confidence: {po.ocr_confidence}% (method: {po.ocr_method})")
if po.extraction_confidence < 85:
warnings.append(f"Low extraction confidence: {po.extraction_confidence}%")
return warnings
def check_duplicate(po: PurchaseOrder, excel_path: str) -> bool:
if not os.path.exists(excel_path):
return False
wb = openpyxl.load_workbook(excel_path)
ws = wb.active
for row in ws.iter_rows(min_row=2, values_only=True):
if row and len(row) >= 5:
if (
str(row[0] or "") == po.po_number
and str(row[2] or "") == po.vendor_name
and abs(float(row[4] or 0) - po.total_amount) < 1.0
):
wb.close()
return True
wb.close()
return False
HEADER_FILL = PatternFill(start_color="548235", end_color="548235", fill_type="solid")
HEADER_FONT = Font(name="Calibri", bold=True, color="FFFFFF", size=11)
HEADERS = [
"PO#", "Date", "Vendor", "Delivery", "Total",
"Status", "DaysLeft", "PaymentTerms", "OCR%", "Extract%", "OCR Method", "FilePath", "ProcessedAt",
]
COL_WIDTHS = [15, 12, 30, 12, 18, 12, 10, 25, 8, 8, 10, 40, 20]
def init_excel(excel_path: str) -> openpyxl.Workbook:
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "PO Tracking"
for col_idx, (header, width) in enumerate(zip(HEADERS, COL_WIDTHS), 1):
cell = ws.cell(row=1, column=col_idx, value=header)
cell.fill = HEADER_FILL
cell.font = HEADER_FONT
cell.alignment = Alignment(horizontal="center")
ws.column_dimensions[openpyxl.utils.get_column_letter(col_idx)].width = width
ws.auto_filter.ref = f"A1:{openpyxl.utils.get_column_letter(len(HEADERS))}1"
ws.freeze_panes = "A2"
wb.save(excel_path)
return wb
def append_to_excel(po: PurchaseOrder, excel_path: str, status: str = "extracted"):
if not os.path.exists(excel_path):
wb = init_excel(excel_path)
else:
wb = openpyxl.load_workbook(excel_path)
ws = wb.active
next_row = ws.max_row + 1
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
row_data = [
po.po_number, po.po_date, po.vendor_name, po.delivery_date,
po.total_amount, status, po.days_left, po.payment_terms,
po.ocr_confidence, po.extraction_confidence, po.ocr_method,
po.source_file, now,
]
for col_idx, value in enumerate(row_data, 1):
ws.cell(row=next_row, column=col_idx, value=value)
ws.cell(row=next_row, column=5).number_format = "#,##0"
wb.save(excel_path)
wb.close()
def save_json_backup(po: PurchaseOrder, output_dir: str) -> str:
os.makedirs(output_dir, exist_ok=True)
safe_name = re.sub(r"[^\w-]", "_", po.po_number or "unknown")
json_path = os.path.join(output_dir, f"po_{safe_name}_{datetime.now():%Y%m%d_%H%M%S}.json")
data = asdict(po)
data.pop("raw_text", None)
with open(json_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return json_path
def main():
parser = argparse.ArgumentParser(description="Extract PO data from PDF/image files")
parser.add_argument("file", help="Path to the PO file (PDF, JPG, PNG)")
parser.add_argument("--output", "-o", default="po_tracking.xlsx")
parser.add_argument("--json-dir", default=None)
parser.add_argument("--format", choices=["excel", "json", "both"], default="both")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
file_path = os.path.abspath(args.file)
if not os.path.exists(file_path):
eprint(f"File not found: {file_path}")
sys.exit(1)
eprint(f"Extracting text from: {file_path}")
ocr = extract_from_file(file_path)
eprint(f"OCR method: {ocr.method} | Confidence: {ocr.confidence}% | Text length: {len(ocr.text)}")
if not ocr.text.strip():
eprint("ERROR: No text extracted. Check OCR dependencies.")
sys.exit(1)
po = parse_po_text(ocr)
eprint(f"Parsed PO: {po.po_number or '(unknown)'} from {po.vendor_name or '(unknown)'}")
warnings = validate_po(po)
if warnings:
for w in warnings:
eprint(f" ⚠ {w}")
if po.extraction_confidence < 85 or po.ocr_confidence < 85:
po.status = "needs_review"
excel_path = os.path.abspath(args.output)
if not args.dry_run and check_duplicate(po, excel_path):
eprint(f"Duplicate detected: {po.po_number} from {po.vendor_name}")
po.status = "duplicate"
if args.dry_run:
data = asdict(po)
data.pop("raw_text", None)
data["warnings"] = warnings
print(json.dumps(data, ensure_ascii=False, indent=2))
return
if args.format in ("excel", "both"):
append_to_excel(po, excel_path, po.status)
eprint(f"Appended to: {excel_path}")
if args.format in ("json", "both"):
json_dir = args.json_dir or os.path.dirname(excel_path) or "."
json_path = save_json_backup(po, json_dir)
eprint(f"JSON backup: {json_path}")
result = {
"po_number": po.po_number,
"vendor": po.vendor_name,
"total": po.total_amount,
"delivery_date": po.delivery_date,
"days_left": po.days_left,
"status": po.status,
"ocr_confidence": po.ocr_confidence,
"ocr_method": po.ocr_method,
"extraction_confidence": po.extraction_confidence,
"warnings": warnings,
}
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()
FILE:scripts/extract_statement.py
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "openpyxl>=3.1.0",
# "pdfplumber>=0.10.0",
# "Pillow>=10.0.0",
# "pytesseract>=0.3.10",
# "pdf2image>=1.17.0",
# ]
# ///
"""Extract transaction data from bank statement PDF/image files and output to Excel.
Usage:
uv run extract_statement.py /path/to/statement.pdf
uv run extract_statement.py statement.pdf -o ~/accounting/vcb_march.xlsx
uv run extract_statement.py statement.pdf --dry-run
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
from dataclasses import asdict, dataclass, field
from datetime import datetime
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
from ocr_utils import clean_amount, eprint, extract_from_file, parse_date
import openpyxl
from openpyxl.styles import Alignment, Font, PatternFill
@dataclass
class Transaction:
transaction_date: str = ""
description: str = ""
transaction_type: str = "" # CREDIT or DEBIT
amount: float = 0.0
balance: float = 0.0
reference: str = ""
@dataclass
class BankStatement:
bank_name: str = ""
account_number: str = ""
account_holder: str = ""
statement_period: str = ""
opening_balance: float = 0.0
closing_balance: float = 0.0
transactions: list[Transaction] = field(default_factory=list)
ocr_confidence: float = 0.0
extraction_confidence: float = 0.0
ocr_method: str = ""
raw_text: str = ""
source_file: str = ""
def parse_statement_header(text: str) -> dict:
info = {}
bank_patterns = [
r"(Vietcombank|VCB|Techcombank|TCB|BIDV|Agribank|MB\s*Bank|MBBank|VPBank|ACB|Sacombank|TPBank|HDBank|VietinBank|CTG)",
r"(?:Ngân hàng|Bank)[\s.:]*(.+?)(?:\n|$)",
]
for pat in bank_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
info["bank_name"] = m.group(1).strip()
break
acct_patterns = [
r"(?:Account\s*(?:No|Number|#)|Số tài khoản|STK)[\s.:]*(\d[\d\s-]+\d)",
r"(?:Acct)[\s.:]*(\d{6,})",
]
for pat in acct_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
info["account_number"] = re.sub(r"[\s-]", "", m.group(1).strip())
break
holder_patterns = [
r"(?:Account\s*(?:Name|Holder)|Chủ tài khoản|Tên TK)[\s.:]*(.+?)(?:\n|$)",
]
for pat in holder_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
info["account_holder"] = m.group(1).strip()
break
period_patterns = [
r"(?:Period|Statement\s*Period|Từ ngày|Kỳ sao kê)[\s.:]*(.+?)(?:\n|$)",
r"(\d{1,2}[/.-]\d{1,2}[/.-]\d{2,4})\s*[-–to đến]\s*(\d{1,2}[/.-]\d{1,2}[/.-]\d{2,4})",
]
for pat in period_patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
info["statement_period"] = m.group(0).strip() if len(m.groups()) > 1 else m.group(1).strip()
break
return info
def parse_transactions_from_tables(tables: list[list]) -> list[Transaction]:
transactions = []
for table in tables:
if not table or len(table) < 2:
continue
header = None
header_idx = -1
for i, row in enumerate(table):
if row and any(
h and re.search(r"date|ngày|debit|credit|nợ|có|amount|số tiền|balance|số dư", str(h), re.IGNORECASE)
for h in row
):
header = [str(h or "").strip().lower() for h in row]
header_idx = i
break
if header is None:
continue
col_map = {}
for idx, h in enumerate(header):
if re.search(r"date|ngày", h):
col_map.setdefault("date", idx)
elif re.search(r"description|diễn giải|nội dung|mô tả", h):
col_map.setdefault("description", idx)
elif re.search(r"debit|nợ|chi", h):
col_map.setdefault("debit", idx)
elif re.search(r"credit|có|thu", h):
col_map.setdefault("credit", idx)
elif re.search(r"amount|số tiền", h):
col_map.setdefault("amount", idx)
elif re.search(r"balance|số dư", h):
col_map.setdefault("balance", idx)
elif re.search(r"ref|reference|mã giao dịch|số (?:tham )?chiếu", h):
col_map.setdefault("reference", idx)
for row in table[header_idx + 1:]:
if not row or all(not cell for cell in row):
continue
txn = Transaction()
if "date" in col_map and col_map["date"] < len(row):
txn.transaction_date = parse_date(str(row[col_map["date"]] or "").strip())
if "description" in col_map and col_map["description"] < len(row):
txn.description = str(row[col_map["description"]] or "").strip()
if "debit" in col_map and col_map["debit"] < len(row):
debit_val = clean_amount(str(row[col_map["debit"]] or ""))
if debit_val > 0:
txn.amount = debit_val
txn.transaction_type = "DEBIT"
if "credit" in col_map and col_map["credit"] < len(row):
credit_val = clean_amount(str(row[col_map["credit"]] or ""))
if credit_val > 0:
txn.amount = credit_val
txn.transaction_type = "CREDIT"
if not txn.transaction_type and "amount" in col_map and col_map["amount"] < len(row):
amt = clean_amount(str(row[col_map["amount"]] or ""))
txn.amount = abs(amt)
txn.transaction_type = "DEBIT" if amt < 0 else "CREDIT"
if "balance" in col_map and col_map["balance"] < len(row):
txn.balance = clean_amount(str(row[col_map["balance"]] or ""))
if "reference" in col_map and col_map["reference"] < len(row):
txn.reference = str(row[col_map["reference"]] or "").strip()
if txn.amount > 0 or txn.transaction_date:
transactions.append(txn)
return transactions
def parse_transactions_from_text(text: str) -> list[Transaction]:
"""Fallback: parse from raw text."""
transactions = []
line_pattern = re.compile(
r"(\d{1,2}[/.-]\d{1,2}[/.-]\d{2,4})\s+(.+?)\s+([\d.,]+)\s*$",
re.MULTILINE,
)
for m in line_pattern.finditer(text):
txn = Transaction()
txn.transaction_date = parse_date(m.group(1))
txn.description = m.group(2).strip()
txn.amount = clean_amount(m.group(3))
txn.transaction_type = "DEBIT" # Default; agent should review
if txn.amount > 0:
transactions.append(txn)
return transactions
def validate_statement(statement: BankStatement) -> list[str]:
warnings = []
if not statement.bank_name:
warnings.append("Missing bank name")
if not statement.transactions:
warnings.append("No transactions extracted")
return warnings
# Balance continuity
prev_balance = statement.opening_balance
for i, txn in enumerate(statement.transactions):
if txn.balance > 0 and prev_balance > 0:
if txn.transaction_type == "CREDIT":
expected = prev_balance + txn.amount
else:
expected = prev_balance - txn.amount
tolerance = max(abs(expected) * 0.01, 1.0)
if abs(expected - txn.balance) > tolerance:
warnings.append(
f"Balance gap at row {i + 1}: expected {expected:,.0f}, got {txn.balance:,.0f}"
)
prev_balance = txn.balance
if statement.closing_balance > 0 and statement.transactions:
last_balance = statement.transactions[-1].balance
if last_balance > 0 and abs(last_balance - statement.closing_balance) > 1.0:
warnings.append(f"Closing balance mismatch: last={last_balance:,.0f}, stated={statement.closing_balance:,.0f}")
if statement.ocr_confidence < 85:
warnings.append(f"Low OCR confidence: {statement.ocr_confidence}% (method: {statement.ocr_method})")
return warnings
HEADER_FILL = PatternFill(start_color="2E75B6", end_color="2E75B6", fill_type="solid")
HEADER_FONT = Font(name="Calibri", bold=True, color="FFFFFF", size=11)
HEADERS = ["Date", "Description", "Type", "Amount", "Balance", "Reference"]
COL_WIDTHS = [12, 45, 10, 18, 18, 20]
def write_statement_excel(statement: BankStatement, excel_path: str):
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Bank Statement"
info_rows = [
("Bank:", statement.bank_name),
("Account:", statement.account_number),
("Holder:", statement.account_holder),
("Period:", statement.statement_period),
("Opening Balance:", statement.opening_balance),
("OCR Method:", statement.ocr_method),
("OCR Confidence:", f"{statement.ocr_confidence}%"),
]
for row_idx, (label, value) in enumerate(info_rows, 1):
ws.cell(row=row_idx, column=1, value=label).font = Font(bold=True)
ws.cell(row=row_idx, column=2, value=value)
header_row = len(info_rows) + 2
for col_idx, (header, width) in enumerate(zip(HEADERS, COL_WIDTHS), 1):
cell = ws.cell(row=header_row, column=col_idx, value=header)
cell.fill = HEADER_FILL
cell.font = HEADER_FONT
cell.alignment = Alignment(horizontal="center")
ws.column_dimensions[openpyxl.utils.get_column_letter(col_idx)].width = width
for i, txn in enumerate(statement.transactions):
row = header_row + 1 + i
ws.cell(row=row, column=1, value=txn.transaction_date)
ws.cell(row=row, column=2, value=txn.description)
ws.cell(row=row, column=3, value=txn.transaction_type)
ws.cell(row=row, column=4, value=txn.amount).number_format = "#,##0"
ws.cell(row=row, column=5, value=txn.balance).number_format = "#,##0"
ws.cell(row=row, column=6, value=txn.reference)
close_row = header_row + len(statement.transactions) + 2
ws.cell(row=close_row, column=1, value="Closing Balance:").font = Font(bold=True)
ws.cell(row=close_row, column=5, value=statement.closing_balance).number_format = "#,##0"
total_credits = sum(t.amount for t in statement.transactions if t.transaction_type == "CREDIT")
total_debits = sum(t.amount for t in statement.transactions if t.transaction_type == "DEBIT")
ws.cell(row=close_row + 1, column=1, value="Total Credits:").font = Font(bold=True)
ws.cell(row=close_row + 1, column=4, value=total_credits).number_format = "#,##0"
ws.cell(row=close_row + 2, column=1, value="Total Debits:").font = Font(bold=True)
ws.cell(row=close_row + 2, column=4, value=total_debits).number_format = "#,##0"
ws.auto_filter.ref = f"A{header_row}:{openpyxl.utils.get_column_letter(len(HEADERS))}{header_row}"
ws.freeze_panes = f"A{header_row + 1}"
wb.save(excel_path)
wb.close()
def save_json_backup(statement: BankStatement, output_dir: str) -> str:
os.makedirs(output_dir, exist_ok=True)
safe_name = re.sub(r"[^\w-]", "_", statement.bank_name or "unknown")
json_path = os.path.join(output_dir, f"statement_{safe_name}_{datetime.now():%Y%m%d_%H%M%S}.json")
data = asdict(statement)
data.pop("raw_text", None)
with open(json_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return json_path
def main():
parser = argparse.ArgumentParser(description="Extract bank statement data from PDF/image files")
parser.add_argument("file", help="Path to the bank statement file (PDF, JPG, PNG)")
parser.add_argument("--output", "-o", default=None, help="Output Excel file path")
parser.add_argument("--json-dir", default=None, help="Directory for JSON backup")
parser.add_argument("--format", choices=["excel", "json", "both"], default="both")
parser.add_argument("--dry-run", action="store_true", help="Parse and validate only")
args = parser.parse_args()
file_path = os.path.abspath(args.file)
if not os.path.exists(file_path):
eprint(f"File not found: {file_path}")
sys.exit(1)
eprint(f"Extracting from: {file_path}")
ocr = extract_from_file(file_path)
eprint(f"OCR method: {ocr.method} | Confidence: {ocr.confidence}% | Text length: {len(ocr.text)}")
if not ocr.text.strip():
eprint("ERROR: No text extracted. Check OCR dependencies.")
sys.exit(1)
header_info = parse_statement_header(ocr.text)
statement = BankStatement(
bank_name=header_info.get("bank_name", ""),
account_number=header_info.get("account_number", ""),
account_holder=header_info.get("account_holder", ""),
statement_period=header_info.get("statement_period", ""),
raw_text=ocr.text,
source_file=file_path,
ocr_confidence=ocr.confidence,
ocr_method=ocr.method,
)
# Parse transactions (prefer table extraction, fallback to text)
if ocr.tables:
statement.transactions = parse_transactions_from_tables(ocr.tables)
if not statement.transactions:
statement.transactions = parse_transactions_from_text(ocr.text)
for pat in [r"(?:Opening|Số dư đầu kỳ)[\s.:]*([0-9.,]+)"]:
m = re.search(pat, ocr.text, re.IGNORECASE)
if m:
statement.opening_balance = clean_amount(m.group(1))
break
for pat in [r"(?:Closing|Số dư cuối kỳ)[\s.:]*([0-9.,]+)"]:
m = re.search(pat, ocr.text, re.IGNORECASE)
if m:
statement.closing_balance = clean_amount(m.group(1))
break
filled = sum(1 for v in [
statement.bank_name, statement.account_number,
statement.transactions, statement.opening_balance,
] if v)
statement.extraction_confidence = round(filled / 4 * 100, 1)
eprint(f"Parsed {len(statement.transactions)} transactions from {statement.bank_name or 'unknown bank'}")
warnings = validate_statement(statement)
for w in warnings:
eprint(f" ⚠ {w}")
if args.dry_run:
data = asdict(statement)
data.pop("raw_text", None)
data["warnings"] = warnings
print(json.dumps(data, ensure_ascii=False, indent=2))
return
bank_slug = re.sub(r"[^\w]", "", statement.bank_name or "unknown").lower()
date_slug = datetime.now().strftime("%Y%m%d")
default_name = f"statement_{bank_slug}_{date_slug}.xlsx"
excel_path = os.path.abspath(args.output or default_name)
if args.format in ("excel", "both"):
write_statement_excel(statement, excel_path)
eprint(f"Excel output: {excel_path}")
if args.format in ("json", "both"):
json_dir = args.json_dir or os.path.dirname(excel_path) or "."
json_path = save_json_backup(statement, json_dir)
eprint(f"JSON backup: {json_path}")
result = {
"bank": statement.bank_name,
"account": statement.account_number,
"transactions": len(statement.transactions),
"ocr_confidence": statement.ocr_confidence,
"ocr_method": statement.ocr_method,
"output": excel_path,
"warnings": warnings,
}
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()
FILE:scripts/generate_templates.py
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "openpyxl>=3.1.0",
# ]
# ///
"""Generate empty Excel templates for accounting tracking sheets.
Usage:
uv run generate_templates.py all --output-dir ~/accounting/
uv run generate_templates.py invoice
uv run generate_templates.py po
uv run generate_templates.py statement
"""
from __future__ import annotations
import argparse
import os
import openpyxl
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
def _write_headers(ws, headers, widths, color):
fill = PatternFill(start_color=color, end_color=color, fill_type="solid")
font = Font(name="Calibri", bold=True, color="FFFFFF", size=11)
thin = Side(style="thin")
border = Border(top=thin, bottom=thin, left=thin, right=thin)
for col, (h, w) in enumerate(zip(headers, widths), 1):
cell = ws.cell(row=1, column=col, value=h)
cell.fill = fill
cell.font = font
cell.alignment = Alignment(horizontal="center")
cell.border = border
ws.column_dimensions[openpyxl.utils.get_column_letter(col)].width = w
ws.auto_filter.ref = f"A1:{openpyxl.utils.get_column_letter(len(headers))}1"
ws.freeze_panes = "A2"
def create_invoice_template(path: str):
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Invoice Tracking"
_write_headers(ws, [
"Invoice#", "Date", "Vendor", "TaxCode", "Subtotal",
"VAT", "Total", "Status", "OCR%", "Extract%", "OCR Method", "FilePath", "ProcessedAt",
], [15, 12, 30, 15, 15, 15, 15, 12, 8, 8, 10, 40, 20], "4472C4")
wb.save(path)
print(f"Created: {path}")
def create_po_template(path: str):
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "PO Tracking"
_write_headers(ws, [
"PO#", "Date", "Vendor", "Delivery", "Total",
"Status", "DaysLeft", "PaymentTerms", "OCR%", "Extract%", "OCR Method", "FilePath", "ProcessedAt",
], [15, 12, 30, 12, 18, 12, 10, 25, 8, 8, 10, 40, 20], "548235")
wb.save(path)
print(f"Created: {path}")
def create_statement_template(path: str):
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Bank Statement"
for row, label in enumerate(["Bank:", "Account:", "Holder:", "Period:", "Opening Balance:", "OCR Method:", "OCR Confidence:"], 1):
ws.cell(row=row, column=1, value=label).font = Font(bold=True)
header_row = 9
fill = PatternFill(start_color="2E75B6", end_color="2E75B6", fill_type="solid")
font = Font(name="Calibri", bold=True, color="FFFFFF", size=11)
thin = Side(style="thin")
border = Border(top=thin, bottom=thin, left=thin, right=thin)
headers = ["Date", "Description", "Type", "Amount", "Balance", "Reference"]
widths = [12, 45, 10, 18, 18, 20]
for col, (h, w) in enumerate(zip(headers, widths), 1):
cell = ws.cell(row=header_row, column=col, value=h)
cell.fill = fill
cell.font = font
cell.alignment = Alignment(horizontal="center")
cell.border = border
ws.column_dimensions[openpyxl.utils.get_column_letter(col)].width = w
ws.auto_filter.ref = f"A{header_row}:{openpyxl.utils.get_column_letter(len(headers))}{header_row}"
wb.save(path)
print(f"Created: {path}")
def main():
parser = argparse.ArgumentParser(description="Generate Excel templates for accounting")
parser.add_argument("template", choices=["invoice", "po", "statement", "all"])
parser.add_argument("--output-dir", "-o", default=".")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
if args.template in ("invoice", "all"):
create_invoice_template(os.path.join(args.output_dir, "invoice_tracking.xlsx"))
if args.template in ("po", "all"):
create_po_template(os.path.join(args.output_dir, "po_tracking.xlsx"))
if args.template in ("statement", "all"):
create_statement_template(os.path.join(args.output_dir, "statement_template.xlsx"))
if __name__ == "__main__":
main()
FILE:scripts/ocr_utils.py
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pdfplumber>=0.10.0",
# "Pillow>=10.0.0",
# "pytesseract>=0.3.10",
# "pdf2image>=1.17.0",
# ]
# ///
"""
Shared OCR and text extraction utilities for accounting document processing.
Supports:
- Digital PDFs (text-layer extraction via pdfplumber)
- Scanned PDFs (image conversion + OCR via pdf2image + pytesseract)
- Image files: JPG, JPEG, PNG, TIFF, BMP (direct OCR via pytesseract)
Dependencies:
- System: tesseract-ocr, tesseract-ocr-vie (for Vietnamese), poppler-utils (for pdf2image)
- Python: pdfplumber, Pillow, pytesseract, pdf2image
"""
from __future__ import annotations
import os
import re
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
@dataclass
class OCRResult:
"""Result of text extraction from a document."""
text: str
pages: list[str]
tables: list[list]
method: str # "pdfplumber", "ocr", "hybrid"
confidence: float # 0-100 estimated quality
source_file: str
def _check_tesseract() -> bool:
"""Check if tesseract is available."""
import shutil
return shutil.which("tesseract") is not None
def _check_poppler() -> bool:
"""Check if poppler-utils (pdftoppm) is available for pdf2image."""
import shutil
return shutil.which("pdftoppm") is not None
def _get_tesseract_langs() -> list[str]:
"""Get list of installed tesseract languages."""
import subprocess
try:
result = subprocess.run(
["tesseract", "--list-langs"],
capture_output=True, text=True, timeout=10,
)
langs = result.stdout.strip().split("\n")[1:] # skip header line
return [l.strip() for l in langs if l.strip()]
except Exception:
return []
def _build_lang_string() -> str:
"""Build tesseract language string, preferring vie+eng if available."""
langs = _get_tesseract_langs()
parts = []
if "vie" in langs:
parts.append("vie")
if "eng" in langs:
parts.append("eng")
return "+".join(parts) if parts else "eng"
def extract_text_pdfplumber(pdf_path: str) -> tuple[list[str], list[list]]:
"""Extract text and tables from PDF using pdfplumber (works on digital PDFs)."""
import pdfplumber
pages = []
tables = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
pages.append(page_text or "")
page_tables = page.extract_tables()
if page_tables:
tables.extend(page_tables)
return pages, tables
def extract_text_ocr_image(image_path: str) -> tuple[str, float]:
"""OCR a single image file. Returns (text, confidence)."""
import pytesseract
from PIL import Image
if not _check_tesseract():
eprint("ERROR: tesseract is not installed. Install with: sudo apt install tesseract-ocr tesseract-ocr-vie")
return "", 0.0
lang = _build_lang_string()
img = Image.open(image_path)
# Pre-process for better OCR: convert to grayscale, increase contrast
img = img.convert("L")
# Get text with confidence data
data = pytesseract.image_to_data(img, lang=lang, output_type=pytesseract.Output.DICT)
text = pytesseract.image_to_string(img, lang=lang)
# Calculate average confidence from word-level data
confidences = [int(c) for c in data["conf"] if int(c) > 0]
avg_confidence = sum(confidences) / len(confidences) if confidences else 0.0
return text, avg_confidence
def extract_text_ocr_pdf(pdf_path: str) -> tuple[list[str], float]:
"""OCR a scanned PDF by converting pages to images first."""
if not _check_poppler():
eprint("ERROR: poppler-utils not installed. Install with: sudo apt install poppler-utils")
return [], 0.0
from pdf2image import convert_from_path
images = convert_from_path(pdf_path, dpi=300)
pages = []
confidences = []
for i, img in enumerate(images):
# Save temp image for OCR
import tempfile
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
img.save(tmp.name, "PNG")
text, conf = extract_text_ocr_image(tmp.name)
pages.append(text)
confidences.append(conf)
os.unlink(tmp.name)
avg_conf = sum(confidences) / len(confidences) if confidences else 0.0
return pages, avg_conf
def extract_from_file(file_path: str) -> OCRResult:
"""
Extract text from any supported document file.
Strategy:
1. For PDFs: try pdfplumber first (digital layer). If little text found,
fall back to OCR (scanned PDF).
2. For images: direct OCR.
3. Returns OCRResult with text, tables, method used, and confidence.
"""
file_path = os.path.abspath(file_path)
ext = Path(file_path).suffix.lower()
if ext == ".pdf":
# Try digital extraction first
pages, tables = extract_text_pdfplumber(file_path)
full_text = "\n".join(pages)
# Heuristic: if less than 50 chars per page on average, likely scanned
avg_chars = len(full_text.strip()) / max(len(pages), 1)
if avg_chars > 50:
# Good digital text
confidence = min(98.0, 70.0 + avg_chars / 20)
return OCRResult(
text=full_text,
pages=pages,
tables=tables,
method="pdfplumber",
confidence=round(confidence, 1),
source_file=file_path,
)
# Scanned PDF — fall back to OCR
eprint(f"Low text from pdfplumber ({avg_chars:.0f} chars/page), falling back to OCR...")
ocr_pages, ocr_conf = extract_text_ocr_pdf(file_path)
if ocr_pages and sum(len(p) for p in ocr_pages) > len(full_text.strip()):
return OCRResult(
text="\n".join(ocr_pages),
pages=ocr_pages,
tables=tables, # Keep any tables from pdfplumber
method="ocr",
confidence=round(ocr_conf, 1),
source_file=file_path,
)
# Hybrid: use whatever gave more text
return OCRResult(
text=full_text if full_text.strip() else "\n".join(ocr_pages),
pages=pages if full_text.strip() else ocr_pages,
tables=tables,
method="hybrid",
confidence=round(max(ocr_conf, 50.0), 1),
source_file=file_path,
)
elif ext in (".jpg", ".jpeg", ".png", ".tiff", ".bmp"):
text, confidence = extract_text_ocr_image(file_path)
return OCRResult(
text=text,
pages=[text],
tables=[],
method="ocr",
confidence=round(confidence, 1),
source_file=file_path,
)
else:
eprint(f"Unsupported file type: {ext}")
return OCRResult(
text="", pages=[], tables=[], method="none",
confidence=0.0, source_file=file_path,
)
# ─── Amount / Date Cleaning Utilities ───────────────────────────────────────
def clean_amount(text: str) -> float:
"""Clean a currency amount string into a float.
Handles:
- Vietnamese VND: 1.000.000 or 1,000,000 (no decimals)
- International: 1,000.50 or 1.000,50
- Currency symbols: VND, đ, $, €, £
- Negative in parentheses: (500.000)
"""
if not text:
return 0.0
cleaned = re.sub(r"[VNDvndđĐ$€£\s]", "", str(text).strip())
is_negative = "(" in cleaned and ")" in cleaned
cleaned = cleaned.replace("(", "").replace(")", "")
if "." in cleaned and "," in cleaned:
if cleaned.rindex(".") > cleaned.rindex(","):
cleaned = cleaned.replace(",", "")
else:
cleaned = cleaned.replace(".", "").replace(",", ".")
elif "." in cleaned:
parts = cleaned.split(".")
if len(parts) > 2 or (len(parts) == 2 and len(parts[-1]) == 3):
cleaned = cleaned.replace(".", "")
elif "," in cleaned:
parts = cleaned.split(",")
if len(parts) > 2 or (len(parts) == 2 and len(parts[-1]) == 3):
cleaned = cleaned.replace(",", "")
else:
cleaned = cleaned.replace(",", ".")
cleaned = re.sub(r"[^\d.\-]", "", cleaned)
try:
val = float(cleaned)
return -val if is_negative else val
except ValueError:
return 0.0
def parse_date(text: str) -> str:
"""Parse a date string into YYYY-MM-DD format.
Supports: dd/mm/yyyy, dd-mm-yyyy, dd.mm.yyyy, mm/dd/yyyy, yyyy-mm-dd
"""
if not text:
return ""
text = text.strip()
for fmt in ("%d/%m/%Y", "%d-%m-%Y", "%d.%m.%Y", "%m/%d/%Y", "%Y-%m-%d"):
try:
return datetime.strptime(text, fmt).strftime("%Y-%m-%d")
except ValueError:
continue
return text
def check_system_deps() -> dict:
"""Check all system dependencies and return status report."""
import shutil
report = {
"tesseract": shutil.which("tesseract") is not None,
"tesseract_langs": _get_tesseract_langs() if shutil.which("tesseract") else [],
"poppler": shutil.which("pdftoppm") is not None,
}
report["vie_support"] = "vie" in report["tesseract_langs"]
report["ready"] = report["tesseract"] and report["poppler"]
return report
if __name__ == "__main__":
import argparse
import json
parser = argparse.ArgumentParser(description="OCR utilities for accounting documents")
sub = parser.add_subparsers(dest="command")
# Check dependencies
sub.add_parser("check", help="Check system dependencies")
# Extract text
ext = sub.add_parser("extract", help="Extract text from a file")
ext.add_argument("file", help="File to extract text from")
ext.add_argument("--json", action="store_true", help="Output as JSON")
args = parser.parse_args()
if args.command == "check":
report = check_system_deps()
print(json.dumps(report, indent=2))
if not report["ready"]:
eprint("\nMissing dependencies. Install with:")
if not report["tesseract"]:
eprint(" sudo apt install tesseract-ocr tesseract-ocr-vie")
if not report["poppler"]:
eprint(" sudo apt install poppler-utils")
sys.exit(1)
else:
eprint("All dependencies OK.")
if not report["vie_support"]:
eprint(" Note: Vietnamese language pack not installed. Run: sudo apt install tesseract-ocr-vie")
elif args.command == "extract":
result = extract_from_file(args.file)
if args.json:
print(json.dumps({
"text": result.text,
"pages": len(result.pages),
"tables": len(result.tables),
"method": result.method,
"confidence": result.confidence,
"source_file": result.source_file,
}, ensure_ascii=False, indent=2))
else:
print(result.text)
eprint(f"\n--- Method: {result.method} | Confidence: {result.confidence}% | Pages: {len(result.pages)} | Tables: {len(result.tables)} ---")
else:
parser.print_help()
Use when the user wants to interact with a public Google Drive — listing, reading, creating, updating, or deleting files and folders. Handles both read-only...
---
name: google-drive-skill
description: |
Use when the user wants to interact with a public Google Drive — listing, reading, creating, updating, or deleting files and folders.
Handles both read-only access (API key only) and write access (service account or OAuth2).
Covers folder navigation, file upload/download, permissions, MIME types, and error handling.
USE FOR: integrating Google Drive into scripts or applications; reading publicly shared files; CRUD on service-account-owned drives; automating Drive folder structures.
DO NOT USE FOR: Google Workspace Admin tasks; Google Docs/Sheets API-specific operations beyond Drive metadata.
---
# Google Drive Skill
## Resolve `SKILL_SCRIPTS`
**IMPORTANT:** All commands below use `SKILL_SCRIPTS` as shorthand for the absolute path to the `scripts/` directory of this skill. Resolve it before running any script:
```bash
SKILL_SCRIPTS="$(
_ws_root="$(git rev-parse --show-toplevel 2>/dev/null || pwd)"
_ws_path="$_ws_root/.github/skills/google-drive-skill/scripts"
_global_path="$HOME/.openclaw/workspace/skills/google-drive-skill/scripts"
_global_path2="$HOME/.openclaw/skills/google-drive-skill/scripts"
if [ -d "$_ws_path" ]; then
echo "$_ws_path"
elif [ -d "$_global_path" ]; then
echo "$_global_path"
elif [ -d "$_global_path2" ]; then
echo "$_global_path2"
else
find "$HOME" /opt -type d -name google-drive-skill -path '*/skills/*' 2>/dev/null \
| head -1 | sed 's|$|/scripts|'
fi
)"
echo "SKILL_SCRIPTS=$SKILL_SCRIPTS"
```
---
## Overview
This skill provides patterns and **ready-to-run scripts** for CRUD operations on Google Drive using the **Drive API v3**.
| Script | Operation | Auth Required |
|--------|-----------|---------------|
| `list_files.py` | List files/folders in a folder | API key (public) or service account |
| `download_file.py` | Download a file to disk | API key (public) or service account |
| `create_folder.py` | Create a new folder | Service account |
| `upload_file.py` | Upload a local file | Service account |
| `update_file.py` | Rename, move, or replace content | Service account |
| `delete_file.py` | Trash or permanently delete | Service account |
| `set_permissions.py` | Make public / share / revoke | Service account |
**Supporting files:**
- `scripts/drive_client.py` — shared auth factory (imported by all scripts)
- `assets/service_account_template.json` — template for your service account JSON
- `references/mime_types.md` — MIME type quick reference
- `references/error_codes.md` — HTTP error codes + retry patterns
---
## Quick Start
### 1. Install dependencies
```bash
pip install google-api-python-client google-auth google-auth-httplib2
```
### 2. Set credentials
```bash
# Read-only (public files):
export GOOGLE_API_KEY="your_api_key_here"
# Read + write (service account):
export GOOGLE_SERVICE_ACCOUNT_JSON="/path/to/service_account.json"
```
See `assets/service_account_template.json` for the expected JSON structure.
### 3. Resolve `SKILL_SCRIPTS` (see top of this file), then run scripts
---
## Script Commands
### List files in a folder
```bash
# Table output (default):
python "$SKILL_SCRIPTS/list_files.py" --folder-id FOLDER_ID
# JSON output:
python "$SKILL_SCRIPTS/list_files.py" --folder-id FOLDER_ID --json
```
### Download a file
```bash
# Save to specific path:
python "$SKILL_SCRIPTS/download_file.py" --file-id FILE_ID --dest ./downloads/report.pdf
# Save to directory (auto filename from Drive):
python "$SKILL_SCRIPTS/download_file.py" --file-id FILE_ID --dest ./downloads/
```
### Create a folder
```bash
# In Drive root:
python "$SKILL_SCRIPTS/create_folder.py" --name "My New Folder"
# Inside a specific parent folder:
python "$SKILL_SCRIPTS/create_folder.py" --name "Subfolder" --parent-id PARENT_FOLDER_ID
```
### Upload a file
```bash
# Basic upload (MIME type auto-detected):
python "$SKILL_SCRIPTS/upload_file.py" --src ./report.pdf --parent-id FOLDER_ID
# Custom name on Drive:
python "$SKILL_SCRIPTS/upload_file.py" --src ./report.pdf --name "Q1-Report.pdf" --parent-id FOLDER_ID
# Upload and make publicly readable immediately:
python "$SKILL_SCRIPTS/upload_file.py" --src ./photo.png --parent-id FOLDER_ID --make-public
```
### Update a file
```bash
# Rename:
python "$SKILL_SCRIPTS/update_file.py" --file-id FILE_ID --name "New Name.pdf"
# Move to a different folder:
python "$SKILL_SCRIPTS/update_file.py" --file-id FILE_ID --move-to NEW_PARENT_ID --old-parent OLD_PARENT_ID
# Replace file content:
python "$SKILL_SCRIPTS/update_file.py" --file-id FILE_ID --src ./updated_report.pdf
# Rename + replace content:
python "$SKILL_SCRIPTS/update_file.py" --file-id FILE_ID --name "v2.pdf" --src ./v2.pdf
```
### Delete or trash a file
```bash
# Move to trash (safe, recoverable):
python "$SKILL_SCRIPTS/delete_file.py" --file-id FILE_ID
# Permanently delete (prompts for confirmation):
python "$SKILL_SCRIPTS/delete_file.py" --file-id FILE_ID --permanent
# Permanently delete (non-interactive):
python "$SKILL_SCRIPTS/delete_file.py" --file-id FILE_ID --permanent --yes
```
### Manage permissions
```bash
# Make file publicly readable (anyone with link):
python "$SKILL_SCRIPTS/set_permissions.py" --file-id FILE_ID --public
# Share with a user as writer:
python "$SKILL_SCRIPTS/set_permissions.py" --file-id FILE_ID --email [email protected] --role writer
# Share with a user as reader:
python "$SKILL_SCRIPTS/set_permissions.py" --file-id FILE_ID --email [email protected] --role reader
# List current permissions (returns JSON):
python "$SKILL_SCRIPTS/set_permissions.py" --file-id FILE_ID --list
# Remove a specific permission:
python "$SKILL_SCRIPTS/set_permissions.py" --file-id FILE_ID --remove PERMISSION_ID
```
---
## Auth Setup
### Option A — API Key (read-only public files)
Use when the file/folder is shared as **"Anyone with the link"** and you only need to **read**.
```python
from googleapiclient.discovery import build
API_KEY = os.environ["GOOGLE_API_KEY"]
drive = build("drive", "v3", developerKey=API_KEY)
```
### Option B — Service Account (read + write on shared/public drives)
Use when you need to **create, modify, or delete** files.
```python
from google.oauth2 import service_account
from googleapiclient.discovery import build
import os
SCOPES = ["https://www.googleapis.com/auth/drive"]
creds = service_account.Credentials.from_service_account_file(
os.environ["GOOGLE_SERVICE_ACCOUNT_JSON"], scopes=SCOPES
)
drive = build("drive", "v3", credentials=creds)
```
> Copy `assets/service_account_template.json` to see the expected structure.
> Store the real file outside the repo and never commit it.
> Add `service_account.json` to `.gitignore`.
---
## File & Folder ID
Every Drive resource has a unique `fileId`. Extract it from the share URL:
```
https://drive.google.com/drive/folders/FOLDER_ID_HERE
https://drive.google.com/file/d/FILE_ID_HERE/view
```
---
## CRUD Operations
### List Files in a Public Folder
```python
def list_files(drive, folder_id: str) -> list[dict]:
results = []
page_token = None
while True:
resp = drive.files().list(
q=f"'{folder_id}' in parents and trashed=false",
fields="nextPageToken, files(id, name, mimeType, size, modifiedTime)",
pageToken=page_token,
supportsAllDrives=True,
includeItemsFromAllDrives=True,
).execute()
results.extend(resp.get("files", []))
page_token = resp.get("nextPageToken")
if not page_token:
break
return results
```
### Download a Public File
```python
import io
from googleapiclient.http import MediaIoBaseDownload
def download_file(drive, file_id: str, dest_path: str) -> None:
request = drive.files().get_media(fileId=file_id, supportsAllDrives=True)
with open(dest_path, "wb") as fh:
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
_, done = downloader.next_chunk()
```
### Create a Folder
```python
def create_folder(drive, name: str, parent_id: str | None = None) -> str:
metadata = {
"name": name,
"mimeType": "application/vnd.google-apps.folder",
}
if parent_id:
metadata["parents"] = [parent_id]
folder = drive.files().create(
body=metadata,
fields="id",
supportsAllDrives=True,
).execute()
return folder["id"]
```
### Upload / Create a File
```python
from googleapiclient.http import MediaFileUpload
def upload_file(
drive,
local_path: str,
name: str,
parent_id: str | None = None,
mime_type: str = "application/octet-stream",
) -> str:
metadata = {"name": name}
if parent_id:
metadata["parents"] = [parent_id]
media = MediaFileUpload(local_path, mimetype=mime_type, resumable=True)
file = drive.files().create(
body=metadata,
media_body=media,
fields="id",
supportsAllDrives=True,
).execute()
return file["id"]
```
### Upload from In-Memory Bytes
```python
from googleapiclient.http import MediaIoBaseUpload
def upload_bytes(
drive,
data: bytes,
name: str,
parent_id: str | None = None,
mime_type: str = "application/octet-stream",
) -> str:
metadata = {"name": name}
if parent_id:
metadata["parents"] = [parent_id]
media = MediaIoBaseUpload(io.BytesIO(data), mimetype=mime_type, resumable=True)
file = drive.files().create(
body=metadata,
media_body=media,
fields="id",
supportsAllDrives=True,
).execute()
return file["id"]
```
### Update File Metadata (rename, move)
```python
def rename_file(drive, file_id: str, new_name: str) -> None:
drive.files().update(
fileId=file_id,
body={"name": new_name},
supportsAllDrives=True,
).execute()
def move_file(drive, file_id: str, new_parent_id: str, old_parent_id: str) -> None:
drive.files().update(
fileId=file_id,
addParents=new_parent_id,
removeParents=old_parent_id,
fields="id, parents",
supportsAllDrives=True,
).execute()
```
### Update File Content
```python
def update_file_content(
drive,
file_id: str,
local_path: str,
mime_type: str = "application/octet-stream",
) -> None:
media = MediaFileUpload(local_path, mimetype=mime_type, resumable=True)
drive.files().update(
fileId=file_id,
media_body=media,
supportsAllDrives=True,
).execute()
```
### Delete a File or Folder
```python
def delete_file(drive, file_id: str) -> None:
drive.files().delete(fileId=file_id, supportsAllDrives=True).execute()
```
> Deleting a folder also deletes all its children permanently. Prefer trashing:
> ```python
> drive.files().update(fileId=file_id, body={"trashed": True}).execute()
> ```
---
## Set Permissions (Make Public)
To make a file publicly readable (anyone with the link):
```python
def make_public(drive, file_id: str) -> None:
drive.permissions().create(
fileId=file_id,
body={"role": "reader", "type": "anyone"},
supportsAllDrives=True,
).execute()
```
To grant write access to a specific user:
```python
def share_with_user(drive, file_id: str, email: str, role: str = "writer") -> None:
# role: "reader", "commenter", "writer", "fileOrganizer", "organizer", "owner"
drive.permissions().create(
fileId=file_id,
body={"role": role, "type": "user", "emailAddress": email},
sendNotificationEmail=False,
supportsAllDrives=True,
).execute()
```
---
## Common MIME Types
| Content | MIME Type |
|---------|-----------|
| Folder | `application/vnd.google-apps.folder` |
| Google Doc | `application/vnd.google-apps.document` |
| Google Sheet | `application/vnd.google-apps.spreadsheet` |
| PDF | `application/pdf` |
| Plain text | `text/plain` |
| JSON | `application/json` |
| PNG | `image/png` |
| ZIP | `application/zip` |
---
## Error Handling
```python
from googleapiclient.errors import HttpError
def safe_delete(drive, file_id: str) -> bool:
try:
drive.files().delete(fileId=file_id, supportsAllDrives=True).execute()
return True
except HttpError as e:
if e.resp.status == 404:
return False # already gone
raise
```
Common HTTP status codes:
- `400` — bad request (check field names, MIME types)
- `403` — permission denied (check auth scope or file sharing settings)
- `404` — file not found (check `fileId`, `supportsAllDrives`)
- `429` — rate limited (back off and retry)
---
## Required Python Packages
```
google-api-python-client>=2.100.0
google-auth>=2.23.0
google-auth-httplib2>=0.1.1
```
Install:
```bash
pip install google-api-python-client google-auth google-auth-httplib2
```
---
## Security Notes
- Never hardcode API keys or service account credentials in source code. Use environment variables or secret managers.
- Restrict API key to the Drive API scope in the Google Cloud Console.
- Service account credentials (`service_account.json`) must be in `.gitignore`.
- When sharing files, prefer time-limited access tokens over permanent public links where possible.
- `"type": "anyone"` with `"role": "writer"` on a production folder is dangerous — audit permissions regularly.
---
## Reference
**Local references (this skill):**
- [references/mime_types.md](references/mime_types.md) — full MIME type table
- [references/error_codes.md](references/error_codes.md) — HTTP errors, Drive reasons, retry pattern
- [assets/service_account_template.json](assets/service_account_template.json) — service account JSON structure
**Official docs:**
- [Drive API v3 documentation](https://developers.google.com/drive/api/v3/reference)
- [Python client library](https://googleapis.github.io/google-api-python-client/docs/dyn/drive_v3.html)
- [Service accounts guide](https://developers.google.com/identity/protocols/oauth2/service-account)
- [Drive MIME types](https://developers.google.com/drive/api/guides/mime-types)
FILE:README.md
# Google Drive Skill
A comprehensive skill providing patterns and ready-to-run scripts for CRUD operations on Google Drive using the **Google Drive API v3**.
## Overview
This skill enables seamless interaction with Google Drive through Python scripts. It supports:
- **Read-only access** (API key for public files)
- **Full read/write access** (service account authentication)
- Folder navigation, file management, permissions, and error handling
### Use Cases
- Integrate Google Drive into scripts and applications
- Read publicly shared files
- Automate CRUD operations on service-account-owned drives
- Manage Drive folder structures programmatically
## Project Structure
```
google-drive-skill/
├── README.md # This file
├── SKILL.md # Detailed skill documentation
├── assets/
│ └── service_account_template.json # Service account credentials template
├── references/
│ ├── error_codes.md # HTTP error codes & retry patterns
│ └── mime_types.md # MIME type quick reference
└── scripts/
├── drive_client.py # Shared auth factory (imported by all)
├── list_files.py # List files/folders in a directory
├── download_file.py # Download files from Drive
├── upload_file.py # Upload files to Drive
├── create_folder.py # Create new folders
├── update_file.py # Rename, move, or update files
├── delete_file.py # Delete or trash files
└── set_permissions.py # Manage file permissions & sharing
```
## Quick Start
### 1. Install Dependencies
```bash
pip install google-api-python-client google-auth google-auth-httplib2
```
### 2. Set Up Credentials
**For read-only access (public files):**
```bash
export GOOGLE_API_KEY="your_api_key_here"
```
**For read/write access (service account):**
```bash
export GOOGLE_SERVICE_ACCOUNT_JSON="/path/to/service_account.json"
```
See `assets/service_account_template.json` for the expected JSON structure.
### 3. Run Scripts
```bash
# List files in a folder
python scripts/list_files.py --folder-id FOLDER_ID
# Download a file
python scripts/download_file.py --file-id FILE_ID --dest ./downloads/
# Upload a file
python scripts/upload_file.py --src ./report.pdf --parent-id FOLDER_ID
# Create a folder
python scripts/create_folder.py --name "New Folder"
# Update/rename a file
python scripts/update_file.py --file-id FILE_ID --name "Updated Name.pdf"
# Delete a file
python scripts/delete_file.py --file-id FILE_ID
# Manage permissions
python scripts/set_permissions.py --file-id FILE_ID --public
```
## Available Scripts
| Script | Operation | Auth Required |
|--------|-----------|---------------|
| `list_files.py` | List files/folders in a directory | API key or service account |
| `download_file.py` | Download a file to disk | API key or service account |
| `create_folder.py` | Create a new folder | Service account |
| `upload_file.py` | Upload a local file | Service account |
| `update_file.py` | Rename, move, or update content | Service account |
| `delete_file.py` | Trash or permanently delete | Service account |
| `set_permissions.py` | Make public / share / revoke access | Service account |
## Authentication Methods
### API Key (Read-Only)
Use for public, shared files when you only need read access.
```python
from googleapiclient.discovery import build
import os
API_KEY = os.environ["GOOGLE_API_KEY"]
drive = build("drive", "v3", developerKey=API_KEY)
```
### Service Account (Read/Write)
Use when you need to create, modify, or delete files.
```python
from google.oauth2 import service_account
from googleapiclient.discovery import build
import os
SCOPES = ["https://www.googleapis.com/auth/drive"]
creds = service_account.Credentials.from_service_account_file(
os.environ["GOOGLE_SERVICE_ACCOUNT_JSON"], scopes=SCOPES
)
drive = build("drive", "v3", credentials=creds)
```
## Finding File & Folder IDs
Every Drive resource has a unique `fileId`. Extract it from the share URL:
```
Folder: https://drive.google.com/drive/folders/FOLDER_ID_HERE
File: https://drive.google.com/file/d/FILE_ID_HERE/view
```
## Common Operations
### List Files
```bash
python scripts/list_files.py --folder-id FOLDER_ID
python scripts/list_files.py --folder-id FOLDER_ID --json # JSON output
```
### Download Files
```bash
# Save to specific path
python scripts/download_file.py --file-id FILE_ID --dest ./report.pdf
# Save to directory (auto filename)
python scripts/download_file.py --file-id FILE_ID --dest ./downloads/
```
### Create Folders
```bash
# In Drive root
python scripts/create_folder.py --name "My New Folder"
# In a specific parent folder
python scripts/create_folder.py --name "Subfolder" --parent-id PARENT_FOLDER_ID
```
### Upload Files
```bash
# Basic upload
python scripts/upload_file.py --src ./report.pdf --parent-id FOLDER_ID
# Custom name and make public
python scripts/upload_file.py --src ./photo.png --name "Photo.png" --parent-id FOLDER_ID --make-public
```
### Update Files
```bash
# Rename
python scripts/update_file.py --file-id FILE_ID --name "New Name.pdf"
# Move to different folder
python scripts/update_file.py --file-id FILE_ID --move-to NEW_PARENT_ID --old-parent OLD_PARENT_ID
# Replace content
python scripts/update_file.py --file-id FILE_ID --src ./updated.pdf
```
### Delete Files
```bash
# Move to trash
python scripts/delete_file.py --file-id FILE_ID
# Permanently delete (with confirmation)
python scripts/delete_file.py --file-id FILE_ID --permanent
# Permanently delete (non-interactive)
python scripts/delete_file.py --file-id FILE_ID --permanent --yes
```
### Manage Permissions
```bash
# Make file publicly readable
python scripts/set_permissions.py --file-id FILE_ID --public
# Share with a user as writer
python scripts/set_permissions.py --file-id FILE_ID --email [email protected] --role writer
# Share as reader
python scripts/set_permissions.py --file-id FILE_ID --email [email protected] --role reader
# List current permissions
python scripts/set_permissions.py --file-id FILE_ID --list
# Remove a permission
python scripts/set_permissions.py --file-id FILE_ID --remove PERMISSION_ID
```
## References
- **[error_codes.md](references/error_codes.md)** — HTTP error codes and retry patterns
- **[mime_types.md](references/mime_types.md)** — MIME type quick reference
- **[SKILL.md](SKILL.md)** — Complete skill documentation and code patterns
## Security Best Practices
1. **Never commit credentials** — Always store `service_account.json` outside the repository
2. **Use `.gitignore`** — Add service account files to `.gitignore`
3. **Rotate service accounts** — Follow Google Cloud security policies
4. **Limit permissions** — Grant only necessary Drive API scopes to service accounts
## Troubleshooting
- **Authentication errors**: Verify environment variables are set correctly
- **Permission denied**: Check that your service account has access to the target files/folders
- **Not found errors**: Confirm file/folder IDs are correct
- See [error_codes.md](references/error_codes.md) for detailed error handling
## For More Information
See [SKILL.md](SKILL.md) for comprehensive documentation including:
- Detailed CRUD operation patterns
- Advanced usage examples
- Error handling strategies
- Full script documentation
FILE:assets/service_account_template.json
{
"type": "service_account",
"project_id": "YOUR_PROJECT_ID",
"private_key_id": "KEY_ID",
"private_key": "-----BEGIN RSA PRIVATE KEY-----\n...\n-----END RSA PRIVATE KEY-----\n",
"client_email": "your-service-account@YOUR_PROJECT_ID.iam.gserviceaccount.com",
"client_id": "NUMERIC_CLIENT_ID",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/your-service-account%40YOUR_PROJECT_ID.iam.gserviceaccount.com",
"universe_domain": "googleapis.com"
}
FILE:references/error_codes.md
# Google Drive Errors Reference
## HTTP Status Codes
| Code | Reason | Common Cause | Fix |
|------|--------|--------------|-----|
| 400 | Bad Request | Invalid field name, wrong MIME type, malformed query | Check `q` syntax, field names, and MIME strings |
| 401 | Unauthorized | Expired or missing credentials | Refresh token / regenerate service account key |
| 403 | Forbidden | Insufficient permissions on the file or Drive | Check sharing settings; ensure service account has access |
| 404 | Not Found | Wrong `fileId`, file deleted, `supportsAllDrives` missing | Verify ID from URL; add `supportsAllDrives=True` |
| 409 | Conflict | Duplicate file name conflict (rare) | Rename or check for existing file first |
| 429 | Too Many Requests | API quota exceeded | Exponential back-off and retry |
| 500 | Internal Server Error | Transient Google-side error | Retry with back-off |
| 503 | Service Unavailable | Drive temporarily unavailable | Retry with back-off |
## Drive API Error Reasons (inside 403 responses)
| Reason | Meaning |
|--------|---------|
| `appNotAuthorizedToFile` | App is not authorized to access this file |
| `domainPolicy` | File sharing blocked by org policy |
| `forbidden` | Authenticated user lacks permissions |
| `insufficientFilePermissions` | Read-only access on a write operation |
| `rateLimitExceeded` | Per-user or per-project rate limit hit |
| `sharingNotSupported` | Drive doesn't support sharing (e.g., shared drive restrictions) |
| `storageQuotaExceeded` | Drive or user storage quota full |
## Handling in Python
```python
from googleapiclient.errors import HttpError
import time
def with_retry(fn, max_attempts: int = 5):
"""Simple exponential back-off wrapper for Drive API calls."""
for attempt in range(max_attempts):
try:
return fn()
except HttpError as e:
if e.resp.status in (429, 500, 503):
wait = 2 ** attempt
print(f"Rate-limited or server error, retrying in {wait}s...")
time.sleep(wait)
else:
raise
raise RuntimeError("Max retry attempts exceeded")
```
## Common Query (`q`) Syntax Errors
```
# Wrong — missing quotes around folder ID
q="FOLDER_ID in parents"
# Correct
q="'FOLDER_ID' in parents and trashed=false"
# Wrong — spaces in MIME type value
q="mimeType = 'application/vnd.google-apps.folder '"
# Correct
q="mimeType='application/vnd.google-apps.folder'"
```
## `supportsAllDrives` Requirement
Always pass `supportsAllDrives=True` and `includeItemsFromAllDrives=True` when files may reside in a **Shared Drive** (formerly Team Drive). Without these, the API returns 404 for Shared Drive files.
FILE:references/mime_types.md
# Google Drive MIME Types Reference
## Google Workspace Types
| Resource | MIME Type |
|----------|-----------|
| Folder | `application/vnd.google-apps.folder` |
| Google Docs | `application/vnd.google-apps.document` |
| Google Sheets | `application/vnd.google-apps.spreadsheet` |
| Google Slides | `application/vnd.google-apps.presentation` |
| Google Forms | `application/vnd.google-apps.form` |
| Google Drawings | `application/vnd.google-apps.drawing` |
| Google Maps | `application/vnd.google-apps.map` |
| Google Sites | `application/vnd.google-apps.site` |
| Google Apps Script | `application/vnd.google-apps.script` |
| Shortcut | `application/vnd.google-apps.shortcut` |
**Note:** Google Workspace files (Docs, Sheets, etc.) cannot be downloaded directly with `get_media`. Export them first:
```python
# Export Google Doc as PDF
request = drive.files().export_media(
fileId=file_id,
mimeType="application/pdf"
)
```
## Common Binary / Text Types
| Content | MIME Type |
|---------|-----------|
| PDF | `application/pdf` |
| ZIP | `application/zip` |
| Plain text | `text/plain` |
| HTML | `text/html` |
| Markdown | `text/markdown` |
| CSV | `text/csv` |
| JSON | `application/json` |
| XML | `application/xml` |
| JavaScript | `text/javascript` |
| Python | `text/x-python` |
## Image Types
| Format | MIME Type |
|--------|-----------|
| PNG | `image/png` |
| JPEG | `image/jpeg` |
| GIF | `image/gif` |
| WebP | `image/webp` |
| SVG | `image/svg+xml` |
| TIFF | `image/tiff` |
| BMP | `image/bmp` |
| ICO | `image/x-icon` |
## Audio / Video Types
| Format | MIME Type |
|--------|-----------|
| MP4 | `video/mp4` |
| WebM | `video/webm` |
| AVI | `video/x-msvideo` |
| MP3 | `audio/mpeg` |
| WAV | `audio/wav` |
| OGG | `audio/ogg` |
## Office / Document Types
| Format | MIME Type |
|--------|-----------|
| DOCX | `application/vnd.openxmlformats-officedocument.wordprocessingml.document` |
| XLSX | `application/vnd.openxmlformats-officedocument.spreadsheetml.sheet` |
| PPTX | `application/vnd.openxmlformats-officedocument.presentationml.presentation` |
| ODT | `application/vnd.oasis.opendocument.text` |
| ODS | `application/vnd.oasis.opendocument.spreadsheet` |
## Auto-detection in Python
```python
import mimetypes
mime_type, _ = mimetypes.guess_type("report.pdf")
# mime_type = "application/pdf"
mime_type, _ = mimetypes.guess_type("unknown.xyz")
# mime_type = None → fall back to "application/octet-stream"
```
FILE:scripts/create_folder.py
#!/usr/bin/env python3
"""
Create a folder in Google Drive.
Usage:
python create_folder.py --name <FOLDER_NAME> [--parent-id <PARENT_FOLDER_ID>]
Output:
Prints the new folder ID.
Auth:
GOOGLE_SERVICE_ACCOUNT_JSON — required (write operation)
"""
import argparse
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from drive_client import build_drive
def create_folder(drive, name: str, parent_id: str | None = None) -> str:
metadata: dict = {
"name": name,
"mimeType": "application/vnd.google-apps.folder",
}
if parent_id:
metadata["parents"] = [parent_id]
folder = (
drive.files()
.create(body=metadata, fields="id, name", supportsAllDrives=True)
.execute()
)
return folder["id"]
def main():
parser = argparse.ArgumentParser(description="Create a folder in Google Drive")
parser.add_argument("--name", required=True, help="Folder name")
parser.add_argument(
"--parent-id", default=None, help="Parent folder ID (root if omitted)"
)
args = parser.parse_args()
drive = build_drive(readonly=False)
folder_id = create_folder(drive, args.name, args.parent_id)
print(f"Created folder '{args.name}'")
print(f"Folder ID: {folder_id}")
if __name__ == "__main__":
main()
FILE:scripts/delete_file.py
#!/usr/bin/env python3
"""
Delete or trash a file / folder in Google Drive.
Usage:
# Move to trash (safe, recoverable):
python delete_file.py --file-id <FILE_ID>
# Permanently delete (irreversible):
python delete_file.py --file-id <FILE_ID> --permanent
# Skip confirmation prompt:
python delete_file.py --file-id <FILE_ID> --permanent --yes
Warning:
Permanently deleting a folder also deletes ALL its children.
Auth:
GOOGLE_SERVICE_ACCOUNT_JSON — required (write operation)
"""
import argparse
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from drive_client import build_drive
from googleapiclient.errors import HttpError
def trash_file(drive, file_id: str) -> None:
drive.files().update(
fileId=file_id, body={"trashed": True}, supportsAllDrives=True
).execute()
def delete_file_permanent(drive, file_id: str) -> bool:
try:
drive.files().delete(fileId=file_id, supportsAllDrives=True).execute()
return True
except HttpError as e:
if e.resp.status == 404:
return False
raise
def main():
parser = argparse.ArgumentParser(
description="Delete or trash a Google Drive file/folder"
)
parser.add_argument("--file-id", required=True, help="Google Drive file/folder ID")
parser.add_argument(
"--permanent",
action="store_true",
help="Permanently delete instead of moving to trash",
)
parser.add_argument(
"--yes", "-y", action="store_true", help="Skip confirmation prompt"
)
args = parser.parse_args()
if args.permanent and not args.yes:
confirm = input(
f"Permanently delete {args.file_id}? This cannot be undone. [y/N] "
).strip().lower()
if confirm != "y":
print("Aborted.")
sys.exit(0)
drive = build_drive(readonly=False)
if args.permanent:
found = delete_file_permanent(drive, args.file_id)
if found:
print(f"Permanently deleted: {args.file_id}")
else:
print(f"Not found (already deleted?): {args.file_id}")
else:
trash_file(drive, args.file_id)
print(f"Moved to trash: {args.file_id}")
if __name__ == "__main__":
main()
FILE:scripts/download_file.py
#!/usr/bin/env python3
"""
Download a file from Google Drive to a local path.
Usage:
python download_file.py --file-id <FILE_ID> --dest <LOCAL_PATH>
Auth:
GOOGLE_API_KEY — publicly shared files
GOOGLE_SERVICE_ACCOUNT_JSON — private / service-account files
"""
import argparse
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from drive_client import build_drive
from googleapiclient.http import MediaIoBaseDownload
def download_file(drive, file_id: str, dest_path: str) -> None:
# Get filename if dest_path is a directory
if os.path.isdir(dest_path):
meta = (
drive.files()
.get(fileId=file_id, fields="name", supportsAllDrives=True)
.execute()
)
dest_path = os.path.join(dest_path, meta["name"])
request = drive.files().get_media(fileId=file_id, supportsAllDrives=True)
with open(dest_path, "wb") as fh:
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
status, done = downloader.next_chunk()
pct = int(status.progress() * 100)
print(f"\rDownloading... {pct}%", end="", flush=True)
print(f"\nSaved to: {dest_path}")
def main():
parser = argparse.ArgumentParser(description="Download a file from Google Drive")
parser.add_argument("--file-id", required=True, help="Google Drive file ID")
parser.add_argument(
"--dest", required=True, help="Local destination path or directory"
)
args = parser.parse_args()
drive = build_drive(readonly=True)
download_file(drive, args.file_id, args.dest)
if __name__ == "__main__":
main()
FILE:scripts/drive_client.py
"""
Shared Google Drive API v3 client factory.
Usage (imported by other scripts):
from drive_client import build_drive
Environment variables:
GOOGLE_API_KEY — for read-only access to public files
GOOGLE_SERVICE_ACCOUNT_JSON — path to service-account JSON file (read+write)
"""
import os
import sys
try:
from googleapiclient.discovery import build
from google.oauth2 import service_account
except ImportError:
sys.exit(
"Missing dependencies. Run:\n"
" pip install google-api-python-client google-auth google-auth-httplib2"
)
SCOPES = ["https://www.googleapis.com/auth/drive"]
def build_drive(readonly: bool = False):
"""
Return an authenticated Drive API v3 service object.
Auth resolution order:
1. GOOGLE_SERVICE_ACCOUNT_JSON → service account credentials (read+write)
2. GOOGLE_API_KEY → API key (read-only public files only)
Args:
readonly: When True, accept API-key-only auth. When False, require
a service account for write operations.
Returns:
googleapiclient.discovery.Resource
"""
sa_path = os.environ.get("GOOGLE_SERVICE_ACCOUNT_JSON")
api_key = os.environ.get("GOOGLE_API_KEY")
if sa_path:
if not os.path.isfile(sa_path):
sys.exit(f"Service account file not found: {sa_path}")
creds = service_account.Credentials.from_service_account_file(
sa_path, scopes=SCOPES
)
return build("drive", "v3", credentials=creds)
if api_key and readonly:
return build("drive", "v3", developerKey=api_key)
if not readonly:
sys.exit(
"Write operation requires GOOGLE_SERVICE_ACCOUNT_JSON env var.\n"
"Set it to the path of your service account JSON file."
)
sys.exit(
"No credentials found.\n"
"Set GOOGLE_API_KEY (read-only) or GOOGLE_SERVICE_ACCOUNT_JSON (read+write)."
)
FILE:scripts/list_files.py
#!/usr/bin/env python3
"""
List files and folders inside a Google Drive folder.
Usage:
python list_files.py --folder-id <FOLDER_ID> [--json]
Output:
Tab-separated table (default) or JSON array (--json)
Auth:
GOOGLE_API_KEY — public folders (read-only)
GOOGLE_SERVICE_ACCOUNT_JSON — service-account owned folders
"""
import argparse
import json
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from drive_client import build_drive
def list_files(drive, folder_id: str) -> list[dict]:
results = []
page_token = None
while True:
resp = (
drive.files()
.list(
q=f"'{folder_id}' in parents and trashed=false",
fields="nextPageToken, files(id, name, mimeType, size, modifiedTime)",
pageToken=page_token,
supportsAllDrives=True,
includeItemsFromAllDrives=True,
)
.execute()
)
results.extend(resp.get("files", []))
page_token = resp.get("nextPageToken")
if not page_token:
break
return results
def main():
parser = argparse.ArgumentParser(description="List files in a Google Drive folder")
parser.add_argument("--folder-id", required=True, help="Google Drive folder ID")
parser.add_argument(
"--json", action="store_true", dest="as_json", help="Output as JSON"
)
args = parser.parse_args()
drive = build_drive(readonly=True)
files = list_files(drive, args.folder_id)
if args.as_json:
print(json.dumps(files, indent=2))
else:
print(f"{'ID':<45} {'MIME TYPE':<45} {'SIZE':>10} NAME")
print("-" * 120)
for f in files:
size = f.get("size", "-")
print(f"{f['id']:<45} {f['mimeType']:<45} {size:>10} {f['name']}")
print(f"\n{len(files)} item(s) found.")
if __name__ == "__main__":
main()
FILE:scripts/set_permissions.py
#!/usr/bin/env python3
"""
Manage permissions on a Google Drive file or folder.
Usage:
# Make public (anyone with link can read):
python set_permissions.py --file-id <ID> --public
# Share with a specific user:
python set_permissions.py --file-id <ID> --email [email protected] [--role writer]
# List current permissions:
python set_permissions.py --file-id <ID> --list
# Remove a permission by permission ID:
python set_permissions.py --file-id <ID> --remove <PERMISSION_ID>
Roles: reader | commenter | writer | fileOrganizer | organizer | owner
Auth:
GOOGLE_SERVICE_ACCOUNT_JSON — required (write operation)
"""
import argparse
import json
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from drive_client import build_drive
def list_permissions(drive, file_id: str) -> list[dict]:
resp = drive.permissions().list(
fileId=file_id,
fields="permissions(id, role, type, emailAddress, displayName)",
supportsAllDrives=True,
).execute()
return resp.get("permissions", [])
def make_public(drive, file_id: str) -> str:
result = drive.permissions().create(
fileId=file_id,
body={"role": "reader", "type": "anyone"},
fields="id",
supportsAllDrives=True,
).execute()
return result["id"]
def share_with_user(
drive, file_id: str, email: str, role: str = "writer"
) -> str:
result = drive.permissions().create(
fileId=file_id,
body={"role": role, "type": "user", "emailAddress": email},
sendNotificationEmail=False,
fields="id",
supportsAllDrives=True,
).execute()
return result["id"]
def remove_permission(drive, file_id: str, permission_id: str) -> None:
drive.permissions().delete(
fileId=file_id,
permissionId=permission_id,
supportsAllDrives=True,
).execute()
def main():
parser = argparse.ArgumentParser(
description="Manage Google Drive file/folder permissions"
)
parser.add_argument("--file-id", required=True, help="Google Drive file/folder ID")
action = parser.add_mutually_exclusive_group(required=True)
action.add_argument(
"--public", action="store_true", help="Make file readable by anyone with link"
)
action.add_argument("--email", default=None, help="Share with a specific user email")
action.add_argument(
"--list", action="store_true", dest="list_perms", help="List current permissions"
)
action.add_argument(
"--remove", default=None, metavar="PERMISSION_ID", help="Remove a permission by ID"
)
parser.add_argument(
"--role",
default="writer",
choices=["reader", "commenter", "writer", "fileOrganizer", "organizer"],
help="Role for --email (default: writer)",
)
args = parser.parse_args()
drive = build_drive(readonly=False)
file_id = args.file_id
if args.list_perms:
perms = list_permissions(drive, file_id)
print(json.dumps(perms, indent=2))
elif args.public:
perm_id = make_public(drive, file_id)
print("Made public (reader, anyone with link)")
print(f"Permission ID: {perm_id}")
elif args.email:
perm_id = share_with_user(drive, file_id, args.email, args.role)
print(f"Shared with {args.email} as {args.role}")
print(f"Permission ID: {perm_id}")
elif args.remove:
remove_permission(drive, file_id, args.remove)
print(f"Removed permission: {args.remove}")
if __name__ == "__main__":
main()
FILE:scripts/update_file.py
#!/usr/bin/env python3
"""
Update a Google Drive file — rename, move, or replace content.
Usage:
# Rename:
python update_file.py --file-id <ID> --name <NEW_NAME>
# Move to a different folder:
python update_file.py --file-id <ID> --move-to <NEW_PARENT_ID> --old-parent <OLD_PARENT_ID>
# Replace file content:
python update_file.py --file-id <ID> --src <LOCAL_PATH> [--mime <MIME_TYPE>]
# Combine rename + content update:
python update_file.py --file-id <ID> --name <NEW_NAME> --src <LOCAL_PATH>
Auth:
GOOGLE_SERVICE_ACCOUNT_JSON — required (write operation)
"""
import argparse
import mimetypes
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from drive_client import build_drive
from googleapiclient.http import MediaFileUpload
def main():
parser = argparse.ArgumentParser(description="Update a Google Drive file")
parser.add_argument("--file-id", required=True, help="ID of the file to update")
parser.add_argument("--name", default=None, help="New file name (rename)")
parser.add_argument("--move-to", default=None, help="New parent folder ID (move)")
parser.add_argument(
"--old-parent",
default=None,
help="Current parent folder ID (required when --move-to is used)",
)
parser.add_argument("--src", default=None, help="Local file to replace content with")
parser.add_argument("--mime", default=None, help="MIME type for content update")
args = parser.parse_args()
if args.move_to and not args.old_parent:
sys.exit("--old-parent is required when using --move-to")
if not any([args.name, args.move_to, args.src]):
sys.exit("Specify at least one of: --name, --move-to, --src")
drive = build_drive(readonly=False)
file_id = args.file_id
# Build metadata update body
body: dict = {}
if args.name:
body["name"] = args.name
# Prepare media if replacing content
media = None
if args.src:
if not os.path.isfile(args.src):
sys.exit(f"File not found: {args.src}")
mime_type = args.mime or mimetypes.guess_type(args.src)[0] or "application/octet-stream"
media = MediaFileUpload(args.src, mimetype=mime_type, resumable=True)
# Prepare kwargs
kwargs: dict = dict(
fileId=file_id,
body=body,
fields="id, name",
supportsAllDrives=True,
)
if args.move_to:
kwargs["addParents"] = args.move_to
kwargs["removeParents"] = args.old_parent
if media:
kwargs["media_body"] = media
result = drive.files().update(**kwargs).execute()
if args.name:
print(f"Renamed to: {result.get('name', args.name)}")
if args.move_to:
print(f"Moved to folder: {args.move_to}")
if args.src:
print(f"Content replaced from: {args.src}")
print(f"File ID: {file_id}")
if __name__ == "__main__":
main()
FILE:scripts/upload_file.py
#!/usr/bin/env python3
"""
Upload a local file to Google Drive.
Usage:
# Upload from local path:
python upload_file.py --src <LOCAL_PATH> [--name <DRIVE_NAME>] [--parent-id <FOLDER_ID>] [--mime <MIME_TYPE>]
# Make newly uploaded file public (anyone with link can read):
python upload_file.py --src photo.png --parent-id <ID> --make-public
Output:
Prints the new file ID.
Auth:
GOOGLE_SERVICE_ACCOUNT_JSON — required (write operation)
"""
import argparse
import mimetypes
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from drive_client import build_drive
from googleapiclient.http import MediaFileUpload
def upload_file(
drive,
local_path: str,
name: str,
parent_id: str | None = None,
mime_type: str | None = None,
) -> str:
if not mime_type:
guessed, _ = mimetypes.guess_type(local_path)
mime_type = guessed or "application/octet-stream"
metadata: dict = {"name": name}
if parent_id:
metadata["parents"] = [parent_id]
media = MediaFileUpload(local_path, mimetype=mime_type, resumable=True)
file = (
drive.files()
.create(body=metadata, media_body=media, fields="id", supportsAllDrives=True)
.execute()
)
return file["id"]
def make_public(drive, file_id: str) -> None:
drive.permissions().create(
fileId=file_id,
body={"role": "reader", "type": "anyone"},
supportsAllDrives=True,
).execute()
def main():
parser = argparse.ArgumentParser(description="Upload a file to Google Drive")
parser.add_argument("--src", required=True, help="Local file path to upload")
parser.add_argument("--name", default=None, help="Name on Drive (default: filename)")
parser.add_argument("--parent-id", default=None, help="Parent folder ID")
parser.add_argument("--mime", default=None, help="MIME type (auto-detected if omitted)")
parser.add_argument(
"--make-public",
action="store_true",
help="Make the uploaded file publicly readable",
)
args = parser.parse_args()
if not os.path.isfile(args.src):
sys.exit(f"File not found: {args.src}")
drive_name = args.name or os.path.basename(args.src)
drive = build_drive(readonly=False)
file_id = upload_file(drive, args.src, drive_name, args.parent_id, args.mime)
print(f"Uploaded '{drive_name}'")
print(f"File ID: {file_id}")
if args.make_public:
make_public(drive, file_id)
print("Visibility: public (anyone with the link can read)")
if __name__ == "__main__":
main()
Integrate with Odoo 17 via XML-RPC API. Use when: managing projects, tasks, calendar events, time off requests, helpdesk tickets, knowledge articles, documen...
---
name: vnclaw-odoo-skill
description: "Integrate with Odoo 17 via XML-RPC API. Use when: managing projects, tasks, calendar events, time off requests, helpdesk tickets, knowledge articles, documents, or timesheets in Odoo 17. Supports read, create, and update operations only. No delete allowed."
argument-hint: "Describe what you want to do in Odoo 17 (e.g., 'list all open tasks in project X', 'create a time off request')"
---
# VNClaw — Odoo 17 Integration Skill
## Execution Rules (MUST follow)
1. **ALWAYS run the command immediately.** Never ask the user to confirm or explain what the command does before running it. Translate the request → pick the command → execute it in one step.
2. **Never say "Would you like me to execute this?"** — just execute it.
3. **Never ask "Do you want me to run this?"** — just run it.
4. **If the command fails**, show the error and try to fix it. Do not ask the user for help diagnosing unless you have exhausted all options.
5. **Credentials**: If env vars are missing, inform the user which variable is missing and stop. Do not ask for the value interactively.
## Path Resolution
**IMPORTANT:** All commands below use `SKILL_SCRIPTS` as shorthand for the absolute path to the scripts directory. The skill may be installed in two locations — resolve it by checking both:
```bash
# Resolve SKILL_SCRIPTS from workspace (.github/skills/) or global (~/.vscode/extensions/ or ~/skills/)
SKILL_SCRIPTS="$(
# 1. Check workspace: .github/skills/vnclaw-odoo-skill/scripts
_ws_root="$(git rev-parse --show-toplevel 2>/dev/null || pwd)"
_ws_path="$_ws_root/.github/skills/vnclaw-odoo-skill/scripts"
# 2. Global fallback locations
_global_path="$HOME/.vnclaw/skills/vnclaw-odoo-skill/scripts"
_global_path2="$HOME/.openclaw/skills/vnclaw-odoo-skill/scripts"
if [ -d "$_ws_path" ]; then
echo "$_ws_path"
elif [ -d "$_global_path" ]; then
echo "$_global_path"
elif [ -d "$_global_path2" ]; then
echo "$_global_path2"
else
# Last resort: search filesystem
find "$HOME" /opt -type d -name vnclaw-odoo-skill -path '*/skills/*' 2>/dev/null | head -1 | sed 's|$|/scripts|'
fi
)"
echo "SKILL_SCRIPTS=$SKILL_SCRIPTS"
```
Possible install locations:
| Location type | Path |
|---------------|------|
| Workspace skill | `<git-root>/.github/skills/vnclaw-odoo-skill/scripts/` |
| User global (vnclaw) | `~/.vnclaw/skills/vnclaw-odoo-skill/scripts/` |
| User global (openclaw) | `~/.openclaw/skills/vnclaw-odoo-skill/scripts/` |
## Environment Variables (Required)
Credentials are loaded from environment variables. **Never hardcode credentials.**
| Variable | Description |
|----------|-------------|
| `ODOO_URL` | Base URL (e.g., `https://mycompany.odoo.com`) |
| `ODOO_DB` | Database name |
| `ODOO_USERNAME` | Login username (email) |
| `ODOO_API_KEY` | API key or password |
## Common Features (All Modules)
All module scripts share these capabilities:
- **`--my`** — Filter to current authenticated user's records
- **Name-based lookups** — Use `--user "Alice"`, `--project "Website"` etc. instead of IDs
- **Date shortcuts** — `--today`, `--yesterday`, `--this-week`, `--last-week`, `--this-month`, `--last-month`, `--this-year`
- **Custom date range** — `--date-from 2026-03-01 --date-to 2026-03-31`
- **Log notes** — `log-note` action to post internal notes on records (where supported)
- **Notify** — `notify` action to schedule activity notifications by user name (where supported)
- **JSON output** — All scripts output JSON to stdout, logs go to stderr
> **DATE FIELD RULE for tasks.py**: Date shortcuts filter `date_deadline` by default.
> To filter by **when a task was created**, always add `--date-field created`.
> To filter by **when a task was last updated**, use `--date-field updated`.
## Natural Language → Command Mapping
Use this table to translate common user requests into the correct command:
| User says... | Command |
|---|---|
| "my tasks" | `tasks.py list --my` |
| "my tasks this week" / "tasks with deadline this week" | `tasks.py list --my --this-week` |
| "tasks I created this week" / "tasks created this week" | `tasks.py list --my --this-week --date-field created` |
| "tasks created today" | `tasks.py list --my --today --date-field created` |
| "tasks created this month" | `tasks.py list --my --this-month --date-field created` |
| "tasks updated today" / "recently modified tasks" | `tasks.py list --my --today --date-field updated` |
| "my timesheets this week" | `timesheets.py list --my --this-week` |
| "log 2 hours on project X" | `timesheets.py log --project "X" --hours 2 --description "..."` |
| "my timesheet summary this month" | `timesheets.py summary --my --this-month` |
| "my calendar today" / "my meetings today" | `calendar_events.py list --my --today` |
| "my tickets" / "tickets assigned to me" | `helpdesk.py list --my` |
| "my leave requests this year" | `time_off.py list --my --this-year` |
## Quick Decision: Which Script to Use
| User wants to... | Script | Example |
|-------------------|--------|---------|
| List/view/create/update **tasks** | `tasks.py` | `python3 $SKILL_SCRIPTS/tasks.py list --my --this-week` |
| List/view/create/update **projects** | `projects.py` | `python3 $SKILL_SCRIPTS/projects.py list --my` |
| Log/view/update **timesheets** | `timesheets.py` | `python3 $SKILL_SCRIPTS/timesheets.py log --project "Website" --hours 2 --description "review"` |
| List/view/create/update **calendar events** | `calendar_events.py` | `python3 $SKILL_SCRIPTS/calendar_events.py list --my --today` |
| List/view/create/update **helpdesk tickets** | `helpdesk.py` | `python3 $SKILL_SCRIPTS/helpdesk.py list --my --this-week` |
| List/view/create/update **time off requests** | `time_off.py` | `python3 $SKILL_SCRIPTS/time_off.py list --my --this-year` |
| List/view/create/update **knowledge articles** | `knowledge.py` | `python3 $SKILL_SCRIPTS/knowledge.py list --my --published` |
| List/view/create/update **documents** | `documents.py` | `python3 $SKILL_SCRIPTS/documents.py list --folder "HR"` |
| Access **any user-defined / custom model** | `custom_app.py` | `python3 $SKILL_SCRIPTS/custom_app.py list crm.lead --my --this-month` |
| **Test connection** to Odoo | `odoo_core.py` | `python3 $SKILL_SCRIPTS/odoo_core.py test-connection` |
---
## Module: Tasks (`tasks.py`)
Manages `project.task`. Actions: `list`, `get`, `create`, `update`, `log-note`, `notify`, `stages`.
**Date filter field** — use `--date-field` to choose which date to filter on:
- `deadline` *(default)* — filters by `date_deadline`
- `created` — filters by `create_date` (when the task was created)
- `updated` — filters by `write_date` (when the task was last modified)
```bash
# My tasks
python3 $SKILL_SCRIPTS/tasks.py list --my
# My tasks CREATED this week ← "created" requires --date-field created
python3 $SKILL_SCRIPTS/tasks.py list --my --this-week --date-field created
# My tasks CREATED today
python3 $SKILL_SCRIPTS/tasks.py list --my --today --date-field created
# My tasks CREATED this month
python3 $SKILL_SCRIPTS/tasks.py list --my --this-month --date-field created
# Tasks with a DEADLINE this week (default behavior, --date-field deadline is implicit)
python3 $SKILL_SCRIPTS/tasks.py list --my --this-week
# Tasks assigned to a specific user (by name) created this week
python3 $SKILL_SCRIPTS/tasks.py list --user "Alice" --this-week --date-field created
# Tasks in a project (by name)
python3 $SKILL_SCRIPTS/tasks.py list --project "Website Redesign"
# Search tasks by keyword
python3 $SKILL_SCRIPTS/tasks.py list --search "login bug"
# Overdue tasks
python3 $SKILL_SCRIPTS/tasks.py list --overdue
# Tasks in a stage (by name)
python3 $SKILL_SCRIPTS/tasks.py list --stage "In Progress"
# Tasks by tag
python3 $SKILL_SCRIPTS/tasks.py list --tag "urgent"
# Get task detail
python3 $SKILL_SCRIPTS/tasks.py get 42
# Create a task (project and assignee by name)
python3 $SKILL_SCRIPTS/tasks.py create --name "Fix login bug" --project "Website" --assign "Alice" --deadline 2026-04-01
# Update task stage (by name) and assignee (by name)
python3 $SKILL_SCRIPTS/tasks.py update 42 --stage "Done" --assign "Bob"
# Log an internal note on a task
python3 $SKILL_SCRIPTS/tasks.py log-note 42 --body "Waiting for client feedback"
# Notify a user via activity
python3 $SKILL_SCRIPTS/tasks.py notify 42 --user "Alice" --summary "Please review this task"
# List available stages
python3 $SKILL_SCRIPTS/tasks.py stages
```
## Module: Projects (`projects.py`)
Manages `project.project`. Actions: `list`, `get`, `create`, `update`, `log-note`, `notify`, `stages`.
```bash
# Projects I manage
python3 $SKILL_SCRIPTS/projects.py list --my
# Filter by manager name
python3 $SKILL_SCRIPTS/projects.py list --manager "Alice"
# Search projects
python3 $SKILL_SCRIPTS/projects.py list --search "Website"
# Active projects only + favorites
python3 $SKILL_SCRIPTS/projects.py list --active-only --favorites
# Get project details
python3 $SKILL_SCRIPTS/projects.py get 1
# Create a project with manager by name
python3 $SKILL_SCRIPTS/projects.py create --name "Website Redesign" --manager "Alice"
# Update project stage and dates
python3 $SKILL_SCRIPTS/projects.py update 1 --stage "In Progress" --date-start 2026-04-01 --date-end 2026-06-30
# Log note on project
python3 $SKILL_SCRIPTS/projects.py log-note 1 --body "Kickoff meeting completed"
# Notify project manager
python3 $SKILL_SCRIPTS/projects.py notify 1 --user "Alice" --summary "Budget review needed"
# List project stages
python3 $SKILL_SCRIPTS/projects.py stages
```
## Module: Timesheets (`timesheets.py`)
Manages `account.analytic.line`. Actions: `list`, `summary`, `log`, `update`.
```bash
# My timesheets this week
python3 $SKILL_SCRIPTS/timesheets.py list --my --this-week
# Timesheets by user name
python3 $SKILL_SCRIPTS/timesheets.py list --user "Alice" --this-month
# Timesheets by employee name
python3 $SKILL_SCRIPTS/timesheets.py list --employee "Alice Smith"
# Timesheets for a project (by name)
python3 $SKILL_SCRIPTS/timesheets.py list --project "Website Redesign"
# Custom date range
python3 $SKILL_SCRIPTS/timesheets.py list --date-from 2026-03-01 --date-to 2026-03-31
# Summary (hours grouped by project/task)
python3 $SKILL_SCRIPTS/timesheets.py summary --my --this-month
# Log a timesheet entry (project by name)
python3 $SKILL_SCRIPTS/timesheets.py log --project "Website Redesign" --hours 2.5 --description "Code review"
# Log for specific date
python3 $SKILL_SCRIPTS/timesheets.py log --project "Website" --hours 3 --description "Dev" --date 2026-03-17
# Update a timesheet entry
python3 $SKILL_SCRIPTS/timesheets.py update 15 --hours 4 --description "Updated"
```
## Module: Calendar Events (`calendar_events.py`)
Manages `calendar.event`. Actions: `list`, `get`, `create`, `update`.
```bash
# My events today
python3 $SKILL_SCRIPTS/calendar_events.py list --my --today
# My events this week
python3 $SKILL_SCRIPTS/calendar_events.py list --my --this-week
# Events where a specific person is attending (by name)
python3 $SKILL_SCRIPTS/calendar_events.py list --attendee "Alice"
# Events by organizer
python3 $SKILL_SCRIPTS/calendar_events.py list --organizer "Bob"
# Search events
python3 $SKILL_SCRIPTS/calendar_events.py list --search "Sprint"
# Custom date range
python3 $SKILL_SCRIPTS/calendar_events.py list --date-from 2026-03-20 --date-to 2026-03-25
# Get event details
python3 $SKILL_SCRIPTS/calendar_events.py get 10
# Create a meeting with attendees by name
python3 $SKILL_SCRIPTS/calendar_events.py create --name "Sprint Planning" --start "2026-03-20 09:00:00" --stop "2026-03-20 10:00:00" --attendees "Alice, Bob, Carol" --location "Room A"
# Reschedule
python3 $SKILL_SCRIPTS/calendar_events.py update 10 --start "2026-03-20 14:00:00" --stop "2026-03-20 15:00:00"
```
## Module: Helpdesk (`helpdesk.py`)
Manages `helpdesk.ticket`. Actions: `list`, `get`, `create`, `update`, `log-note`, `notify`, `stages`.
```bash
# My assigned tickets
python3 $SKILL_SCRIPTS/helpdesk.py list --my
# Tickets assigned to a user (by name)
python3 $SKILL_SCRIPTS/helpdesk.py list --user "Alice"
# Tickets from a customer (by name)
python3 $SKILL_SCRIPTS/helpdesk.py list --customer "Acme Corp"
# Search + date filter
python3 $SKILL_SCRIPTS/helpdesk.py list --search "login" --this-week
# High-priority tickets by team
python3 $SKILL_SCRIPTS/helpdesk.py list --priority 2 --team "Support"
# Filter by stage name
python3 $SKILL_SCRIPTS/helpdesk.py list --stage "In Progress"
# Get ticket details
python3 $SKILL_SCRIPTS/helpdesk.py get 5
# Create a ticket (assign and customer by name)
python3 $SKILL_SCRIPTS/helpdesk.py create --name "Login page broken" --team "Support" --assign "Alice" --customer "Acme Corp"
# Update ticket stage and assignee (by name)
python3 $SKILL_SCRIPTS/helpdesk.py update 5 --stage "Solved" --assign "Bob"
# Log note on ticket
python3 $SKILL_SCRIPTS/helpdesk.py log-note 5 --body "Escalated to engineering"
# Notify a user
python3 $SKILL_SCRIPTS/helpdesk.py notify 5 --user "Alice" --summary "Please investigate"
# List available stages
python3 $SKILL_SCRIPTS/helpdesk.py stages
```
## Module: Time Off (`time_off.py`)
Manages `hr.leave`. Actions: `list`, `get`, `create`, `update`, `leave-types`.
```bash
# My leave requests
python3 $SKILL_SCRIPTS/time_off.py list --my
# Leave requests by user name
python3 $SKILL_SCRIPTS/time_off.py list --user "Alice"
# By employee name
python3 $SKILL_SCRIPTS/time_off.py list --employee "Alice Smith"
# Filter by state
python3 $SKILL_SCRIPTS/time_off.py list --state validate
# Filter by leave type name
python3 $SKILL_SCRIPTS/time_off.py list --leave-type "Sick Time Off"
# This year's leaves
python3 $SKILL_SCRIPTS/time_off.py list --my --this-year
# List available leave types
python3 $SKILL_SCRIPTS/time_off.py leave-types
# Create a leave request (employee and type by name)
python3 $SKILL_SCRIPTS/time_off.py create --date-from "2026-03-25 08:00:00" --date-to "2026-03-26 17:00:00" --leave-type "Paid Time Off" --name "Personal day"
# Get leave details
python3 $SKILL_SCRIPTS/time_off.py get 12
```
## Module: Knowledge (`knowledge.py`)
Manages `knowledge.article`. Actions: `list`, `get`, `create`, `update`.
```bash
# My articles
python3 $SKILL_SCRIPTS/knowledge.py list --my
# Articles by author name
python3 $SKILL_SCRIPTS/knowledge.py list --author "Alice"
# Published articles only
python3 $SKILL_SCRIPTS/knowledge.py list --published
# Root-level articles only
python3 $SKILL_SCRIPTS/knowledge.py list --root-only
# Search articles
python3 $SKILL_SCRIPTS/knowledge.py list --search "onboarding"
# Filter by category
python3 $SKILL_SCRIPTS/knowledge.py list --category workspace
# Get article with body content
python3 $SKILL_SCRIPTS/knowledge.py get 5
# Create an article under a parent (by name)
python3 $SKILL_SCRIPTS/knowledge.py create --name "Setup Guide" --parent-name "Engineering" --body "<h1>Getting Started</h1>"
# Update article content
python3 $SKILL_SCRIPTS/knowledge.py update 5 --body "<p>Updated content</p>"
```
## Module: Documents (`documents.py`)
Manages `documents.document`. Actions: `list`, `get`, `create`, `update`, `folders`.
```bash
# My documents
python3 $SKILL_SCRIPTS/documents.py list --my
# Documents by owner name
python3 $SKILL_SCRIPTS/documents.py list --owner "Alice"
# Documents in a folder (by name)
python3 $SKILL_SCRIPTS/documents.py list --folder "HR Documents"
# Search documents
python3 $SKILL_SCRIPTS/documents.py list --search "invoice"
# Filter by type and date
python3 $SKILL_SCRIPTS/documents.py list --type binary --this-month
# Filter by tag
python3 $SKILL_SCRIPTS/documents.py list --tag "contracts"
# List available folders
python3 $SKILL_SCRIPTS/documents.py folders
# Get document details
python3 $SKILL_SCRIPTS/documents.py get 3
# Create a URL-type document (folder by name, owner by name)
python3 $SKILL_SCRIPTS/documents.py create --name "Design Spec" --folder "Engineering" --owner "Alice" --url "https://example.com/spec"
# Update document folder
python3 $SKILL_SCRIPTS/documents.py update 3 --folder "Archive"
```
## Module: Custom App (`custom_app.py`)
For **any Odoo model not covered by the dedicated scripts** — CRM, Sales, Purchase, Inventory, Manufacturing, Accounting, or any custom app installed in the Odoo instance.
Recommended workflow:
1. Discover the model technical name with `models`
2. Inspect available fields with `fields`
3. Then `list`, `get`, `create`, `update` as needed
```bash
# --- Discovery ---
# Search for models by name (find the technical model name)
python3 $SKILL_SCRIPTS/custom_app.py models --search "CRM"
python3 $SKILL_SCRIPTS/custom_app.py models --search "Sale Order"
python3 $SKILL_SCRIPTS/custom_app.py models --module crm
# Inspect fields of a model
python3 $SKILL_SCRIPTS/custom_app.py fields crm.lead
python3 $SKILL_SCRIPTS/custom_app.py fields crm.lead --search "stage"
python3 $SKILL_SCRIPTS/custom_app.py fields sale.order --type many2one
# --- Reading ---
# List records with auto-detected default fields
python3 $SKILL_SCRIPTS/custom_app.py list crm.lead
# My CRM leads this month
python3 $SKILL_SCRIPTS/custom_app.py list crm.lead --my --this-month
# Search by name + assigned user
python3 $SKILL_SCRIPTS/custom_app.py list crm.lead --search "Acme" --user "Alice"
# Custom domain + explicit fields
python3 $SKILL_SCRIPTS/custom_app.py list sale.order \
--domain '[["state","=","sale"]]' \
--fields '["id","name","partner_id","amount_total","date_order"]' \
--order "date_order desc" --limit 20
# Filter by date field (custom field name)
python3 $SKILL_SCRIPTS/custom_app.py list sale.order --date-field date_order --this-month
# Get a single record (all fields)
python3 $SKILL_SCRIPTS/custom_app.py get crm.lead 42
# Count matching records
python3 $SKILL_SCRIPTS/custom_app.py count crm.lead --domain '[["stage_id.name","=","Won"]]'
# --- Writing ---
# Create a record
python3 $SKILL_SCRIPTS/custom_app.py create crm.lead \
--values '{"name":"New Opportunity","partner_id":5,"expected_revenue":10000}'
# Update a single record
python3 $SKILL_SCRIPTS/custom_app.py update crm.lead 42 \
--values '{"stage_id":3,"priority":"1"}'
# Update multiple records at once
python3 $SKILL_SCRIPTS/custom_app.py update crm.lead --ids '[42,43,44]' \
--values '{"user_id":7}'
# --- Notes & Activities ---
# Log an internal note on any record
python3 $SKILL_SCRIPTS/custom_app.py log-note crm.lead 42 --body "Called the client, follow up next week"
# Schedule an activity notification (user resolved by name)
python3 $SKILL_SCRIPTS/custom_app.py notify crm.lead 42 \
--user "Alice" --summary "Please follow up with this lead"
```
## Generic Client (`odoo_core.py`)
For raw low-level access or testing the connection:
```bash
# Test connection
python3 $SKILL_SCRIPTS/odoo_core.py test-connection
# Search/read with raw domain
python3 $SKILL_SCRIPTS/odoo_core.py search-read res.partner --fields '["name","email","phone"]' --limit 20
# Get field definitions
python3 $SKILL_SCRIPTS/odoo_core.py fields res.partner
```
## Security Rules
1. **No delete** — `unlink` is blocked in all scripts.
2. **Sensitive models blocked** — `ir.rule`, `ir.model.access`, `ir.config_parameter`, etc.
3. **Read-only models** — `res.users`, `hr.employee`, stage/type models cannot be modified.
4. **Credentials from environment only** — Never pass credentials as arguments.
5. **Audit logging** — All operations are logged to stderr with timestamps.
## Troubleshooting
| Issue | Solution |
|-------|----------|
| `Access Denied` | Check all 4 env vars: `ODOO_URL`, `ODOO_DB`, `ODOO_USERNAME`, `ODOO_API_KEY` |
| `Model not found` | Module may not be installed in your Odoo instance |
| `No module named odoo_core` | Run the script from the scripts directory or use absolute path |
| Script not found | Resolve `SKILL_SCRIPTS` path as shown in Path Resolution above |
## Field Reference
See [Odoo Models Reference](./references/odoo-models.md) for detailed field lists per model.
FILE:README.md
# VNClaw — Odoo Integration Skill
A GitHub Copilot skill that lets you interact with **Odoo 17** via the XML-RPC API directly from your AI assistant. Supports reading, creating, and updating records across the most common Odoo modules.
> **No delete operations are supported** — by design, to prevent accidental data loss.
---
## Table of Contents
- [Overview](#overview)
- [Installation](#installation)
- [Configuration](#configuration)
- [Scripts](#scripts)
- [Common Flags](#common-flags)
- [Usage Examples](#usage-examples)
- [Security](#security)
---
## Overview
This skill translates natural language requests into Python XML-RPC commands against your Odoo 17 instance. It covers:
| Module | Script | Odoo Model |
|---|---|---|
| Tasks | `tasks.py` | `project.task` |
| Projects | `projects.py` | `project.project` |
| Timesheets | `timesheets.py` | `account.analytic.line` |
| Calendar Events | `calendar_events.py` | `calendar.event` |
| Helpdesk Tickets | `helpdesk.py` | `helpdesk.ticket` |
| Time Off | `time_off.py` | `hr.leave` |
| Knowledge Articles | `knowledge.py` | `knowledge.article` |
| Documents | `documents.py` | `documents.document` |
| Any Custom Model | `custom_app.py` | *(any)* |
| Connection Test | `odoo_core.py` | — |
---
## Installation
The skill can be installed in two locations:
| Type | Path |
|---|---|
| Workspace skill | `<git-root>/.github/skills/vnclaw-odoo-skill/` |
| User global (vnclaw) | `~/.vnclaw/skills/vnclaw-odoo-skill/` |
| User global (openclaw) | `~/.openclaw/skills/vnclaw-odoo-skill/` |
The skill auto-detects its location at runtime — no path configuration needed.
---
## Configuration
Credentials are loaded from **environment variables**. Never hardcode them.
| Variable | Description | Example |
|---|---|---|
| `ODOO_URL` | Base URL of your Odoo instance (no trailing slash) | `https://mycompany.odoo.com` |
| `ODOO_DB` | Database name (case-sensitive) | `mycompany-production` |
| `ODOO_USERNAME` | Login username (email) | `[email protected]` |
| `ODOO_API_KEY` | API key or password | `abc123...` |
Copy [`assets/example.env`](assets/example.env) to `.env` and fill in your values:
```bash
cp assets/example.env .env
# Edit .env with your credentials
```
> Generate an API key in Odoo: **Settings → Users → Select user → Account Security → API Keys**
**Never commit `.env` to version control.**
### Test your connection
```bash
python3 scripts/odoo_core.py test-connection
```
---
## Scripts
### `tasks.py` — Tasks
```bash
# List my tasks
python3 scripts/tasks.py list --my
# Tasks I created this week
python3 scripts/tasks.py list --my --this-week --date-field created
# Tasks with a deadline this week
python3 scripts/tasks.py list --my --this-week
# Overdue tasks
python3 scripts/tasks.py list --overdue
# Create a task
python3 scripts/tasks.py create --name "Fix login bug" --project "Website" --assign "Alice" --deadline 2026-04-01
# Update task stage and assignee
python3 scripts/tasks.py update 42 --stage "Done" --assign "Bob"
# Log an internal note
python3 scripts/tasks.py log-note 42 --body "Waiting for client feedback"
```
---
### `projects.py` — Projects
```bash
# Projects I manage
python3 scripts/projects.py list --my
# Create a project
python3 scripts/projects.py create --name "Website Redesign" --manager "Alice"
# Update project stage and dates
python3 scripts/projects.py update 1 --stage "In Progress" --date-start 2026-04-01 --date-end 2026-06-30
```
---
### `timesheets.py` — Timesheets
```bash
# My timesheets this week
python3 scripts/timesheets.py list --my --this-week
# Summary grouped by project/task
python3 scripts/timesheets.py summary --my --this-month
# Log 2.5 hours
python3 scripts/timesheets.py log --project "Website Redesign" --hours 2.5 --description "Code review"
```
---
### `calendar_events.py` — Calendar Events
```bash
# My events today
python3 scripts/calendar_events.py list --my --today
# Create a meeting with attendees
python3 scripts/calendar_events.py create \
--name "Sprint Planning" \
--start "2026-03-20 09:00:00" \
--stop "2026-03-20 10:00:00" \
--attendees "Alice, Bob, Carol" \
--location "Room A"
```
---
### `helpdesk.py` — Helpdesk Tickets
```bash
# My assigned tickets
python3 scripts/helpdesk.py list --my
# Create a ticket
python3 scripts/helpdesk.py create --name "Login page broken" --team "Support" --assign "Alice" --customer "Acme Corp"
# Update ticket stage
python3 scripts/helpdesk.py update 5 --stage "Solved"
```
---
### `time_off.py` — Time Off
```bash
# My leave requests this year
python3 scripts/time_off.py list --my --this-year
# List available leave types
python3 scripts/time_off.py leave-types
# Create a leave request
python3 scripts/time_off.py create \
--date-from "2026-03-25 08:00:00" \
--date-to "2026-03-26 17:00:00" \
--leave-type "Paid Time Off"
```
---
### `knowledge.py` — Knowledge Articles
```bash
# My published articles
python3 scripts/knowledge.py list --my --published
# Create an article
python3 scripts/knowledge.py create --name "Setup Guide" --parent-name "Engineering" --body "<h1>Getting Started</h1>"
```
---
### `documents.py` — Documents
```bash
# Documents in a folder
python3 scripts/documents.py list --folder "HR Documents"
# List available folders
python3 scripts/documents.py folders
# Create a URL document
python3 scripts/documents.py create --name "Design Spec" --folder "Engineering" --url "https://example.com/spec"
```
---
### `custom_app.py` — Any Odoo Model
Use this for CRM, Sales, Purchase, Inventory, or any custom app not covered by the dedicated scripts.
```bash
# Discover model technical names
python3 scripts/custom_app.py models --search "CRM"
# Inspect available fields
python3 scripts/custom_app.py fields crm.lead
# List records
python3 scripts/custom_app.py list crm.lead --my --this-month
```
---
## Common Flags
These flags work across all module scripts:
| Flag | Description |
|---|---|
| `--my` | Filter to the authenticated user's records |
| `--user "Name"` | Filter by user name |
| `--today` | Records for today |
| `--yesterday` | Records for yesterday |
| `--this-week` | Records for this week |
| `--last-week` | Records for last week |
| `--this-month` | Records for this month |
| `--last-month` | Records for last month |
| `--this-year` | Records for this year |
| `--date-from YYYY-MM-DD` | Custom date range start |
| `--date-to YYYY-MM-DD` | Custom date range end |
### `tasks.py` — `--date-field` flag
By default, date filters on tasks apply to `date_deadline`. Use `--date-field` to change this:
| Value | Filters on |
|---|---|
| `deadline` *(default)* | `date_deadline` |
| `created` | `create_date` |
| `updated` | `write_date` |
---
## Security
- **No delete operations** — only `search_read`, `read`, `create`, and `write` are permitted.
- **Blocked models** — security-sensitive models (`ir.rule`, `ir.model.access`, `res.groups`, etc.) cannot be accessed.
- **Read-only models** — `res.users`, `hr.employee`, and similar models cannot be written to.
- **Credentials via environment variables only** — never hardcoded.
- **All output goes to stdout as JSON** — logs go to stderr for clean piping.
FILE:assets/example.env
# VNClaw — Odoo 17 Environment Variables
# Copy this file to .env and fill in your values.
# NEVER commit the .env file to version control.
# Base URL of your Odoo 17 instance (no trailing slash)
ODOO_URL=https://mycompany.odoo.com
# Database name (exact, case-sensitive)
ODOO_DB=mycompany-production
# Your Odoo login username (usually email)
[email protected]
# API key (recommended) or password
# Generate an API key: Odoo → Settings → Users → Select user → Account Security → API Keys
ODOO_API_KEY=your-api-key-here
FILE:references/odoo-models.md
# Odoo 17 Models Reference
Detailed field reference for the Odoo 17 models supported by OpenClaw.
---
## project.project — Projects
| Field | Type | Description |
|-------|------|-------------|
| `name` | Char | Project name |
| `user_id` | Many2one → res.users | Project manager |
| `partner_id` | Many2one → res.partner | Customer |
| `date_start` | Date | Start date |
| `date` | Date | End date |
| `description` | Html | Project description |
| `tag_ids` | Many2many → project.tags | Tags |
| `task_count` | Integer | Number of tasks (computed) |
| `active` | Boolean | Active (archived if false) |
| `company_id` | Many2one → res.company | Company |
| `analytic_account_id` | Many2one → account.analytic.account | Analytic account |
| `allow_timesheets` | Boolean | Allow timesheets |
| `label_tasks` | Char | Task label (e.g., "Tasks", "Issues") |
---
## project.task — Tasks
| Field | Type | Description |
|-------|------|-------------|
| `name` | Char | Task title |
| `project_id` | Many2one → project.project | Project |
| `user_ids` | Many2many → res.users | Assignees |
| `stage_id` | Many2one → project.task.type | Stage |
| `date_deadline` | Date | Deadline |
| `date_assign` | Datetime | Assignment date |
| `priority` | Selection | Priority: `0` (Normal), `1` (Important) |
| `tag_ids` | Many2many → project.tags | Tags |
| `description` | Html | Description |
| `parent_id` | Many2one → project.task | Parent task |
| `child_ids` | One2many → project.task | Sub-tasks |
| `timesheet_ids` | One2many → account.analytic.line | Timesheets |
| `effective_hours` | Float | Hours spent (computed) |
| `planned_hours` | Float | Initially planned hours |
| `remaining_hours` | Float | Remaining hours (computed) |
| `kanban_state` | Selection | `normal`, `done`, `blocked` |
| `active` | Boolean | Active |
| `partner_id` | Many2one → res.partner | Customer |
| `company_id` | Many2one → res.company | Company |
---
## calendar.event — Calendar Events
| Field | Type | Description |
|-------|------|-------------|
| `name` | Char | Event summary |
| `start` | Datetime | Start date/time |
| `stop` | Datetime | End date/time |
| `allday` | Boolean | All-day event |
| `start_date` | Date | Start date (all-day events) |
| `stop_date` | Date | End date (all-day events) |
| `duration` | Float | Duration in hours |
| `description` | Html | Description |
| `location` | Char | Location |
| `partner_ids` | Many2many → res.partner | Attendees |
| `user_id` | Many2one → res.users | Organizer |
| `recurrency` | Boolean | Recurring event |
| `privacy` | Selection | `public`, `private`, `confidential` |
| `show_as` | Selection | `busy`, `free` |
| `alarm_ids` | Many2many → calendar.alarm | Reminders |
| `categ_ids` | Many2many → calendar.event.type | Tags |
| `videocall_location` | Char | Video call URL |
---
## hr.leave — Time Off (Leave Requests)
| Field | Type | Description |
|-------|------|-------------|
| `employee_id` | Many2one → hr.employee | Employee |
| `holiday_status_id` | Many2one → hr.leave.type | Leave type |
| `date_from` | Datetime | Start date/time |
| `date_to` | Datetime | End date/time |
| `number_of_days` | Float | Duration in days (computed) |
| `name` | Char | Description / reason |
| `state` | Selection | `draft`, `confirm`, `validate1`, `validate`, `refuse` |
| `user_id` | Many2one → res.users | User |
| `department_id` | Many2one → hr.department | Department |
| `request_date_from` | Date | Requested start date |
| `request_date_to` | Date | Requested end date |
| `request_hour_from` | Selection | Start hour (half-day) |
| `request_hour_to` | Selection | End hour (half-day) |
| `request_unit_half` | Boolean | Half-day request |
---
## helpdesk.ticket — Helpdesk Tickets
| Field | Type | Description |
|-------|------|-------------|
| `name` | Char | Ticket subject |
| `team_id` | Many2one → helpdesk.team | Helpdesk team |
| `user_id` | Many2one → res.users | Assigned to |
| `partner_id` | Many2one → res.partner | Customer |
| `partner_email` | Char | Customer email |
| `description` | Html | Description |
| `stage_id` | Many2one → helpdesk.stage | Stage |
| `priority` | Selection | `0` (Low), `1` (Medium), `2` (High), `3` (Urgent) |
| `tag_ids` | Many2many → helpdesk.tag | Tags |
| `create_date` | Datetime | Created on |
| `close_date` | Datetime | Closed on |
| `assign_date` | Datetime | Assigned on |
| `sla_deadline` | Datetime | SLA deadline |
| `kanban_state` | Selection | `normal`, `done`, `blocked` |
| `ticket_type_id` | Many2one → helpdesk.ticket.type | Ticket type |
---
## knowledge.article — Knowledge Articles
| Field | Type | Description |
|-------|------|-------------|
| `name` | Char | Article title |
| `body` | Html | Article content |
| `parent_id` | Many2one → knowledge.article | Parent article |
| `child_ids` | One2many → knowledge.article | Child articles |
| `category` | Selection | `workspace`, `private`, `shared` |
| `create_uid` | Many2one → res.users | Author |
| `write_date` | Datetime | Last modified |
| `is_published` | Boolean | Published |
| `icon` | Char | Emoji icon |
| `sequence` | Integer | Sort order |
| `internal_permission` | Selection | `write`, `read`, `none` |
| `article_member_ids` | One2many → knowledge.article.member | Members |
| `favorite_count` | Integer | Favorite count |
| `is_article_item` | Boolean | Is article item |
---
## documents.document — Documents
| Field | Type | Description |
|-------|------|-------------|
| `name` | Char | Document name |
| `folder_id` | Many2one → documents.folder | Workspace/folder |
| `owner_id` | Many2one → res.users | Owner |
| `partner_id` | Many2one → res.partner | Contact |
| `type` | Selection | `binary` (file), `url`, `empty` |
| `url` | Char | URL (if type=url) |
| `datas` | Binary | File content (base64) |
| `mimetype` | Char | MIME type |
| `file_size` | Integer | File size in bytes |
| `tag_ids` | Many2many → documents.tag | Tags |
| `description` | Text | Description |
| `create_date` | Datetime | Upload date |
| `lock_uid` | Many2one → res.users | Locked by |
| `is_locked` | Boolean | Is locked |
---
## account.analytic.line — Timesheets
| Field | Type | Description |
|-------|------|-------------|
| `employee_id` | Many2one → hr.employee | Employee |
| `project_id` | Many2one → project.project | Project |
| `task_id` | Many2one → project.task | Task |
| `name` | Char | Description of work |
| `date` | Date | Date |
| `unit_amount` | Float | Duration in hours |
| `user_id` | Many2one → res.users | User |
| `company_id` | Many2one → res.company | Company |
| `amount` | Monetary | Cost (computed) |
| `product_uom_id` | Many2one → uom.uom | Unit of measure |
---
## res.partner — Contacts
| Field | Type | Description |
|-------|------|-------------|
| `name` | Char | Name |
| `email` | Char | Email |
| `phone` | Char | Phone |
| `mobile` | Char | Mobile |
| `street` | Char | Street |
| `city` | Char | City |
| `country_id` | Many2one → res.country | Country |
| `company_type` | Selection | `person`, `company` |
| `is_company` | Boolean | Is a company |
| `parent_id` | Many2one → res.partner | Parent company |
| `child_ids` | One2many → res.partner | Contacts |
| `category_id` | Many2many → res.partner.category | Tags |
---
## Domain Filter Syntax
Odoo uses a Polish-notation domain filter:
```json
[["field", "operator", "value"]]
```
**Operators:** `=`, `!=`, `>`, `>=`, `<`, `<=`, `like`, `ilike`, `in`, `not in`, `child_of`, `parent_of`
**Logic:** `&` (AND, default), `|` (OR), `!` (NOT)
**Examples:**
```json
// Tasks assigned to user 2 in project 1
[["project_id", "=", 1], ["user_ids", "in", [2]]]
// Open helpdesk tickets with high priority
["&", ["stage_id.is_close", "=", false], ["priority", ">=", "2"]]
// Calendar events this month
[["start", ">=", "2026-03-01"], ["start", "<", "2026-04-01"]]
// Time off requests pending approval
[["state", "in", ["confirm", "validate1"]]]
```
---
## Many2many Write Syntax
When writing Many2many fields, use command tuples:
| Command | Syntax | Description |
|---------|--------|-------------|
| Add | `[[4, id, 0]]` | Link existing record |
| Remove | `[[3, id, 0]]` | Unlink (don't delete) |
| Replace | `[[6, 0, [id1, id2]]]` | Replace all links |
Example — set attendees on a calendar event:
```json
{"partner_ids": [[6, 0, [1, 2, 3]]]}
```
FILE:scripts/calendar_events.py
#!/usr/bin/env python3
"""VNClaw — Calendar Events module (calendar.event)"""
import argparse, json, sys, os
from datetime import datetime, timedelta
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from odoo_core import (connect, execute, output_json, resolve_user, resolve_partner,
resolve_users_multi, add_date_filter_args, get_date_bounds,
log_note, schedule_activity, log)
MODEL = "calendar.event"
DEFAULT_FIELDS = ["id", "name", "start", "stop", "allday", "location", "partner_ids", "user_id", "description"]
def cmd_list(args):
uid, m, db, key = connect()
domain = []
# --my: events where I'm an attendee or organizer
if args.my:
# Get current user's partner_id
user_rec = execute(uid, m, db, key, "res.users", "read", [uid], fields=["partner_id"])
if user_rec:
partner_id = user_rec[0]["partner_id"][0]
domain.append(["partner_ids", "in", [partner_id]])
# --attendee: filter by attendee name
if args.attendee:
partner_id = resolve_partner(uid, m, db, key, args.attendee)
domain.append(["partner_ids", "in", [partner_id]])
# --organizer: filter by organizer name
if args.organizer:
org_id = resolve_user(uid, m, db, key, args.organizer)
domain.append(["user_id", "=", org_id])
# --search
if args.search:
domain.append(["name", "ilike", args.search])
# Date filters
df, dt = get_date_bounds(args)
if df:
domain.append(["start", ">=", f"{df} 00:00:00"])
if dt:
domain.append(["start", "<=", f"{dt} 23:59:59"])
fields = json.loads(args.fields) if args.fields else DEFAULT_FIELDS
records = execute(uid, m, db, key, MODEL, "search_read", domain,
fields=fields, limit=args.limit, order="start asc")
output_json(records)
def cmd_get(args):
uid, m, db, key = connect()
fields = json.loads(args.fields) if args.fields else DEFAULT_FIELDS + [
"duration", "privacy", "show_as", "alarm_ids", "categ_ids",
"videocall_location", "recurrency"
]
records = execute(uid, m, db, key, MODEL, "read", [args.id], fields=fields)
output_json(records[0] if records else {"error": f"Event {args.id} not found"})
def cmd_create(args):
uid, m, db, key = connect()
vals = {"name": args.name, "start": args.start, "stop": args.stop}
if args.location:
vals["location"] = args.location
if args.description:
vals["description"] = args.description
# --attendees: resolve names
if args.attendees:
names = [n.strip() for n in args.attendees.split(",") if n.strip()]
partner_ids = [resolve_partner(uid, m, db, key, n) for n in names]
vals["partner_ids"] = [[6, 0, partner_ids]]
elif args.partner_ids:
vals["partner_ids"] = [[6, 0, json.loads(args.partner_ids)]]
if args.allday:
vals["allday"] = True
if args.extra:
vals.update(json.loads(args.extra))
rid = execute(uid, m, db, key, MODEL, "create", vals)
output_json({"created_id": rid, "name": args.name, "start": args.start})
def cmd_update(args):
uid, m, db, key = connect()
vals = {}
if args.name:
vals["name"] = args.name
if args.start:
vals["start"] = args.start
if args.stop:
vals["stop"] = args.stop
if args.location:
vals["location"] = args.location
if args.description:
vals["description"] = args.description
if args.attendees:
names = [n.strip() for n in args.attendees.split(",") if n.strip()]
partner_ids = [resolve_partner(uid, m, db, key, n) for n in names]
vals["partner_ids"] = [[6, 0, partner_ids]]
elif args.partner_ids:
vals["partner_ids"] = [[6, 0, json.loads(args.partner_ids)]]
if args.extra:
vals.update(json.loads(args.extra))
if not vals:
print("Error: No fields to update.", file=sys.stderr)
sys.exit(1)
ok = execute(uid, m, db, key, MODEL, "write", [args.id], vals)
output_json({"updated_id": args.id, "success": ok})
def main():
p = argparse.ArgumentParser(description="VNClaw — Calendar Events")
sub = p.add_subparsers(dest="action")
# list
s = sub.add_parser("list", help="List calendar events")
s.add_argument("--my", action="store_true", help="My events only")
s.add_argument("--attendee", help="Filter by attendee NAME")
s.add_argument("--organizer", help="Filter by organizer NAME")
s.add_argument("--search", help="Search event name")
add_date_filter_args(s)
s.add_argument("--fields", help="JSON array of fields")
s.add_argument("--limit", type=int, default=50)
# get
s = sub.add_parser("get", help="Get event by ID")
s.add_argument("id", type=int)
s.add_argument("--fields", help="JSON array of fields")
# create
s = sub.add_parser("create", help="Create a calendar event")
s.add_argument("--name", required=True, help="Event title")
s.add_argument("--start", required=True, help="Start datetime (YYYY-MM-DD HH:MM:SS)")
s.add_argument("--stop", required=True, help="End datetime (YYYY-MM-DD HH:MM:SS)")
s.add_argument("--location")
s.add_argument("--description")
s.add_argument("--attendees", help="Attendee names, comma-separated (e.g. 'Alice, Bob')")
s.add_argument("--partner-ids", help="Attendee partner IDs as JSON array (alternative)")
s.add_argument("--allday", action="store_true")
s.add_argument("--extra", help="Additional fields as JSON")
# update
s = sub.add_parser("update", help="Update a calendar event")
s.add_argument("id", type=int)
s.add_argument("--name")
s.add_argument("--start")
s.add_argument("--stop")
s.add_argument("--location")
s.add_argument("--description")
s.add_argument("--attendees", help="New attendee names, comma-separated")
s.add_argument("--partner-ids", help="New attendee IDs as JSON array")
s.add_argument("--extra", help="Additional fields as JSON")
args = p.parse_args()
if not args.action:
p.print_help()
sys.exit(1)
{"list": cmd_list, "get": cmd_get, "create": cmd_create, "update": cmd_update}[args.action](args)
if __name__ == "__main__":
main()
FILE:scripts/custom_app.py
#!/usr/bin/env python3
"""VNClaw — Custom App module
Generic interface for any Odoo model/app not covered by the dedicated scripts.
All security rules from odoo_core still apply (no delete, no blocked models).
Typical workflow:
1. Discover the model technical name: custom_app.py models --search "CRM"
2. Inspect its fields: custom_app.py fields crm.lead
3. List records: custom_app.py list crm.lead --search "Acme" --limit 20
4. Read one record: custom_app.py get crm.lead 42
5. Create a record: custom_app.py create crm.lead --values '{...}'
6. Update a record: custom_app.py update crm.lead 42 --values '{...}'
7. Count matching records: custom_app.py count crm.lead --domain '[["stage_id.name","=","Won"]]'
8. Log an internal note: custom_app.py log-note crm.lead 42 --body "Called the client"
9. Schedule an activity notification: custom_app.py notify crm.lead 42 --user "Alice" --summary "Follow up"
"""
import argparse
import json
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from odoo_core import (connect, execute, output_json, resolve_user,
add_date_filter_args, get_date_bounds,
log_note, schedule_activity, log, BLOCKED_MODELS)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _safe_execute(uid, m, db, key, model, operation, *args, **kwargs):
"""Thin wrapper that surfaces friendly errors for unknown models."""
try:
return execute(uid, m, db, key, model, operation, *args, **kwargs)
except Exception as exc:
err = str(exc)
if "Object" in err and "doesn't exist" in err:
print(f"Error: Model '{model}' not found. "
"Run `custom_app.py models --search <keyword>` to discover the right model name.",
file=sys.stderr)
else:
print(f"Error: {err}", file=sys.stderr)
sys.exit(1)
def _get_name_field(uid, m, db, key, model):
"""Return the primary name field for a model (usually 'name', 'display_name', or 'subject')."""
try:
fields_info = m.execute_kw(db, uid, key, model, "fields_get",
[], {"attributes": ["string", "type"]})
for candidate in ("name", "display_name", "subject", "title", "ref"):
if candidate in fields_info:
return candidate
except Exception:
pass
return "display_name"
def _build_default_fields(uid, m, db, key, model, extra=()):
"""Return a safe default field list for search_read by inspecting the model."""
try:
fields_info = m.execute_kw(db, uid, key, model, "fields_get",
[], {"attributes": ["string", "type", "store"]})
except Exception:
return ["id", "display_name"]
# Always include these if they exist
priority = ["id", "name", "display_name", "create_date", "write_date",
"user_id", "state", "active", "sequence"]
result = [f for f in priority if f in fields_info and fields_info[f].get("store", True)]
for f in extra:
if f in fields_info and f not in result:
result.append(f)
# Pad to a reasonable set with other stored scalar fields
SCALAR = {"char", "integer", "float", "monetary", "boolean", "selection",
"date", "datetime", "text", "many2one"}
for fname, fmeta in fields_info.items():
if len(result) >= 12:
break
if (fname not in result and fmeta.get("store", True)
and fmeta.get("type") in SCALAR):
result.append(fname)
return result
# ---------------------------------------------------------------------------
# Commands
# ---------------------------------------------------------------------------
def cmd_models(args):
"""List installed Odoo models, optionally filtered by name/description."""
uid, m, db, key = connect()
domain = [["transient", "=", False]]
if args.search:
domain.append("|")
domain.append(["name", "ilike", args.search])
domain.append(["model", "ilike", args.search])
if args.module:
# ir.model doesn't have a direct module field; use ir.model.data instead
module_models = m.execute_kw(db, uid, key, "ir.model.data", "search_read",
[[["module", "=", args.module], ["model", "=", "ir.model"]]],
{"fields": ["res_id"], "limit": 500})
if module_models:
ids = [r["res_id"] for r in module_models]
domain.append(["id", "in", ids])
records = m.execute_kw(db, uid, key, "ir.model", "search_read",
[domain],
{"fields": ["id", "name", "model", "info"],
"limit": args.limit, "order": "model asc"})
output_json(records)
def cmd_fields(args):
"""Return field definitions for a model — use this to find the right field names."""
uid, m, db, key = connect()
attrs = json.loads(args.attributes) if args.attributes else \
["string", "type", "required", "readonly", "store", "relation", "selection", "help"]
try:
fields_info = m.execute_kw(db, uid, key, args.model, "fields_get",
[], {"attributes": attrs})
except Exception as exc:
print(f"Error fetching fields for '{args.model}': {exc}", file=sys.stderr)
sys.exit(1)
# Filter by type if requested
if args.type:
fields_info = {k: v for k, v in fields_info.items() if v.get("type") == args.type}
if args.search:
kw = args.search.lower()
fields_info = {k: v for k, v in fields_info.items()
if kw in k.lower() or kw in v.get("string", "").lower()}
# Sort by field name for readability
ordered = dict(sorted(fields_info.items()))
output_json(ordered)
def cmd_list(args):
"""Search and read records from any model."""
uid, m, db, key = connect()
domain = json.loads(args.domain) if args.domain else []
# --my: filter by current user (tries user_id, create_uid, partner_id)
if args.my:
name_field = _get_name_field(uid, m, db, key, args.model)
try:
finfo = m.execute_kw(db, uid, key, args.model, "fields_get",
[], {"attributes": ["type"]})
if "user_id" in finfo:
domain.append(["user_id", "=", uid])
elif "create_uid" in finfo:
domain.append(["create_uid", "=", uid])
except Exception:
pass
# --user: resolve by name
if args.user:
user_id = resolve_user(uid, m, db, key, args.user)
try:
finfo = m.execute_kw(db, uid, key, args.model, "fields_get",
[], {"attributes": ["type"]})
if "user_id" in finfo:
domain.append(["user_id", "=", user_id])
elif "create_uid" in finfo:
domain.append(["create_uid", "=", user_id])
except Exception:
pass
# --search: ilike on the name field
if args.search:
name_f = _get_name_field(uid, m, db, key, args.model)
domain.append([name_f, "ilike", args.search])
# Date filters (applied to create_date by default, or --date-field)
date_field = args.date_field or "create_date"
df, dt = get_date_bounds(args)
if df:
domain.append([date_field, ">=", f"{df} 00:00:00"])
if dt:
domain.append([date_field, "<=", f"{dt} 23:59:59"])
fields = json.loads(args.fields) if args.fields else \
_build_default_fields(uid, m, db, key, args.model)
order = args.order or "id desc"
records = _safe_execute(uid, m, db, key, args.model, "search_read", domain,
fields=fields, limit=args.limit, offset=args.offset, order=order)
output_json(records)
def cmd_get(args):
"""Read a single record by ID."""
uid, m, db, key = connect()
fields = json.loads(args.fields) if args.fields else []
kw = {"fields": fields} if fields else {}
try:
records = m.execute_kw(db, uid, key, args.model, "read", [[args.id]], kw)
except Exception as exc:
print(f"Error: {exc}", file=sys.stderr)
sys.exit(1)
output_json(records[0] if records else {"error": f"Record {args.id} not found in '{args.model}'"})
def cmd_create(args):
"""Create a record in any model."""
uid, m, db, key = connect()
vals = json.loads(args.values)
rid = _safe_execute(uid, m, db, key, args.model, "create", vals)
output_json({"created_id": rid, "model": args.model})
def cmd_update(args):
"""Update a record (or multiple records) in any model."""
uid, m, db, key = connect()
ids = json.loads(args.ids) if args.ids else [args.id]
vals = json.loads(args.values)
ok = _safe_execute(uid, m, db, key, args.model, "write", ids, vals)
output_json({"updated_ids": ids, "success": ok, "model": args.model})
def cmd_count(args):
"""Count records matching a domain."""
uid, m, db, key = connect()
domain = json.loads(args.domain) if args.domain else []
try:
count = m.execute_kw(db, uid, key, args.model, "search_count", [domain])
except Exception as exc:
print(f"Error: {exc}", file=sys.stderr)
sys.exit(1)
output_json({"model": args.model, "count": count, "domain": domain})
def cmd_log_note(args):
"""Post an internal log note on any record."""
uid, m, db, key = connect()
msg_id = log_note(uid, m, db, key, args.model, args.id, args.body)
output_json({"logged_note": True, "model": args.model, "record_id": args.id, "message_id": msg_id})
def cmd_notify(args):
"""Schedule an activity notification on any record, targeting a user by name."""
uid, m, db, key = connect()
user_id = resolve_user(uid, m, db, key, args.user)
activity_id = schedule_activity(uid, m, db, key, args.model, args.id,
user_id, args.summary, args.note or "")
output_json({"activity_scheduled": True, "model": args.model,
"record_id": args.id, "for_user": args.user, "activity_id": activity_id})
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
p = argparse.ArgumentParser(
description="VNClaw — Custom App: generic access to any Odoo model",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
sub = p.add_subparsers(dest="action")
# models
s = sub.add_parser("models", help="Discover installed Odoo models")
s.add_argument("--search", help="Search by model name or technical name")
s.add_argument("--module", help="Filter by Odoo module name (e.g. crm, sale)")
s.add_argument("--limit", type=int, default=100)
# fields
s = sub.add_parser("fields", help="List fields of a model")
s.add_argument("model", help="Model technical name (e.g. crm.lead)")
s.add_argument("--search", help="Filter fields by name or label")
s.add_argument("--type", help="Filter fields by type (char, many2one, selection, ...)")
s.add_argument("--attributes", help="JSON array of field attributes to return")
# list
s = sub.add_parser("list", help="Search and read records from any model")
s.add_argument("model", help="Model technical name (e.g. crm.lead)")
s.add_argument("--domain", help="Odoo domain as JSON (e.g. '[[\\'stage_id.name\\',\\'=\\',\\'Won\\']]')")
s.add_argument("--my", action="store_true", help="Records assigned to / created by me")
s.add_argument("--user", help="Filter by user NAME (resolves via res.users)")
s.add_argument("--search", help="ilike search on the primary name field")
s.add_argument("--date-field", help="Field to apply date filters to (default: create_date)")
add_date_filter_args(s)
s.add_argument("--fields", help="JSON array of fields to return")
s.add_argument("--limit", type=int, default=80)
s.add_argument("--offset", type=int, default=0)
s.add_argument("--order", help="Order by clause (e.g. 'create_date desc')")
# get
s = sub.add_parser("get", help="Read one record by ID")
s.add_argument("model", help="Model technical name")
s.add_argument("id", type=int, help="Record ID")
s.add_argument("--fields", help="JSON array of fields to return (default: all)")
# create
s = sub.add_parser("create", help="Create a record")
s.add_argument("model", help="Model technical name")
s.add_argument("--values", required=True, help='Field values as JSON (e.g. \'{"name":"Test","user_id":2}\')')
# update
s = sub.add_parser("update", help="Update a record")
s.add_argument("model", help="Model technical name")
s.add_argument("id", type=int, nargs="?", help="Record ID (single record)")
s.add_argument("--ids", help="Multiple record IDs as JSON array (e.g. '[1,2,3]')")
s.add_argument("--values", required=True, help='Field values as JSON')
# count
s = sub.add_parser("count", help="Count records matching a domain")
s.add_argument("model", help="Model technical name")
s.add_argument("--domain", help="Odoo domain as JSON (default: [])")
# log-note
s = sub.add_parser("log-note", help="Post an internal note on a record")
s.add_argument("model", help="Model technical name")
s.add_argument("id", type=int, help="Record ID")
s.add_argument("--body", required=True, help="Note body (HTML or plain text)")
# notify
s = sub.add_parser("notify", help="Schedule an activity notification on a record")
s.add_argument("model", help="Model technical name")
s.add_argument("id", type=int, help="Record ID")
s.add_argument("--user", required=True, help="Target user NAME (resolved by name/email)")
s.add_argument("--summary", required=True, help="Activity summary")
s.add_argument("--note", help="Optional longer note")
args = p.parse_args()
if not args.action:
p.print_help()
sys.exit(1)
dispatch = {
"models": cmd_models,
"fields": cmd_fields,
"list": cmd_list,
"get": cmd_get,
"create": cmd_create,
"update": cmd_update,
"count": cmd_count,
"log-note": cmd_log_note,
"notify": cmd_notify,
}
dispatch[args.action](args)
if __name__ == "__main__":
main()
FILE:scripts/documents.py
#!/usr/bin/env python3
"""VNClaw — Documents module (documents.document)"""
import argparse, json, sys, os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from odoo_core import (connect, execute, output_json, resolve_user, resolve_partner,
add_date_filter_args, get_date_bounds, log)
MODEL = "documents.document"
DEFAULT_FIELDS = ["id", "name", "owner_id", "partner_id", "folder_id", "type",
"create_date", "write_date", "tag_ids", "mimetype"]
def _resolve_folder(uid, m, db, key, name):
recs = execute(uid, m, db, key, "documents.folder", "search_read",
[["name", "ilike", name]], fields=["id", "name"])
if not recs:
print(f"Error: Folder '{name}' not found.", file=sys.stderr)
sys.exit(1)
return recs[0]["id"]
def cmd_list(args):
uid, m, db, key = connect()
domain = []
if args.my:
domain.append(["owner_id", "=", uid])
if args.owner:
owner_id = resolve_user(uid, m, db, key, args.owner)
domain.append(["owner_id", "=", owner_id])
if args.folder:
folder_id = _resolve_folder(uid, m, db, key, args.folder)
domain.append(["folder_id", "=", folder_id])
if args.search:
domain.append(["name", "ilike", args.search])
if args.type:
domain.append(["type", "=", args.type])
if args.tag:
tags = execute(uid, m, db, key, "documents.tag", "search_read",
[["name", "ilike", args.tag]], fields=["id"])
if tags:
domain.append(["tag_ids", "in", [t["id"] for t in tags]])
# Date filters on create_date
df, dt = get_date_bounds(args)
if df:
domain.append(["create_date", ">=", f"{df} 00:00:00"])
if dt:
domain.append(["create_date", "<=", f"{dt} 23:59:59"])
fields = json.loads(args.fields) if args.fields else DEFAULT_FIELDS
records = execute(uid, m, db, key, MODEL, "search_read", domain,
fields=fields, limit=args.limit, order="create_date desc")
output_json(records)
def cmd_get(args):
uid, m, db, key = connect()
fields = json.loads(args.fields) if args.fields else DEFAULT_FIELDS + [
"description", "res_model", "res_id", "url", "active"
]
records = execute(uid, m, db, key, MODEL, "read", [args.id], fields=fields)
output_json(records[0] if records else {"error": f"Document {args.id} not found"})
def cmd_create(args):
uid, m, db, key = connect()
vals = {"name": args.name}
if args.folder:
vals["folder_id"] = _resolve_folder(uid, m, db, key, args.folder)
elif args.folder_id:
vals["folder_id"] = args.folder_id
if args.owner:
vals["owner_id"] = resolve_user(uid, m, db, key, args.owner)
if args.partner:
vals["partner_id"] = resolve_partner(uid, m, db, key, args.partner)
if args.url:
vals["url"] = args.url
vals["type"] = "url"
if args.description:
vals["description"] = args.description
if args.extra:
vals.update(json.loads(args.extra))
rid = execute(uid, m, db, key, MODEL, "create", vals)
output_json({"created_id": rid, "name": args.name})
def cmd_update(args):
uid, m, db, key = connect()
vals = {}
if args.name:
vals["name"] = args.name
if args.folder:
vals["folder_id"] = _resolve_folder(uid, m, db, key, args.folder)
if args.owner:
vals["owner_id"] = resolve_user(uid, m, db, key, args.owner)
if args.partner:
vals["partner_id"] = resolve_partner(uid, m, db, key, args.partner)
if args.description:
vals["description"] = args.description
if args.extra:
vals.update(json.loads(args.extra))
if not vals:
print("Error: No fields to update.", file=sys.stderr)
sys.exit(1)
ok = execute(uid, m, db, key, MODEL, "write", [args.id], vals)
output_json({"updated_id": args.id, "success": ok})
def cmd_folders(args):
uid, m, db, key = connect()
recs = execute(uid, m, db, key, "documents.folder", "search_read", [],
fields=["id", "name", "parent_folder_id", "description"],
order="name asc")
output_json(recs)
def main():
p = argparse.ArgumentParser(description="VNClaw — Documents")
sub = p.add_subparsers(dest="action")
# list
s = sub.add_parser("list", help="List documents")
s.add_argument("--my", action="store_true", help="My documents only")
s.add_argument("--owner", help="Filter by owner NAME")
s.add_argument("--folder", help="Filter by folder NAME")
s.add_argument("--search", help="Search document name")
s.add_argument("--type", help="binary or url")
s.add_argument("--tag", help="Filter by tag NAME")
add_date_filter_args(s)
s.add_argument("--fields", help="JSON array of fields")
s.add_argument("--limit", type=int, default=50)
# get
s = sub.add_parser("get", help="Get document detail by ID")
s.add_argument("id", type=int)
s.add_argument("--fields", help="JSON array of fields")
# create
s = sub.add_parser("create", help="Create a document record")
s.add_argument("--name", required=True, help="Document name")
s.add_argument("--folder", help="Folder NAME")
s.add_argument("--folder-id", type=int, help="Folder ID (alternative)")
s.add_argument("--owner", help="Owner NAME")
s.add_argument("--partner", help="Contact/partner NAME")
s.add_argument("--url", help="URL (creates a URL-type document)")
s.add_argument("--description")
s.add_argument("--extra", help="Additional fields as JSON")
# update
s = sub.add_parser("update", help="Update a document")
s.add_argument("id", type=int)
s.add_argument("--name")
s.add_argument("--folder", help="Folder NAME")
s.add_argument("--owner", help="Owner NAME")
s.add_argument("--partner", help="Contact/partner NAME")
s.add_argument("--description")
s.add_argument("--extra", help="Additional fields as JSON")
# folders
sub.add_parser("folders", help="List available folders")
args = p.parse_args()
if not args.action:
p.print_help()
sys.exit(1)
{"list": cmd_list, "get": cmd_get, "create": cmd_create, "update": cmd_update,
"folders": cmd_folders}[args.action](args)
if __name__ == "__main__":
main()
FILE:scripts/helpdesk.py
#!/usr/bin/env python3
"""VNClaw — Helpdesk Tickets module (helpdesk.ticket)"""
import argparse, json, sys, os
from datetime import datetime
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from odoo_core import (connect, execute, output_json, resolve_user, resolve_partner,
add_date_filter_args, get_date_bounds, log_note, schedule_activity, log)
MODEL = "helpdesk.ticket"
DEFAULT_FIELDS = ["id", "name", "partner_id", "user_id", "team_id", "stage_id",
"priority", "create_date", "description", "ticket_type_id", "tag_ids"]
def cmd_list(args):
uid, m, db, key = connect()
domain = []
if args.my:
domain.append(["user_id", "=", uid])
if args.user:
user_id = resolve_user(uid, m, db, key, args.user)
domain.append(["user_id", "=", user_id])
if args.customer:
partner_id = resolve_partner(uid, m, db, key, args.customer)
domain.append(["partner_id", "=", partner_id])
if args.search:
domain.append(["name", "ilike", args.search])
if args.team:
teams = execute(uid, m, db, key, "helpdesk.team", "search_read",
[["name", "ilike", args.team]], fields=["id"])
if teams:
domain.append(["team_id", "=", teams[0]["id"]])
else:
log(f"Warning: team '{args.team}' not found, ignoring filter")
if args.stage:
stages = execute(uid, m, db, key, "helpdesk.stage", "search_read",
[["name", "ilike", args.stage]], fields=["id"])
if stages:
domain.append(["stage_id", "in", [s["id"] for s in stages]])
else:
log(f"Warning: stage '{args.stage}' not found, ignoring filter")
if args.priority:
domain.append(["priority", "=", args.priority])
# Date filters on create_date
df, dt = get_date_bounds(args)
if df:
domain.append(["create_date", ">=", f"{df} 00:00:00"])
if dt:
domain.append(["create_date", "<=", f"{dt} 23:59:59"])
fields = json.loads(args.fields) if args.fields else DEFAULT_FIELDS
records = execute(uid, m, db, key, MODEL, "search_read", domain,
fields=fields, limit=args.limit, order="create_date desc")
output_json(records)
def cmd_get(args):
uid, m, db, key = connect()
fields = json.loads(args.fields) if args.fields else DEFAULT_FIELDS + [
"kanban_state", "sla_status_ids", "message_ids"
]
records = execute(uid, m, db, key, MODEL, "read", [args.id], fields=fields)
output_json(records[0] if records else {"error": f"Ticket {args.id} not found"})
def cmd_create(args):
uid, m, db, key = connect()
vals = {"name": args.name}
if args.description:
vals["description"] = args.description
if args.assign:
vals["user_id"] = resolve_user(uid, m, db, key, args.assign)
if args.customer:
vals["partner_id"] = resolve_partner(uid, m, db, key, args.customer)
if args.team:
teams = execute(uid, m, db, key, "helpdesk.team", "search_read",
[["name", "ilike", args.team]], fields=["id"])
if teams:
vals["team_id"] = teams[0]["id"]
if args.priority:
vals["priority"] = args.priority
if args.ticket_type:
types = execute(uid, m, db, key, "helpdesk.ticket.type", "search_read",
[["name", "ilike", args.ticket_type]], fields=["id"])
if types:
vals["ticket_type_id"] = types[0]["id"]
if args.extra:
vals.update(json.loads(args.extra))
rid = execute(uid, m, db, key, MODEL, "create", vals)
output_json({"created_id": rid, "name": args.name})
def cmd_update(args):
uid, m, db, key = connect()
vals = {}
if args.name:
vals["name"] = args.name
if args.description:
vals["description"] = args.description
if args.assign:
vals["user_id"] = resolve_user(uid, m, db, key, args.assign)
if args.customer:
vals["partner_id"] = resolve_partner(uid, m, db, key, args.customer)
if args.stage:
stages = execute(uid, m, db, key, "helpdesk.stage", "search_read",
[["name", "ilike", args.stage]], fields=["id"])
if stages:
vals["stage_id"] = stages[0]["id"]
if args.priority:
vals["priority"] = args.priority
if args.team:
teams = execute(uid, m, db, key, "helpdesk.team", "search_read",
[["name", "ilike", args.team]], fields=["id"])
if teams:
vals["team_id"] = teams[0]["id"]
if args.extra:
vals.update(json.loads(args.extra))
if not vals:
print("Error: No fields to update.", file=sys.stderr)
sys.exit(1)
ok = execute(uid, m, db, key, MODEL, "write", [args.id], vals)
output_json({"updated_id": args.id, "success": ok})
def cmd_log_note(args):
uid, m, db, key = connect()
log_note(uid, m, db, key, MODEL, args.id, args.body)
output_json({"logged_note": True, "ticket_id": args.id})
def cmd_notify(args):
uid, m, db, key = connect()
user_id = resolve_user(uid, m, db, key, args.user)
schedule_activity(uid, m, db, key, MODEL, args.id, user_id, args.summary, args.note or "")
output_json({"activity_scheduled": True, "ticket_id": args.id, "for_user": args.user})
def cmd_stages(args):
uid, m, db, key = connect()
stages = execute(uid, m, db, key, "helpdesk.stage", "search_read", [],
fields=["id", "name", "sequence"], order="sequence asc")
output_json(stages)
def main():
p = argparse.ArgumentParser(description="VNClaw — Helpdesk Tickets")
sub = p.add_subparsers(dest="action")
# list
s = sub.add_parser("list", help="List helpdesk tickets")
s.add_argument("--my", action="store_true", help="My assigned tickets only")
s.add_argument("--user", help="Filter by assigned user NAME")
s.add_argument("--customer", help="Filter by customer NAME")
s.add_argument("--search", help="Search ticket name")
s.add_argument("--team", help="Filter by team NAME")
s.add_argument("--stage", help="Filter by stage NAME")
s.add_argument("--priority", help="Priority: 0=Low, 1=Medium, 2=High, 3=Urgent")
add_date_filter_args(s)
s.add_argument("--fields", help="JSON array of fields")
s.add_argument("--limit", type=int, default=50)
# get
s = sub.add_parser("get", help="Get ticket detail by ID")
s.add_argument("id", type=int)
s.add_argument("--fields", help="JSON array of fields")
# create
s = sub.add_parser("create", help="Create a helpdesk ticket")
s.add_argument("--name", required=True, help="Ticket title")
s.add_argument("--description")
s.add_argument("--assign", help="Assign to user NAME")
s.add_argument("--customer", help="Customer NAME")
s.add_argument("--team", help="Team NAME")
s.add_argument("--priority", help="0=Low, 1=Medium, 2=High, 3=Urgent")
s.add_argument("--ticket-type", help="Ticket type NAME")
s.add_argument("--extra", help="Additional fields as JSON")
# update
s = sub.add_parser("update", help="Update a helpdesk ticket")
s.add_argument("id", type=int)
s.add_argument("--name")
s.add_argument("--description")
s.add_argument("--assign", help="Assign to user NAME")
s.add_argument("--customer", help="Customer NAME")
s.add_argument("--stage", help="Stage NAME")
s.add_argument("--priority")
s.add_argument("--team", help="Team NAME")
s.add_argument("--extra", help="Additional fields as JSON")
# log-note
s = sub.add_parser("log-note", help="Log internal note on a ticket")
s.add_argument("id", type=int)
s.add_argument("--body", required=True, help="Note body (HTML or plain text)")
# notify
s = sub.add_parser("notify", help="Schedule activity notification")
s.add_argument("id", type=int)
s.add_argument("--user", required=True, help="Target user NAME")
s.add_argument("--summary", required=True, help="Activity summary")
s.add_argument("--note", help="Activity note")
# stages
sub.add_parser("stages", help="List available stages")
args = p.parse_args()
if not args.action:
p.print_help()
sys.exit(1)
{"list": cmd_list, "get": cmd_get, "create": cmd_create, "update": cmd_update,
"log-note": cmd_log_note, "notify": cmd_notify, "stages": cmd_stages}[args.action](args)
if __name__ == "__main__":
main()
FILE:scripts/knowledge.py
#!/usr/bin/env python3
"""VNClaw — Knowledge Articles module (knowledge.article)"""
import argparse, json, sys, os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from odoo_core import (connect, execute, output_json, resolve_user, log)
MODEL = "knowledge.article"
DEFAULT_FIELDS = ["id", "name", "create_uid", "write_date", "is_published",
"parent_id", "sequence", "category"]
def cmd_list(args):
uid, m, db, key = connect()
domain = []
if args.my:
domain.append(["create_uid", "=", uid])
if args.author:
author_id = resolve_user(uid, m, db, key, args.author)
domain.append(["create_uid", "=", author_id])
if args.published:
domain.append(["is_published", "=", True])
if args.search:
domain.append(["name", "ilike", args.search])
if args.parent_id:
domain.append(["parent_id", "=", args.parent_id])
if args.root_only:
domain.append(["parent_id", "=", False])
if args.category:
domain.append(["category", "=", args.category])
fields = json.loads(args.fields) if args.fields else DEFAULT_FIELDS
records = execute(uid, m, db, key, MODEL, "search_read", domain,
fields=fields, limit=args.limit, order="sequence asc, write_date desc")
output_json(records)
def cmd_get(args):
uid, m, db, key = connect()
fields = json.loads(args.fields) if args.fields else DEFAULT_FIELDS + [
"body", "icon", "child_ids", "is_article_item", "article_member_ids"
]
records = execute(uid, m, db, key, MODEL, "read", [args.id], fields=fields)
output_json(records[0] if records else {"error": f"Article {args.id} not found"})
def cmd_create(args):
uid, m, db, key = connect()
vals = {"name": args.name}
if args.body:
vals["body"] = args.body
if args.parent_id:
vals["parent_id"] = args.parent_id
if args.parent_name:
parents = execute(uid, m, db, key, MODEL, "search_read",
[["name", "ilike", args.parent_name]], fields=["id"], limit=1)
if parents:
vals["parent_id"] = parents[0]["id"]
else:
log(f"Warning: parent article '{args.parent_name}' not found, creating as root")
if args.is_published:
vals["is_published"] = True
if args.icon:
vals["icon"] = args.icon
if args.extra:
vals.update(json.loads(args.extra))
rid = execute(uid, m, db, key, MODEL, "create", vals)
output_json({"created_id": rid, "name": args.name})
def cmd_update(args):
uid, m, db, key = connect()
vals = {}
if args.name:
vals["name"] = args.name
if args.body:
vals["body"] = args.body
if args.parent_id is not None:
vals["parent_id"] = args.parent_id if args.parent_id > 0 else False
if args.is_published is not None:
vals["is_published"] = args.is_published
if args.icon:
vals["icon"] = args.icon
if args.extra:
vals.update(json.loads(args.extra))
if not vals:
print("Error: No fields to update.", file=sys.stderr)
sys.exit(1)
ok = execute(uid, m, db, key, MODEL, "write", [args.id], vals)
output_json({"updated_id": args.id, "success": ok})
def main():
p = argparse.ArgumentParser(description="VNClaw — Knowledge Articles")
sub = p.add_subparsers(dest="action")
# list
s = sub.add_parser("list", help="List knowledge articles")
s.add_argument("--my", action="store_true", help="My articles only")
s.add_argument("--author", help="Filter by author NAME")
s.add_argument("--published", action="store_true", help="Published articles only")
s.add_argument("--search", help="Search title")
s.add_argument("--parent-id", type=int, help="Filter by parent article ID")
s.add_argument("--root-only", action="store_true", help="Only root-level articles")
s.add_argument("--category", help="Filter by category: workspace, private, shared")
s.add_argument("--fields", help="JSON array of fields")
s.add_argument("--limit", type=int, default=50)
# get
s = sub.add_parser("get", help="Get article by ID (includes body)")
s.add_argument("id", type=int)
s.add_argument("--fields", help="JSON array of fields")
# create
s = sub.add_parser("create", help="Create a knowledge article")
s.add_argument("--name", required=True, help="Article title")
s.add_argument("--body", help="Article content (HTML)")
s.add_argument("--parent-id", type=int, help="Parent article ID")
s.add_argument("--parent-name", help="Parent article NAME (alternative)")
s.add_argument("--is-published", action="store_true")
s.add_argument("--icon", help="Emoji icon")
s.add_argument("--extra", help="Additional fields as JSON")
# update
s = sub.add_parser("update", help="Update a knowledge article")
s.add_argument("id", type=int)
s.add_argument("--name")
s.add_argument("--body", help="New content (HTML)")
s.add_argument("--parent-id", type=int, help="New parent ID (0 = root)")
s.add_argument("--is-published", type=bool, help="true/false")
s.add_argument("--icon")
s.add_argument("--extra", help="Additional fields as JSON")
args = p.parse_args()
if not args.action:
p.print_help()
sys.exit(1)
{"list": cmd_list, "get": cmd_get, "create": cmd_create, "update": cmd_update}[args.action](args)
if __name__ == "__main__":
main()
FILE:scripts/odoo_core.py
#!/usr/bin/env python3
"""
VNClaw — Odoo 17 XML-RPC Core Library
Shared connection, security, and execution logic.
Imported by all module scripts. Can also be run directly for generic operations.
"""
import argparse
import json
import os
import sys
import xmlrpc.client
from datetime import datetime
# ---------------------------------------------------------------------------
# Security
# ---------------------------------------------------------------------------
BLOCKED_MODELS = frozenset({
"ir.rule", "ir.model.access", "ir.config_parameter", "ir.module.module",
"res.groups", "base.module.upgrade", "base.module.uninstall",
"ir.cron", "ir.actions.server", "ir.actions.act_window",
})
READ_ONLY_MODELS = frozenset({
"res.users", "hr.employee", "project.task.type",
"hr.leave.type", "helpdesk.team", "documents.folder",
})
ALLOWED_OPERATIONS = frozenset({"search_read", "read", "create", "write", "search", "fields_get"})
def log(msg: str) -> None:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {msg}", file=sys.stderr)
def get_env(name: str) -> str:
value = os.environ.get(name)
if not value:
print(f"Error: Environment variable '{name}' is not set.", file=sys.stderr)
sys.exit(1)
return value
def connect():
"""Authenticate and return (uid, models_proxy, db, api_key)."""
url = get_env("ODOO_URL").rstrip("/")
db = get_env("ODOO_DB")
username = get_env("ODOO_USERNAME")
api_key = get_env("ODOO_API_KEY")
common = xmlrpc.client.ServerProxy(f"{url}/xmlrpc/2/common", allow_none=True, context=None)
uid = common.authenticate(db, username, api_key, {})
if not uid:
print("Error: Authentication failed. Check your credentials.", file=sys.stderr)
sys.exit(1)
models = xmlrpc.client.ServerProxy(f"{url}/xmlrpc/2/object", allow_none=True, context=None)
log(f"Authenticated as uid={uid} on {url} (db={db})")
return uid, models, db, api_key
def validate_operation(model: str, operation: str) -> None:
if operation not in ALLOWED_OPERATIONS:
print(f"Error: Operation '{operation}' is not allowed.", file=sys.stderr)
sys.exit(1)
if model in BLOCKED_MODELS:
print(f"Error: Access to model '{model}' is blocked for security.", file=sys.stderr)
sys.exit(1)
if operation in ("create", "write") and model in READ_ONLY_MODELS:
print(f"Error: Model '{model}' is read-only.", file=sys.stderr)
sys.exit(1)
def execute(uid, models, db, api_key, model, operation, *args, **kwargs):
validate_operation(model, operation)
log(f"{model}.{operation}")
return models.execute_kw(db, uid, api_key, model, operation, list(args), kwargs)
def output_json(data):
print(json.dumps(data, indent=2, default=str))
# ---------------------------------------------------------------------------
# Name → ID Resolvers (so users never need to know IDs)
# ---------------------------------------------------------------------------
_user_cache = {}
_partner_cache = {}
_employee_cache = {}
def resolve_user(uid, models, db, api_key, name_or_email):
"""Resolve a user by name or email → user ID. Cached per session."""
key = name_or_email.lower().strip()
if key in _user_cache:
return _user_cache[key]
domain = ["|", ["name", "ilike", key], ["login", "ilike", key]]
users = models.execute_kw(db, uid, api_key, "res.users", "search_read",
[domain], {"fields": ["id", "name", "login"], "limit": 5})
if not users:
print(f"Error: No user found matching '{name_or_email}'.", file=sys.stderr)
sys.exit(1)
if len(users) > 1:
log(f"Multiple users match '{name_or_email}': {[u['name'] for u in users]}. Using first: {users[0]['name']} (id={users[0]['id']})")
_user_cache[key] = users[0]["id"]
return users[0]["id"]
def resolve_partner(uid, models, db, api_key, name_or_email):
"""Resolve a partner/contact by name or email → partner ID."""
key = name_or_email.lower().strip()
if key in _partner_cache:
return _partner_cache[key]
domain = ["|", ["name", "ilike", key], ["email", "ilike", key]]
partners = models.execute_kw(db, uid, api_key, "res.partner", "search_read",
[domain], {"fields": ["id", "name", "email"], "limit": 5})
if not partners:
print(f"Error: No partner found matching '{name_or_email}'.", file=sys.stderr)
sys.exit(1)
if len(partners) > 1:
log(f"Multiple partners match '{name_or_email}': {[p['name'] for p in partners]}. Using first: {partners[0]['name']} (id={partners[0]['id']})")
_partner_cache[key] = partners[0]["id"]
return partners[0]["id"]
def resolve_employee(uid, models, db, api_key, name):
"""Resolve an employee by name → employee ID."""
key = name.lower().strip()
if key in _employee_cache:
return _employee_cache[key]
employees = models.execute_kw(db, uid, api_key, "hr.employee", "search_read",
[[["name", "ilike", key]]], {"fields": ["id", "name"], "limit": 5})
if not employees:
print(f"Error: No employee found matching '{name}'.", file=sys.stderr)
sys.exit(1)
if len(employees) > 1:
log(f"Multiple employees match '{name}': {[e['name'] for e in employees]}. Using first: {employees[0]['name']} (id={employees[0]['id']})")
_employee_cache[key] = employees[0]["id"]
return employees[0]["id"]
def resolve_project(uid, models, db, api_key, name):
"""Resolve a project by name → project ID."""
projects = models.execute_kw(db, uid, api_key, "project.project", "search_read",
[[["name", "ilike", name.strip()]]], {"fields": ["id", "name"], "limit": 5})
if not projects:
print(f"Error: No project found matching '{name}'.", file=sys.stderr)
sys.exit(1)
if len(projects) > 1:
log(f"Multiple projects match '{name}': {[p['name'] for p in projects]}. Using first: {projects[0]['name']} (id={projects[0]['id']})")
return projects[0]["id"]
def resolve_users_multi(uid, models, db, api_key, names_csv):
"""Resolve comma-separated user names → list of user IDs."""
names = [n.strip() for n in names_csv.split(",") if n.strip()]
return [resolve_user(uid, models, db, api_key, n) for n in names]
# ---------------------------------------------------------------------------
# Date Range Helpers
# ---------------------------------------------------------------------------
def date_range(shortcut):
"""Convert a shortcut to (date_from, date_to) strings (YYYY-MM-DD).
Shortcuts: today, yesterday, this-week, last-week, this-month, last-month, this-year.
"""
from datetime import date as _date, timedelta
today = _date.today()
if shortcut == "today":
return str(today), str(today)
elif shortcut == "yesterday":
y = today - timedelta(days=1)
return str(y), str(y)
elif shortcut == "this-week":
monday = today - timedelta(days=today.weekday())
sunday = monday + timedelta(days=6)
return str(monday), str(sunday)
elif shortcut == "last-week":
monday = today - timedelta(days=today.weekday() + 7)
sunday = monday + timedelta(days=6)
return str(monday), str(sunday)
elif shortcut == "this-month":
first = today.replace(day=1)
if today.month == 12:
last = today.replace(year=today.year + 1, month=1, day=1) - timedelta(days=1)
else:
last = today.replace(month=today.month + 1, day=1) - timedelta(days=1)
return str(first), str(last)
elif shortcut == "last-month":
first_this = today.replace(day=1)
last_prev = first_this - timedelta(days=1)
first_prev = last_prev.replace(day=1)
return str(first_prev), str(last_prev)
elif shortcut == "this-year":
return f"{today.year}-01-01", f"{today.year}-12-31"
else:
print(f"Error: Unknown date shortcut '{shortcut}'. Use: today, yesterday, this-week, last-week, this-month, last-month, this-year.", file=sys.stderr)
sys.exit(1)
def add_date_filter_args(parser):
"""Add standard date filter arguments to a subparser."""
parser.add_argument("--today", action="store_const", const="today", dest="date_shortcut", help="Filter to today")
parser.add_argument("--yesterday", action="store_const", const="yesterday", dest="date_shortcut", help="Filter to yesterday")
parser.add_argument("--this-week", action="store_const", const="this-week", dest="date_shortcut", help="Filter to this week")
parser.add_argument("--last-week", action="store_const", const="last-week", dest="date_shortcut", help="Filter to last week")
parser.add_argument("--this-month", action="store_const", const="this-month", dest="date_shortcut", help="Filter to this month")
parser.add_argument("--last-month", action="store_const", const="last-month", dest="date_shortcut", help="Filter to last month")
parser.add_argument("--this-year", action="store_const", const="this-year", dest="date_shortcut", help="Filter to this year")
parser.add_argument("--date-from", help="Custom start date (YYYY-MM-DD)")
parser.add_argument("--date-to", help="Custom end date (YYYY-MM-DD)")
def get_date_bounds(args):
"""Extract (date_from, date_to) from parsed args. Returns (None, None) if no filter."""
shortcut = getattr(args, "date_shortcut", None)
if shortcut:
return date_range(shortcut)
df = getattr(args, "date_from", None)
dt = getattr(args, "date_to", None)
return df, dt
# ---------------------------------------------------------------------------
# Log Notes & Schedule Activities
# ---------------------------------------------------------------------------
def log_note(uid, models, db, api_key, model, record_id, body):
"""Post an internal note (log note) on a record."""
log(f"Posting note on {model}/{record_id}")
msg_id = models.execute_kw(db, uid, api_key, model, "message_post", [record_id], {
"body": body,
"message_type": "comment",
"subtype_xmlid": "mail.mt_note",
})
return msg_id
def schedule_activity(uid, models, db, api_key, model, record_id, user_id, summary,
note="", date_deadline=None, activity_type="mail.mail_activity_data_todo"):
"""Schedule an activity on a record for a specific user."""
from datetime import date as _date
# Get model ID
model_ids = models.execute_kw(db, uid, api_key, "ir.model", "search",
[[["model", "=", model]]], {"limit": 1})
if not model_ids:
print(f"Error: Model '{model}' not found in ir.model.", file=sys.stderr)
sys.exit(1)
# Get activity type ID
act_type_ids = models.execute_kw(db, uid, api_key, "ir.model.data", "search_read",
[[["complete_name", "=", activity_type]]],
{"fields": ["res_id"], "limit": 1})
act_type_id = act_type_ids[0]["res_id"] if act_type_ids else False
vals = {
"res_model_id": model_ids[0],
"res_id": record_id,
"user_id": user_id,
"summary": summary,
"note": note or "",
"date_deadline": date_deadline or str(_date.today()),
}
if act_type_id:
vals["activity_type_id"] = act_type_id
log(f"Scheduling activity on {model}/{record_id} for user_id={user_id}")
activity_id = models.execute_kw(db, uid, api_key, "mail.activity", "create", [vals])
return activity_id
def test_connection():
uid, models, db, api_key = connect()
url = get_env("ODOO_URL").rstrip("/")
common = xmlrpc.client.ServerProxy(f"{url}/xmlrpc/2/common", allow_none=True)
version = common.version()
output_json({
"status": "connected", "uid": uid,
"server_version": version.get("server_version", "unknown"),
})
# ---------------------------------------------------------------------------
# Generic CLI (for uncommon models)
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="VNClaw — Odoo 17 Generic Client")
sub = parser.add_subparsers(dest="command")
sub.add_parser("test-connection", help="Test connection to Odoo")
p = sub.add_parser("search-read", help="Search and read any model")
p.add_argument("model")
p.add_argument("--domain", default="[]")
p.add_argument("--fields", default="[]")
p.add_argument("--limit", type=int, default=80)
p.add_argument("--offset", type=int, default=0)
p.add_argument("--order", default="")
p = sub.add_parser("read", help="Read records by ID")
p.add_argument("model")
p.add_argument("--ids", required=True)
p.add_argument("--fields", default="[]")
p = sub.add_parser("create", help="Create a record")
p.add_argument("model")
p.add_argument("--values", required=True)
p = sub.add_parser("write", help="Update records")
p.add_argument("model")
p.add_argument("--ids", required=True)
p.add_argument("--values", required=True)
p = sub.add_parser("search", help="Search for IDs")
p.add_argument("model")
p.add_argument("--domain", default="[]")
p.add_argument("--limit", type=int, default=80)
p.add_argument("--offset", type=int, default=0)
p = sub.add_parser("fields", help="Get field definitions")
p.add_argument("model")
p.add_argument("--attributes", default='["string","type","required","readonly","relation"]')
p = sub.add_parser("count", help="Count matching records")
p.add_argument("model")
p.add_argument("--domain", default="[]")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
if args.command == "test-connection":
test_connection()
elif args.command == "search-read":
uid, m, db, key = connect()
kw = {"fields": json.loads(args.fields), "limit": args.limit, "offset": args.offset}
if args.order:
kw["order"] = args.order
output_json(execute(uid, m, db, key, args.model, "search_read", json.loads(args.domain), **kw))
elif args.command == "read":
uid, m, db, key = connect()
output_json(execute(uid, m, db, key, args.model, "read", json.loads(args.ids), **{"fields": json.loads(args.fields)}))
elif args.command == "create":
uid, m, db, key = connect()
rid = execute(uid, m, db, key, args.model, "create", json.loads(args.values))
output_json({"created_id": rid, "model": args.model})
elif args.command == "write":
uid, m, db, key = connect()
ok = execute(uid, m, db, key, args.model, "write", json.loads(args.ids), json.loads(args.values))
output_json({"updated_ids": json.loads(args.ids), "model": args.model, "success": ok})
elif args.command == "search":
uid, m, db, key = connect()
ids = execute(uid, m, db, key, args.model, "search", json.loads(args.domain), **{"limit": args.limit, "offset": args.offset})
output_json({"ids": ids, "count": len(ids)})
elif args.command == "fields":
uid, m, db, key = connect()
output_json(execute(uid, m, db, key, args.model, "fields_get", **{"attributes": json.loads(args.attributes)}))
elif args.command == "count":
uid, m, db, key = connect()
validate_operation(args.model, "search_read")
c = m.execute_kw(db, uid, key, args.model, "search_count", [json.loads(args.domain)])
output_json({"model": args.model, "count": c})
if __name__ == "__main__":
main()
FILE:scripts/projects.py
#!/usr/bin/env python3
"""VNClaw — Projects module (project.project)"""
import argparse, json, sys, os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from odoo_core import (connect, execute, output_json, resolve_user,
add_date_filter_args, get_date_bounds, log_note, schedule_activity, log)
MODEL = "project.project"
DEFAULT_FIELDS = ["id", "name", "user_id", "partner_id", "date_start", "date",
"task_count", "tag_ids", "stage_id", "active"]
def cmd_list(args):
uid, m, db, key = connect()
domain = []
if args.my:
domain.append(["user_id", "=", uid])
if args.manager:
manager_id = resolve_user(uid, m, db, key, args.manager)
domain.append(["user_id", "=", manager_id])
if args.search:
domain.append(["name", "ilike", args.search])
if args.stage:
stages = execute(uid, m, db, key, "project.project.stage", "search_read",
[["name", "ilike", args.stage]], fields=["id"])
if stages:
domain.append(["stage_id", "in", [s["id"] for s in stages]])
if args.active_only:
domain.append(["active", "=", True])
if args.favorites:
domain.append(["is_favorite", "=", True])
# Date filters on date_start or create_date
df, dt = get_date_bounds(args)
if df:
domain.append(["create_date", ">=", f"{df} 00:00:00"])
if dt:
domain.append(["create_date", "<=", f"{dt} 23:59:59"])
fields = json.loads(args.fields) if args.fields else DEFAULT_FIELDS
records = execute(uid, m, db, key, MODEL, "search_read", domain,
fields=fields, limit=args.limit, order="name asc")
output_json(records)
def cmd_get(args):
uid, m, db, key = connect()
fields = json.loads(args.fields) if args.fields else DEFAULT_FIELDS + [
"description", "label_tasks", "allow_timesheets", "privacy_visibility",
"analytic_account_id", "date_start", "date"
]
records = execute(uid, m, db, key, MODEL, "read", [args.id], fields=fields)
output_json(records[0] if records else {"error": f"Project {args.id} not found"})
def cmd_create(args):
uid, m, db, key = connect()
vals = {"name": args.name}
if args.manager:
vals["user_id"] = resolve_user(uid, m, db, key, args.manager)
if args.description:
vals["description"] = args.description
if args.privacy:
vals["privacy_visibility"] = args.privacy
if args.allow_timesheets:
vals["allow_timesheets"] = True
if args.date_start:
vals["date_start"] = args.date_start
if args.date_end:
vals["date"] = args.date_end
if args.extra:
vals.update(json.loads(args.extra))
rid = execute(uid, m, db, key, MODEL, "create", vals)
output_json({"created_id": rid, "name": args.name})
def cmd_update(args):
uid, m, db, key = connect()
vals = {}
if args.name:
vals["name"] = args.name
if args.manager:
vals["user_id"] = resolve_user(uid, m, db, key, args.manager)
if args.description:
vals["description"] = args.description
if args.stage:
stages = execute(uid, m, db, key, "project.project.stage", "search_read",
[["name", "ilike", args.stage]], fields=["id"])
if stages:
vals["stage_id"] = stages[0]["id"]
if args.privacy:
vals["privacy_visibility"] = args.privacy
if args.date_start:
vals["date_start"] = args.date_start
if args.date_end:
vals["date"] = args.date_end
if args.extra:
vals.update(json.loads(args.extra))
if not vals:
print("Error: No fields to update.", file=sys.stderr)
sys.exit(1)
ok = execute(uid, m, db, key, MODEL, "write", [args.id], vals)
output_json({"updated_id": args.id, "success": ok})
def cmd_log_note(args):
uid, m, db, key = connect()
log_note(uid, m, db, key, MODEL, args.id, args.body)
output_json({"logged_note": True, "project_id": args.id})
def cmd_notify(args):
uid, m, db, key = connect()
user_id = resolve_user(uid, m, db, key, args.user)
schedule_activity(uid, m, db, key, MODEL, args.id, user_id, args.summary, args.note or "")
output_json({"activity_scheduled": True, "project_id": args.id, "for_user": args.user})
def cmd_stages(args):
uid, m, db, key = connect()
recs = execute(uid, m, db, key, "project.project.stage", "search_read", [],
fields=["id", "name"], order="id asc")
output_json(recs)
def main():
p = argparse.ArgumentParser(description="VNClaw — Projects")
sub = p.add_subparsers(dest="action")
# list
s = sub.add_parser("list", help="List projects")
s.add_argument("--my", action="store_true", help="Projects I manage")
s.add_argument("--manager", help="Filter by manager NAME")
s.add_argument("--search", help="Search project name")
s.add_argument("--stage", help="Filter by stage NAME")
s.add_argument("--active-only", action="store_true", help="Active projects only")
s.add_argument("--favorites", action="store_true", help="Favorites only")
add_date_filter_args(s)
s.add_argument("--fields", help="JSON array of fields")
s.add_argument("--limit", type=int, default=50)
# get
s = sub.add_parser("get", help="Get project detail by ID")
s.add_argument("id", type=int)
s.add_argument("--fields", help="JSON array of fields")
# create
s = sub.add_parser("create", help="Create a project")
s.add_argument("--name", required=True, help="Project name")
s.add_argument("--manager", help="Manager NAME")
s.add_argument("--description")
s.add_argument("--privacy", help="portal, employees, followers")
s.add_argument("--allow-timesheets", action="store_true")
s.add_argument("--date-start", help="Start date (YYYY-MM-DD)")
s.add_argument("--date-end", help="End date (YYYY-MM-DD)")
s.add_argument("--extra", help="Additional fields as JSON")
# update
s = sub.add_parser("update", help="Update a project")
s.add_argument("id", type=int)
s.add_argument("--name")
s.add_argument("--manager", help="Manager NAME")
s.add_argument("--description")
s.add_argument("--stage", help="Stage NAME")
s.add_argument("--privacy", help="portal, employees, followers")
s.add_argument("--date-start")
s.add_argument("--date-end")
s.add_argument("--extra", help="Additional fields as JSON")
# log-note
s = sub.add_parser("log-note", help="Log internal note on project")
s.add_argument("id", type=int)
s.add_argument("--body", required=True, help="Note body (HTML or plain text)")
# notify
s = sub.add_parser("notify", help="Schedule activity notification")
s.add_argument("id", type=int)
s.add_argument("--user", required=True, help="Target user NAME")
s.add_argument("--summary", required=True, help="Activity summary")
s.add_argument("--note", help="Activity note")
# stages
sub.add_parser("stages", help="List project stages")
args = p.parse_args()
if not args.action:
p.print_help()
sys.exit(1)
{"list": cmd_list, "get": cmd_get, "create": cmd_create, "update": cmd_update,
"log-note": cmd_log_note, "notify": cmd_notify, "stages": cmd_stages}[args.action](args)
if __name__ == "__main__":
main()
FILE:scripts/tasks.py
#!/usr/bin/env python3
"""VNClaw — Tasks module (project.task)"""
import argparse, json, sys, os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from odoo_core import (connect, execute, output_json, resolve_user, resolve_project,
resolve_users_multi, add_date_filter_args, get_date_bounds,
log_note, schedule_activity, log)
MODEL = "project.task"
DEFAULT_FIELDS = ["id", "name", "project_id", "user_ids", "stage_id",
"date_deadline", "create_date", "priority", "tag_ids"]
# Map friendly --date-field values to actual Odoo field names
_DATE_FIELD_MAP = {
"deadline": "date_deadline",
"created": "create_date",
"updated": "write_date",
}
def cmd_list(args):
uid, m, db, key = connect()
domain = []
# --my: current user's tasks
if args.my:
domain.append(["user_ids", "in", [uid]])
# --user: resolve by name
if args.user:
user_id = resolve_user(uid, m, db, key, args.user)
domain.append(["user_ids", "in", [user_id]])
# --project / --project-id
if args.project:
pid = resolve_project(uid, m, db, key, args.project)
domain.append(["project_id", "=", pid])
elif args.project_id:
domain.append(["project_id", "=", args.project_id])
# --stage
if args.stage:
domain.append(["stage_id.name", "ilike", args.stage])
# --search
if args.search:
domain.append(["name", "ilike", args.search])
# --overdue
if args.overdue:
from datetime import date
domain.append(["date_deadline", "<", str(date.today())])
domain.append(["stage_id.is_closed", "=", False])
# --priority
if args.priority:
domain.append(["priority", "=", args.priority])
# --tag
if args.tag:
domain.append(["tag_ids.name", "ilike", args.tag])
# Date filters — field selected by --date-field (deadline / created / updated)
date_field = _DATE_FIELD_MAP.get(getattr(args, "date_field", "deadline") or "deadline", "date_deadline")
df, dt = get_date_bounds(args)
if df:
date_val_from = f"{df} 00:00:00" if date_field != "date_deadline" else df
domain.append([date_field, ">=", date_val_from])
if dt:
date_val_to = f"{dt} 23:59:59" if date_field != "date_deadline" else dt
domain.append([date_field, "<=", date_val_to])
if args.active_only:
domain.append(["active", "=", True])
fields = json.loads(args.fields) if args.fields else DEFAULT_FIELDS
records = execute(uid, m, db, key, MODEL, "search_read", domain,
fields=fields, limit=args.limit, offset=args.offset,
order=args.order or "priority desc, date_deadline asc")
output_json(records)
def cmd_get(args):
uid, m, db, key = connect()
fields = json.loads(args.fields) if args.fields else DEFAULT_FIELDS + [
"description", "child_ids", "parent_id", "activity_ids",
"allocated_hours", "effective_hours", "remaining_hours",
"timesheet_ids",
]
# Fetch available fields first to avoid crashing on missing optional fields
try:
available = set(execute(uid, m, db, key, MODEL, "fields_get", [],
attributes=["type"]).keys())
fields = [f for f in fields if f in available]
except Exception:
pass
records = execute(uid, m, db, key, MODEL, "read", [args.id], fields=fields)
output_json(records[0] if records else {"error": f"Task {args.id} not found"})
def cmd_create(args):
uid, m, db, key = connect()
# Resolve project by name or ID
if args.project:
project_id = resolve_project(uid, m, db, key, args.project)
else:
project_id = args.project_id
vals = {"name": args.name, "project_id": project_id}
# --assign: resolve user names
if args.assign:
user_ids = resolve_users_multi(uid, m, db, key, args.assign)
vals["user_ids"] = [[6, 0, user_ids]]
elif args.user_ids:
vals["user_ids"] = [[6, 0, json.loads(args.user_ids)]]
if args.deadline:
vals["date_deadline"] = args.deadline
if args.description:
vals["description"] = args.description
if args.priority:
vals["priority"] = args.priority
if args.planned_hours:
vals["allocated_hours"] = args.planned_hours
if args.parent_id:
vals["parent_id"] = args.parent_id
if args.extra:
vals.update(json.loads(args.extra))
rid = execute(uid, m, db, key, MODEL, "create", vals)
output_json({"created_id": rid, "name": args.name})
def cmd_update(args):
uid, m, db, key = connect()
vals = {}
if args.name:
vals["name"] = args.name
if args.stage_id:
vals["stage_id"] = args.stage_id
if args.stage:
# Resolve stage by name
stages = execute(uid, m, db, key, "project.task.type", "search_read",
[["name", "ilike", args.stage]], fields=["id", "name"], limit=1)
if stages:
vals["stage_id"] = stages[0]["id"]
else:
print(f"Error: No stage matching '{args.stage}'.", file=sys.stderr)
sys.exit(1)
if args.assign:
user_ids = resolve_users_multi(uid, m, db, key, args.assign)
vals["user_ids"] = [[6, 0, user_ids]]
elif args.user_ids:
vals["user_ids"] = [[6, 0, json.loads(args.user_ids)]]
if args.priority:
vals["priority"] = args.priority
if args.deadline:
vals["date_deadline"] = args.deadline
if args.description:
vals["description"] = args.description
if args.extra:
vals.update(json.loads(args.extra))
if not vals:
print("Error: No fields to update.", file=sys.stderr)
sys.exit(1)
ok = execute(uid, m, db, key, MODEL, "write", [args.id], vals)
output_json({"updated_id": args.id, "success": ok, "fields_updated": list(vals.keys())})
def cmd_log_note(args):
"""Post an internal note on a task."""
uid, m, db, key = connect()
msg_id = log_note(uid, m, db, key, MODEL, args.id, args.message)
output_json({"task_id": args.id, "message_id": msg_id, "status": "note posted"})
def cmd_notify(args):
"""Schedule an activity notification for a user on a task."""
uid, m, db, key = connect()
user_id = resolve_user(uid, m, db, key, args.user)
act_id = schedule_activity(uid, m, db, key, MODEL, args.id, user_id,
summary=args.summary, note=args.note or "",
date_deadline=args.deadline)
output_json({"task_id": args.id, "activity_id": act_id, "assigned_to": args.user, "summary": args.summary})
def cmd_stages(args):
"""List available task stages."""
uid, m, db, key = connect()
domain = []
if args.project_id:
domain.append(["project_ids", "in", [args.project_id]])
records = execute(uid, m, db, key, "project.task.type", "search_read", domain,
fields=["id", "name", "sequence", "fold"], limit=100, order="sequence asc")
output_json(records)
def main():
p = argparse.ArgumentParser(description="VNClaw — Task Management")
sub = p.add_subparsers(dest="action")
# list
s = sub.add_parser("list", help="List tasks")
s.add_argument("--my", action="store_true", help="My tasks (current user)")
s.add_argument("--user", help="Filter by user NAME (partial match)")
s.add_argument("--project", help="Filter by project NAME (partial match)")
s.add_argument("--project-id", type=int, help="Filter by project ID")
s.add_argument("--stage", help="Filter by stage name (partial match)")
s.add_argument("--search", help="Search task name (partial match)")
s.add_argument("--overdue", action="store_true", help="Only overdue tasks")
s.add_argument("--priority", choices=["0", "1"], help="Filter by priority")
s.add_argument("--tag", help="Filter by tag name")
add_date_filter_args(s)
s.add_argument("--date-field", choices=["deadline", "created", "updated"], default="deadline",
help="Which date to filter on: deadline (default), created, or updated")
s.add_argument("--active-only", action="store_true", default=True)
s.add_argument("--fields", help="JSON array of fields")
s.add_argument("--limit", type=int, default=50)
s.add_argument("--offset", type=int, default=0)
s.add_argument("--order", default="")
# get
s = sub.add_parser("get", help="Get task by ID")
s.add_argument("id", type=int)
s.add_argument("--fields", help="JSON array of fields")
# create
s = sub.add_parser("create", help="Create a task")
s.add_argument("--name", required=True, help="Task name")
s.add_argument("--project", help="Project NAME (resolved automatically)")
s.add_argument("--project-id", type=int, help="Project ID (alternative to --project)")
s.add_argument("--assign", help="Assignee names, comma-separated (e.g. 'Alice, Bob')")
s.add_argument("--user-ids", help="Assignee IDs as JSON array (alternative to --assign)")
s.add_argument("--deadline", help="Deadline (YYYY-MM-DD)")
s.add_argument("--description", help="Task description")
s.add_argument("--priority", choices=["0", "1"], help="0=Normal, 1=Important")
s.add_argument("--planned-hours", type=float, help="Planned/allocated hours")
s.add_argument("--parent-id", type=int, help="Parent task ID (sub-task)")
s.add_argument("--extra", help="Additional fields as JSON object")
# update
s = sub.add_parser("update", help="Update a task")
s.add_argument("id", type=int, help="Task ID")
s.add_argument("--name", help="New name")
s.add_argument("--stage-id", type=int, help="New stage ID")
s.add_argument("--stage", help="New stage NAME (resolved automatically)")
s.add_argument("--assign", help="New assignees by name, comma-separated")
s.add_argument("--user-ids", help="New assignee IDs as JSON array")
s.add_argument("--priority", choices=["0", "1"])
s.add_argument("--deadline", help="New deadline (YYYY-MM-DD)")
s.add_argument("--description")
s.add_argument("--extra", help="Additional fields as JSON object")
# log-note
s = sub.add_parser("log-note", help="Post an internal note on a task")
s.add_argument("id", type=int, help="Task ID")
s.add_argument("--message", required=True, help="Note content (plain text or HTML)")
# notify
s = sub.add_parser("notify", help="Schedule an activity notification on a task")
s.add_argument("id", type=int, help="Task ID")
s.add_argument("--user", required=True, help="User NAME to notify")
s.add_argument("--summary", required=True, help="Activity summary")
s.add_argument("--note", help="Detailed note")
s.add_argument("--deadline", help="Due date (YYYY-MM-DD, default: today)")
# stages
s = sub.add_parser("stages", help="List available task stages")
s.add_argument("--project-id", type=int, help="Filter stages by project ID")
args = p.parse_args()
if not args.action:
p.print_help()
sys.exit(1)
actions = {
"list": cmd_list, "get": cmd_get, "create": cmd_create, "update": cmd_update,
"log-note": cmd_log_note, "notify": cmd_notify, "stages": cmd_stages,
}
actions[args.action](args)
if __name__ == "__main__":
main()
FILE:scripts/time_off.py
#!/usr/bin/env python3
"""VNClaw — Time Off module (hr.leave)"""
import argparse, json, sys, os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from odoo_core import (connect, execute, output_json, resolve_user, resolve_employee,
add_date_filter_args, get_date_bounds, log)
MODEL = "hr.leave"
DEFAULT_FIELDS = ["id", "name", "employee_id", "holiday_status_id", "state",
"date_from", "date_to", "number_of_days", "user_id"]
def _resolve_leave_type(uid, m, db, key, name):
recs = execute(uid, m, db, key, "hr.leave.type", "search_read",
[["name", "ilike", name]], fields=["id", "name"])
if not recs:
print(f"Error: Leave type '{name}' not found.", file=sys.stderr)
sys.exit(1)
return recs[0]["id"]
def cmd_list(args):
uid, m, db, key = connect()
domain = []
if args.my:
domain.append(["user_id", "=", uid])
if args.user:
user_id = resolve_user(uid, m, db, key, args.user)
domain.append(["user_id", "=", user_id])
if args.employee:
emp_id = resolve_employee(uid, m, db, key, args.employee)
domain.append(["employee_id", "=", emp_id])
if args.state:
domain.append(["state", "=", args.state])
if args.leave_type:
lt_id = _resolve_leave_type(uid, m, db, key, args.leave_type)
domain.append(["holiday_status_id", "=", lt_id])
# Date filters on date_from
df, dt = get_date_bounds(args)
if df:
domain.append(["date_from", ">=", f"{df} 00:00:00"])
if dt:
domain.append(["date_from", "<=", f"{dt} 23:59:59"])
fields = json.loads(args.fields) if args.fields else DEFAULT_FIELDS
records = execute(uid, m, db, key, MODEL, "search_read", domain,
fields=fields, limit=args.limit, order="date_from desc")
output_json(records)
def cmd_get(args):
uid, m, db, key = connect()
fields = json.loads(args.fields) if args.fields else DEFAULT_FIELDS + [
"notes", "department_id", "category_id", "payslip_status"
]
records = execute(uid, m, db, key, MODEL, "read", [args.id], fields=fields)
output_json(records[0] if records else {"error": f"Leave {args.id} not found"})
def cmd_create(args):
uid, m, db, key = connect()
vals = {
"date_from": args.date_from,
"date_to": args.date_to,
}
if args.name:
vals["name"] = args.name
if args.employee:
vals["employee_id"] = resolve_employee(uid, m, db, key, args.employee)
if args.leave_type:
vals["holiday_status_id"] = _resolve_leave_type(uid, m, db, key, args.leave_type)
if args.extra:
vals.update(json.loads(args.extra))
rid = execute(uid, m, db, key, MODEL, "create", vals)
output_json({"created_id": rid, "date_from": args.date_from, "date_to": args.date_to})
def cmd_update(args):
uid, m, db, key = connect()
vals = {}
if args.name:
vals["name"] = args.name
if args.date_from:
vals["date_from"] = args.date_from
if args.date_to:
vals["date_to"] = args.date_to
if args.leave_type:
vals["holiday_status_id"] = _resolve_leave_type(uid, m, db, key, args.leave_type)
if args.extra:
vals.update(json.loads(args.extra))
if not vals:
print("Error: No fields to update.", file=sys.stderr)
sys.exit(1)
ok = execute(uid, m, db, key, MODEL, "write", [args.id], vals)
output_json({"updated_id": args.id, "success": ok})
def cmd_leave_types(args):
uid, m, db, key = connect()
recs = execute(uid, m, db, key, "hr.leave.type", "search_read", [],
fields=["id", "name", "requires_allocation", "leave_validation_type"],
order="name asc")
output_json(recs)
def main():
p = argparse.ArgumentParser(description="VNClaw — Time Off (Leave Requests)")
sub = p.add_subparsers(dest="action")
# list
s = sub.add_parser("list", help="List leave requests")
s.add_argument("--my", action="store_true", help="My leave requests only")
s.add_argument("--user", help="Filter by user NAME")
s.add_argument("--employee", help="Filter by employee NAME")
s.add_argument("--state", help="confirm, validate, refuse, draft, cancel")
s.add_argument("--leave-type", help="Leave type NAME (e.g. 'Sick Time Off')")
add_date_filter_args(s)
s.add_argument("--fields", help="JSON array of fields")
s.add_argument("--limit", type=int, default=50)
# get
s = sub.add_parser("get", help="Get leave request detail by ID")
s.add_argument("id", type=int)
s.add_argument("--fields", help="JSON array of fields")
# create
s = sub.add_parser("create", help="Create a leave request")
s.add_argument("--name", help="Description/reason")
s.add_argument("--date-from", required=True, help="Start (YYYY-MM-DD HH:MM:SS)")
s.add_argument("--date-to", required=True, help="End (YYYY-MM-DD HH:MM:SS)")
s.add_argument("--employee", help="Employee NAME (defaults to current user's employee)")
s.add_argument("--leave-type", help="Leave type NAME")
s.add_argument("--extra", help="Additional fields as JSON")
# update
s = sub.add_parser("update", help="Update a leave request (only in draft state)")
s.add_argument("id", type=int)
s.add_argument("--name")
s.add_argument("--date-from")
s.add_argument("--date-to")
s.add_argument("--leave-type", help="Leave type NAME")
s.add_argument("--extra", help="Additional fields as JSON")
# leave-types
sub.add_parser("leave-types", help="List available leave types")
args = p.parse_args()
if not args.action:
p.print_help()
sys.exit(1)
{"list": cmd_list, "get": cmd_get, "create": cmd_create, "update": cmd_update,
"leave-types": cmd_leave_types}[args.action](args)
if __name__ == "__main__":
main()
FILE:scripts/timesheets.py
#!/usr/bin/env python3
"""VNClaw — Timesheets module (account.analytic.line)"""
import argparse, json, sys, os
from datetime import date, timedelta
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from odoo_core import (connect, execute, output_json, resolve_user, resolve_employee,
resolve_project, add_date_filter_args, get_date_bounds, log)
MODEL = "account.analytic.line"
DEFAULT_FIELDS = ["id", "employee_id", "project_id", "task_id", "name", "date", "unit_amount"]
def cmd_list(args):
uid, m, db, key = connect()
domain = [["project_id", "!=", False]]
# --my
if args.my:
domain.append(["user_id", "=", uid])
# --user by name
if args.user:
user_id = resolve_user(uid, m, db, key, args.user)
domain.append(["user_id", "=", user_id])
# --employee by name
if args.employee:
emp_id = resolve_employee(uid, m, db, key, args.employee)
domain.append(["employee_id", "=", emp_id])
elif args.employee_id:
domain.append(["employee_id", "=", args.employee_id])
# --project by name / --project-id
if args.project:
pid = resolve_project(uid, m, db, key, args.project)
domain.append(["project_id", "=", pid])
elif args.project_id:
domain.append(["project_id", "=", args.project_id])
if args.task_id:
domain.append(["task_id", "=", args.task_id])
# Date filters
df, dt = get_date_bounds(args)
if df:
domain.append(["date", ">=", df])
if dt:
domain.append(["date", "<=", dt])
fields = json.loads(args.fields) if args.fields else DEFAULT_FIELDS
records = execute(uid, m, db, key, MODEL, "search_read", domain,
fields=fields, limit=args.limit, offset=args.offset, order="date desc")
output_json(records)
def cmd_summary(args):
"""Show total hours grouped by project (and optionally task)."""
uid, m, db, key = connect()
domain = [["project_id", "!=", False]]
if args.my:
domain.append(["user_id", "=", uid])
if args.user:
user_id = resolve_user(uid, m, db, key, args.user)
domain.append(["user_id", "=", user_id])
if args.project:
pid = resolve_project(uid, m, db, key, args.project)
domain.append(["project_id", "=", pid])
elif args.project_id:
domain.append(["project_id", "=", args.project_id])
df, dt = get_date_bounds(args)
if df:
domain.append(["date", ">=", df])
if dt:
domain.append(["date", "<=", dt])
records = execute(uid, m, db, key, MODEL, "search_read", domain,
fields=["project_id", "task_id", "unit_amount"], limit=500)
# Aggregate
by_project = {}
for r in records:
proj_name = r["project_id"][1] if r["project_id"] else "No Project"
task_name = r["task_id"][1] if r["task_id"] else "No Task"
by_project.setdefault(proj_name, {"total": 0, "tasks": {}})
by_project[proj_name]["total"] += r["unit_amount"]
by_project[proj_name]["tasks"].setdefault(task_name, 0)
by_project[proj_name]["tasks"][task_name] += r["unit_amount"]
grand_total = sum(p["total"] for p in by_project.values())
output_json({"projects": by_project, "grand_total_hours": round(grand_total, 2)})
def cmd_log(args):
uid, m, db, key = connect()
# Resolve project
if args.project:
project_id = resolve_project(uid, m, db, key, args.project)
else:
project_id = args.project_id
vals = {
"project_id": project_id,
"name": args.description,
"date": args.date or str(date.today()),
"unit_amount": args.hours,
}
if args.task_id:
vals["task_id"] = args.task_id
if args.extra:
vals.update(json.loads(args.extra))
rid = execute(uid, m, db, key, MODEL, "create", vals)
output_json({"created_id": rid, "hours": args.hours, "date": vals["date"]})
def cmd_update(args):
uid, m, db, key = connect()
vals = {}
if args.hours is not None:
vals["unit_amount"] = args.hours
if args.description:
vals["name"] = args.description
if args.date:
vals["date"] = args.date
if args.extra:
vals.update(json.loads(args.extra))
if not vals:
print("Error: No fields to update.", file=sys.stderr)
sys.exit(1)
ok = execute(uid, m, db, key, MODEL, "write", [args.id], vals)
output_json({"updated_id": args.id, "success": ok})
def main():
p = argparse.ArgumentParser(description="VNClaw — Timesheet Management")
sub = p.add_subparsers(dest="action")
# list
s = sub.add_parser("list", help="List timesheet entries")
s.add_argument("--my", action="store_true", help="My timesheets only")
s.add_argument("--user", help="Filter by user NAME")
s.add_argument("--employee", help="Filter by employee NAME")
s.add_argument("--employee-id", type=int)
s.add_argument("--project", help="Filter by project NAME")
s.add_argument("--project-id", type=int)
s.add_argument("--task-id", type=int)
add_date_filter_args(s)
s.add_argument("--fields", help="JSON array of fields")
s.add_argument("--limit", type=int, default=100)
s.add_argument("--offset", type=int, default=0)
# summary
s = sub.add_parser("summary", help="Show hours summary grouped by project/task")
s.add_argument("--my", action="store_true", help="My timesheets only")
s.add_argument("--user", help="Filter by user NAME")
s.add_argument("--project", help="Filter by project NAME")
s.add_argument("--project-id", type=int)
add_date_filter_args(s)
# log
s = sub.add_parser("log", help="Log a timesheet entry")
s.add_argument("--project", help="Project NAME (resolved automatically)")
s.add_argument("--project-id", type=int, help="Project ID (alternative to --project)")
s.add_argument("--task-id", type=int, help="Task ID (optional)")
s.add_argument("--description", required=True, help="Work description")
s.add_argument("--hours", type=float, required=True, help="Hours spent")
s.add_argument("--date", help="Date (YYYY-MM-DD, default: today)")
s.add_argument("--extra", help="Additional fields as JSON")
# update
s = sub.add_parser("update", help="Update a timesheet entry")
s.add_argument("id", type=int)
s.add_argument("--hours", type=float)
s.add_argument("--description")
s.add_argument("--date")
s.add_argument("--extra", help="Additional fields as JSON")
args = p.parse_args()
if not args.action:
p.print_help()
sys.exit(1)
{"list": cmd_list, "summary": cmd_summary, "log": cmd_log, "update": cmd_update}[args.action](args)
if __name__ == "__main__":
main()