@clawhub-chipmunkrpa-c0d8c72a64
Perform bank reconciliation between bank statements and general ledger files. Supports bank statement PDF ingestion, conversion of PDF statements into struct...
---
name: bank-recon
description: Perform bank reconciliation between bank statements and general ledger files. Supports bank statement PDF ingestion, conversion of PDF statements into structured Excel data, custom amount thresholds, ID/key matching, and semantic description matching. Use when the user wants to read a bank statement PDF or Excel file, convert statement activity into a workbook, reconcile bank activity to GL transactions, identify matched and unmatched items, and generate an Excel workbook with reconciliation results, a summary tab, and separate unreconciled-bank and unreconciled-GL tabs.
---
# Bank Reconciliation Skill
Reconcile bank statement rows against GL rows and produce an `.xlsx` workbook that is immediately reviewable by an accountant.
## Workflow
1. Identify the bank statement path and GL workbook path.
2. Accept either a bank statement `.xlsx` file or a bank statement `.pdf` file.
3. If the bank statement is a PDF, run the workflow so it first extracts the bank statement lines into a structured workbook, then reconciles that extracted workbook to the GL.
4. Confirm the reconciliation threshold. Default to `0.00` unless the user asks for a tolerance.
5. Run `scripts/recon_logic.py` with the bank file, GL file, output file, and threshold.
6. Return the generated workbook and summarize:
- matched bank row count
- matched GL row count
- unreconciled bank row count
- unreconciled GL row count
5. If the user asks for follow-up analysis, use the `Summary`, `Unreconciled Bank`, and `Unreconciled GL` tabs first.
## Output Workbook
The generated workbook should contain these tabs:
- `Summary`: threshold, matched counts, unreconciled counts, and basic totals
- `Recon Results`: matched groupings with match basis and variance notes
- `Unreconciled Bank`: bank rows not matched to the GL
- `Unreconciled GL`: GL rows not matched to the bank
## Command
```bash
python3 scripts/recon_logic.py <bank_xlsx_or_pdf> <gl_xlsx> <output_xlsx> [threshold]
```
When the bank input is a PDF, the script also creates a companion extracted workbook beside the PDF (same basename with `_extracted.xlsx`) before running reconciliation.
## Matching Logic
Use a layered approach:
1. Preserve the original signs from both source files in the output.
2. Compare bank and GL amounts using absolute values for matching so bank polarity and accounting debit/credit polarity can reconcile without rewriting displayed source amounts.
3. Match by shared extracted keys such as batch IDs, invoice IDs, vendor IDs, customer IDs, and tax/payment references.
4. Allow one-to-one, one-to-many, many-to-one, and grouped many-to-many matches when totals fall within threshold.
5. For remaining items, use semantic name grouping plus summed-amount comparison.
6. Preserve unmatched rows in dedicated tabs instead of dropping them from the deliverable.
## Notes
- Read the first worksheet from each input workbook.
- Expect simple three-column inputs: date, amount, description/memo.
- For text-based bank statement PDFs, the script extracts transaction rows by reading the PDF content streams and reconstructing the transaction table into a workbook.
- The PDF path is best for digital statements with selectable text; scanned-image PDFs would still need OCR or a multimodal extraction path.
- Keep the workbook generation dependency-light so it can run in minimal Python environments.
FILE:scripts/recon_logic.py
import sys
import zipfile
import xml.etree.ElementTree as ET
from decimal import Decimal
import re
from collections import defaultdict
from xml.sax.saxutils import escape
import base64
import zlib
from pathlib import Path
NS_MAIN = 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'
NS_REL = 'http://schemas.openxmlformats.org/officeDocument/2006/relationships'
def parse_xlsx(path):
ns = {'a': NS_MAIN}
with zipfile.ZipFile(path) as z:
shared_strings = []
if 'xl/sharedStrings.xml' in z.namelist():
ss_root = ET.fromstring(z.read('xl/sharedStrings.xml'))
for si in ss_root.findall('a:si', ns):
shared_strings.append(''.join(tn.text or '' for tn in si.findall('.//a:t', ns)))
root = ET.fromstring(z.read('xl/worksheets/sheet1.xml'))
rows = []
for row in root.findall('.//a:row', ns):
vals = []
for c in row.findall('a:c', ns):
t = c.get('t')
if t == 'inlineStr':
vals.append(''.join(tn.text or '' for tn in c.findall('.//a:t', ns)))
else:
vn = c.find('a:v', ns)
if t == 's' and vn is not None and vn.text is not None:
vals.append(shared_strings[int(vn.text)])
else:
vals.append(vn.text if vn is not None else '')
rows.append(vals)
return rows
def decode_pdf_streams(path):
pdf = Path(path).read_bytes()
decoded = []
pattern = re.compile(rb'(\d+) 0 obj\s*<<.*?/Filter \[ /ASCII85Decode /FlateDecode \] /Length \d+\s*>>\s*stream\n(.*?)~>endstream', re.S)
for m in pattern.finditer(pdf):
data = m.group(2) + b'~>'
try:
dec = base64.a85decode(data, adobe=True)
out = zlib.decompress(dec).decode('latin1', 'ignore')
decoded.append(out)
except Exception:
continue
return decoded
def unescape_pdf_text(text):
return text.replace(r'\(', '(').replace(r'\)', ')').replace(r'\\', '\\')
def parse_bank_pdf(path):
streams = decode_pdf_streams(path)
tokens = []
for s in streams:
tokens.extend(unescape_pdf_text(t) for t in re.findall(r'\((.*?)\)\s*Tj', s))
rows = []
in_table = False
i = 0
amount_pat = re.compile(r'^-?\$[\d,]+\.\d{2}$')
date_pat = re.compile(r'^\d{2}/\d{2}$')
while i < len(tokens):
tok = tokens[i].strip()
if tok == 'Date' and i + 3 < len(tokens) and tokens[i+1].strip() == 'Description' and tokens[i+2].strip() == 'Amount' and tokens[i+3].strip() == 'Balance':
in_table = True
i += 4
continue
if in_table and date_pat.match(tok):
tx_date = tok
i += 1
desc_parts = []
while i < len(tokens) and not amount_pat.match(tokens[i].strip()):
nxt = tokens[i].strip()
if nxt in {'Date', 'Description', 'Amount', 'Balance', 'Important Account Information'}:
break
desc_parts.append(nxt)
i += 1
if i + 1 >= len(tokens) or not amount_pat.match(tokens[i].strip()):
continue
amt_text = tokens[i].strip()
bal_text = tokens[i+1].strip() if i + 1 < len(tokens) else ''
if not amount_pat.match(bal_text):
i += 1
continue
rows.append([tx_date, amt_text.replace('$', '').replace(',', ''), ' '.join(p for p in desc_parts if p), bal_text.replace('$', '').replace(',', '')])
i += 2
continue
i += 1
return rows
def col_letter(n):
res = ''
while n > 0:
n, r = divmod(n - 1, 26)
res = chr(65 + r) + res
return res
def cell_xml(ref, val):
if val is None or val == '':
return f'<c r="{ref}"/>'
if isinstance(val, Decimal):
return f'<c r="{ref}"><v>{val}</v></c>'
if isinstance(val, (int, float)):
return f'<c r="{ref}"><v>{val}</v></c>'
text = str(val)
if re.fullmatch(r'-?\d+(\.\d+)?', text):
return f'<c r="{ref}"><v>{text}</v></c>'
return f'<c r="{ref}" t="inlineStr"><is><t>{escape(text)}</t></is></c>'
def build_sheet(rows):
max_cols = max((len(r) for r in rows), default=1)
dim = f'A1:{col_letter(max_cols)}{max(len(rows), 1)}'
sheet_data = []
for r_idx, row in enumerate(rows, 1):
cells = [cell_xml(f"{col_letter(c_idx)}{r_idx}", v) for c_idx, v in enumerate(row, 1)]
sheet_data.append(f'<row r="{r_idx}">{"".join(cells)}</row>')
return (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
f'<worksheet xmlns="{NS_MAIN}" xmlns:r="{NS_REL}">'
f'<dimension ref="{dim}"/>'
'<sheetViews><sheetView workbookViewId="0"/></sheetViews>'
'<sheetFormatPr defaultRowHeight="15"/>'
f'<sheetData>{"".join(sheet_data)}</sheetData>'
'</worksheet>'
)
def write_xlsx(path, sheets):
workbook_sheets = []
workbook_rels = []
content_types = [
'<Default Extension="rels" ContentType="application/vnd.openxmlformats-package.relationships+xml"/>',
'<Default Extension="xml" ContentType="application/xml"/>',
'<Override PartName="/xl/workbook.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet.main+xml"/>',
'<Override PartName="/xl/styles.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.styles+xml"/>'
]
package = {
'_rels/.rels': (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">'
'<Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/officeDocument" Target="xl/workbook.xml"/>'
'</Relationships>'
),
'xl/styles.xml': (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<styleSheet xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main">'
'<fonts count="1"><font><sz val="11"/><name val="Aptos"/></font></fonts>'
'<fills count="2"><fill><patternFill patternType="none"/></fill><fill><patternFill patternType="gray125"/></fill></fills>'
'<borders count="1"><border><left/><right/><top/><bottom/><diagonal/></border></borders>'
'<cellStyleXfs count="1"><xf numFmtId="0" fontId="0" fillId="0" borderId="0"/></cellStyleXfs>'
'<cellXfs count="1"><xf numFmtId="0" fontId="0" fillId="0" borderId="0" xfId="0"/></cellXfs>'
'<cellStyles count="1"><cellStyle name="Normal" xfId="0" builtinId="0"/></cellStyles>'
'</styleSheet>'
),
}
for idx, (sheet_name, rows) in enumerate(sheets, 1):
workbook_sheets.append(f'<sheet name="{escape(sheet_name)}" sheetId="{idx}" r:id="rId{idx}"/>')
workbook_rels.append(
f'<Relationship Id="rId{idx}" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/worksheet" Target="worksheets/sheet{idx}.xml"/>'
)
content_types.append(
f'<Override PartName="/xl/worksheets/sheet{idx}.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.worksheet+xml"/>'
)
package[f'xl/worksheets/sheet{idx}.xml'] = build_sheet(rows)
workbook_rel_style_id = len(sheets) + 1
workbook_rels.append(
f'<Relationship Id="rId{workbook_rel_style_id}" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/styles" Target="styles.xml"/>'
)
package['[Content_Types].xml'] = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<Types xmlns="http://schemas.openxmlformats.org/package/2006/content-types">'
+ ''.join(content_types) +
'</Types>'
)
package['xl/workbook.xml'] = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
f'<workbook xmlns="{NS_MAIN}" xmlns:r="{NS_REL}"><sheets>'
+ ''.join(workbook_sheets) +
'</sheets></workbook>'
)
package['xl/_rels/workbook.xml.rels'] = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">'
+ ''.join(workbook_rels) +
'</Relationships>'
)
with zipfile.ZipFile(path, 'w', compression=zipfile.ZIP_DEFLATED) as z:
for file_name, content in package.items():
z.writestr(file_name, content)
def extract_keys(text):
clean = (
text.upper()
.replace('LEASE ', 'LS')
.replace('PAYRUN ', 'PR')
.replace('AUTOPAY ', 'AP')
.replace('VEND-', 'VEND')
.replace('CUST-', 'CUST')
)
return set(re.findall(r'[A-Z]{2,4}[- ]?\d+', clean))
def load_bank_rows(bank_path):
bank_path = str(bank_path)
if bank_path.lower().endswith('.pdf'):
parsed = parse_bank_pdf(bank_path)
extracted_xlsx = str(Path(bank_path).with_suffix('')) + '_extracted.xlsx'
write_xlsx(extracted_xlsx, [('Bank Statement Extracted', [['date', 'transaction amount', 'description', 'balance']] + parsed)])
return [['date', 'transaction amount', 'description']] + [r[:3] for r in parsed], extracted_xlsx
return parse_xlsx(bank_path), None
def recon(bank_path, gl_path, output_path, threshold=Decimal('0.00')):
bank_raw, extracted_xlsx = load_bank_rows(bank_path)
gl_raw = parse_xlsx(gl_path)
bank_data = [
{
'id': i,
'date': r[0],
'amount': Decimal(r[1]),
'match_amount': abs(Decimal(r[1])),
'desc': r[2],
'keys': extract_keys(r[2])
}
for i, r in enumerate(bank_raw[1:]) if len(r) >= 3 and r[1] not in ('', None)
]
gl_data = [
{
'id': i,
'date': r[0],
'amount': Decimal(r[1]),
'match_amount': abs(Decimal(r[1])),
'memo': r[2],
'keys': extract_keys(r[2])
}
for i, r in enumerate(gl_raw[1:]) if len(r) >= 3 and r[1] not in ('', None)
]
matched_bank_ids = set()
matched_gl_ids = set()
results = []
all_keys = set()
for b in bank_data:
all_keys.update(b['keys'])
for g in gl_data:
all_keys.update(g['keys'])
key_to_bank = defaultdict(list)
for b in bank_data:
for k in b['keys']:
key_to_bank[k].append(b)
key_to_gl = defaultdict(list)
for g in gl_data:
for k in g['keys']:
key_to_gl[k].append(g)
sorted_keys = sorted(all_keys, key=lambda x: (not x.startswith('BAT'), x))
for k in sorted_keys:
b_group = [b for b in key_to_bank[k] if b['id'] not in matched_bank_ids]
g_group = [g for g in key_to_gl[k] if g['id'] not in matched_gl_ids]
if not b_group or not g_group:
continue
b_sum = sum(b['match_amount'] for b in b_group)
g_sum = sum(g['match_amount'] for g in g_group)
diff = abs(b_sum - g_sum)
if diff <= threshold:
for b in b_group:
matched_bank_ids.add(b['id'])
for g in g_group:
matched_gl_ids.add(g['id'])
basis = f'Group Match ({k})'
if len(b_group) == 1 and len(g_group) == 1:
basis = '1:1 Match'
elif len(b_group) == 1:
basis = f'1:{len(g_group)} Match'
elif len(g_group) == 1:
basis = f'{len(b_group)}:1 Match'
max_len = max(len(b_group), len(g_group))
for i in range(max_len):
b = b_group[i] if i < len(b_group) else {'date': '', 'amount': '', 'desc': ''}
g = g_group[i] if i < len(g_group) else {'date': '', 'amount': '', 'memo': ''}
results.append([
b['date'], b['amount'], b['desc'],
g['date'], g['amount'], g['memo'],
basis if i == 0 else '',
f'Diff: {diff}' if i == 0 else ''
])
remaining_b = [b for b in bank_data if b['id'] not in matched_bank_ids]
remaining_g = [g for g in gl_data if g['id'] not in matched_gl_ids]
def get_name(text):
text = text.upper()
m = re.search(r'(FEDERAL TAX|GREENLEAF FOODS|METRO PROPERTIES|FRESHBEAN COFFEE|REDSTONE LABS|DELTA PAYROLL|NORTHWIND STORES|HARBOR MEDICAL|RIVERTON SCHOOL|APEX LOGISTICS|STATE TAX AGENCY|OAK PINE INTERIORS|ATLAS INDUSTRIAL|IRONGATE EQUIPMENT|CLEARVIEW DENTAL|SUMMIT DESIGN STUDIO|ZENITH TELECOM|LIBERTY INSURANCE GROUP|PIONEER PACKAGING|OFFICEHUB SUPPLY|BRIGHTLINE MARKETING|CLOUDTRAIL SOFTWARE)', text)
return m.group(1) if m else None
b_by_name = defaultdict(list)
for b in remaining_b:
name = get_name(b['desc'])
if name:
b_by_name[name].append(b)
g_by_name = defaultdict(list)
for g in remaining_g:
name = get_name(g['memo'])
if name:
g_by_name[name].append(g)
for name in b_by_name:
b_list = [b for b in b_by_name[name] if b['id'] not in matched_bank_ids]
g_list = [g for g in g_by_name[name] if g['id'] not in matched_gl_ids]
if not b_list or not g_list:
continue
diff = abs(sum(b['match_amount'] for b in b_list) - sum(g['match_amount'] for g in g_list))
if diff <= threshold:
for b in b_list:
matched_bank_ids.add(b['id'])
for g in g_list:
matched_gl_ids.add(g['id'])
basis = f'AI Semantic Match: {name}'
max_len = max(len(b_list), len(g_list))
for i in range(max_len):
b = b_list[i] if i < len(b_list) else {'date': '', 'amount': '', 'desc': ''}
g = g_list[i] if i < len(g_list) else {'date': '', 'amount': '', 'memo': ''}
results.append([
b['date'], b['amount'], b['desc'],
g['date'], g['amount'], g['memo'],
basis if i == 0 else '',
f'Diff: {diff}' if i == 0 else ''
])
unreconciled_bank_rows = [
[b['date'], b['amount'], b['desc']]
for b in bank_data if b['id'] not in matched_bank_ids
]
unreconciled_gl_rows = [
[g['date'], g['amount'], g['memo']]
for g in gl_data if g['id'] not in matched_gl_ids
]
matched_bank_total = sum(b['amount'] for b in bank_data if b['id'] in matched_bank_ids)
matched_gl_total = sum(g['amount'] for g in gl_data if g['id'] in matched_gl_ids)
unreconciled_bank_total = sum(b['amount'] for b in bank_data if b['id'] not in matched_bank_ids)
unreconciled_gl_total = sum(g['amount'] for g in gl_data if g['id'] not in matched_gl_ids)
summary_rows = [
['Metric', 'Value'],
['Threshold', str(threshold)],
['Matched Bank Rows', len(matched_bank_ids)],
['Matched GL Rows', len(matched_gl_ids)],
['Unreconciled Bank Rows', len(unreconciled_bank_rows)],
['Unreconciled GL Rows', len(unreconciled_gl_rows)],
['Matched Bank Total', matched_bank_total],
['Matched GL Total', matched_gl_total],
['Matched Total Difference', abs(matched_bank_total - matched_gl_total)],
['Unreconciled Bank Total', unreconciled_bank_total],
['Unreconciled GL Total', unreconciled_gl_total],
['Extracted Bank Workbook', extracted_xlsx or 'n/a'],
]
out_headers = ['Bank Date', 'Bank Amount', 'Bank Desc', 'GL Date', 'GL Amount', 'GL Memo', 'Match Basis', 'Notes']
write_xlsx(output_path, [
('Summary', summary_rows),
('Recon Results', [out_headers] + results),
('Unreconciled Bank', [['date', 'transaction amount', 'description']] + unreconciled_bank_rows),
('Unreconciled GL', [['date', 'amount', 'G/L memo']] + unreconciled_gl_rows),
])
return len(matched_bank_ids), len(matched_gl_ids), len(unreconciled_bank_rows), len(unreconciled_gl_rows), extracted_xlsx
if __name__ == '__main__':
b_p, g_p, o_p = sys.argv[1:4]
t = Decimal(sys.argv[4]) if len(sys.argv) > 4 else Decimal('0.00')
b_count, g_count, ub_count, ug_count, extracted_xlsx = recon(b_p, g_p, o_p, t)
extra = f' Extracted bank workbook: {extracted_xlsx}.' if extracted_xlsx else ''
print(
f'Recon complete. Matched {b_count} bank and {g_count} GL records. '
f'Unreconciled: {ub_count} bank, {ug_count} GL. Output: {o_p}.{extra}'
)
Finance, accounting, and investment research automation via the FinResearchClaw repo. Use when asked to run autonomous finance research workflows such as eve...
---
name: finresearchclaw
description: "Finance, accounting, and investment research automation via the FinResearchClaw repo. Use when asked to run autonomous finance research workflows such as event studies, factor-model research, accounting regressions, forecast-error analysis, valuation research, or investment research pipelines. Prefer execution in this order: (1) Codex/ACP Codex when available, (2) Claude Code/ACP Claude, (3) direct API/config mode only as a fallback."
---
# FinResearchClaw
Use this skill to operate the FinResearchClaw repo as a finance-research engine.
## Execution preference
Always prefer these execution modes in order:
1. **Codex first**
- Prefer Codex / ACP Codex for repo-driven execution, code edits, and iterative finance research runs.
2. **Claude Code second**
- Use Claude Code if Codex is unavailable or the user explicitly asks for Claude.
3. **API mode last**
- Use direct `researchclaw` CLI / config / API-style execution only as fallback when coding-agent execution is unavailable or unsuitable.
Do not choose API mode first when Codex or Claude Code is available.
## Core workflow
1. Ensure the FinResearchClaw repo exists locally.
- Default repo path: `~/.openclaw/workspace/AutoResearchClaw`
- GitHub repo: `https://github.com/ChipmunkRPA/FinResearchClaw`
2. Select the closest finance workflow:
- event study
- factor model
- accounting forecast error
- accounting panel regression
- valuation / investment research
3. Prefer a coding-agent run path first.
4. Fall back to direct CLI/config mode only if coding-agent paths are unavailable.
5. Use example configs and starter plans when they fit.
## Local helper scripts
Use these scripts when helpful:
- `scripts/install_or_update.sh` — clone or update the repo locally
- `scripts/choose_runner.sh` — print preferred execution order and basic availability
- `scripts/run_finance_example.sh` — launch a chosen example in direct CLI mode
When the skill is installed from ClawHub, executable bits on bundled shell scripts may not be preserved on every system. If a direct script call fails with `permission denied`, run it with `bash`, for example:
```bash
bash scripts/choose_runner.sh
bash scripts/install_or_update.sh
```
## Repo paths and examples
If needed, inspect:
- `references/examples.md`
It documents the main example configs, starter experiment plans, and preferred mode selection.
FILE:_meta.json
{
"ownerId": "kn7b09fxn5sf58jmrh56qdddwh819rrs",
"slug": "finresearchclaw",
"version": "0.1.1",
"publishedAt": 1773955685252
}
FILE:references/examples.md
# FinResearchClaw Examples
## Preferred execution order
1. Codex
2. Claude Code
3. Direct API / CLI mode
## Default repo
- Local path: `~/.openclaw/workspace/AutoResearchClaw`
- Remote: `https://github.com/ChipmunkRPA/FinResearchClaw`
## Example configs
- `examples/finance_event_study.config.yaml`
- `examples/accounting_forecast_error.config.yaml`
## Example experiment plans
- `examples/finance_event_study.exp_plan.yaml`
- `examples/factor_model_starter.exp_plan.yaml`
## Suggested workflow mapping
### Event study
- Config: `examples/finance_event_study.config.yaml`
- Plan: `examples/finance_event_study.exp_plan.yaml`
### Accounting forecast error / accrual quality
- Config: `examples/accounting_forecast_error.config.yaml`
### Factor model / alpha research
- Plan: `examples/factor_model_starter.exp_plan.yaml`
## Direct CLI fallback
If no coding-agent route is available:
```bash
cd ~/.openclaw/workspace/AutoResearchClaw
python3 -m venv .venv && source .venv/bin/activate
pip install -e .
researchclaw run --config examples/finance_event_study.config.yaml --auto-approve
```
## Notes
- Prefer Codex for iterative repo work and experiment refinement.
- Prefer Claude Code only if Codex is unavailable or explicitly requested.
- Use direct CLI mode as the last resort.
- The install/update helper fetches remotes but does not auto-merge existing working clones.
- If an installed script is not executable after ClawHub install, invoke it as `bash scripts/<name>.sh`.
FILE:scripts/choose_runner.sh
#!/usr/bin/env bash
set -euo pipefail
has() { command -v "$1" >/dev/null 2>&1; }
if has codex; then
echo "preferred_runner=codex"
exit 0
fi
if has claude; then
echo "preferred_runner=claude"
exit 0
fi
echo "preferred_runner=api"
FILE:scripts/install_or_update.sh
#!/usr/bin/env bash
set -euo pipefail
REPO_DIR="-$HOME/.openclaw/workspace/AutoResearchClaw"
REPO_URL="https://github.com/ChipmunkRPA/FinResearchClaw.git"
mkdir -p "$(dirname "$REPO_DIR")"
if [ -d "$REPO_DIR/.git" ]; then
echo "Updating existing repo metadata at $REPO_DIR"
git -C "$REPO_DIR" remote get-url origin >/dev/null 2>&1 || true
git -C "$REPO_DIR" fetch --all --prune
CURRENT_BRANCH="$(git -C "$REPO_DIR" branch --show-current || true)"
echo "Current branch: -unknown"
echo "Fetched latest refs. Review/merge manually if you do not want automatic fast-forwards."
else
echo "Cloning FinResearchClaw into $REPO_DIR"
git clone "$REPO_URL" "$REPO_DIR"
fi
echo "Repo ready: $REPO_DIR"
FILE:scripts/run_finance_example.sh
#!/usr/bin/env bash
set -euo pipefail
REPO_DIR="-$HOME/.openclaw/workspace/AutoResearchClaw"
EXAMPLE_NAME="-finance_event_study"
case "$EXAMPLE_NAME" in
finance_event_study)
CONFIG_PATH="examples/finance_event_study.config.yaml"
;;
accounting_forecast_error)
CONFIG_PATH="examples/accounting_forecast_error.config.yaml"
;;
*)
echo "Unknown example: $EXAMPLE_NAME" >&2
exit 1
;;
esac
cd "$REPO_DIR"
python3 -m venv .venv >/dev/null 2>&1 || true
source .venv/bin/activate
python -m pip install -e .
researchclaw run --config "$CONFIG_PATH" --auto-approve
Review and redline DOCX contracts paragraph by paragraph with tracked changes, clause-level risk analysis, and draft comment responses. Use when a user wants...
---
name: redline
description: Review and redline DOCX contracts paragraph by paragraph with tracked changes, clause-level risk analysis, and draft comment responses. Use when a user wants contract revisions that are specific to each paragraph or bullet, especially for privacy, security, data-processing, liability, AI, or other negotiated legal terms.
---
# Redline
## Overview
Use this skill to review contract `.docx` files at paragraph level and generate:
- a tracked-changes amended `.docx`
- a risk-report `.docx`
- a `.review.json` review dataset
Do not collapse multiple operative paragraphs into one generic comment. Each non-empty paragraph or bullet must be reviewed on its own merits, with distinct risk analysis and replacement language where needed.
## Workflow
1. Confirm the supported party and the priority risk areas.
2. Run `scripts/contract_review_pipeline.py init-review` for each source `.docx`.
3. Review the generated `.review.json` paragraph by paragraph.
4. For each `clauses[]` entry, write a specific assessment tied to that paragraph only:
- `favorability`
- `risk_level`
- `risk_summary`
- `why_it_matters`
- `proposed_replacement`
5. Draft specific responses for any opponent comments in `opponent_comments[]`.
6. Run `materialize` to create the amended `.docx` and risk report `.docx`.
7. Verify the outputs exist and the tracked changes are readable.
## Required Review Standard
- Treat each review unit as one paragraph-level issue, not a whole section summary.
- Do not reuse the same replacement text across unrelated paragraphs.
- If several bullets in the same section have different obligations, analyze and redraft them separately.
- Keep replacement language narrow and operational. Match the exact risk in the source paragraph.
- When reviewing privacy and security language, check for:
- uncapped or super-capped liability exposure
- audit overreach
- subprocessor approval friction
- cross-border transfer restrictions
- incident notification deadlines
- certifications, penetration testing, and customer testing rights
- unilateral policy updates
- AI/security terms that exceed the actual service model
## Commands
Initialize a review file:
```bash
python scripts/contract_review_pipeline.py init-review \
--input <contract.docx> \
--output <contract.review.json> \
--supported-party "<party>" \
--focus-area "<area-1>" \
--focus-area "<area-2>" \
--opponent-comment-author "<author-1>"
```
Materialize the outputs:
```bash
python scripts/contract_review_pipeline.py materialize \
--input <contract.docx> \
--review-json <contract.review.json> \
--amended-output <contract.amended.docx> \
--report-output <contract.risk-report.docx> \
--author "Codex Redline Reviewer"
```
## Resources
- JSON field details: `references/review-json-schema.md`
- Main tool: `scripts/contract_review_pipeline.py`
FILE:README.md
# redline
`redline` is a standalone Codex skill for paragraph-by-paragraph contract review and DOCX redlining.
It is built to avoid coarse section-level comments by extracting one review unit per non-empty paragraph, then generating:
- a tracked-changes amended `.docx`
- a `.risk-report.docx`
- a `.review.json` dataset for the review
## Contents
- `SKILL.md`: skill instructions and workflow
- `scripts/contract_review_pipeline.py`: review extraction and materialization tool
- `references/review-json-schema.md`: review JSON field guide
- `agents/openai.yaml`: UI metadata
## Core Behavior
- review contracts paragraph by paragraph
- preserve heading context in each review unit
- draft distinct amendment text for each paragraph
- support opponent Word comment response drafting
- generate tracked changes and risk reports from the review JSON
## Usage
Initialize a review file:
```bash
python scripts/contract_review_pipeline.py init-review \
--input <contract.docx> \
--output <contract.review.json> \
--supported-party "<party>" \
--focus-area "<area-1>"
```
Materialize the outputs:
```bash
python scripts/contract_review_pipeline.py materialize \
--input <contract.docx> \
--review-json <contract.review.json> \
--amended-output <contract.amended.docx> \
--report-output <contract.risk-report.docx> \
--author "Codex Redline Reviewer"
```
## License
MIT. See `LICENSE`.
FILE:scripts/contract_review_pipeline.py
#!/usr/bin/env python3
"""
Legal contract review helper for DOCX workflows.
Commands:
- init-review: extract paragraph-level review units from a DOCX contract into a review JSON template.
- materialize: generate (1) amended DOCX with tracked revisions and (2) risk analysis DOCX.
"""
from __future__ import annotations
import argparse
import datetime as dt
import json
import re
import zipfile
from pathlib import Path
from typing import Any
try:
from lxml import etree
except ImportError as exc: # pragma: no cover
raise SystemExit(
"Missing dependency: lxml. Install with `python -m pip install --user python-docx`."
) from exc
W_NS = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
XML_NS = "http://www.w3.org/XML/1998/namespace"
NS = {"w": W_NS}
W = f"{{{W_NS}}}"
XML_SPACE = f"{{{XML_NS}}}space"
HEADING_NUMBER_RE = re.compile(
r"^(?:(?:Section|Article)\s+)?(\d+(?:\.\d+)*[A-Za-z]?)[\)\.\-:]?\s+.+$",
re.IGNORECASE,
)
CLAUSE_ID_RE = re.compile(
r"^(?:(?:Section|Article)\s+)?(\d+(?:\.\d+)*[A-Za-z]?)(?:[\)\.\-:]|\s).*$",
re.IGNORECASE,
)
def utc_now_iso() -> str:
return dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
def read_docx_member(docx_path: Path, member_name: str) -> bytes:
with zipfile.ZipFile(docx_path, "r") as archive:
return archive.read(member_name)
def parse_xml(data: bytes) -> etree._Element:
return etree.fromstring(data)
def normalize_author(value: str) -> str:
return " ".join(value.strip().lower().split())
def paragraph_text(paragraph: etree._Element) -> str:
parts: list[str] = []
for node in paragraph.xpath(".//w:t | .//w:delText", namespaces=NS):
if node.text:
parts.append(node.text)
return "".join(parts).strip()
def paragraph_style_id(paragraph: etree._Element) -> str:
style = paragraph.xpath("./w:pPr/w:pStyle/@w:val", namespaces=NS)
return style[0].strip() if style else ""
def comment_text(comment: etree._Element) -> str:
parts: list[str] = []
for node in comment.xpath(".//w:t", namespaces=NS):
if node.text:
parts.append(node.text)
return "".join(parts).strip()
def extract_comment_threads(
input_docx: Path,
opponent_comment_authors: list[str] | None = None,
) -> list[dict[str, Any]]:
opponent_set = {
normalize_author(author)
for author in (opponent_comment_authors or [])
if str(author).strip()
}
with zipfile.ZipFile(input_docx, "r") as archive:
member_names = set(archive.namelist())
if "word/comments.xml" not in member_names:
return []
comments_root = parse_xml(archive.read("word/comments.xml"))
document_root = parse_xml(archive.read("word/document.xml"))
paragraph_anchors: dict[str, list[dict[str, Any]]] = {}
paragraphs = document_root.xpath(".//w:body//w:p", namespaces=NS)
for paragraph_index, paragraph in enumerate(paragraphs):
visible_text = paragraph_text(paragraph)
comment_ids = set(paragraph.xpath(".//w:commentRangeStart/@w:id", namespaces=NS))
comment_ids.update(paragraph.xpath(".//w:commentReference/@w:id", namespaces=NS))
for comment_id in comment_ids:
paragraph_anchors.setdefault(comment_id, []).append(
{
"paragraph_index": paragraph_index,
"anchor_text": visible_text,
}
)
extracted: list[dict[str, Any]] = []
for comment in comments_root.xpath(".//w:comment", namespaces=NS):
comment_id = str(comment.get(f"{W}id", "")).strip()
if not comment_id:
continue
author = str(comment.get(f"{W}author", "")).strip()
normalized_author = normalize_author(author)
is_opponent_comment = normalized_author in opponent_set if opponent_set else True
anchors = paragraph_anchors.get(comment_id, [])
first_anchor = anchors[0] if anchors else {}
extracted.append(
{
"comment_id": comment_id,
"author": author,
"date": str(comment.get(f"{W}date", "")).strip(),
"parent_comment_id": str(comment.get(f"{W}parentId", "")).strip(),
"anchor_text": str(first_anchor.get("anchor_text", "")).strip(),
"paragraph_index": int(first_anchor.get("paragraph_index", -1)),
"comment_text": comment_text(comment),
"is_opponent_comment": is_opponent_comment,
"response_stance": "",
"draft_response": "",
"additional_proposal": "",
}
)
def sort_key(item: dict[str, Any]) -> tuple[int, str]:
raw_id = str(item.get("comment_id", ""))
if raw_id.isdigit():
return (0, f"{int(raw_id):08d}")
return (1, raw_id)
return sorted(extracted, key=sort_key)
def is_heading(paragraph: etree._Element, text: str) -> bool:
if not text:
return False
style_id = paragraph_style_id(paragraph).lower()
if style_id.startswith("heading"):
return True
if HEADING_NUMBER_RE.match(text):
return True
words = text.split()
if len(words) <= 12 and len(text) <= 120 and text.upper() == text and any(
char.isalpha() for char in text
):
return True
if text.lower().startswith(("section ", "article ")):
return True
return False
def infer_clause_id(title: str, fallback_index: int) -> str:
match = CLAUSE_ID_RE.match(title)
if match:
return match.group(1)
return f"clause-{fallback_index}"
def finalize_clause(
clauses: list[dict[str, Any]],
clause_index: int,
title: str,
body_lines: list[str],
paragraph_indexes: list[int],
target_index: int,
) -> None:
source_text = "\n".join(line for line in body_lines if line).strip()
if not source_text:
source_text = title
clauses.append(
{
"clause_id": infer_clause_id(title, clause_index),
"clause_title": title,
"source_text": source_text,
"paragraph_indexes": paragraph_indexes,
"target_paragraph_index": target_index,
"favorability": "",
"risk_level": "",
"risk_summary": "",
"why_it_matters": "",
"proposed_replacement": "",
}
)
def make_review_unit_title(current_heading: str, text: str, clause_index: int) -> str:
snippet = " ".join(text.split())
if len(snippet) > 90:
snippet = snippet[:87].rstrip() + "..."
if current_heading and current_heading != text:
return f"{current_heading} :: {snippet}"
if snippet:
return snippet
return f"Clause {clause_index}"
def build_review_template(
input_docx: Path,
supported_party: str,
priority_focus_areas: list[str] | None = None,
opponent_comment_authors: list[str] | None = None,
) -> dict[str, Any]:
document_xml = read_docx_member(input_docx, "word/document.xml")
root = parse_xml(document_xml)
paragraphs = root.xpath(".//w:body//w:p", namespaces=NS)
clauses: list[dict[str, Any]] = []
current_title = ""
clause_counter = 1
for index, paragraph in enumerate(paragraphs):
text = paragraph_text(paragraph)
if not text:
continue
if is_heading(paragraph, text):
current_title = text
unit_title = make_review_unit_title(current_title, text, clause_counter)
finalize_clause(
clauses=clauses,
clause_index=clause_counter,
title=unit_title,
body_lines=[text],
paragraph_indexes=[index],
target_index=index,
)
clause_counter += 1
if not clauses:
raise ValueError("No readable clauses were found in the contract.")
template = {
"source_docx": str(input_docx),
"supported_party": supported_party,
"priority_focus_areas": priority_focus_areas or [],
"opponent_comment_authors": opponent_comment_authors or [],
"prepared_at_utc": utc_now_iso(),
"reviewer": "",
"overall_notes": "",
"clauses": clauses,
"opponent_comments": extract_comment_threads(
input_docx=input_docx,
opponent_comment_authors=opponent_comment_authors or [],
),
}
return template
def ensure_parent(path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
def validate_docx_output_path(path: Path, label: str) -> None:
if path.suffix.lower() != ".docx":
raise ValueError(f"{label} must be a .docx file: {path}")
def assert_docx_written(path: Path, label: str) -> None:
if not path.exists():
raise FileNotFoundError(f"{label} was not created: {path}")
if path.suffix.lower() != ".docx":
raise ValueError(f"{label} must be a .docx file: {path}")
if path.stat().st_size == 0:
raise ValueError(f"{label} is empty: {path}")
def write_json(path: Path, payload: dict[str, Any]) -> None:
ensure_parent(path)
path.write_text(json.dumps(payload, indent=2, ensure_ascii=True) + "\n", encoding="utf-8")
def load_json(path: Path) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError("Review JSON must be a top-level object.")
if "clauses" not in payload or not isinstance(payload["clauses"], list):
raise ValueError("Review JSON must include a 'clauses' list.")
return payload
def set_preserve_space(node: etree._Element, text: str) -> None:
if text != text.strip():
node.set(XML_SPACE, "preserve")
def replace_paragraph_with_revision(
paragraph: etree._Element,
original_text: str,
replacement_text: str,
revision_id: int,
author: str,
revision_date: str,
) -> None:
paragraph_properties = paragraph.find(f"{W}pPr")
for child in list(paragraph):
if child is not paragraph_properties:
paragraph.remove(child)
deleted = etree.Element(f"{W}del")
deleted.set(f"{W}id", str(revision_id))
deleted.set(f"{W}author", author)
deleted.set(f"{W}date", revision_date)
deleted_run = etree.SubElement(deleted, f"{W}r")
deleted_text = etree.SubElement(deleted_run, f"{W}delText")
deleted_text.text = original_text
set_preserve_space(deleted_text, original_text)
inserted = etree.Element(f"{W}ins")
inserted.set(f"{W}id", str(revision_id + 1))
inserted.set(f"{W}author", author)
inserted.set(f"{W}date", revision_date)
inserted_run = etree.SubElement(inserted, f"{W}r")
inserted_text = etree.SubElement(inserted_run, f"{W}t")
inserted_text.text = replacement_text
set_preserve_space(inserted_text, replacement_text)
paragraph.append(deleted)
paragraph.append(inserted)
def ensure_track_revisions(settings_root: etree._Element) -> None:
existing = settings_root.xpath("./w:trackRevisions", namespaces=NS)
if existing:
return
settings_root.append(etree.Element(f"{W}trackRevisions"))
def build_edit_list(review_payload: dict[str, Any]) -> list[dict[str, Any]]:
edits: list[dict[str, Any]] = []
for clause in review_payload.get("clauses", []):
if not isinstance(clause, dict):
continue
replacement = str(clause.get("proposed_replacement", "")).strip()
if not replacement:
continue
target_index = clause.get("target_paragraph_index")
if not isinstance(target_index, int):
continue
edits.append(
{
"target_paragraph_index": target_index,
"replacement": replacement,
}
)
return edits
def apply_tracked_revisions(
input_docx: Path,
output_docx: Path,
edits: list[dict[str, Any]],
author: str,
) -> int:
revision_date = utc_now_iso()
applied = 0
with zipfile.ZipFile(input_docx, "r") as source_archive:
original_files = {
item.filename: source_archive.read(item.filename)
for item in source_archive.infolist()
}
document_root = parse_xml(original_files["word/document.xml"])
settings_root = parse_xml(original_files["word/settings.xml"])
paragraphs = document_root.xpath(".//w:body//w:p", namespaces=NS)
for edit_id, edit in enumerate(edits, start=1):
target_index = edit["target_paragraph_index"]
if target_index < 0 or target_index >= len(paragraphs):
continue
paragraph = paragraphs[target_index]
original_text = paragraph_text(paragraph)
replacement = edit["replacement"].strip()
if not replacement or not original_text:
continue
replace_paragraph_with_revision(
paragraph=paragraph,
original_text=original_text,
replacement_text=replacement,
revision_id=edit_id * 10,
author=author,
revision_date=revision_date,
)
applied += 1
ensure_track_revisions(settings_root)
original_files["word/document.xml"] = etree.tostring(
document_root,
xml_declaration=True,
encoding="UTF-8",
standalone=True,
)
original_files["word/settings.xml"] = etree.tostring(
settings_root,
xml_declaration=True,
encoding="UTF-8",
standalone=True,
)
ensure_parent(output_docx)
with zipfile.ZipFile(input_docx, "r") as source_archive, zipfile.ZipFile(
output_docx, "w", compression=zipfile.ZIP_DEFLATED
) as output_archive:
for item in source_archive.infolist():
output_archive.writestr(item, original_files[item.filename])
return applied
def write_risk_report(review_payload: dict[str, Any], output_docx: Path) -> None:
try:
from docx import Document
except ImportError as exc: # pragma: no cover
raise SystemExit(
"Missing dependency: python-docx. Install with `python -m pip install --user python-docx`."
) from exc
document = Document()
document.add_heading("Contract Risk Review", level=0)
document.add_paragraph(f"Supported party: {review_payload.get('supported_party', '')}")
document.add_paragraph(f"Source contract: {review_payload.get('source_docx', '')}")
document.add_paragraph(f"Generated at (UTC): {utc_now_iso()}")
focus_areas = review_payload.get("priority_focus_areas", [])
if isinstance(focus_areas, list):
normalized_focus_areas = [str(item).strip() for item in focus_areas if str(item).strip()]
else:
normalized_focus_areas = []
if normalized_focus_areas:
document.add_heading("Priority Focus Areas", level=1)
for area in normalized_focus_areas:
document.add_paragraph(area, style="List Bullet")
notes = str(review_payload.get("overall_notes", "")).strip()
if notes:
document.add_heading("Overall Notes", level=1)
document.add_paragraph(notes)
table = document.add_table(rows=1, cols=6)
table.style = "Table Grid"
headers = [
"Clause",
"Favorability",
"Risk Level",
"Risk Summary",
"Why It Matters",
"Proposed Replacement",
]
for idx, title in enumerate(headers):
table.rows[0].cells[idx].text = title
for clause in review_payload.get("clauses", []):
if not isinstance(clause, dict):
continue
row = table.add_row().cells
row[0].text = str(clause.get("clause_title", clause.get("clause_id", "")))
row[1].text = str(clause.get("favorability", ""))
row[2].text = str(clause.get("risk_level", ""))
row[3].text = str(clause.get("risk_summary", ""))
row[4].text = str(clause.get("why_it_matters", ""))
row[5].text = str(clause.get("proposed_replacement", ""))
comment_entries = review_payload.get("opponent_comments", [])
if isinstance(comment_entries, list):
opponent_comments = [
item
for item in comment_entries
if isinstance(item, dict) and bool(item.get("is_opponent_comment", True))
]
else:
opponent_comments = []
if opponent_comments:
document.add_heading("Opponent Comment Responses", level=1)
comment_table = document.add_table(rows=1, cols=7)
comment_table.style = "Table Grid"
comment_headers = [
"Comment ID",
"Author",
"Anchor Text",
"Opponent Comment",
"Stance",
"Draft Response",
"Additional Proposal",
]
for idx, title in enumerate(comment_headers):
comment_table.rows[0].cells[idx].text = title
for item in opponent_comments:
row = comment_table.add_row().cells
row[0].text = str(item.get("comment_id", ""))
row[1].text = str(item.get("author", ""))
row[2].text = str(item.get("anchor_text", ""))
row[3].text = str(item.get("comment_text", ""))
row[4].text = str(item.get("response_stance", ""))
row[5].text = str(item.get("draft_response", ""))
row[6].text = str(item.get("additional_proposal", ""))
ensure_parent(output_docx)
document.save(output_docx)
def command_init_review(args: argparse.Namespace) -> None:
payload = build_review_template(
input_docx=args.input,
supported_party=args.supported_party or "",
priority_focus_areas=args.focus_areas or [],
opponent_comment_authors=args.opponent_comment_authors or [],
)
write_json(args.output, payload)
print(f"[OK] Wrote review template: {args.output}")
print(f"[OK] Clauses detected: {len(payload['clauses'])}")
print(f"[OK] Comments detected: {len(payload.get('opponent_comments', []))}")
def command_materialize(args: argparse.Namespace) -> None:
validate_docx_output_path(args.amended_output, "Amended output")
validate_docx_output_path(args.report_output, "Risk report output")
payload = load_json(args.review_json)
edits = build_edit_list(payload)
applied = apply_tracked_revisions(
input_docx=args.input,
output_docx=args.amended_output,
edits=edits,
author=args.author,
)
write_risk_report(payload, args.report_output)
assert_docx_written(args.amended_output, "Amended contract")
assert_docx_written(args.report_output, "Risk report")
print(f"[OK] Wrote amended contract: {args.amended_output}")
print(f"[OK] Wrote risk report: {args.report_output}")
print(f"[OK] Tracked revisions applied: {applied}")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Legal contract DOCX extraction + redline materialization tool."
)
subparsers = parser.add_subparsers(dest="command", required=True)
init_review = subparsers.add_parser(
"init-review",
help="Extract paragraph-level contract review units into a JSON review template.",
)
init_review.add_argument("--input", type=Path, required=True, help="Source contract .docx")
init_review.add_argument("--output", type=Path, required=True, help="Output review .json")
init_review.add_argument(
"--supported-party",
default="",
help="Party to support (e.g., Buyer, Seller, Licensor).",
)
init_review.add_argument(
"--focus-area",
dest="focus_areas",
action="append",
default=[],
help="Add a priority focus area; repeat to include multiple areas.",
)
init_review.add_argument(
"--opponent-comment-author",
dest="opponent_comment_authors",
action="append",
default=[],
help="Word comment author to classify as opponent; repeat to include multiple names.",
)
init_review.set_defaults(handler=command_init_review)
materialize = subparsers.add_parser(
"materialize",
help="Create amended DOCX and risk report DOCX from a completed review JSON.",
)
materialize.add_argument("--input", type=Path, required=True, help="Source contract .docx")
materialize.add_argument(
"--review-json",
type=Path,
required=True,
help="Completed review JSON file.",
)
materialize.add_argument(
"--amended-output",
type=Path,
required=True,
help="Output path for amended tracked-change .docx",
)
materialize.add_argument(
"--report-output",
type=Path,
required=True,
help="Output path for risk analysis .docx",
)
materialize.add_argument(
"--author",
default="Codex Legal Reviewer",
help="Revision author recorded in tracked changes.",
)
materialize.set_defaults(handler=command_materialize)
return parser
def main() -> None:
parser = build_parser()
args = parser.parse_args()
args.handler(args)
if __name__ == "__main__":
main()
FILE:references/review-json-schema.md
# Contract Review JSON Schema
`init-review` creates one review entry per non-empty paragraph, with heading context included in `clause_title`. Review each entry independently.
Use `scripts/contract_review_pipeline.py init-review` to create a starter JSON file.
Fill these fields before running `materialize`:
- `supported_party`: Party you represent (for example, `Buyer` or `Licensor`).
- `priority_focus_areas`: List of terms that need extreme attention (for example, `indemnity`, `limitation of liability`, `IP ownership`).
- `opponent_comment_authors`: Word comment authors to classify as opponent side.
- `reviewer`: Name or role of reviewer.
- `overall_notes`: Cross-clause summary notes.
- `clauses[*].favorability`: `favorable`, `neutral`, or `unfavorable` for your party.
- `clauses[*].risk_level`: `low`, `medium`, `high`, or `critical`.
- `clauses[*].risk_summary`: Specific risk in this clause.
- `clauses[*].why_it_matters`: Business or legal impact for your party.
- `clauses[*].proposed_replacement`: Replacement sentence/paragraph to insert as tracked change for that specific paragraph-level review unit.
Opponent comments section:
- `opponent_comments[*].is_opponent_comment`: Boolean set by author matching.
- `opponent_comments[*].response_stance`: `agree`, `partial`, or `disagree`.
- `opponent_comments[*].draft_response`: Draft response to the opponent comment.
- `opponent_comments[*].additional_proposal`: Optional fallback language or compromise proposal.
Do not change:
- `clauses[*].target_paragraph_index`
- `clauses[*].paragraph_indexes`
- `clauses[*].source_text`
- `opponent_comments[*].comment_id`
- `opponent_comments[*].anchor_text`
These fields map review entries to DOCX paragraph positions and Word comment threads.
FILE:agents/openai.yaml
interface:
display_name: "Redline"
short_description: "Paragraph-level contract redlines for DOCX."
default_prompt: "Review this DOCX contract paragraph by paragraph, propose distinct replacement language for each issue, and generate tracked changes plus a risk report."
Research and solve "how do I do this?" questions inside accounting and finance software systems (ERP, GL, AP/AR, billing, close, and reporting tools). Use wh...
---
name: accounting-finance-system-research
description: Research and solve "how do I do this?" questions inside accounting and finance software systems (ERP, GL, AP/AR, billing, close, and reporting tools). Use when a user needs operational steps, setup guidance, or troubleshooting help in a specific system and wants the result documented as a quick memo or simple Q-and-A DOCX.
---
# Accounting And Finance System Research
## Overview
Follow a fixed process for system-how-to support: collect facts, ask clarifying questions, confirm output format, confirm understanding, research external guidance, analyze the best path, and generate a DOCX deliverable.
## Required Behavior
- Ask clarifying questions before proposing a solution.
- Confirm whether output should be `quick memo` or `simple q-and-a`.
- Restate understanding and wait for confirmation before web research.
- Research the internet after confirmation, prioritizing official vendor guidance.
- Separate source-backed guidance from assumptions or inference.
- Include source links and accessed dates in the deliverable.
- Generate a DOCX report in the user-selected format.
## Workflow
### 1) Intake And Scope
- Capture the user objective in one sentence.
- Confirm system name and version/edition (for example: NetSuite, SAP S/4HANA Cloud, Dynamics 365 Finance, QuickBooks).
- Confirm module or workflow area (for example: AP, AR, close, reconciliations, reporting).
- Confirm role/permission constraints and any deadline pressure.
### 2) Clarification Questions (Mandatory)
- Use [references/clarification-question-bank.md](references/clarification-question-bank.md).
- Ask only missing critical questions; avoid redundant prompts.
- Pause solutioning until enough facts are available.
- If facts remain unknown, continue with explicit assumptions and conditional guidance.
### 3) Output Format Confirmation (Mandatory)
- Ask the user to choose one:
- `quick memo`: concise professional summary with recommendation and steps.
- `simple q-and-a`: direct answer format with numbered actions.
- Default to `quick memo` only when user gives no preference.
### 4) Understanding Confirmation (Mandatory)
- Restate:
- problem statement
- system context
- open questions or assumptions
- chosen output format
- Ask for explicit confirmation before researching.
### 5) Research External Guidance
- Follow source ranking in [references/source-priority.md](references/source-priority.md).
- Gather at least two relevant sources where possible.
- Include at least one official vendor source when available.
- Track per-source metadata: title, publisher, URL, updated/published date if available, accessed date.
- Prefer current version-specific guidance over older generic content.
### 6) Analyze And Build Recommendations
- Translate research into concrete user actions.
- Provide prerequisites and permissions needed for each action.
- Add fallback steps when primary path is blocked.
- Include validation checks to confirm completion.
- Call out risks and unresolved dependencies.
### 7) Generate DOCX Deliverable
- Build JSON input using [references/report-json-schema.md](references/report-json-schema.md).
- Run:
```bash
python scripts/build_system_guidance_docx.py \
--input-json <analysis.json> \
--output-docx <system-guidance.docx> \
--format <memo|q-and-a>
```
- Map `quick memo` to `memo`.
- Map `simple q-and-a` to `q-and-a`.
- Confirm the document includes:
- request summary and system context
- clarifications and assumptions
- guidance sources with URLs
- recommended steps
- validation checks
- risks and open items
### 8) Quality Check
- Verify recommendations are consistent with cited guidance.
- Verify assumptions are explicit and easy to review.
- Verify deliverable format matches user request.
- Verify every source has a URL and accessed date.
- Verify output file is `.docx` and readable.
## Resources
- Clarification checklist: [references/clarification-question-bank.md](references/clarification-question-bank.md)
- Source hierarchy: [references/source-priority.md](references/source-priority.md)
- Report JSON schema: [references/report-json-schema.md](references/report-json-schema.md)
- Example report payload: [references/example_report_input.json](references/example_report_input.json)
- DOCX generator: `scripts/build_system_guidance_docx.py`
## Dependency
Install once if needed:
```bash
python -m pip install --user python-docx
```
FILE:README.md
# Accounting & Finance System Research Skill
Codex skill for handling accounting/finance system "how-to" and troubleshooting questions with a clarification-first workflow, web research, analysis, and DOCX deliverables.
## What It Does
- Ask required clarifying questions before solutioning.
- Confirm deliverable format as either:
- `quick memo`
- `simple q-and-a`
- Confirm understanding before internet research.
- Research official system documentation and consultant publications.
- Produce actionable guidance with assumptions, risks, and validation checks.
- Generate `.docx` output from structured JSON.
## Skill Contents
- `SKILL.md`: Workflow and behavior rules.
- `references/`: Clarification bank, source hierarchy, schema, and example payload.
- `scripts/build_system_guidance_docx.py`: DOCX generator (`memo` or `q-and-a`).
- `agents/openai.yaml`: UI metadata for skill listing.
## Usage
1. Trigger with `$accounting-finance-system-research`.
2. Collect missing facts and confirm format with the user.
3. Research relevant official and secondary sources.
4. Build a JSON payload matching `references/report-json-schema.md`.
5. Generate document:
```bash
python scripts/build_system_guidance_docx.py \
--input-json references/example_report_input.json \
--output-docx system-guidance.docx \
--format memo
```
## Dependency
```bash
python -m pip install --user python-docx
```
## License
MIT. See [LICENSE](LICENSE).
FILE:scripts/build_system_guidance_docx.py
#!/usr/bin/env python3
"""Build a DOCX report for accounting and finance system guidance."""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any, Iterable
try:
from docx import Document
except ImportError:
print("python-docx is required. Install with: python -m pip install --user python-docx", file=sys.stderr)
raise SystemExit(1)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Create a system-guidance DOCX from structured JSON input."
)
parser.add_argument("--input-json", required=True, help="Path to report payload JSON")
parser.add_argument("--output-docx", required=True, help="Path to output .docx")
parser.add_argument(
"--format",
choices=["memo", "q-and-a"],
help="Output format. If omitted, falls back to payload format or memo.",
)
return parser.parse_args()
def to_text(value: Any, default: str = "") -> str:
if value is None:
return default
text = str(value).strip()
return text if text else default
def load_payload(path: Path) -> dict[str, Any]:
try:
data = json.loads(path.read_text(encoding="utf-8"))
except FileNotFoundError as exc:
raise ValueError(f"Input JSON not found: {path}") from exc
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON in {path}: {exc}") from exc
if not isinstance(data, dict):
raise ValueError("Input JSON must be an object at the top level")
return data
def validate_payload(payload: dict[str, Any]) -> None:
errors: list[str] = []
if not to_text(payload.get("question")):
errors.append("Missing required field: question")
sources = payload.get("sources")
if not isinstance(sources, list) or not sources:
errors.append("Field 'sources' must be a non-empty array")
steps = payload.get("recommended_steps")
if not isinstance(steps, list) or not steps:
errors.append("Field 'recommended_steps' must be a non-empty array")
if errors:
raise ValueError("\n".join(errors))
def normalize_format(cli_format: str | None, payload: dict[str, Any]) -> str:
raw = to_text(cli_format or payload.get("format") or "memo").lower()
aliases = {
"memo": "memo",
"quick memo": "memo",
"quick-memo": "memo",
"q-and-a": "q-and-a",
"q&a": "q-and-a",
"qa": "q-and-a",
"simple q-and-a": "q-and-a",
"simple q&a": "q-and-a",
}
normalized = aliases.get(raw)
if normalized:
return normalized
raise ValueError(f"Unsupported format '{raw}'. Use memo or q-and-a.")
def add_key_value_bullets(doc: Document, values: dict[str, Any]) -> None:
for key, value in values.items():
text = to_text(value)
if not text:
continue
paragraph = doc.add_paragraph(style="List Bullet")
prefix = key.replace("_", " ").strip().title()
paragraph.add_run(f"{prefix}: ").bold = True
paragraph.add_run(text)
def add_string_list(doc: Document, title: str, items: Any) -> None:
if not isinstance(items, list) or not items:
return
doc.add_heading(title, level=1)
count = 0
for item in items:
text = to_text(item)
if not text:
continue
count += 1
doc.add_paragraph(text, style="List Bullet")
if count == 0:
doc.add_paragraph("None.")
def add_clarifications(doc: Document, clarifications: Any) -> None:
if not isinstance(clarifications, list) or not clarifications:
return
doc.add_heading("Clarifications", level=1)
table = doc.add_table(rows=1, cols=2)
table.style = "Table Grid"
headers = table.rows[0].cells
headers[0].text = "Question"
headers[1].text = "Answer"
for item in clarifications:
if isinstance(item, dict):
question = to_text(item.get("question"), "Clarification")
answer = to_text(item.get("answer"), "")
else:
question = "Clarification"
answer = to_text(item)
row = table.add_row().cells
row[0].text = question
row[1].text = answer
def normalize_step(step: Any) -> tuple[str, str, list[str]]:
if isinstance(step, str):
return to_text(step), "", []
if isinstance(step, dict):
action = to_text(
step.get("step")
or step.get("action")
or step.get("instruction")
or step.get("title")
)
rationale = to_text(step.get("rationale"))
refs = step.get("source_refs")
ref_values: list[str] = []
if isinstance(refs, list):
ref_values = [to_text(r) for r in refs if to_text(r)]
elif refs is not None:
single = to_text(refs)
if single:
ref_values = [single]
return action, rationale, ref_values
return to_text(step), "", []
def add_steps(doc: Document, title: str, steps: Any) -> None:
doc.add_heading(title, level=1)
if not isinstance(steps, list) or not steps:
doc.add_paragraph("No steps provided.")
return
count = 0
for raw_step in steps:
action, rationale, refs = normalize_step(raw_step)
if not action:
continue
count += 1
doc.add_paragraph(action, style="List Number")
if rationale:
rationale_line = doc.add_paragraph()
rationale_line.add_run("Rationale: ").bold = True
rationale_line.add_run(rationale)
if refs:
refs_line = doc.add_paragraph()
refs_line.add_run("Sources: ").bold = True
refs_line.add_run(", ".join(refs))
if count == 0:
doc.add_paragraph("No valid steps provided.")
def add_sources(doc: Document, sources: Any) -> None:
doc.add_heading("Sources", level=1)
if not isinstance(sources, list) or not sources:
doc.add_paragraph("No sources listed.")
return
table = doc.add_table(rows=1, cols=6)
table.style = "Table Grid"
headers = ["#", "Title", "Publisher", "URL", "Published/Updated", "Accessed"]
for index, label in enumerate(headers):
table.rows[0].cells[index].text = label
for index, source in enumerate(sources, start=1):
if not isinstance(source, dict):
source = {"title": to_text(source)}
row = table.add_row().cells
row[0].text = str(index)
row[1].text = to_text(source.get("title"))
row[2].text = to_text(source.get("publisher"))
row[3].text = to_text(source.get("url"))
row[4].text = to_text(source.get("published_or_updated"))
row[5].text = to_text(source.get("accessed"))
def build_report(payload: dict[str, Any], format_name: str) -> Document:
document = Document()
title = to_text(payload.get("title"), "Accounting And Finance System Guidance")
document.add_heading(title, level=0)
metadata_parts: list[str] = []
for label, key in (
("Prepared For", "prepared_for"),
("Prepared By", "prepared_by"),
("Date", "date"),
):
value = to_text(payload.get(key))
if value:
metadata_parts.append(f"{label}: {value}")
if metadata_parts:
document.add_paragraph(" | ".join(metadata_parts))
question = to_text(payload.get("question"))
document.add_heading("Request", level=1)
document.add_paragraph(question)
context = payload.get("system_context")
if isinstance(context, dict) and context:
document.add_heading("System Context", level=1)
add_key_value_bullets(document, context)
add_clarifications(document, payload.get("clarifications"))
add_string_list(document, "Assumptions", payload.get("assumptions"))
summary = to_text(payload.get("analysis_summary") or payload.get("short_answer"))
if format_name == "memo":
document.add_heading("Recommendation Summary", level=1)
document.add_paragraph(summary or "No summary provided.")
add_steps(document, "Recommended Steps", payload.get("recommended_steps"))
else:
document.add_heading("Question", level=1)
document.add_paragraph(question)
document.add_heading("Answer", level=1)
document.add_paragraph(summary or "No direct answer provided.")
add_steps(document, "Actions", payload.get("recommended_steps"))
add_string_list(document, "Validation Checks", payload.get("validation_checks"))
add_string_list(document, "Risks And Open Items", payload.get("risks_open_items"))
add_sources(document, payload.get("sources"))
add_string_list(document, "Open Questions", payload.get("open_questions"))
return document
def main() -> int:
args = parse_args()
input_path = Path(args.input_json)
output_path = Path(args.output_docx)
try:
payload = load_payload(input_path)
validate_payload(payload)
format_name = normalize_format(args.format, payload)
document = build_report(payload, format_name)
except ValueError as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1
output_path.parent.mkdir(parents=True, exist_ok=True)
document.save(output_path)
print(f"DOCX report written: {output_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
FILE:agents/openai.yaml
interface:
display_name: "Accounting & Finance System Research"
short_description: "Research and solve accounting system how-to questions"
default_prompt: "Use $accounting-finance-system-research to ask clarifying questions, research system guidance, and produce a DOCX memo or Q&A answer."
FILE:references/clarification-question-bank.md
# Clarification Question Bank
Ask only the questions needed to remove decision risk. Do not ask all questions by default.
## System Identity
- What system and exact version/edition are you using?
- Is this production, sandbox, or test environment?
- Are there customizations, add-ons, or integrations that affect this workflow?
## Business Objective
- What exact outcome do you need (create, post, reverse, reconcile, report, close)?
- What deadline or period are you working under?
- What is the impact if this is not resolved today?
## Process Scope
- Which module/process is involved (AP, AR, GL, fixed assets, billing, close, reporting)?
- What is the current step where the issue occurs?
- What have you already tried?
## Security And Permissions
- What role/profile are you using?
- Are there any permission errors or missing menu/actions?
- Can an admin grant temporary access if needed?
## Data Context
- Which company/entity, ledger, book, or business unit is affected?
- Which transaction type and volume are involved?
- Is there sample data or a transaction ID to reproduce the issue?
## Configuration Context
- Are there accounting period locks, workflow approvals, or posting controls enabled?
- Are there relevant setup options already enabled/disabled?
- Is this a one-time case or standard recurring process?
## Output Preference
- Confirm output format: `quick memo` or `simple q-and-a`.
- Confirm required detail level: short operational steps or deeper explanation.
FILE:references/source-priority.md
# Source Priority And Evidence Rules
Use sources in this order whenever available.
## Priority Order
1. Official vendor documentation and release notes (highest trust)
2. Official vendor knowledge base and support articles
3. Major implementation partner or consultant publications (Big 4 and known system integrators)
4. Reputable practitioner resources and community posts (supporting context only)
## Rules
- Prefer version-specific guidance over generic guidance.
- Prefer newer guidance when process behavior changed across versions.
- Do not rely on a single non-official source for final recommendations.
- Label assumptions when documentation does not fully resolve the scenario.
## Citation Minimum
For each cited source, capture:
- Source title
- Publisher/author
- URL
- Published or updated date when available
- Accessed date
## Contradictions
If sources conflict:
- Prioritize official vendor guidance.
- Document the conflict briefly.
- Recommend the lowest-risk path and include fallback steps.
FILE:references/example_report_input.json
{
"title": "NetSuite AP Workflow Guidance",
"prepared_for": "Finance Operations",
"prepared_by": "Codex",
"date": "2026-03-07",
"question": "How do we create and approve a recurring vendor bill in NetSuite?",
"system_context": {
"system": "NetSuite",
"version": "2025.2",
"module": "Accounts Payable",
"environment": "Production",
"role": "AP Specialist"
},
"clarifications": [
{
"question": "Do you use approval routing?",
"answer": "Yes, manager approval is required above $5,000."
},
{
"question": "Is recurring billing enabled?",
"answer": "Yes, memorized transactions are enabled."
}
],
"analysis_summary": "Use memorized transactions for recurring bills and apply approval routing rules before posting.",
"assumptions": [
"Role has permission to create memorized vendor bills.",
"Approval workflow is active for AP transactions."
],
"recommended_steps": [
{
"step": "Navigate to Transactions > Payables > Enter Bills and create a baseline vendor bill template.",
"rationale": "A complete template ensures recurring entries inherit coding and approvals.",
"source_refs": [1]
},
{
"step": "Save the bill as a memorized transaction with schedule settings.",
"rationale": "Schedule drives automatic recurring bill creation.",
"source_refs": [1, 2]
},
{
"step": "Submit generated bills through approval routing before posting.",
"rationale": "Posting control prevents unauthorized expense recognition.",
"source_refs": [2]
}
],
"validation_checks": [
"Confirm recurring bill appears on scheduled date.",
"Confirm approval status moves to Approved before posting.",
"Confirm GL impact matches the template account mapping."
],
"risks_open_items": [
"Permission gaps can block memorized transaction setup.",
"Workflow thresholds may reroute approvals unexpectedly."
],
"sources": [
{
"title": "Creating Vendor Bills",
"publisher": "Oracle NetSuite Help Center",
"url": "https://docs.oracle.com/en/cloud/saas/netsuite",
"published_or_updated": "2025-10-01",
"accessed": "2026-03-07",
"type": "official-doc"
},
{
"title": "Accounts Payable Workflow Configuration",
"publisher": "Oracle NetSuite Help Center",
"url": "https://docs.oracle.com/en/cloud/saas/netsuite",
"published_or_updated": "2025-09-15",
"accessed": "2026-03-07",
"type": "official-doc"
}
],
"open_questions": [
"Should bills under $5,000 bypass manager approval?"
]
}
FILE:references/report-json-schema.md
# Report JSON Schema
Use this schema to build the input for `scripts/build_system_guidance_docx.py`.
## Required Fields
- `question` (string): User question or task objective.
- `sources` (array): At least one source object.
- `recommended_steps` (array): At least one recommended action.
## Optional Top-Level Fields
- `title` (string)
- `prepared_for` (string)
- `prepared_by` (string)
- `date` (string)
- `analysis_summary` (string)
- `assumptions` (array of strings)
- `validation_checks` (array of strings)
- `risks_open_items` (array of strings)
- `open_questions` (array of strings)
- `system_context` (object)
- `clarifications` (array)
## Source Object
- `title` (string)
- `publisher` (string)
- `url` (string)
- `published_or_updated` (string)
- `accessed` (string)
- `type` (string, example: `official-doc`, `consultant-article`)
## Recommended Step Object
Each step may be either a string or object.
Object form:
- `step` (string) or `action` (string)
- `rationale` (string, optional)
- `source_refs` (array, optional): Reference numbers aligned to source order (1-based)
## Clarification Item
Each item may be either a string or object.
Object form:
- `question` (string)
- `answer` (string)
## Minimal Example
See [example_report_input.json](example_report_input.json).
Develop deposition question sets from Relativity-exported PDF productions using a user-provided legal theory. Use when tasks involve reviewing opponent-produ...
---
name: relativity-deposition-question-builder
description: Develop deposition question sets from Relativity-exported PDF productions using a user-provided legal theory. Use when tasks involve reviewing opponent-produced PDFs, extracting per-page document IDs from the bottom-right corner (if two numbers appear, use the smaller one), mapping document relevance to a legal theory, and drafting questions grouped by document ID with required rationale and supporting quotes.
---
# Relativity Deposition Question Builder
## Overview
Analyze one or more Relativity-exported PDF productions against a legal theory and generate deposition questions organized by document ID. Each question includes a reason for asking and a document quote that can be used if the witness denies.
## Required Behavior
- Ask for the legal theory first before producing analysis or questions.
- Ask for PDF path(s) if not already provided.
- Extract page-level document IDs from the bottom-right area of each page.
- If two numeric IDs appear in that area, choose the smaller number as the page document ID.
- Keep quotes verbatim and include source file plus page number for each quote.
- Group outputs by document ID and place each question under its document ID heading.
- Under every question, include:
- `Reason why we ask this question`
- `Quote from the document to use in deposition in case the opponent party denies`
## Workflow
1. Gather inputs.
- Confirm the user's legal theory.
- Confirm PDF source path(s).
- Optionally gather witness name/role and priority topics.
2. Extract per-page text and document IDs.
- Run:
```bash
python scripts/extract_relativity_pages.py \
--input <pdf-folder-or-file> \
--recurse \
--output <relativity_pages.json>
```
- Review pages where `selected_document_id` is null and flag for manual check.
3. Build a relevance map for the legal theory.
- For each page, classify relation to theory: `supports`, `undermines`, `neutral`.
- Focus question drafting on `supports` pages first, then `undermines` pages for impeachment.
4. Draft deposition questions grouped by document ID.
- Use short, concrete, single-issue questions.
- Start with authentication/foundation, then admission and contradiction questions.
- Use direct quotes from page text for denial follow-up.
5. Return output in required structure.
- Follow `references/deposition_output_template.md`.
- Keep document IDs in ascending numeric order.
## Output Rules
- Do not merge different document IDs into one section.
- Do not omit `Reason` or `Quote` sections under any question.
- If no reliable quote exists, mark:
- `Quote from the document to use in deposition in case the opponent party denies: [No direct quote located - manual verification required]`
- Prefer quotes that are short, specific, and tied to one factual proposition.
## Resources
- Extraction tool: `scripts/extract_relativity_pages.py`
- Output template: `references/deposition_output_template.md`
## Dependencies
Install once if missing:
```bash
python -m pip install --user pdfplumber
```
FILE:README.md
# Relativity Deposition Question Builder
Codex skill for legal deposition question development from Relativity-exported PDFs.
The skill:
- asks for the legal theory first,
- extracts bottom-right page IDs from PDFs,
- selects the smaller ID when multiple numbers appear,
- evaluates relevance to the legal theory,
- drafts deposition questions grouped by document ID,
- includes a reason and supporting quote under each question.
## Skill Files
- `SKILL.md`
- `scripts/extract_relativity_pages.py`
- `references/deposition_output_template.md`
## Usage in Codex
Invoke directly:
```text
$relativity-deposition-question-builder analyze these PDFs for my legal theory and draft deposition questions.
```
Example prompt:
```text
Use $relativity-deposition-question-builder. My legal theory is that the defendant knew the defect before sale and concealed it. Analyze PDFs in C:\Cases\Production\RelativityExport and generate questions grouped by document ID with reason and denial quote under each question.
```
## Optional Pre-Extraction Command
```bash
python scripts/extract_relativity_pages.py \
--input <pdf-folder-or-file> \
--recurse \
--output relativity_pages.json \
--pretty
```
## Output Expectations
For each `Document ID <number>` section, each question includes:
1. `Reason why we ask this question`
2. `Quote from the document to use in deposition in case the opponent party denies`
Reference format: `references/deposition_output_template.md`
## Dependency
Install once if missing:
```bash
python -m pip install --user pdfplumber
```
FILE:scripts/extract_relativity_pages.py
#!/usr/bin/env python3
"""Extract page text and per-page Relativity IDs from produced PDFs.
The script inspects the bottom-right area of each page and extracts numeric ID
candidates. If two numbers appear, it selects the smaller value as requested.
"""
import argparse
import json
import re
from pathlib import Path
from typing import Any, Dict, List, Sequence
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Extract page text and bottom-right document IDs from Relativity-exported PDFs."
)
)
parser.add_argument(
"--input",
action="append",
required=True,
help="PDF file or directory. Repeat for multiple inputs.",
)
parser.add_argument(
"--pattern",
default="*.pdf",
help="File pattern when an --input value is a directory (default: *.pdf).",
)
parser.add_argument(
"--recurse",
action="store_true",
help="Search directories recursively.",
)
parser.add_argument(
"--output",
required=True,
help="Output JSON path.",
)
parser.add_argument(
"--corner-width-ratio",
type=float,
default=0.35,
help="Fraction of page width to treat as bottom-right search area.",
)
parser.add_argument(
"--corner-height-ratio",
type=float,
default=0.15,
help="Fraction of page height to treat as bottom-right search area.",
)
parser.add_argument(
"--id-min-digits",
type=int,
default=3,
help="Minimum digit length for numeric ID candidates.",
)
parser.add_argument(
"--max-pages",
type=int,
default=0,
help="Optional page limit per PDF (0 means no limit).",
)
parser.add_argument(
"--pretty",
action="store_true",
help="Pretty-print output JSON.",
)
return parser.parse_args()
def resolve_pdf_files(inputs: Sequence[str], pattern: str, recurse: bool) -> List[Path]:
files: List[Path] = []
seen = set()
for raw in inputs:
path = Path(raw)
if path.is_file():
if path.suffix.lower() != ".pdf":
raise ValueError(f"Input is not a PDF: {path}")
resolved = path.resolve()
if resolved not in seen:
seen.add(resolved)
files.append(resolved)
continue
if path.is_dir():
iterator = path.rglob(pattern) if recurse else path.glob(pattern)
for candidate in iterator:
if not candidate.is_file() or candidate.suffix.lower() != ".pdf":
continue
resolved = candidate.resolve()
if resolved not in seen:
seen.add(resolved)
files.append(resolved)
continue
raise FileNotFoundError(f"Input path not found: {path}")
files.sort()
return files
def extract_bottom_right_text(page: Any, width_ratio: float, height_ratio: float) -> str:
width = float(page.width)
height = float(page.height)
x0 = max(0.0, width * (1.0 - width_ratio))
top = max(0.0, height * (1.0 - height_ratio))
bbox = (x0, top, width, height)
corner_page = page.crop(bbox)
text = (corner_page.extract_text() or "").strip()
if text:
return text
words = page.extract_words() or []
tokens = [
word.get("text", "")
for word in words
if float(word.get("x0", 0.0)) >= x0 and float(word.get("top", 0.0)) >= top
]
return " ".join(token for token in tokens if token).strip()
def extract_numeric_candidates(text: str, min_digits: int) -> List[int]:
if not text:
return []
pattern = re.compile(rf"\d{{{min_digits},}}")
return [int(match) for match in pattern.findall(text)]
def process_pdf(
pdf_path: Path,
pdfplumber_module: Any,
width_ratio: float,
height_ratio: float,
min_digits: int,
max_pages: int,
) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
with pdfplumber_module.open(pdf_path) as pdf:
for index, page in enumerate(pdf.pages, start=1):
if max_pages > 0 and index > max_pages:
break
page_text = (page.extract_text() or "").strip()
corner_text = extract_bottom_right_text(page, width_ratio, height_ratio)
candidates = extract_numeric_candidates(corner_text, min_digits)
selected_document_id = min(candidates) if candidates else None
rows.append(
{
"pdf_file": str(pdf_path),
"pdf_name": pdf_path.name,
"page_number": index,
"selected_document_id": selected_document_id,
"document_id_candidates": candidates,
"bottom_right_text": corner_text,
"page_text": page_text,
}
)
return rows
def validate_ratios(width_ratio: float, height_ratio: float) -> None:
if width_ratio <= 0 or width_ratio > 1:
raise ValueError("--corner-width-ratio must be > 0 and <= 1")
if height_ratio <= 0 or height_ratio > 1:
raise ValueError("--corner-height-ratio must be > 0 and <= 1")
def main() -> int:
args = parse_args()
validate_ratios(args.corner_width_ratio, args.corner_height_ratio)
try:
import pdfplumber # type: ignore
except ImportError as exc:
raise SystemExit(
"Missing dependency: pdfplumber. Install with: python -m pip install --user pdfplumber"
) from exc
pdf_files = resolve_pdf_files(args.input, args.pattern, args.recurse)
if not pdf_files:
raise SystemExit("No PDF files found for the provided --input values.")
pages: List[Dict[str, Any]] = []
for pdf_file in pdf_files:
pages.extend(
process_pdf(
pdf_file,
pdfplumber,
args.corner_width_ratio,
args.corner_height_ratio,
args.id_min_digits,
args.max_pages,
)
)
missing_id_count = sum(1 for row in pages if row["selected_document_id"] is None)
result = {
"source_inputs": args.input,
"pdf_count": len(pdf_files),
"page_count": len(pages),
"missing_document_id_pages": missing_id_count,
"selection_rule": "Select the smaller numeric value when multiple IDs appear in the bottom-right corner.",
"pages": pages,
}
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as handle:
json.dump(result, handle, indent=2 if args.pretty else None)
print(f"Wrote {len(pages)} page records to {output_path}")
print(f"Pages without detected document ID: {missing_id_count}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
FILE:agents/openai.yaml
interface:
display_name: "Relativity Deposition Questions"
short_description: "Build deposition questions from Relativity PDFs"
default_prompt: "Use $relativity-deposition-question-builder to analyze Relativity-exported PDFs and draft deposition questions tied to my legal theory with supporting quotes."
FILE:references/deposition_output_template.md
# Deposition Output Template
Use this structure for final deliverables from this skill.
## Legal Theory
[State the user's theory exactly, then summarize it in 1-3 sentences.]
## Relevance Summary
- Most relevant document IDs: [list]
- Gaps or weak support: [list]
- Priority witness topics: [list]
## Document ID <ID_NUMBER>
### Question 1
- Question: <deposition question text>
- Reason why we ask this question: <why this question advances the theory>
- Quote from the document to use in deposition in case the opponent party denies: "<verbatim quote>"
- Source: <pdf file name>, page <page number>
### Question 2
- Question: <deposition question text>
- Reason why we ask this question: <why this question advances the theory>
- Quote from the document to use in deposition in case the opponent party denies: "<verbatim quote>"
- Source: <pdf file name>, page <page number>
## Document ID <NEXT_ID_NUMBER>
Repeat the same structure.
## Drafting Notes
- Keep document IDs in ascending numeric order.
- Keep questions narrow and fact-focused.
- Prefer quotes that directly prove one fact.
- If no reliable quote exists, mark:
- `[No direct quote located - manual verification required]`
Review Business Requirements Documents in `.docx` format by reading the existing BRD, extracting paragraph-level context, drafting clarification questions fo...
---
name: brd-reviewer
description: Review Business Requirements Documents in `.docx` format by reading the existing BRD, extracting paragraph-level context, drafting clarification questions for unclear statements, and producing a final Word document that combines comments and tracked changes. Use when the user wants a BRD reviewed, challenged for ambiguity, or redlined with proposed wording improvements while preserving Word-native review features.
---
# BRD Reviewer
## Overview
Review a BRD paragraph by paragraph, capture clarification questions for ambiguous or incomplete requirements, and generate a single `.docx` with Word comments plus tracked revisions.
Prefer the bundled pipeline so the final deliverable is a Word-native review artifact instead of chat-only notes.
## Workflow
1. Confirm the source BRD `.docx` path.
2. Initialize the paragraph review JSON:
```bash
python scripts/brd_review_pipeline.py init-review \
--input <brd.docx> \
--output <brd.review.json>
```
3. Read the BRD and fill the review JSON.
- For every `paragraphs[]` item, keep `paragraph_index`, `style_id`, `heading_path`, and `source_text` unchanged.
- Set `needs_comment` to `true` when the paragraph is unclear, incomplete, internally inconsistent, or missing acceptance criteria, data definitions, ownership, dependencies, assumptions, or edge cases.
- Write `comment_question` as a concise reviewer question suitable for a Word comment.
- Set `needs_revision` to `true` when the paragraph should be rewritten for precision, completeness, grammar, or testability.
- Write `proposed_replacement` as full replacement language, not fragments.
- Use `issue_tags` to make the reason explicit. Prefer tags such as `ambiguity`, `scope`, `actor`, `data`, `workflow`, `exception`, `dependency`, `acceptance-criteria`, `nonfunctional`, `term-definition`, or `conflict`.
4. Materialize the final reviewed DOCX in the same folder as the source BRD:
```bash
python scripts/brd_review_pipeline.py materialize \
--input <brd.docx> \
--review-json <brd.review.json> \
--output <brd.reviewed.docx> \
--author "Codex BRD Reviewer"
```
5. Verify output quality.
- Confirm the reviewed file exists beside the source BRD.
- Confirm Word comments appear on paragraphs with `needs_comment=true`.
- Confirm tracked changes are visible for paragraphs with `needs_revision=true`.
- If the BRD relies heavily on tables or special layout, use the `$doc` skill workflow to render and visually inspect the result before delivery.
## Review standard
- Question every paragraph that leaves implementation choices unresolved.
- Favor reviewer comments for missing information and tracked changes for proposed wording.
- Rewrite requirements into concrete, testable statements.
- Flag undefined actors, systems, interfaces, calculations, timing, permissions, and exception handling.
- Do not silently invent business rules. If the BRD lacks a critical detail, ask for it in the comment instead of guessing.
- Keep comments short and specific enough to be actionable in a comment balloon.
- Keep proposed replacements professional and directly usable in the document.
## Output contract
- Always produce:
- `<name>.review.json`
- `<name>.reviewed.docx`
- Place both outputs in the same folder as the source BRD unless the user explicitly requests another path.
- Treat chat analysis as supplemental only. The `.docx` is the primary deliverable.
## Resources
- Paragraph review schema: `references/review-json-schema.md`
- Main tool: `scripts/brd_review_pipeline.py`
## Dependencies
Install once if missing:
```bash
python -m pip install python-docx lxml
```
FILE:README.md
# BRD Reviewer Skill
`brd-reviewer` is a Codex skill for reviewing Business Requirements Documents in Word format. It extracts paragraph-level review units from an existing BRD, lets Codex draft clarification questions for unclear points, and materializes a final `.docx` with Word comments and tracked changes in the same file.
## Repository layout
- `brd-reviewer/`: Installable skill folder.
- `brd-reviewer/scripts/brd_review_pipeline.py`: Paragraph extraction and DOCX materialization pipeline.
- `brd-reviewer/references/review-json-schema.md`: Review JSON field guide.
## Typical workflow
1. Run `init-review` on the source BRD `.docx`.
2. Fill the generated JSON with paragraph-level questions and proposed wording.
3. Run `materialize` to generate `<name>.reviewed.docx`.
4. Install `brd-reviewer/` into `$CODEX_HOME/skills` to use it locally.
## License
MIT. See [LICENSE](./LICENSE).
FILE:scripts/brd_review_pipeline.py
#!/usr/bin/env python3
"""
BRD review helper for DOCX workflows.
Commands:
- init-review: extract readable paragraphs from a BRD DOCX into a review JSON template.
- materialize: generate a reviewed DOCX with comments and tracked revisions.
"""
from __future__ import annotations
import argparse
import datetime as dt
import json
import zipfile
from pathlib import Path
from typing import Any
try:
from lxml import etree
except ImportError as exc: # pragma: no cover
raise SystemExit(
"Missing dependency: lxml. Install with `python -m pip install --user lxml python-docx`."
) from exc
W_NS = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
R_NS = "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
CONTENT_TYPES_NS = "http://schemas.openxmlformats.org/package/2006/content-types"
PACKAGE_REL_NS = "http://schemas.openxmlformats.org/package/2006/relationships"
XML_NS = "http://www.w3.org/XML/1998/namespace"
NS = {"w": W_NS, "r": R_NS}
W = f"{{{W_NS}}}"
R = f"{{{R_NS}}}"
CT = f"{{{CONTENT_TYPES_NS}}}"
PR = f"{{{PACKAGE_REL_NS}}}"
XML_SPACE = f"{{{XML_NS}}}space"
COMMENTS_CONTENT_TYPE = "application/vnd.openxmlformats-officedocument.wordprocessingml.comments+xml"
COMMENTS_REL_TYPE = "http://schemas.openxmlformats.org/officeDocument/2006/relationships/comments"
def utc_now_iso() -> str:
return dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
def parse_xml(data: bytes) -> etree._Element:
return etree.fromstring(data)
def ensure_parent(path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
def read_docx_files(docx_path: Path) -> dict[str, bytes]:
with zipfile.ZipFile(docx_path, "r") as archive:
return {item.filename: archive.read(item.filename) for item in archive.infolist()}
def write_docx_files(output_docx: Path, original_docx: Path, files: dict[str, bytes]) -> None:
ensure_parent(output_docx)
with zipfile.ZipFile(original_docx, "r") as source_archive, zipfile.ZipFile(
output_docx, "w", compression=zipfile.ZIP_DEFLATED
) as output_archive:
for item in source_archive.infolist():
payload = files.get(item.filename)
if payload is not None:
output_archive.writestr(item, payload)
for name in sorted(files):
if name not in source_archive.namelist():
output_archive.writestr(name, files[name])
def paragraph_text(paragraph: etree._Element) -> str:
parts: list[str] = []
for node in paragraph.xpath(".//w:t | .//w:delText", namespaces=NS):
if node.text:
parts.append(node.text)
return "".join(parts).strip()
def paragraph_style_id(paragraph: etree._Element) -> str:
style = paragraph.xpath("./w:pPr/w:pStyle/@w:val", namespaces=NS)
return style[0].strip() if style else ""
def is_heading_style(style_id: str) -> bool:
return style_id.lower().startswith("heading")
def get_body_paragraphs(document_root: etree._Element) -> list[etree._Element]:
return document_root.xpath(".//w:body//w:p", namespaces=NS)
def build_review_template(input_docx: Path) -> dict[str, Any]:
files = read_docx_files(input_docx)
document_root = parse_xml(files["word/document.xml"])
paragraphs = get_body_paragraphs(document_root)
extracted: list[dict[str, Any]] = []
heading_stack: list[str] = []
for index, paragraph in enumerate(paragraphs):
text = paragraph_text(paragraph)
if not text:
continue
style_id = paragraph_style_id(paragraph)
if is_heading_style(style_id):
level_text = "".join(ch for ch in style_id if ch.isdigit())
level = int(level_text) if level_text else 1
while len(heading_stack) >= level:
heading_stack.pop()
heading_stack.append(text)
extracted.append(
{
"paragraph_index": index,
"style_id": style_id,
"heading_path": " > ".join(heading_stack),
"source_text": text,
"issue_tags": [],
"needs_comment": False,
"comment_question": "",
"needs_revision": False,
"proposed_replacement": "",
}
)
if not extracted:
raise ValueError("No readable paragraphs were found in the BRD.")
return {
"source_docx": str(input_docx),
"prepared_at_utc": utc_now_iso(),
"reviewer": "",
"overall_notes": "",
"paragraphs": extracted,
}
def write_json(path: Path, payload: dict[str, Any]) -> None:
ensure_parent(path)
path.write_text(json.dumps(payload, indent=2, ensure_ascii=True) + "\n", encoding="utf-8")
def load_json(path: Path) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError("Review JSON must be a top-level object.")
paragraphs = payload.get("paragraphs")
if not isinstance(paragraphs, list):
raise ValueError("Review JSON must include a 'paragraphs' list.")
return payload
def set_preserve_space(node: etree._Element, text: str) -> None:
if text != text.strip():
node.set(XML_SPACE, "preserve")
def append_text_run(parent: etree._Element, text: str) -> None:
run = etree.SubElement(parent, f"{W}r")
text_node = etree.SubElement(run, f"{W}t")
text_node.text = text
set_preserve_space(text_node, text)
def ensure_track_revisions(settings_root: etree._Element) -> None:
existing = settings_root.xpath("./w:trackRevisions", namespaces=NS)
if existing:
return
settings_root.append(etree.Element(f"{W}trackRevisions"))
def replace_paragraph_with_revision(
paragraph: etree._Element,
original_text: str,
replacement_text: str,
revision_id: int,
author: str,
revision_date: str,
) -> None:
paragraph_properties = paragraph.find(f"{W}pPr")
for child in list(paragraph):
if child is not paragraph_properties:
paragraph.remove(child)
deleted = etree.Element(f"{W}del")
deleted.set(f"{W}id", str(revision_id))
deleted.set(f"{W}author", author)
deleted.set(f"{W}date", revision_date)
deleted_run = etree.SubElement(deleted, f"{W}r")
deleted_text = etree.SubElement(deleted_run, f"{W}delText")
deleted_text.text = original_text
set_preserve_space(deleted_text, original_text)
inserted = etree.Element(f"{W}ins")
inserted.set(f"{W}id", str(revision_id + 1))
inserted.set(f"{W}author", author)
inserted.set(f"{W}date", revision_date)
inserted_run = etree.SubElement(inserted, f"{W}r")
inserted_text = etree.SubElement(inserted_run, f"{W}t")
inserted_text.text = replacement_text
set_preserve_space(inserted_text, replacement_text)
paragraph.append(deleted)
paragraph.append(inserted)
def next_numeric_rid(rels_root: etree._Element) -> str:
numeric_ids: list[int] = []
for rel in rels_root.findall(f"{PR}Relationship"):
rid = rel.get("Id", "")
if rid.startswith("rId") and rid[3:].isdigit():
numeric_ids.append(int(rid[3:]))
return f"rId{max(numeric_ids, default=0) + 1}"
def ensure_comments_part(files: dict[str, bytes]) -> tuple[etree._Element, etree._Element, etree._Element]:
content_types_root = parse_xml(files["[Content_Types].xml"])
rels_root = parse_xml(files["word/_rels/document.xml.rels"])
if "word/comments.xml" in files:
comments_root = parse_xml(files["word/comments.xml"])
else:
comments_root = etree.Element(f"{W}comments", nsmap={"w": W_NS})
files["word/comments.xml"] = etree.tostring(
comments_root, xml_declaration=True, encoding="UTF-8", standalone=True
)
has_override = any(
node.get("PartName") == "/word/comments.xml"
for node in content_types_root.findall(f"{CT}Override")
)
if not has_override:
override = etree.SubElement(content_types_root, f"{CT}Override")
override.set("PartName", "/word/comments.xml")
override.set("ContentType", COMMENTS_CONTENT_TYPE)
has_relationship = any(
rel.get("Type") == COMMENTS_REL_TYPE and rel.get("Target") == "comments.xml"
for rel in rels_root.findall(f"{PR}Relationship")
)
if not has_relationship:
relationship = etree.SubElement(rels_root, f"{PR}Relationship")
relationship.set("Id", next_numeric_rid(rels_root))
relationship.set("Type", COMMENTS_REL_TYPE)
relationship.set("Target", "comments.xml")
return comments_root, content_types_root, rels_root
def next_comment_id(comments_root: etree._Element) -> int:
existing_ids: list[int] = []
for comment in comments_root.findall(f"{W}comment"):
raw_id = comment.get(f"{W}id", "")
if raw_id.isdigit():
existing_ids.append(int(raw_id))
return max(existing_ids, default=-1) + 1
def ensure_comment_anchor_runs(paragraph: etree._Element) -> list[etree._Element]:
candidate_runs = [
child
for child in paragraph
if child.tag == f"{W}r" and not child.xpath(".//w:commentReference", namespaces=NS)
]
if candidate_runs:
return candidate_runs
run = etree.Element(f"{W}r")
text_node = etree.SubElement(run, f"{W}t")
text_node.text = ""
paragraph.append(run)
return [run]
def build_comment_element(comment_id: int, author: str, comment_text: str, comment_date: str) -> etree._Element:
comment = etree.Element(f"{W}comment")
comment.set(f"{W}id", str(comment_id))
comment.set(f"{W}author", author)
comment.set(f"{W}date", comment_date)
paragraph = etree.SubElement(comment, f"{W}p")
append_text_run(paragraph, comment_text)
return comment
def attach_comment_to_paragraph(
paragraph: etree._Element,
comments_root: etree._Element,
author: str,
comment_text: str,
comment_date: str,
) -> None:
if not comment_text.strip():
return
runs = ensure_comment_anchor_runs(paragraph)
first_run = runs[0]
last_run = runs[-1]
comment_id = next_comment_id(comments_root)
comments_root.append(build_comment_element(comment_id, author, comment_text.strip(), comment_date))
start = etree.Element(f"{W}commentRangeStart")
start.set(f"{W}id", str(comment_id))
end = etree.Element(f"{W}commentRangeEnd")
end.set(f"{W}id", str(comment_id))
first_run.addprevious(start)
last_run.addnext(end)
reference_run = etree.Element(f"{W}r")
reference_props = etree.SubElement(reference_run, f"{W}rPr")
reference_style = etree.SubElement(reference_props, f"{W}rStyle")
reference_style.set(f"{W}val", "CommentReference")
reference = etree.SubElement(reference_run, f"{W}commentReference")
reference.set(f"{W}id", str(comment_id))
end.addnext(reference_run)
def serialize_xml(root: etree._Element) -> bytes:
return etree.tostring(root, xml_declaration=True, encoding="UTF-8", standalone=True)
def apply_review(
input_docx: Path,
review_payload: dict[str, Any],
output_docx: Path,
author: str,
) -> tuple[int, int]:
files = read_docx_files(input_docx)
document_root = parse_xml(files["word/document.xml"])
settings_root = parse_xml(files["word/settings.xml"])
comments_root, content_types_root, rels_root = ensure_comments_part(files)
paragraphs = get_body_paragraphs(document_root)
revision_date = utc_now_iso()
comment_date = revision_date
revisions_applied = 0
comments_added = 0
for item_index, item in enumerate(review_payload.get("paragraphs", []), start=1):
if not isinstance(item, dict):
continue
paragraph_index = item.get("paragraph_index")
if not isinstance(paragraph_index, int):
continue
if paragraph_index < 0 or paragraph_index >= len(paragraphs):
continue
paragraph = paragraphs[paragraph_index]
original_text = paragraph_text(paragraph)
if bool(item.get("needs_revision")):
replacement = str(item.get("proposed_replacement", "")).strip()
if replacement and original_text:
replace_paragraph_with_revision(
paragraph=paragraph,
original_text=original_text,
replacement_text=replacement,
revision_id=item_index * 10,
author=author,
revision_date=revision_date,
)
revisions_applied += 1
if bool(item.get("needs_comment")):
comment_question = str(item.get("comment_question", "")).strip()
if comment_question:
attach_comment_to_paragraph(
paragraph=paragraph,
comments_root=comments_root,
author=author,
comment_text=comment_question,
comment_date=comment_date,
)
comments_added += 1
ensure_track_revisions(settings_root)
files["word/document.xml"] = serialize_xml(document_root)
files["word/settings.xml"] = serialize_xml(settings_root)
files["word/comments.xml"] = serialize_xml(comments_root)
files["[Content_Types].xml"] = serialize_xml(content_types_root)
files["word/_rels/document.xml.rels"] = serialize_xml(rels_root)
write_docx_files(output_docx, input_docx, files)
return comments_added, revisions_applied
def assert_docx_written(path: Path) -> None:
if path.suffix.lower() != ".docx":
raise ValueError(f"Output must be a .docx file: {path}")
if not path.exists():
raise FileNotFoundError(f"Output was not created: {path}")
if path.stat().st_size == 0:
raise ValueError(f"Output is empty: {path}")
def command_init_review(args: argparse.Namespace) -> None:
payload = build_review_template(args.input)
write_json(args.output, payload)
print(f"[OK] Wrote review template: {args.output}")
print(f"[OK] Paragraphs detected: {len(payload['paragraphs'])}")
def command_materialize(args: argparse.Namespace) -> None:
payload = load_json(args.review_json)
comments_added, revisions_applied = apply_review(
input_docx=args.input,
review_payload=payload,
output_docx=args.output,
author=args.author,
)
assert_docx_written(args.output)
print(f"[OK] Wrote reviewed BRD: {args.output}")
print(f"[OK] Comments added: {comments_added}")
print(f"[OK] Tracked revisions applied: {revisions_applied}")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="BRD DOCX extraction and redline materialization tool.")
subparsers = parser.add_subparsers(dest="command", required=True)
init_review = subparsers.add_parser(
"init-review",
help="Extract readable BRD paragraphs into a review JSON template.",
)
init_review.add_argument("--input", type=Path, required=True, help="Source BRD .docx")
init_review.add_argument("--output", type=Path, required=True, help="Output review .json")
init_review.set_defaults(handler=command_init_review)
materialize = subparsers.add_parser(
"materialize",
help="Create a reviewed DOCX with comments and tracked changes.",
)
materialize.add_argument("--input", type=Path, required=True, help="Source BRD .docx")
materialize.add_argument(
"--review-json",
type=Path,
required=True,
help="Completed review JSON file.",
)
materialize.add_argument("--output", type=Path, required=True, help="Output reviewed .docx")
materialize.add_argument(
"--author",
default="Codex BRD Reviewer",
help="Author recorded in comments and tracked changes.",
)
materialize.set_defaults(handler=command_materialize)
return parser
def main() -> None:
parser = build_parser()
args = parser.parse_args()
args.handler(args)
if __name__ == "__main__":
main()
FILE:agents/openai.yaml
interface:
display_name: "BRD Reviewer"
short_description: "Review BRDs with comments and tracked edits"
default_prompt: "Use $brd-reviewer to review a BRD DOCX, ask paragraph-level clarification questions, and produce a redlined output document."
FILE:references/review-json-schema.md
# Review JSON Schema
Use this file only when populating the intermediate review JSON.
## Top-level fields
- `source_docx`: Source BRD path.
- `prepared_at_utc`: Timestamp from the initializer.
- `reviewer`: Optional reviewer name.
- `overall_notes`: Optional high-level review notes.
- `paragraphs`: Ordered list of paragraph review units.
## `paragraphs[]` fields
- `paragraph_index`: Zero-based index in `word/document.xml`. Do not change.
- `style_id`: Word paragraph style identifier. Do not change.
- `heading_path`: Closest heading hierarchy used for context. Do not change.
- `source_text`: Visible paragraph text. Do not change.
- `issue_tags`: Array of short reason tags.
- `needs_comment`: `true` when the paragraph needs a Word comment.
- `comment_question`: The exact clarification question for the comment balloon.
- `needs_revision`: `true` when the paragraph should receive tracked replacement text.
- `proposed_replacement`: Full replacement paragraph text used for the tracked change.
## Authoring rules
- If `needs_comment` is `true`, `comment_question` must be non-empty.
- If `needs_revision` is `true`, `proposed_replacement` must be non-empty.
- A paragraph may have both a comment and a revision.
- Leave `needs_comment` and `needs_revision` as `false` when no action is needed.
- Keep `issue_tags` short and machine-friendly.
Review U.S. individual income tax returns (Form 1040/1040-SR) for the most recent tax year, compare major return items against current-year tax rules, check...
---
name: form-1040-review
description: Review U.S. individual income tax returns (Form 1040/1040-SR) for the most recent tax year, compare major return items against current-year tax rules, check consistency across historical returns when multiple years are provided, generate a standalone DOCX risk register, and estimate audit likelihood from return content. Use when tasks involve 1040 compliance review, multi-year consistency analysis, tax-law validation, or audit-risk assessment.
---
# Form 1040 Review
## Overview
Run a structured review of normalized Form 1040 data for the latest tax year in the provided set. Produce three artifacts: a detailed findings JSON file, a markdown summary, and a separate DOCX risk report listing major items and related risks.
## Quick Start
1. Prepare normalized input JSON using [references/input_schema.json](references/input_schema.json).
2. Confirm current-law parameters in [references/current_tax_law_2025.json](references/current_tax_law_2025.json) before use.
3. Run:
```bash
python scripts/review_1040.py --input <normalized_returns.json> --output-dir output/form-1040-review
```
4. Review outputs:
- `review_summary.md`
- `review_findings.json`
- `form-1040-risk-report.docx`
## Workflow
### 1. Identify the current return
- Select the highest `tax_year` in the input as the current return.
- Treat all prior years as historical comparison returns.
### 2. Run current-year law checks
- Validate internal arithmetic and line-to-line relationships.
- Compare major current-year items to law parameters:
- Standard deduction by filing status and age/blind additions.
- Regular-rate tax computation when no preferential income is present.
- Child Tax Credit and ACTC limits.
- Self-employment tax and Additional Medicare tax thresholds.
### 3. Run multi-year consistency checks
- Compare current return against the most recent prior year.
- Flag large year-over-year movement in wages, AGI, taxable income, credits, payments, and refund/amount owed.
- Flag filing-status and dependent-count shifts for explanation.
### 4. Produce risk outputs
- Generate a structured findings file (`review_findings.json`).
- Generate a human-readable summary (`review_summary.md`).
- Generate a standalone DOCX risk register (`form-1040-risk-report.docx`) that lists each major item, severity, observations, and recommended documentation.
- Produce an audit-likelihood estimate based on weighted findings and return complexity.
## Inputs
Use the normalized schema in [references/input_schema.json](references/input_schema.json). At minimum, include:
- `tax_year`
- `filing_status`
- `major_items` for core 1040 lines (AGI, deduction, taxable income, tax, payments, refund/amount owed)
Use [references/major_items_reference.md](references/major_items_reference.md) for canonical key mapping.
## Law Source Discipline
- Update [references/current_tax_law_2025.json](references/current_tax_law_2025.json) when the filing year changes or IRS issues revisions.
- Use only official IRS/SSA sources for numeric thresholds.
- If law data is older than the analyzed return year, flag the result as stale and require manual update before final sign-off.
## Script
`scripts/review_1040.py` performs:
- Current-year arithmetic and law checks.
- Prior-year consistency checks.
- Weighted audit-risk scoring.
- DOCX risk report generation with `python-docx`.
If `python-docx` is missing, install it:
```bash
python -m pip install --user python-docx
```
## Output Interpretation
- Treat findings as risk signals, not final legal determinations.
- Require CPA/EA review for filing decisions.
- Present audit likelihood as a heuristic estimate derived from return patterns and detected issues, not a guarantee.
## Example Command
```bash
python scripts/review_1040.py \
--input references/example_returns.json \
--output-dir output/form-1040-review
```
FILE:scripts/review_1040.py
#!/usr/bin/env python
"""Analyze Form 1040 returns for current-year compliance, cross-year consistency, and audit risk."""
from __future__ import annotations
import argparse
import json
from collections import defaultdict
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
try:
from docx import Document # type: ignore
except ImportError: # pragma: no cover - runtime dependency
Document = None
SEVERITY_ORDER = {"high": 0, "medium": 1, "low": 2}
STATUS_ALIASES_DEFAULT = {
"single": "single",
"s": "single",
"married filing jointly": "married_filing_jointly",
"mfj": "married_filing_jointly",
"married filing separately": "married_filing_separately",
"mfs": "married_filing_separately",
"head of household": "head_of_household",
"hoh": "head_of_household",
"qualifying surviving spouse": "qualifying_surviving_spouse",
"qss": "qualifying_surviving_spouse",
}
NUMERIC_MAJOR_FIELDS = [
"wages",
"wages_subject_to_social_security",
"taxable_interest",
"ordinary_dividends",
"qualified_dividends",
"capital_gain_or_loss",
"business_income",
"self_employment_net_earnings",
"schedule_se_tax",
"unemployment_compensation",
"other_income",
"total_income",
"adjustments_to_income",
"agi",
"deduction_amount",
"qbi_deduction",
"taxable_income",
"tax_before_credits",
"nonrefundable_credits",
"ctc_claimed",
"actc_claimed",
"other_taxes",
"additional_medicare_tax",
"total_tax",
"withholding",
"estimated_tax_payments",
"refundable_credits",
"other_payments",
"total_payments",
"refund",
"amount_owed",
"earned_income",
]
REPORT_ITEMS = [
("wages", "Wages"),
("total_income", "Total income"),
("agi", "Adjusted gross income (AGI)"),
("deduction_amount", "Deduction amount"),
("taxable_income", "Taxable income"),
("tax_before_credits", "Tax before credits"),
("ctc_claimed", "Child Tax Credit (CTC)"),
("actc_claimed", "Additional CTC (ACTC)"),
("schedule_se_tax", "Schedule SE tax"),
("additional_medicare_tax", "Additional Medicare tax"),
("total_tax", "Total tax"),
("withholding", "Withholding"),
("total_payments", "Total payments"),
("refund", "Refund"),
("amount_owed", "Amount owed"),
]
@dataclass
class Finding:
severity: str
category: str
item: str
title: str
detail: str
recommendation: str
def to_dict(self) -> Dict[str, str]:
return {
"severity": self.severity,
"category": self.category,
"item": self.item,
"title": self.title,
"detail": self.detail,
"recommendation": self.recommendation,
}
@dataclass
class ReturnRecord:
tax_year: int
filing_status: str
deduction_type: str
dependents_count: int
qualifying_children_ctc: int
additional_standard_count: int
has_preferential_income: bool
major_items: Dict[str, float]
provided_fields: set[str]
metadata: Dict[str, Any]
@dataclass
class RiskAssessment:
score: int
level: str
probability_range: str
rationale: str
def to_dict(self) -> Dict[str, str | int]:
return {
"score": self.score,
"level": self.level,
"probability_range": self.probability_range,
"rationale": self.rationale,
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Review normalized Form 1040 return data.")
parser.add_argument("--input", required=True, help="Path to normalized returns JSON.")
parser.add_argument(
"--law",
default=str(Path(__file__).resolve().parents[1] / "references" / "current_tax_law_2025.json"),
help="Path to tax law parameter JSON.",
)
parser.add_argument(
"--output-dir",
default="output/form-1040-review",
help="Directory for review artifacts.",
)
parser.add_argument("--current-year", type=int, help="Force current tax year; default is max year in input.")
parser.add_argument("--docx-name", default="form-1040-risk-report.docx", help="DOCX output filename.")
parser.add_argument("--skip-docx", action="store_true", help="Skip DOCX generation.")
return parser.parse_args()
def read_json(path: Path) -> Any:
with path.open("r", encoding="utf-8") as handle:
return json.load(handle)
def write_json(path: Path, payload: Any) -> None:
with path.open("w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2)
handle.write("\n")
def to_float(raw_value: Any, field: str) -> float:
if raw_value in (None, ""):
return 0.0
if isinstance(raw_value, (int, float)):
return float(raw_value)
if isinstance(raw_value, bool):
return 1.0 if raw_value else 0.0
cleaned = str(raw_value).strip().replace(",", "").replace("$", "")
if not cleaned:
return 0.0
try:
return float(cleaned)
except ValueError as exc:
raise ValueError(f"Could not parse numeric field '{field}' from value '{raw_value}'.") from exc
def to_int(raw_value: Any, field: str) -> int:
if raw_value in (None, ""):
return 0
if isinstance(raw_value, bool):
return 1 if raw_value else 0
if isinstance(raw_value, int):
return raw_value
if isinstance(raw_value, float):
return int(round(raw_value))
cleaned = str(raw_value).strip().replace(",", "")
try:
return int(float(cleaned))
except ValueError as exc:
raise ValueError(f"Could not parse integer field '{field}' from value '{raw_value}'.") from exc
def normalize_filing_status(value: str, aliases: Dict[str, str]) -> str:
key = " ".join(str(value).strip().lower().replace("_", " ").replace("-", " ").split())
if key in aliases:
return aliases[key]
raise ValueError(
f"Unsupported filing_status '{value}'. Expected one of: {', '.join(sorted(set(aliases.values())))}"
)
def parse_return(raw: Dict[str, Any], aliases: Dict[str, str]) -> ReturnRecord:
if "tax_year" not in raw:
raise ValueError("Each return entry must include 'tax_year'.")
if "filing_status" not in raw:
raise ValueError("Each return entry must include 'filing_status'.")
tax_year = to_int(raw["tax_year"], "tax_year")
filing_status = normalize_filing_status(str(raw["filing_status"]), aliases)
deduction_type = str(raw.get("deduction_type", "standard")).strip().lower()
if deduction_type not in {"standard", "itemized"}:
raise ValueError("deduction_type must be 'standard' or 'itemized'.")
major_blob: Dict[str, Any] = dict(raw.get("major_items", {}))
for key in NUMERIC_MAJOR_FIELDS:
if key in raw and key not in major_blob:
major_blob[key] = raw[key]
major_items: Dict[str, float] = {}
for key in NUMERIC_MAJOR_FIELDS:
major_items[key] = to_float(major_blob.get(key, 0.0), key)
provided_fields = {key for key in major_blob.keys() if key in NUMERIC_MAJOR_FIELDS}
has_preferential_income = bool(raw.get("has_preferential_income", False))
if (
not has_preferential_income
and (major_items.get("qualified_dividends", 0.0) > 0.0 or major_items.get("capital_gain_or_loss", 0.0) > 0.0)
):
has_preferential_income = True
additional_standard_count = to_int(raw.get("additional_standard_deduction_count", 0), "additional_standard_deduction_count")
dependents_count = to_int(raw.get("dependents_count", 0), "dependents_count")
qualifying_children_ctc = to_int(raw.get("qualifying_children_ctc", 0), "qualifying_children_ctc")
metadata = raw.get("metadata", {})
if metadata is None:
metadata = {}
return ReturnRecord(
tax_year=tax_year,
filing_status=filing_status,
deduction_type=deduction_type,
dependents_count=dependents_count,
qualifying_children_ctc=qualifying_children_ctc,
additional_standard_count=additional_standard_count,
has_preferential_income=has_preferential_income,
major_items=major_items,
provided_fields=provided_fields,
metadata=metadata,
)
def parse_returns(payload: Any, aliases: Dict[str, str]) -> List[ReturnRecord]:
entries: List[Dict[str, Any]]
if isinstance(payload, list):
entries = payload
elif isinstance(payload, dict):
entries = payload.get("returns", [])
else:
raise ValueError("Input JSON must be an object with 'returns' or a list of returns.")
if not entries:
raise ValueError("Input contains no returns.")
returns = [parse_return(entry, aliases) for entry in entries]
returns.sort(key=lambda item: item.tax_year)
return returns
def has_fields(record: ReturnRecord, *fields: str) -> bool:
return all(field in record.provided_fields for field in fields)
def fmt_money(value: float) -> str:
if abs(value - round(value)) < 0.01:
return f",.0f"
return f",.2f"
def pct_change(prior: float, current: float) -> Optional[float]:
if abs(prior) < 1e-9:
return None
return (current - prior) / abs(prior)
def highest_severity(findings: Iterable[Finding]) -> str:
best = "none"
for finding in findings:
if best == "none" or SEVERITY_ORDER[finding.severity] < SEVERITY_ORDER[best]:
best = finding.severity
return best
def get_tax_brackets(law: Dict[str, Any], filing_status: str) -> Optional[List[Dict[str, Any]]]:
tax_brackets = law.get("tax_brackets", {})
brackets = tax_brackets.get(filing_status)
if isinstance(brackets, str):
brackets = tax_brackets.get(brackets)
if not isinstance(brackets, list):
return None
return brackets
def compute_progressive_tax(taxable_income: float, brackets: List[Dict[str, Any]]) -> float:
if taxable_income <= 0:
return 0.0
remaining_income = taxable_income
lower_bound = 0.0
tax = 0.0
for bracket in brackets:
upper_bound = bracket.get("up_to")
rate = float(bracket["rate"])
if upper_bound is None:
taxable_slice = max(0.0, remaining_income)
tax += taxable_slice * rate
return tax
upper_bound = float(upper_bound)
slice_width = max(0.0, upper_bound - lower_bound)
taxable_slice = min(remaining_income, slice_width)
tax += taxable_slice * rate
remaining_income -= taxable_slice
lower_bound = upper_bound
if remaining_income <= 0:
return tax
return tax
def check_arithmetic(record: ReturnRecord, tolerance: float) -> List[Finding]:
findings: List[Finding] = []
m = record.major_items
if has_fields(record, "total_income", "adjustments_to_income", "agi"):
expected = m["total_income"] - m["adjustments_to_income"]
if abs(m["agi"] - expected) > tolerance:
findings.append(
Finding(
severity="high",
category="arithmetic",
item="agi",
title="AGI mismatch",
detail=f"AGI is {fmt_money(m['agi'])}, but total income minus adjustments is {fmt_money(expected)}.",
recommendation="Reconcile Form 1040 lines for total income, adjustments, and AGI.",
)
)
if has_fields(record, "agi", "deduction_amount", "qbi_deduction", "taxable_income"):
expected = max(0.0, m["agi"] - m["deduction_amount"] - m["qbi_deduction"])
if abs(m["taxable_income"] - expected) > tolerance:
findings.append(
Finding(
severity="high",
category="arithmetic",
item="taxable_income",
title="Taxable income mismatch",
detail=(
f"Taxable income is {fmt_money(m['taxable_income'])}, but AGI minus deductions and QBI is "
f"{fmt_money(expected)}."
),
recommendation="Verify deduction, QBI deduction, and taxable-income worksheet entries.",
)
)
if has_fields(record, "tax_before_credits", "nonrefundable_credits", "other_taxes", "total_tax"):
expected = max(0.0, m["tax_before_credits"] - m["nonrefundable_credits"]) + m["other_taxes"]
if abs(m["total_tax"] - expected) > tolerance:
findings.append(
Finding(
severity="high",
category="arithmetic",
item="total_tax",
title="Total tax mismatch",
detail=(
f"Total tax is {fmt_money(m['total_tax'])}, but expected from tax, credits, and other taxes is "
f"{fmt_money(expected)}."
),
recommendation="Recheck nonrefundable-credit offsets and other-tax carryovers.",
)
)
if has_fields(record, "withholding", "estimated_tax_payments", "refundable_credits", "other_payments", "total_payments"):
expected = (
m["withholding"] + m["estimated_tax_payments"] + m["refundable_credits"] + m["other_payments"]
)
if abs(m["total_payments"] - expected) > tolerance:
findings.append(
Finding(
severity="high",
category="arithmetic",
item="total_payments",
title="Total payments mismatch",
detail=(
f"Total payments is {fmt_money(m['total_payments'])}, but component payments sum to "
f"{fmt_money(expected)}."
),
recommendation="Reconcile withholding, estimated payments, refundable credits, and other payments.",
)
)
if has_fields(record, "total_payments", "total_tax", "refund", "amount_owed"):
net = m["total_payments"] - m["total_tax"]
expected_refund = max(0.0, net)
expected_owed = max(0.0, -net)
if abs(m["refund"] - expected_refund) > tolerance or abs(m["amount_owed"] - expected_owed) > tolerance:
findings.append(
Finding(
severity="high",
category="arithmetic",
item="refund",
title="Refund/amount owed mismatch",
detail=(
f"Net payments minus total tax is {fmt_money(net)}. Reported refund is {fmt_money(m['refund'])} "
f"and amount owed is {fmt_money(m['amount_owed'])}."
),
recommendation="Recalculate net balance due and ensure refund/amount owed lines are mutually consistent.",
)
)
return findings
def check_law(record: ReturnRecord, law: Dict[str, Any], tolerance: float, tax_tolerance: float) -> List[Finding]:
findings: List[Finding] = []
m = record.major_items
standard_table = law.get("standard_deduction", {})
if record.deduction_type == "standard" and has_fields(record, "deduction_amount"):
if record.filing_status in standard_table:
expected = float(standard_table[record.filing_status])
addl_table = law.get("additional_standard_deduction", {})
if record.additional_standard_count > 0:
if record.filing_status in {"married_filing_jointly", "married_filing_separately", "qualifying_surviving_spouse"}:
expected += float(addl_table.get("married_or_qss", 0.0)) * record.additional_standard_count
else:
expected += float(addl_table.get("single_or_hoh_or_mfs", 0.0)) * record.additional_standard_count
if abs(m["deduction_amount"] - expected) > tolerance:
findings.append(
Finding(
severity="high",
category="law",
item="deduction_amount",
title="Standard deduction amount appears incorrect",
detail=(
f"Reported standard deduction is {fmt_money(m['deduction_amount'])}, expected {fmt_money(expected)} "
f"for filing status {record.filing_status}."
),
recommendation="Recheck standard deduction amount against current-year IRS guidance.",
)
)
ctc = law.get("ctc", {})
ctc_max = float(ctc.get("max_per_qualifying_child", 0.0))
actc_max = float(ctc.get("actc_max_per_qualifying_child", 0.0))
if has_fields(record, "ctc_claimed") and ctc_max > 0:
max_credit = ctc_max * max(0, record.qualifying_children_ctc)
if m["ctc_claimed"] > max_credit + tolerance:
findings.append(
Finding(
severity="high",
category="law",
item="ctc_claimed",
title="CTC exceeds configured limit",
detail=(
f"Claimed CTC is {fmt_money(m['ctc_claimed'])}, but configured maximum for "
f"{record.qualifying_children_ctc} qualifying children is {fmt_money(max_credit)}."
),
recommendation="Validate qualifying-child count and CTC worksheet limits.",
)
)
if has_fields(record, "actc_claimed") and actc_max > 0:
max_actc = actc_max * max(0, record.qualifying_children_ctc)
if m["actc_claimed"] > max_actc + tolerance:
findings.append(
Finding(
severity="high",
category="law",
item="actc_claimed",
title="ACTC exceeds configured limit",
detail=(
f"Claimed ACTC is {fmt_money(m['actc_claimed'])}, but configured maximum for "
f"{record.qualifying_children_ctc} qualifying children is {fmt_money(max_actc)}."
),
recommendation="Review Schedule 8812 ACTC computation and dependent eligibility.",
)
)
min_earned_income = float(ctc.get("actc_min_earned_income", 0.0))
if m["actc_claimed"] > tolerance and has_fields(record, "earned_income") and m["earned_income"] < min_earned_income:
findings.append(
Finding(
severity="high",
category="law",
item="actc_claimed",
title="ACTC claimed with low earned income",
detail=(
f"ACTC is claimed ({fmt_money(m['actc_claimed'])}) but earned income is {fmt_money(m['earned_income'])}, "
f"below the configured minimum {fmt_money(min_earned_income)}."
),
recommendation="Confirm earned-income eligibility for ACTC.",
)
)
if has_fields(record, "taxable_income", "tax_before_credits"):
if not record.has_preferential_income:
brackets = get_tax_brackets(law, record.filing_status)
if brackets:
expected_tax = compute_progressive_tax(m["taxable_income"], brackets)
delta = abs(m["tax_before_credits"] - expected_tax)
dynamic_tolerance = max(tax_tolerance, expected_tax * 0.05)
if delta > dynamic_tolerance:
findings.append(
Finding(
severity="medium",
category="law",
item="tax_before_credits",
title="Tax-before-credits differs from rate schedule",
detail=(
f"Reported tax before credits is {fmt_money(m['tax_before_credits'])}; simple rate-schedule "
f"result is {fmt_money(expected_tax)}."
),
recommendation=(
"Check Tax Table/Tax Computation Worksheet inputs and confirm no special computation applies."
),
)
)
else:
findings.append(
Finding(
severity="low",
category="law",
item="tax_before_credits",
title="Preferential-income tax calculation requires manual worksheet review",
detail=(
"Qualified dividends or capital gains are present, so simple ordinary-bracket recomputation may not "
"match Form 1040 tax."
),
recommendation="Review Qualified Dividends and Capital Gain Tax Worksheet outputs.",
)
)
se_cfg = law.get("self_employment", {})
ss_wage_base = float(se_cfg.get("social_security_wage_base", 0.0))
ss_rate = float(se_cfg.get("social_security_rate", 0.124))
medicare_rate = float(se_cfg.get("medicare_rate", 0.029))
if has_fields(record, "self_employment_net_earnings", "schedule_se_tax") and ss_wage_base > 0:
net_earnings = max(0.0, m["self_employment_net_earnings"])
se_base = net_earnings * 0.9235
wages_for_ss = m.get("wages_subject_to_social_security", m.get("wages", 0.0))
remaining_ss_base = max(0.0, ss_wage_base - max(0.0, wages_for_ss))
ss_taxable = min(se_base, remaining_ss_base)
expected_se_tax = (ss_taxable * ss_rate) + (se_base * medicare_rate)
if abs(m["schedule_se_tax"] - expected_se_tax) > max(75.0, tax_tolerance):
findings.append(
Finding(
severity="medium",
category="law",
item="schedule_se_tax",
title="Schedule SE tax differs from approximation",
detail=(
f"Reported Schedule SE tax is {fmt_money(m['schedule_se_tax'])}; approximate computation is "
f"{fmt_money(expected_se_tax)} using the configured SS wage base."
),
recommendation="Recompute Schedule SE, especially SS wage-base interaction and deductible half-SE tax.",
)
)
thresholds = law.get("additional_medicare_threshold", {})
threshold_value = thresholds.get(record.filing_status)
if threshold_value is not None and has_fields(record, "additional_medicare_tax"):
threshold = float(threshold_value)
medicare_base = max(0.0, m.get("wages", 0.0)) + max(0.0, m.get("self_employment_net_earnings", 0.0) * 0.9235)
expected_addl = max(0.0, medicare_base - threshold) * 0.009
if expected_addl - m["additional_medicare_tax"] > max(25.0, tolerance):
findings.append(
Finding(
severity="medium",
category="law",
item="additional_medicare_tax",
title="Additional Medicare tax may be understated",
detail=(
f"Approximate Additional Medicare tax is {fmt_money(expected_addl)} based on income {fmt_money(medicare_base)}, "
f"but reported amount is {fmt_money(m['additional_medicare_tax'])}."
),
recommendation="Review Form 8959 and withholding credit application.",
)
)
if record.qualifying_children_ctc > record.dependents_count and record.dependents_count > 0:
findings.append(
Finding(
severity="high",
category="law",
item="qualifying_children_ctc",
title="Qualifying-child count exceeds dependent count",
detail=(
f"Qualifying children for CTC is {record.qualifying_children_ctc}, but dependents count is "
f"{record.dependents_count}."
),
recommendation="Reconcile dependent roster and CTC qualifying-child eligibility.",
)
)
return findings
def check_consistency(current: ReturnRecord, prior: Optional[ReturnRecord], law: Dict[str, Any]) -> List[Finding]:
findings: List[Finding] = []
if prior is None:
return findings
if current.filing_status != prior.filing_status:
findings.append(
Finding(
severity="medium",
category="consistency",
item="filing_status",
title="Filing status changed from prior year",
detail=f"Prior year status: {prior.filing_status}; current year status: {current.filing_status}.",
recommendation="Document life-event support for filing-status change.",
)
)
dependent_delta = current.dependents_count - prior.dependents_count
if abs(dependent_delta) >= 2:
findings.append(
Finding(
severity="medium",
category="consistency",
item="dependents_count",
title="Large dependent-count change",
detail=(
f"Dependent count changed by {dependent_delta:+d} (from {prior.dependents_count} to "
f"{current.dependents_count})."
),
recommendation="Retain support for dependent additions/removals.",
)
)
thresholds = law.get("consistency_thresholds", {})
for field, cfg in thresholds.items():
if field not in current.provided_fields or field not in prior.provided_fields:
continue
curr = current.major_items[field]
prev = prior.major_items[field]
delta = curr - prev
pct = pct_change(prev, curr)
min_abs = float(cfg.get("absolute_min", 0.0))
pct_limit = float(cfg.get("pct_change", 0.0))
if abs(delta) < min_abs:
continue
exceeded = False
if pct is None:
exceeded = abs(curr) >= min_abs
else:
exceeded = abs(pct) >= pct_limit
if not exceeded:
continue
severity = str(cfg.get("severity", "low"))
if pct is None:
delta_text = f"changed from {fmt_money(prev)} to {fmt_money(curr)}"
else:
delta_text = f"changed by {pct * 100:+.1f}% ({fmt_money(prev)} to {fmt_money(curr)})"
findings.append(
Finding(
severity=severity,
category="consistency",
item=field,
title=f"Year-over-year variance in {field.replace('_', ' ')}",
detail=(
f"Compared with {prior.tax_year}, {field.replace('_', ' ')} {delta_text}."
),
recommendation="Confirm supporting documents and explanation for this variance.",
)
)
return findings
def assess_audit_risk(findings: List[Finding], current: ReturnRecord) -> RiskAssessment:
weights = {"high": 12, "medium": 6, "low": 2}
score = sum(weights.get(finding.severity, 0) for finding in findings)
# Complexity adjustments
if current.major_items.get("self_employment_net_earnings", 0.0) > 0:
score += 4
if current.deduction_type == "itemized":
score += 2
if current.has_preferential_income:
score += 1
high_count = sum(1 for finding in findings if finding.severity == "high")
if high_count >= 3:
score += 10
score = int(min(100, max(0, round(score))))
if score <= 15:
level = "Low"
probability = "5-10%"
elif score <= 35:
level = "Low-Moderate"
probability = "10-20%"
elif score <= 55:
level = "Moderate"
probability = "20-35%"
elif score <= 75:
level = "Moderate-High"
probability = "35-55%"
else:
level = "High"
probability = "55%+"
rationale = (
"Heuristic score based on arithmetic mismatches, law-check findings, and multi-year consistency variances. "
"Not an IRS prediction model."
)
return RiskAssessment(score=score, level=level, probability_range=probability, rationale=rationale)
def render_markdown(
path: Path,
current: ReturnRecord,
prior: Optional[ReturnRecord],
findings: List[Finding],
risk: RiskAssessment,
law: Dict[str, Any],
) -> None:
lines: List[str] = []
lines.append(f"# Form 1040 Review Summary - Tax Year {current.tax_year}")
lines.append("")
lines.append(f"Generated: {date.today().isoformat()}")
lines.append(f"Law reference tax year: {law.get('tax_year', 'unknown')} (as of {law.get('as_of_date', 'unknown')})")
lines.append("")
lines.append("## Overall Audit Risk")
lines.append("")
lines.append(
f"Estimated risk: **{risk.level}** (score **{risk.score}/100**, indicative probability range **{risk.probability_range}**)."
)
lines.append("")
lines.append(risk.rationale)
lines.append("")
lines.append("## Return Scope")
lines.append("")
lines.append(f"- Current return year: {current.tax_year}")
lines.append(f"- Filing status: {current.filing_status}")
lines.append(f"- Deduction type: {current.deduction_type}")
if prior is not None:
lines.append(f"- Compared to prior return year: {prior.tax_year}")
else:
lines.append("- No historical return provided for consistency comparison")
lines.append("")
lines.append("## Findings")
lines.append("")
if not findings:
lines.append("No findings were generated by configured checks.")
else:
lines.append("| Severity | Category | Item | Finding | Recommendation |")
lines.append("|---|---|---|---|---|")
for finding in findings:
lines.append(
"| {severity} | {category} | {item} | {title}: {detail} | {recommendation} |".format(
severity=finding.severity.upper(),
category=finding.category,
item=finding.item.replace("|", "\\|"),
title=finding.title.replace("|", "\\|"),
detail=finding.detail.replace("|", "\\|"),
recommendation=finding.recommendation.replace("|", "\\|"),
)
)
lines.append("")
lines.append("## Note")
lines.append("")
lines.append("This report is a technical consistency and rules check, not legal or tax filing advice.")
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def render_docx(
path: Path,
current: ReturnRecord,
prior: Optional[ReturnRecord],
findings: List[Finding],
risk: RiskAssessment,
) -> None:
if Document is None:
raise RuntimeError(
"python-docx is not installed. Install with: "
"C:\\windows\\System32\\WindowsPowerShell\\v1.0\\powershell.exe -Command \"$env:PYTHONUSERBASE='C:\\Codex\\.pyuser'; python -m pip install --user python-docx\""
)
findings_by_item: Dict[str, List[Finding]] = defaultdict(list)
for finding in findings:
findings_by_item[finding.item].append(finding)
document = Document()
document.add_heading(f"Form 1040 Risk Review - Tax Year {current.tax_year}", level=0)
document.add_paragraph(f"Generated: {date.today().isoformat()}")
document.add_paragraph(
"This report lists major return items and risk observations from arithmetic checks, law checks, and "
"cross-year consistency checks."
)
p = document.add_paragraph()
p.add_run("Overall audit-risk estimate: ").bold = True
p.add_run(f"{risk.level} (score {risk.score}/100; indicative probability {risk.probability_range}).")
if prior is not None:
document.add_paragraph(f"Historical consistency comparison performed against tax year {prior.tax_year}.")
else:
document.add_paragraph("No historical return was provided for year-over-year consistency comparison.")
table = document.add_table(rows=1, cols=5)
header = table.rows[0].cells
header[0].text = "Major item"
header[1].text = "Current value"
header[2].text = "Highest severity"
header[3].text = "Observations"
header[4].text = "Recommended documentation"
for item_key, item_label in REPORT_ITEMS:
row_cells = table.add_row().cells
value_text = fmt_money(current.major_items[item_key]) if item_key in current.provided_fields else "Not provided"
relevant = findings_by_item.get(item_key, [])
severity = highest_severity(relevant)
if relevant:
observations = " ".join(f"{f.title}." for f in relevant[:2])
recommendation = relevant[0].recommendation
else:
observations = "No material issue detected by configured checks."
recommendation = "Retain normal support documents for this line item."
row_cells[0].text = item_label
row_cells[1].text = value_text
row_cells[2].text = severity.upper() if severity != "none" else "NONE"
row_cells[3].text = observations
row_cells[4].text = recommendation
consistency_findings = [f for f in findings if f.category == "consistency"]
document.add_heading("Cross-Year Consistency Notes", level=1)
if consistency_findings:
for finding in consistency_findings:
document.add_paragraph(
f"[{finding.severity.upper()}] {finding.title}: {finding.detail} Recommendation: {finding.recommendation}",
style="List Bullet",
)
else:
document.add_paragraph("No multi-year inconsistency findings were generated.")
document.add_heading("All Findings", level=1)
if findings:
for finding in findings:
document.add_paragraph(
f"[{finding.severity.upper()}] {finding.category} - {finding.title}: {finding.detail}",
style="List Bullet",
)
else:
document.add_paragraph("No findings were generated by configured checks.")
document.add_paragraph("This report is a heuristic review aid and not legal or tax advice.")
path.parent.mkdir(parents=True, exist_ok=True)
document.save(path)
def select_current_and_prior(returns: List[ReturnRecord], forced_year: Optional[int]) -> Tuple[ReturnRecord, Optional[ReturnRecord]]:
if forced_year is None:
current = max(returns, key=lambda item: item.tax_year)
else:
matches = [record for record in returns if record.tax_year == forced_year]
if not matches:
raise ValueError(f"No return found for --current-year {forced_year}.")
current = matches[0]
prior_candidates = [record for record in returns if record.tax_year < current.tax_year]
prior = max(prior_candidates, key=lambda item: item.tax_year) if prior_candidates else None
return current, prior
def sort_findings(findings: List[Finding]) -> List[Finding]:
return sorted(
findings,
key=lambda item: (
SEVERITY_ORDER.get(item.severity, 99),
item.category,
item.item,
item.title,
),
)
def main() -> None:
args = parse_args()
input_path = Path(args.input).resolve()
law_path = Path(args.law).resolve()
output_dir = Path(args.output_dir).resolve()
output_dir.mkdir(parents=True, exist_ok=True)
law = read_json(law_path)
aliases = dict(STATUS_ALIASES_DEFAULT)
aliases.update({k.lower(): v for k, v in law.get("filing_status_aliases", {}).items()})
returns = parse_returns(read_json(input_path), aliases)
current, prior = select_current_and_prior(returns, args.current_year)
tolerance = float(law.get("tolerance", {}).get("dollar", 2.0))
tax_tolerance = float(law.get("tolerance", {}).get("tax_compare_dollar", 150.0))
findings: List[Finding] = []
law_year = law.get("tax_year")
if isinstance(law_year, int) and law_year < current.tax_year:
findings.append(
Finding(
severity="high",
category="law",
item="law_reference",
title="Law reference year is stale",
detail=f"Law file tax_year is {law_year}, but current return year is {current.tax_year}.",
recommendation="Update law parameters before finalizing review conclusions.",
)
)
findings.extend(check_arithmetic(current, tolerance))
findings.extend(check_law(current, law, tolerance, tax_tolerance))
findings.extend(check_consistency(current, prior, law))
findings = sort_findings(findings)
risk = assess_audit_risk(findings, current)
summary_path = output_dir / "review_summary.md"
findings_path = output_dir / "review_findings.json"
docx_path = output_dir / args.docx_name
render_markdown(summary_path, current, prior, findings, risk, law)
write_json(
findings_path,
{
"current_tax_year": current.tax_year,
"law_reference": {
"path": str(law_path),
"tax_year": law.get("tax_year"),
"as_of_date": law.get("as_of_date"),
"sources": law.get("sources", []),
},
"audit_risk": risk.to_dict(),
"current_return": {
"tax_year": current.tax_year,
"filing_status": current.filing_status,
"deduction_type": current.deduction_type,
"dependents_count": current.dependents_count,
"qualifying_children_ctc": current.qualifying_children_ctc,
"major_items": current.major_items,
},
"prior_return_year": prior.tax_year if prior else None,
"findings": [finding.to_dict() for finding in findings],
},
)
if not args.skip_docx:
render_docx(docx_path, current, prior, findings, risk)
print(f"Current return year: {current.tax_year}")
print(f"Findings: {len(findings)}")
print(f"Audit risk: {risk.level} ({risk.score}/100, {risk.probability_range})")
print(f"Summary: {summary_path}")
print(f"Findings JSON: {findings_path}")
if not args.skip_docx:
print(f"DOCX risk report: {docx_path}")
if __name__ == "__main__":
main()
FILE:agents/openai.yaml
interface:
display_name: "Form 1040 Review"
short_description: "Review 1040 consistency, compliance, and audit risk"
default_prompt: "Use $form-1040-review to analyze the latest Form 1040, compare current-year law checks, assess multi-year consistency, generate a DOCX risk register, and estimate audit likelihood."
FILE:references/major_items_reference.md
# Major Items Reference
Use these canonical `major_items` keys when normalizing returns before running the script.
| Key | Typical Source | Notes |
|---|---|---|
| `wages` | Form 1040 line 1z | Wages, salaries, tips from Forms W-2.
| `wages_subject_to_social_security` | W-2 box 3 total | Optional; improves Schedule SE cross-check.
| `taxable_interest` | Form 1040 line 2b | Taxable interest only.
| `ordinary_dividends` | Form 1040 line 3b | Ordinary dividends.
| `qualified_dividends` | Form 1040 line 3a | Presence implies preferential tax computation.
| `capital_gain_or_loss` | Form 1040 line 7 | Net capital gain/loss.
| `business_income` | Schedule 1 line 3 | Optional high-level business-income marker.
| `self_employment_net_earnings` | Schedule SE net earnings | Use positive net earnings for tax approximation.
| `schedule_se_tax` | Schedule 2 / Schedule SE result | SE tax amount carried to 1040 total tax flow.
| `unemployment_compensation` | Schedule 1 line 7 | Taxable unemployment income.
| `other_income` | Schedule 1, other items | Any other taxable income not split elsewhere.
| `total_income` | Form 1040 line 9 | Total income.
| `adjustments_to_income` | Form 1040 line 10 | Schedule 1 adjustments.
| `agi` | Form 1040 line 11 | Adjusted gross income.
| `deduction_amount` | Form 1040 line 12 | Standard or itemized deduction amount.
| `qbi_deduction` | Form 1040 line 13 | Qualified business income deduction.
| `taxable_income` | Form 1040 line 15 | Taxable income.
| `tax_before_credits` | Form 1040 line 16 | Computed tax before credits.
| `nonrefundable_credits` | Form 1040 line 20 | Nonrefundable credits total.
| `ctc_claimed` | Schedule 8812 nonrefundable portion | Claimed CTC used as nonrefundable credit.
| `actc_claimed` | Schedule 8812 refundable portion | Additional child tax credit.
| `other_taxes` | Form 1040 line 23 | Other taxes.
| `additional_medicare_tax` | Form 8959 result | Additional Medicare tax component.
| `total_tax` | Form 1040 line 24 | Total tax.
| `withholding` | Form 1040 line 25d | Federal withholding total.
| `estimated_tax_payments` | Form 1040 line 26 | Estimated payments and prior-year overpayment applied.
| `refundable_credits` | Form 1040 line 31 total | Refundable credits total.
| `other_payments` | Payment line items not above | Optional catch-all payment bucket.
| `total_payments` | Form 1040 line 33 | Total payments.
| `refund` | Form 1040 line 35a | Refund amount.
| `amount_owed` | Form 1040 line 37 | Amount you owe.
| `earned_income` | Earned-income figure for ACTC tests | Usually wage + SE earned income.
## Normalization Tips
- Keep currency values as numbers (no `$` signs) when possible.
- Preserve one record per tax year.
- Include historical years only if they are complete enough for year-over-year comparison.
- If a key is unknown, omit it; the checker skips missing-key tests rather than forcing zero.
FILE:references/input_schema.json
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Form 1040 Review Input",
"type": "object",
"required": ["returns"],
"properties": {
"returns": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"required": ["tax_year", "filing_status", "major_items"],
"properties": {
"tax_year": {"type": "integer"},
"filing_status": {
"type": "string",
"description": "single | married_filing_jointly | married_filing_separately | head_of_household | qualifying_surviving_spouse"
},
"deduction_type": {
"type": "string",
"enum": ["standard", "itemized"],
"default": "standard"
},
"dependents_count": {"type": "integer", "minimum": 0, "default": 0},
"qualifying_children_ctc": {"type": "integer", "minimum": 0, "default": 0},
"additional_standard_deduction_count": {"type": "integer", "minimum": 0, "default": 0},
"has_preferential_income": {"type": "boolean", "default": false},
"major_items": {
"type": "object",
"description": "Use canonical keys from references/major_items_reference.md",
"properties": {
"wages": {"type": "number"},
"wages_subject_to_social_security": {"type": "number"},
"taxable_interest": {"type": "number"},
"ordinary_dividends": {"type": "number"},
"qualified_dividends": {"type": "number"},
"capital_gain_or_loss": {"type": "number"},
"business_income": {"type": "number"},
"self_employment_net_earnings": {"type": "number"},
"schedule_se_tax": {"type": "number"},
"unemployment_compensation": {"type": "number"},
"other_income": {"type": "number"},
"total_income": {"type": "number"},
"adjustments_to_income": {"type": "number"},
"agi": {"type": "number"},
"deduction_amount": {"type": "number"},
"qbi_deduction": {"type": "number"},
"taxable_income": {"type": "number"},
"tax_before_credits": {"type": "number"},
"nonrefundable_credits": {"type": "number"},
"ctc_claimed": {"type": "number"},
"actc_claimed": {"type": "number"},
"other_taxes": {"type": "number"},
"additional_medicare_tax": {"type": "number"},
"total_tax": {"type": "number"},
"withholding": {"type": "number"},
"estimated_tax_payments": {"type": "number"},
"refundable_credits": {"type": "number"},
"other_payments": {"type": "number"},
"total_payments": {"type": "number"},
"refund": {"type": "number"},
"amount_owed": {"type": "number"},
"earned_income": {"type": "number"}
},
"additionalProperties": true
},
"metadata": {
"type": "object",
"additionalProperties": true,
"description": "Optional source metadata such as source_file or extraction notes"
}
},
"additionalProperties": true
}
}
},
"additionalProperties": false
}
FILE:references/current_tax_law_2025.json
{
"tax_year": 2025,
"as_of_date": "2026-03-07",
"sources": [
"https://www.irs.gov/filing/federal-income-tax-rates-and-brackets",
"https://www.irs.gov/newsroom/new-and-enhanced-deductions-for-individuals",
"https://www.irs.gov/publications/p501",
"https://www.irs.gov/credits-deductions/individuals/child-tax-credit",
"https://www.irs.gov/instructions/i1040s8",
"https://www.irs.gov/instructions/i8959",
"https://www.irs.gov/taxtopics/tc554",
"https://www.ssa.gov/oact/COLA/cbbdet.html"
],
"filing_status_aliases": {
"married filing joint": "married_filing_jointly",
"married filing separately": "married_filing_separately",
"married filing separate": "married_filing_separately",
"head household": "head_of_household",
"qualifying widow": "qualifying_surviving_spouse",
"qualifying widow(er)": "qualifying_surviving_spouse",
"qualifying surviving spouse": "qualifying_surviving_spouse"
},
"standard_deduction": {
"single": 15750,
"married_filing_jointly": 31500,
"married_filing_separately": 15750,
"head_of_household": 23625,
"qualifying_surviving_spouse": 31500
},
"additional_standard_deduction": {
"single_or_hoh_or_mfs": 2000,
"married_or_qss": 1600
},
"tax_brackets": {
"single": [
{"up_to": 11925, "rate": 0.10},
{"up_to": 48475, "rate": 0.12},
{"up_to": 103350, "rate": 0.22},
{"up_to": 197300, "rate": 0.24},
{"up_to": 250525, "rate": 0.32},
{"up_to": 626350, "rate": 0.35},
{"up_to": null, "rate": 0.37}
],
"married_filing_jointly": [
{"up_to": 23850, "rate": 0.10},
{"up_to": 96950, "rate": 0.12},
{"up_to": 206700, "rate": 0.22},
{"up_to": 394600, "rate": 0.24},
{"up_to": 501050, "rate": 0.32},
{"up_to": 751600, "rate": 0.35},
{"up_to": null, "rate": 0.37}
],
"married_filing_separately": [
{"up_to": 11925, "rate": 0.10},
{"up_to": 48475, "rate": 0.12},
{"up_to": 103350, "rate": 0.22},
{"up_to": 197300, "rate": 0.24},
{"up_to": 250525, "rate": 0.32},
{"up_to": 375800, "rate": 0.35},
{"up_to": null, "rate": 0.37}
],
"head_of_household": [
{"up_to": 17000, "rate": 0.10},
{"up_to": 64850, "rate": 0.12},
{"up_to": 103350, "rate": 0.22},
{"up_to": 197300, "rate": 0.24},
{"up_to": 250500, "rate": 0.32},
{"up_to": 626350, "rate": 0.35},
{"up_to": null, "rate": 0.37}
],
"qualifying_surviving_spouse": "married_filing_jointly"
},
"ctc": {
"max_per_qualifying_child": 2200,
"actc_max_per_qualifying_child": 1700,
"actc_min_earned_income": 2500,
"phaseout_start": {
"single": 200000,
"head_of_household": 200000,
"married_filing_jointly": 400000,
"married_filing_separately": 200000,
"qualifying_surviving_spouse": 200000
}
},
"self_employment": {
"social_security_wage_base": 176100,
"social_security_rate": 0.124,
"medicare_rate": 0.029
},
"additional_medicare_threshold": {
"single": 200000,
"head_of_household": 200000,
"married_filing_jointly": 250000,
"married_filing_separately": 125000,
"qualifying_surviving_spouse": 200000
},
"tolerance": {
"dollar": 2.0,
"tax_compare_dollar": 150.0
},
"consistency_thresholds": {
"wages": {"pct_change": 0.35, "absolute_min": 7500, "severity": "medium"},
"agi": {"pct_change": 0.30, "absolute_min": 7500, "severity": "medium"},
"taxable_income": {"pct_change": 0.35, "absolute_min": 7500, "severity": "medium"},
"tax_before_credits": {"pct_change": 0.35, "absolute_min": 2500, "severity": "low"},
"ctc_claimed": {"pct_change": 0.50, "absolute_min": 500, "severity": "medium"},
"total_tax": {"pct_change": 0.35, "absolute_min": 2500, "severity": "low"},
"withholding": {"pct_change": 0.45, "absolute_min": 2500, "severity": "low"},
"refund": {"pct_change": 0.50, "absolute_min": 1500, "severity": "low"},
"amount_owed": {"pct_change": 0.50, "absolute_min": 1500, "severity": "low"}
}
}
FILE:references/example_returns.json
{
"returns": [
{
"tax_year": 2024,
"filing_status": "single",
"deduction_type": "standard",
"dependents_count": 1,
"qualifying_children_ctc": 1,
"additional_standard_deduction_count": 0,
"major_items": {
"wages": 62000,
"taxable_interest": 200,
"ordinary_dividends": 300,
"qualified_dividends": 0,
"capital_gain_or_loss": 0,
"total_income": 62500,
"adjustments_to_income": 2500,
"agi": 60000,
"deduction_amount": 14600,
"qbi_deduction": 0,
"taxable_income": 45400,
"tax_before_credits": 5260,
"nonrefundable_credits": 2000,
"ctc_claimed": 2000,
"actc_claimed": 0,
"other_taxes": 0,
"total_tax": 3260,
"withholding": 5800,
"estimated_tax_payments": 500,
"refundable_credits": 0,
"other_payments": 0,
"total_payments": 6300,
"refund": 3040,
"amount_owed": 0,
"earned_income": 62000
},
"metadata": {
"source_file": "form1040_2024.pdf"
}
},
{
"tax_year": 2025,
"filing_status": "single",
"deduction_type": "standard",
"dependents_count": 1,
"qualifying_children_ctc": 1,
"additional_standard_deduction_count": 0,
"has_preferential_income": false,
"major_items": {
"wages": 98000,
"taxable_interest": 450,
"ordinary_dividends": 300,
"qualified_dividends": 0,
"capital_gain_or_loss": 0,
"total_income": 98750,
"adjustments_to_income": 3000,
"agi": 95750,
"deduction_amount": 15750,
"qbi_deduction": 0,
"taxable_income": 80000,
"tax_before_credits": 12514,
"nonrefundable_credits": 2200,
"ctc_claimed": 2200,
"actc_claimed": 0,
"other_taxes": 0,
"additional_medicare_tax": 0,
"total_tax": 10314,
"withholding": 11500,
"estimated_tax_payments": 0,
"refundable_credits": 0,
"other_payments": 0,
"total_payments": 11500,
"refund": 1186,
"amount_owed": 0,
"earned_income": 98000
},
"metadata": {
"source_file": "form1040_2025.pdf"
}
}
]
}Research technical accounting treatment and financial statement disclosure for specific transactions using U.S. GAAP and SEC-focused sources. Use when a user...
---
name: technical-accounting-research
description: Research technical accounting treatment and financial statement disclosure for specific transactions using U.S. GAAP and SEC-focused sources. Use when a user asks how to account for a transaction, what journal entries, presentation, or disclosures are required, or needs accounting-position documentation in memo, email, or Q-and-A DOCX format.
---
# Technical Accounting Research
## Overview
Handle transaction-specific accounting questions through a fixed sequence: gather facts, confirm output format, ensure the local FinResearchClaw repo is installed and runnable, route the task through that wrapped repo/workflow, research guidance online, apply standards, and deliver the requested output.
For all tasks under this skill — memo, email, and q-and-a alike — the workflow must wrap and leverage the local FinResearchClaw repo/workflow as a required research-and-drafting execution layer. This is mandatory, not optional, and applies even when the accounting issue appears straightforward. The skill may create and use a task-local virtual environment when needed to run the repo or supporting document-generation dependencies. FinResearchClaw is a required support engine for research depth and drafting quality across all output modes; authoritative accounting conclusions must still be grounded in ASC / SEC / AICPA / clearly labeled interpretive guidance.
## Prerequisites
Before using this skill for substantive completion, install and make available the local FinResearchClaw repo:
1. Clone or install `https://github.com/ChipmunkRPA/FinResearchClaw`
2. Ensure the local repo is available at `~/.openclaw/workspace/AutoResearchClaw` unless a different local path is intentionally configured
3. Ensure Python is available and create a task-local virtual environment when needed
4. Install memo-generation dependencies such as `python-docx` when DOCX output is required
If FinResearchClaw is not installed locally and runnable, do not treat the full research workflow as ready. Fix that prerequisite first.
## Required Behavior
- Ask clarification questions before analysis.
- Confirm requested output format: `memo`, `email`, or `q-and-a`.
- If the user asks for a `memo`, default the deliverable to a `.docx` file saved in the user's `~/Downloads` folder unless the user explicitly requests a different location or format.
- For `memo` requests, do not post the full memo body directly into chat by default. Instead, generate the DOCX deliverable and reply with a short status note that the file was created and where it was saved.
- The final memo DOCX must be client-ready and must not expose Markdown syntax (for example `#`, `##`, `###`, `####`, bullet markers used as raw source notation, or fenced code blocks) anywhere in the rendered document.
- **Avoid bullet points** in the final memo; use separate, meaningful, and structured narrative paragraphs for all sections.
- **Avoid numbered sections**; use proper professional formatting for section headings without prepended numbers.
- The memo and any delivered document must use **Times New Roman, 12pt, black font**.
- **Avoid Markdown symbols** like `***` or `**` (for bold) if they might be rendered literally; use native DOCX formatting.
- **Mandatory Real-Life Example**: Always perform a web search to find and include a real-world accounting example (public SEC filings preferred) or a secondary reference material example to demonstrate the practical application of the accounting treatment.
- Research the internet before final conclusions, even if guidance seems familiar.
- For all requests under this skill, use FinResearchClaw as a required wrapped execution and drafting layer.
- Do not present, summarize, or imply a substantive accounting conclusion to the user before the required FinResearchClaw-backed flow has been run, except to restate assumptions, ask clarifying questions, or explain that the analysis is still in progress.
- The skill may create and activate a task-local virtual environment to run FinResearchClaw, supporting scripts, and output-generation dependencies.
- Distinguish authoritative guidance from interpretive guidance.
- Cite sources with links and accessed date in the deliverable.
- Include at least **5 authoritative references** supporting the accounting conclusion unless fewer are genuinely available for the issue; if fewer than 5 authoritative references can be identified, disclose that constraint explicitly and supplement with clearly labeled interpretive guidance.
- If the user does not provide a company name for a memo or memo-style deliverable, default to the neutral defined party name **"Company"** throughout the document. Do not substitute other role labels such as "vendor," "customer," or similar labels as the primary unnamed entity reference unless the fact pattern itself requires naming multiple counterparties distinctly.
- State assumptions explicitly when facts remain unknown.
- Do not let FinResearchClaw-style output replace authoritative accounting analysis; use it to strengthen organization and completeness, not to dilute source hierarchy.
## Workflow
### 1. Intake and Scope
- Capture the user issue in one sentence.
- Confirm reporting basis and jurisdiction (`US GAAP`, `SEC filer status`, and whether disclosures are public-company or private-company focused).
- Confirm reporting period and materiality context.
### 2. Clarification Questions (Mandatory)
- Use [references/clarification-question-bank.md](references/clarification-question-bank.md).
- Ask only the questions needed for the fact pattern; do not skip critical facts.
- Pause analysis until enough facts are available to form a defensible conclusion.
- If facts stay incomplete, proceed with explicit assumptions and sensitivity notes.
### 3. Output Format Confirmation (Mandatory)
- Ask which format is required (`memo` for formal documentation, `email` for concise communication, `q-and-a` for direct question and answer support).
- If no preference is provided, default to `memo`.
- For `memo` outputs, default to generating a `.docx` file in `~/Downloads`.
- Unless the user explicitly asks to see the full memo inline, do not paste the memo text into the channel; provide the saved file path instead.
### 4. Research Guidance
- Research sources using the priority and reliability rules in [references/source-priority.md](references/source-priority.md).
- Prefer primary and authoritative sources first (FASB/SEC/AICPA standard-setting materials).
- Use Big 4 publications as interpretive support, not sole authority.
- For a memo conclusion, target at least **5 authoritative references** across the conclusion, disclosure direction, and key technical positions wherever the issue reasonably permits.
- Invoke FinResearchClaw for every task handled by this skill, regardless of output format.
- Execution for memo, email, and q-and-a outputs should all wrap the FinResearchClaw repo/workflow even if the accounting issue is straightforward.
- The FinResearchClaw-backed run must occur before any user-facing technical conclusion is given. Do not short-circuit from clarified facts directly to an accounting answer.
- The skill may create a task-local virtual environment for the run if needed.
- Do not skip or bypass the FinResearchClaw path. If the repo is unavailable, dependencies are missing, or the workflow fails to run, stop and troubleshoot the FinResearchClaw environment first. The task should not proceed to substantive completion until the wrapped FinResearchClaw path is functioning.
- Do not surface an accounting conclusion before this stage is completed successfully.
- At the start of a live task, explicitly verify that FinResearchClaw is installed locally and that the configured repo path exists before relying on the wrapped workflow.
- If using FinResearchClaw, treat it as a research accelerator and drafting assistant only; independently verify accounting conclusions against authoritative and clearly labeled interpretive sources before finalizing.
- Capture citation labels and URLs for each source used.
### 5. Technical Analysis and Illustration
- Frame the accounting issue.
- Map facts to recognition, measurement, presentation, and disclosure guidance.
- **Real-Life Illustration (Mandatory)**: Find and include a descriptive summary of a real-world case (e.g., an SEC filer's 10-K/Q disclosure) that mirrors the fact pattern. Explain how they applied the standard.
- Evaluate reasonable alternatives and explain rejection rationale.
- Conclude with recommended accounting treatment, disclosure direction, and key risks.
- Include journal entry examples when useful for implementation.
- For formal memo output, produce a polished memorandum style with a complete professional header and well-formed section headings (no numbers).
- If no company name is specified by the user, define the reporting entity as **"Company"** and use that term consistently throughout the memo.
- Analysis and all other sections must use descriptive, narrative paragraphs instead of bullet points or numbered lists.
- When FinResearchClaw is available, use it to improve thoroughness and professional drafting quality while preserving accounting-source hierarchy.
- Do not allow raw JSON structures, Python dictionary renderings, placeholder header fields, unformatted source dumps, or visible Markdown markers to appear in the final memo.
- If a drafting pass begins in Markdown, the final DOCX generation step must fully convert that draft into native memorandum formatting rather than packaging the Markdown text as-is.
- Preferred execution order for all outputs under this skill: (1) authoritative accounting research and fact development first, (2) create/use a task-local virtual environment if needed, (3) run the FinResearchClaw repo/workflow as the required drafting/research wrapper, (4) final manual verification against ASC/SEC/AICPA and labeled interpretive guidance, and (5) generate the requested deliverable format.
### 6. Draft and Materialize DOCX
- Build a JSON payload using [references/report-json-schema.md](references/report-json-schema.md).
- For `memo` requests, save the output DOCX to `~/Downloads` by default (for example, `~/Downloads/<descriptive-file-name>.docx`).
- Standard wrapped workflow for all outputs:
1. gather facts and clarifications;
2. perform authoritative and interpretive accounting research;
3. create/use a task-local virtual environment if needed for the run;
4. run the FinResearchClaw-supported drafting pass for every task under this skill;
5. during memo-mode execution, send a short in-progress status update to the user at least every 1 minute until the memo workflow finishes or fails; each update should briefly state the current stage (for example fact gathering, authoritative research, FinResearchClaw run, memo drafting, citation verification, or DOCX generation);
6. manually validate the final analysis, formatting, section structure, entity naming consistency, and citations;
7. generate the requested deliverable format (DOCX for memo by default, or the requested email/q-and-a output) using a conversion path that renders headings, paragraphs, bullets, and tables as native document elements rather than leaving Markdown markers visible;
8. review the finished output for presentation quality before responding;
9. reply in chat with the appropriate completion note for the chosen format.
- Do not use a Markdown-to-DOCX path that can preserve raw Markdown notation in the final document. If the chosen conversion method leaves visible Markdown markers, treat that as a failed output and regenerate the memo with a native DOCX creation path.
- **Do not pass Markdown syntax through as final DOCX prose.** Convert section labels into schema fields and provide plain text content for paragraphs, bullets, and conclusions. The generated Word document must use native Word headings/lists rather than visible Markdown markers such as `#`, `##`, `###`, or `####`.
- Run:
```bash
python scripts/build_accounting_report_docx.py \
--input-json <analysis.json> \
--output-docx ~/Downloads/<technical-accounting-report>.docx \
--format <memo|email|q-and-a>
```
- The script produces a DOCX with:
- Title and metadata.
- Facts and issue statement.
- Guidance table with links.
- Analysis and conclusion.
- Disclosure considerations.
- Open items and assumptions.
### 7. Quality Check
- Confirm the conclusion is consistent with cited guidance.
- Confirm all significant assumptions are disclosed.
- Confirm the output format matches user request.
- Confirm every external source in the analysis has a URL listed in the report.
- Confirm the final memo reads like a professional memorandum rather than a raw data export.
- Confirm headers are fully populated (for example To / From / Date / Subject) and that analysis sections render as proper prose, not serialized objects.
- Confirm the memo uses **"Company"** consistently when no specific company name was provided.
- Confirm the conclusion is supported by at least **5 authoritative references**, or that any shortfall is explicitly disclosed and justified.
- Confirm the rendered DOCX contains no visible Markdown syntax such as heading hashes, fenced code blocks, or raw list notation.
- No task under this skill may close without using the wrapped FinResearchClaw path successfully. If FinResearchClaw is not yet working, the required next step is to fix that environment rather than continue without it.
## Resources
- Clarifying question checklist: [references/clarification-question-bank.md](references/clarification-question-bank.md)
- Source hierarchy and citation rules: [references/source-priority.md](references/source-priority.md)
- JSON format for DOCX generation: [references/report-json-schema.md](references/report-json-schema.md)
- Example report payload: [references/example_report_input.json](references/example_report_input.json)
- DOCX generator: `scripts/build_accounting_report_docx.py`
- FinResearchClaw repo: `https://github.com/ChipmunkRPA/FinResearchClaw`
- FinResearchClaw local skill reference: `~/.openclaw/skills/finresearchclaw/SKILL.md`
## Dependency
Install once if needed:
```bash
python -m pip install --user python-docx
```
## Runtime / Environment Expectations
- Memo-mode runs are allowed to create and use a task-local Python virtual environment.
- FinResearchClaw default repo path: `~/.openclaw/workspace/AutoResearchClaw`
- If the repo or its dependencies are not ready, initialize a local venv for the task and install only the dependencies needed for the memo workflow.
- If FinResearchClaw cannot be executed after reasonable setup attempts, disclose that explicitly and fall back to a non-FinResearchClaw path only as an exception, not the default.
FILE:README.md
# Technical Accounting Research Skill
A Codex skill for technical accounting issue research and documentation under U.S. GAAP and SEC-focused reporting contexts.
## What It Does
- Forces clarification questions before analysis.
- Confirms output format (`memo`, `email`, or `q-and-a`) before drafting.
- Performs authoritative and interpretive accounting research (for example FASB, AICPA, SEC, and Big 4 sources).
- Requires the local FinResearchClaw repo/workflow as a wrapped research-and-drafting layer for substantive runs.
- Applies standards to the fact pattern and produces a deliverable, typically a DOCX memo.
## FinResearchClaw Dependency
This skill is designed to work with the local FinResearchClaw repo and should not be treated as fully operational until that repo is installed and runnable.
Required repo:
- `https://github.com/ChipmunkRPA/FinResearchClaw`
Expected local path by default:
- `~/.openclaw/workspace/AutoResearchClaw`
FinResearchClaw is used as a required wrapped execution layer for memo, email, and q-and-a workflows. It is intended to improve research depth and drafting quality, but final accounting conclusions must still be verified against authoritative and clearly labeled interpretive guidance.
## Requirements
- Python 3.11+
- Local clone/install of `FinResearchClaw`
- `python-docx` for DOCX memo generation
Install the supporting dependency if needed:
```bash
python -m pip install --user python-docx
```
## Recommended Local Setup
Install FinResearchClaw locally first:
```bash
git clone https://github.com/ChipmunkRPA/FinResearchClaw.git ~/.openclaw/workspace/AutoResearchClaw
cd ~/.openclaw/workspace/AutoResearchClaw
python3 -m venv .venv
source .venv/bin/activate
pip install -e .
```
After that, this skill can wrap the local FinResearchClaw workflow during live technical-accounting tasks.
## Skill Layout
- `SKILL.md`: Core workflow and behavior rules.
- `agents/openai.yaml`: UI metadata for invocation.
- `references/`: Clarification checklist, source hierarchy, JSON schema, sample payload.
- `scripts/build_accounting_report_docx.py`: DOCX report generator.
## Generate a DOCX Report
```bash
python scripts/build_accounting_report_docx.py \
--input-json references/example_report_input.json \
--output-docx output/technical-accounting-report.docx \
--format memo
```
Allowed formats: `memo`, `email`, `q-and-a`.
## Operational Notes
- The skill should verify that the local FinResearchClaw repo exists before relying on the wrapped workflow.
- If FinResearchClaw is missing or broken, fix that prerequisite before treating the skill as ready for substantive completion.
- FinResearchClaw does not replace ASC / SEC / AICPA source validation; it supports the research and drafting process.
## License
This project is licensed under the MIT License. See `LICENSE`.
FILE:agents/openai.yaml
interface:
display_name: "Technical Accounting Research"
short_description: "Research accounting treatment and disclosure guidance"
default_prompt: "Use $technical-accounting-research to analyze this transaction and produce a DOCX memo, email, or Q&A response."
FILE:references/clarification-question-bank.md
# Clarification Question Bank
Use this checklist to ask only the questions needed to understand the transaction before analysis.
## 1. Reporting Context
- What reporting framework applies (`US GAAP`, IFRS, or another basis)?
- Is the reporting entity an SEC registrant? If yes, what filer status applies?
- What reporting period and close date does this conclusion affect?
- Is the transaction material quantitatively or qualitatively?
## 2. Transaction Facts
- What exactly happened, and what is the business purpose?
- What legal form does the arrangement take (contract, amendment, side letter, etc.)?
- What are the key dates (execution, delivery, acceptance, effective date)?
- What are the significant payment terms and contingencies?
## 3. Scope and Unit of Account
- What is the proposed unit of account?
- Are there multiple components that may need separation or combination?
- Are related-party relationships involved?
## 4. Recognition and Measurement Inputs
- What amounts are known vs estimated?
- What assumptions are management currently using?
- Are fair value, impairment, expected credit loss, or probability assessments required?
- Are there alternative accounting policy elections available?
## 5. Presentation and Disclosure Inputs
- Where does management expect to present the item in the financial statements?
- What note disclosures are currently drafted, if any?
- Are non-GAAP, segment, or MD&A impacts expected?
- Are debt covenant, going concern, or subsequent-events disclosures implicated?
## 6. Process and Evidence
- What source documents are available (contracts, board minutes, valuation reports)?
- Is there prior-year treatment for similar transactions?
- Is there a target filing date or audit committee deadline?
- Are there auditor-preferred formats or documentation standards to follow?
## 7. Output Preferences (Mandatory)
- Should the response be prepared as a `memo`, `email`, or `q-and-a`?
- Who is the audience (controller, CFO, audit committee, external auditors)?
- Should journal entries be included?
- Should draft disclosure language be included?
FILE:references/example_report_input.json
{
"title": "Technical Accounting Memorandum - SaaS Implementation Fee",
"prepared_for": "Controller",
"prepared_by": "Accounting Policy Team",
"date": "2026-03-07",
"subject": "Revenue recognition for nonrefundable implementation fee",
"issue": "Whether a nonrefundable implementation fee in a SaaS arrangement should be recognized upfront or deferred.",
"facts": [
"Customer signs a 3-year hosted software contract.",
"Customer pays a nonrefundable implementation fee at contract inception.",
"No distinct software license or transfer of control occurs at inception."
],
"guidance": [
{
"citation": "ASC 606-10-25",
"source_type": "authoritative",
"key_point": "Promised goods and services must be evaluated for distinct performance obligations.",
"url": "https://www.fasb.org/page/PageContent?pageId=/standards/accounting-standards-updates.html",
"accessed_on": "2026-03-07"
},
{
"citation": "SEC FRM Topic 13",
"source_type": "authoritative",
"key_point": "Revenue recognition disclosures should clearly describe judgments and timing.",
"url": "https://www.sec.gov/divisions/corpfin/cffinancialreportingmanual.shtml",
"accessed_on": "2026-03-07"
},
{
"citation": "Big 4 Revenue Guide",
"source_type": "interpretive",
"key_point": "Upfront setup fees often represent advance payment for future hosted services.",
"url": "https://viewpoint.pwc.com/",
"accessed_on": "2026-03-07"
}
],
"analysis": [
"The implementation fee does not transfer a distinct good or service at inception.",
"The fee is economically linked to ongoing hosted services and should be recognized over the expected period of benefit.",
"Judgment is required to determine period of benefit and associated disclosure language."
],
"conclusion": "Defer the implementation fee and recognize it as revenue over the expected benefit period, consistent with the hosting service pattern.",
"assumptions": [
"No separately priced implementation deliverable exists.",
"Customer cannot benefit from setup activities independent of hosted service."
],
"disclosure_considerations": [
"Describe nature of implementation fees and timing of revenue recognition.",
"Disclose significant judgments used to determine period of benefit."
],
"journal_entries": [
{
"description": "Record upfront fee at inception",
"debit": "Cash",
"credit": "Deferred Revenue",
"amount": "$120,000"
},
{
"description": "Recognize monthly revenue",
"debit": "Deferred Revenue",
"credit": "Revenue",
"amount": "$3,333"
}
],
"open_items": [
"Validate expected customer relationship period with historical churn data.",
"Confirm draft disclosure wording with SEC reporting counsel."
],
"next_steps": [
"Align conclusion with external auditors before quarter-end close.",
"Update revenue disclosure note draft."
],
"qa": [
{
"question": "Can we recognize the implementation fee upfront?",
"answer": "Not under the stated facts; no distinct good or service transfers at inception."
}
]
}
FILE:references/report-json-schema.md
# Report JSON Schema
Use this structure as input to `scripts/build_accounting_report_docx.py`.
## Required top-level fields
- `issue` (string)
- `facts` (array of strings)
- `guidance` (array of objects)
- `analysis` (array of strings)
- `conclusion` (string)
## Formatting rule
All string fields should contain final prose, not Markdown control syntax. Do not include raw heading markers (`#`, `##`, `###`, `####`), Markdown bullet prefixes, or other Markdown formatting tokens unless they are intended to appear literally in the final document.
## Recommended fields
- `title` (string)
- `prepared_for` (string)
- `prepared_by` (string)
- `date` (string, ISO `YYYY-MM-DD` preferred)
- `subject` (string)
- `assumptions` (array of strings)
- `disclosure_considerations` (array of strings)
- `open_items` (array of strings)
- `next_steps` (array of strings)
- `journal_entries` (array of objects)
- `qa` (array of objects, especially for `q-and-a` format)
- `to` / `cc` / `from` (strings for `email` format)
## Guidance object
Each entry in `guidance` should include:
- `citation` (string)
- `source_type` (`authoritative` or `interpretive`)
- `key_point` (string)
- `url` (string)
- `accessed_on` (string date)
## Journal entry object
- `description` (string)
- `debit` (string)
- `credit` (string)
- `amount` (string)
## Q-and-A object
- `question` (string)
- `answer` (string)
## Minimal example
```json
{
"issue": "How should implementation fees in a SaaS arrangement be recognized?",
"facts": [
"Customer pays a nonrefundable upfront fee.",
"The fee does not transfer a distinct good or service at contract inception."
],
"guidance": [
{
"citation": "ASC 606-10-25",
"source_type": "authoritative",
"key_point": "Identify performance obligations and assess whether promised goods/services are distinct.",
"url": "https://www.fasb.org",
"accessed_on": "2026-03-07"
}
],
"analysis": [
"The upfront fee is an advance payment for future services.",
"Revenue should generally be recognized over the expected period of benefit."
],
"conclusion": "Defer upfront fee revenue and recognize over the expected customer relationship period."
}
```
FILE:references/source-priority.md
# Source Priority and Citation Rules
## Source Hierarchy
Use sources in this order and label each source accordingly in the report.
1. Authoritative standards and rules.
- FASB ASC and Accounting Standards Updates (ASUs).
- SEC rules and forms (Regulation S-X, Regulation S-K, Staff Accounting Bulletins, Financial Reporting Manual, relevant releases).
2. Standard-setting and interpretive publications.
- AICPA technical guidance (Audit and Accounting Guides, TQAs when relevant).
3. Secondary interpretive guidance.
- Big 4 interpretive publications, roadmaps, manuals, and technical alerts.
4. Supplemental sources.
- Other reputable accounting publications used only to support, not replace, authoritative citations.
## Research Protocol
- Identify the accounting question first (recognition, measurement, presentation, disclosure).
- Collect at least one authoritative source for each major conclusion.
- For memo-mode conclusions, target at least 5 authoritative references in total across the core accounting positions, disclosure implications, and key judgment points wherever the issue reasonably permits.
- Use Big 4 or other secondary sources to interpret complex application questions.
- If authoritative text is inaccessible due licensing, disclose that limitation and cite publicly available ASU/SEC/AICPA material plus clearly labeled interpretive sources.
- If fewer than 5 authoritative references are genuinely available for the issue, state that explicitly in the memo and explain why the authoritative source set is narrower than usual.
## Citation Standards
For each source, capture:
- `citation`: concise citation label (for example, `ASC 606-10-25`, `Reg S-X Rule 5-03`, `PwC Revenue Guide 2025`).
- `source_type`: `authoritative` or `interpretive`.
- `url`: direct link used during research.
- `key_point`: one-sentence summary of what the source supports.
- `accessed_on`: date reviewed.
## Conflict Resolution
- If sources conflict, follow hierarchy: authoritative over interpretive.
- Explain why one interpretation was accepted and why alternatives were rejected.
- Record unresolved ambiguity under open items and assumptions.
FILE:scripts/build_accounting_report_docx.py
#!/usr/bin/env python
"""Build a technical accounting report DOCX from JSON input."""
from __future__ import annotations
import argparse
import json
from datetime import date
from pathlib import Path
from typing import Any
import re
try:
from docx import Document
from docx.shared import Pt, RGBColor
except ImportError as exc: # pragma: no cover
raise SystemExit(
"python-docx is required. Install with: python -m pip install --user python-docx"
) from exc
def _as_text(value: Any, default: str = "") -> str:
if value is None:
return default
if isinstance(value, str):
return value.strip()
return str(value).strip()
def _as_list_of_str(value: Any) -> list[str]:
if isinstance(value, list):
return [_as_text(item) for item in value if _as_text(item)]
if isinstance(value, str) and value.strip():
return [value.strip()]
return []
def _as_list_of_dict(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
out: list[dict[str, Any]] = []
for item in value:
if isinstance(item, dict):
out.append(item)
return out
def _strip_markdown_prefixes(text: str) -> str:
cleaned = _as_text(text)
if not cleaned:
return ""
cleaned = re.sub(r"^#{1,6}\s*", "", cleaned)
cleaned = re.sub(r"^[-*+]\s+", "", cleaned)
cleaned = re.sub(r"^\d+\.\s+", "", cleaned)
return cleaned.strip()
def _set_default_style(doc: Document) -> None:
style = doc.styles["Normal"]
style.font.name = "Times New Roman"
style.font.size = Pt(12)
def _add_metadata_table(doc: Document, rows: list[tuple[str, str]]) -> None:
table = doc.add_table(rows=len(rows), cols=2)
# Use Table Grid but clear borders for metadata look
for index, (label, value) in enumerate(rows):
table.cell(index, 0).text = f"{label}:"
table.cell(index, 1).text = value
def _add_heading(doc: Document, text: str, level: int) -> None:
h = doc.add_heading(text, level=level)
for run in h.runs:
run.font.color.rgb = RGBColor(0, 0, 0)
run.font.name = "Times New Roman"
def _add_rich_text_block(doc: Document, text: str) -> None:
cleaned = _as_text(text)
if not cleaned:
return
for raw_line in cleaned.splitlines():
line = raw_line.strip()
if not line:
continue
heading_match = re.match(r"^(#{1,6})\s*(.+)$", line)
if heading_match:
level = min(len(heading_match.group(1)), 4)
_add_heading(doc, _strip_markdown_prefixes(heading_match.group(2)), level=level)
continue
bullet_match = re.match(r"^[-*+]\s+(.+)$", line)
if bullet_match:
doc.add_paragraph(_strip_markdown_prefixes(bullet_match.group(1)), style="List Bullet")
continue
number_match = re.match(r"^\d+\.\s+(.+)$", line)
if number_match:
doc.add_paragraph(_strip_markdown_prefixes(number_match.group(1)), style="List Number")
continue
doc.add_paragraph(line)
def _add_heading_and_text(doc: Document, heading: str, text: str) -> None:
if not text:
return
_add_heading(doc, heading, level=1)
_add_rich_text_block(doc, text)
def _add_heading_and_paragraphs(doc: Document, heading: str, items: list[str]) -> None:
if not items:
return
_add_heading(doc, heading, level=1)
for item in items:
cleaned_item = _strip_markdown_prefixes(item)
if cleaned_item:
doc.add_paragraph(cleaned_item, style="List Bullet")
def _sanitize_text(text: str) -> str:
"""Basic cleanup of markdown symbols to avoid literal rendering."""
s = text.replace("####", "").replace("###", "").replace("##", "").replace("#", "")
s = s.replace("***", "").replace("**", "").replace("__", "").replace("*", "").replace("_", "")
return s.strip()
def _add_guidance_table(doc: Document, guidance: list[dict[str, Any]]) -> None:
if not guidance:
return
_add_heading(doc, "Guidance Reviewed", level=1)
table = doc.add_table(rows=1, cols=5)
table.style = "Table Grid"
header = table.rows[0].cells
header[0].text = "Citation"
header[1].text = "Type"
header[2].text = "Key Point"
header[3].text = "URL"
header[4].text = "Accessed"
for item in guidance:
row = table.add_row().cells
row[0].text = _sanitize_text(_as_text(item.get("citation")))
row[1].text = _sanitize_text(_as_text(item.get("source_type")))
row[2].text = _sanitize_text(_as_text(item.get("key_point")))
row[3].text = _as_text(item.get("url"))
row[4].text = _as_text(item.get("accessed_on"))
def _add_journal_entries_table(doc: Document, entries: list[dict[str, Any]]) -> None:
if not entries:
return
_add_heading(doc, "Journal Entry Examples", level=1)
table = doc.add_table(rows=1, cols=4)
table.style = "Table Grid"
header = table.rows[0].cells
header[0].text = "Description"
header[1].text = "Debit"
header[2].text = "Credit"
header[3].text = "Amount"
for item in entries:
row = table.add_row().cells
row[0].text = _sanitize_text(_as_text(item.get("description")))
row[1].text = _sanitize_text(_as_text(item.get("debit")))
row[2].text = _sanitize_text(_as_text(item.get("credit")))
row[3].text = _sanitize_text(_as_text(item.get("amount")))
def _add_qa(doc: Document, qa_items: list[dict[str, Any]]) -> None:
if not qa_items:
return
_add_heading(doc, "Questions and Answers", level=1)
for item in qa_items:
question = _as_text(item.get("question"))
answer = _as_text(item.get("answer"))
if question:
p = doc.add_paragraph()
p.add_run("Question: ").bold = True
p.add_run(_sanitize_text(question))
if answer:
p = doc.add_paragraph()
p.add_run("Answer: ").bold = True
p.add_run(_sanitize_text(answer))
def _build_document(payload: dict[str, Any], report_format: str) -> Document:
doc = Document()
_set_default_style(doc)
default_title_map = {
"memo": "Technical Accounting Memorandum",
"email": "Technical Accounting Email Draft",
"q-and-a": "Technical Accounting Questions and Answers",
}
title = _as_text(payload.get("title"), default_title_map[report_format])
_add_heading(doc, title, level=0)
report_date = _as_text(payload.get("date"), date.today().isoformat())
if report_format == "memo":
_add_metadata_table(
doc,
[
("To", _as_text(payload.get("prepared_for"))),
("From", _as_text(payload.get("prepared_by"))),
("Date", report_date),
("Subject", _as_text(payload.get("subject"))),
],
)
elif report_format == "email":
_add_metadata_table(
doc,
[
("To", _as_text(payload.get("to"), _as_text(payload.get("prepared_for")))),
("Cc", _as_text(payload.get("cc"))),
("From", _as_text(payload.get("from"), _as_text(payload.get("prepared_by")))),
("Date", report_date),
("Subject", _as_text(payload.get("subject"))),
],
)
else:
_add_metadata_table(
doc,
[
("Prepared For", _as_text(payload.get("prepared_for"))),
("Prepared By", _as_text(payload.get("prepared_by"))),
("Date", report_date),
("Topic", _as_text(payload.get("subject"))),
],
)
_add_heading_and_text(doc, "Issue", _as_text(payload.get("issue")))
_add_heading_and_paragraphs(doc, "Facts", _as_list_of_str(payload.get("facts")))
guidance = _as_list_of_dict(payload.get("guidance"))
_add_guidance_table(doc, guidance)
_add_heading_and_paragraphs(doc, "Analysis", _as_list_of_str(payload.get("analysis")))
_add_heading_and_text(doc, "Conclusion", _as_text(payload.get("conclusion")))
_add_heading_and_paragraphs(
doc,
"Disclosure Considerations",
_as_list_of_str(payload.get("disclosure_considerations")),
)
_add_journal_entries_table(doc, _as_list_of_dict(payload.get("journal_entries")))
_add_heading_and_paragraphs(doc, "Assumptions", _as_list_of_str(payload.get("assumptions")))
_add_heading_and_paragraphs(doc, "Open Items", _as_list_of_str(payload.get("open_items")))
_add_heading_and_paragraphs(doc, "Next Steps", _as_list_of_str(payload.get("next_steps")))
_add_qa(doc, _as_list_of_dict(payload.get("qa")))
if guidance:
_add_heading(doc, "Source List", level=1)
for source in guidance:
citation = _as_text(source.get("citation"), "Unlabeled source")
source_type = _as_text(source.get("source_type"), "unknown type")
accessed_on = _as_text(source.get("accessed_on"), "unknown date")
url = _as_text(source.get("url"), "(no URL)")
doc.add_paragraph(
f"{citation} ({source_type}), accessed {accessed_on}: {url}"
)
return doc
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Generate a technical accounting DOCX report from JSON input."
)
parser.add_argument("--input-json", required=True, help="Path to JSON input payload")
parser.add_argument("--output-docx", required=True, help="Path for generated DOCX")
parser.add_argument(
"--format",
default="memo",
choices=["memo", "email", "q-and-a"],
help="Output report style",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
input_path = Path(args.input_json)
output_path = Path(args.output_docx)
with input_path.open("r", encoding="utf-8") as f:
payload = json.load(f)
if not isinstance(payload, dict):
raise SystemExit("Input JSON must contain an object at top level.")
doc = _build_document(payload, args.format)
output_path.parent.mkdir(parents=True, exist_ok=True)
doc.save(str(output_path))
print(f"[OK] Generated {output_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())