@clawhub-humsafarprabhu-cmyk-46f4fc4748
Autonomously manage and post varied, platform-optimized social media content across X, Instagram, YouTube, and Meta using smart scheduling and data-driven th...
# Social Autopilot — Full Auto Social Media Engine
You are a social media automation agent. You manage the user's social media presence across X (Twitter), Instagram, YouTube, and Meta (Facebook/Threads) — completely autonomously.
## What You Do
1. **Content Generation** — Generate platform-optimized posts from a content database (CSV). Rotate through content formats: insights, hot takes, myth busters, questions, quizzes, struggle posts.
2. **X Threads** — Generate data-driven threads (4-6 tweets) from dataset analysis. Each thread backed by real data with proof examples. Auto-rotate through 7 themes daily.
3. **Video Reels** — Generate short-form video (9:16, 1080x1920) for Instagram Reels and YouTube Shorts using HTML-to-video rendering. Multiple color themes, dynamic content per video.
4. **Smart Scheduling** — Post at configurable time slots via GitHub Actions cron or manual trigger.
5. **Hashtag Strategy** — 1-2 relevant hashtags per X post, rotated by topic. Full hashtag sets for Instagram.
6. **Answer in Comments** — Post answers/reveals as comments (not in main post) to drive engagement.
7. **Platform-Specific Formatting** — Respect character limits (X: 280), aspect ratios (IG: 9:16), and best practices per platform.
## Required Environment Variables
All credentials are read from environment variables. No keys are hardcoded.
### X (Twitter)
```
X_API_KEY=<your X/Twitter API key>
X_API_SECRET=<your X/Twitter API secret>
X_ACCESS_TOKEN=<your X/Twitter access token>
X_ACCESS_TOKEN_SECRET=<your X/Twitter access token secret>
```
### Instagram
```
INSTAGRAM_USER_ID=<your Instagram user ID>
INSTAGRAM_ACCESS_TOKEN=<your Instagram Graph API access token>
INSTAGRAM_APP_SECRET=<your Instagram app secret for webhook verification>
```
### Meta (Facebook/Threads)
```
META_PAGE_ACCESS_TOKEN=<your Meta page access token>
META_PAGE_ID=<your Meta page ID>
```
### YouTube
YouTube posting uses OAuth2 credentials stored in a `client_secrets.json` file. Authentication is handled via browser OAuth flow on first run.
### Cloudflare R2 (for Instagram reel hosting)
Instagram requires a public URL for reel uploads. R2 is used as the video host.
```
R2_ENDPOINT=<your Cloudflare R2 endpoint>
R2_ACCESS_KEY=<your R2 access key>
R2_SECRET_KEY=<your R2 secret key>
R2_BUCKET=<your R2 bucket name>
R2_PUBLIC_URL=<your R2 public URL>
```
### Optional (auto-detected)
```
CI=true # Set automatically by GitHub Actions
GITHUB_ACTIONS=true # Set automatically by GitHub Actions
```
## Required Files
- `data/questions.csv` — Your content database (CSV with columns: question, option1, option2, option3, option4, correctIndex, explanation, subject, year)
## Required Python Packages
```
tweepy
requests
moviepy
numpy
Pillow
html2image
boto3
google-api-python-client
google-auth-oauthlib
```
## Scripts Included
| Script | Purpose |
|--------|---------|
| `formatter.py` | Content generation — post pools, hashtags, platform formatting |
| `x_poster.py` | X/Twitter posting + thread posting via tweepy |
| `x_thread_generator.py` | Data-driven thread generation from CSV analysis |
| `instagram_main.py` | Instagram reel posting orchestrator |
| `ig_reel_poster.py` | Instagram Graph API reel upload + answer comments |
| `ig_config.py` | Instagram captions, hashtags, output paths |
| `youtube_main.py` | YouTube Shorts posting orchestrator |
| `yt_shorts_poster.py` | YouTube Data API upload |
| `yt_config.py` | YouTube titles, descriptions, tags |
| `meta_poster.py` | Meta/Facebook/Threads posting |
| `html_video_generator.py` | HTML→PNG→MP4 video generation (8 color themes) |
| `video_generator.py` | PIL-based fallback video generator |
| `image_generator.py` | Static image generation for posts |
| `csv_manager.py` | Content database reader + tracking |
| `r2_uploader.py` | Cloudflare R2 video upload (for Instagram reel hosting) |
| `yt_auth.py` | YouTube OAuth2 authentication handler |
## Commands
- **"Post now"** — Immediately post to all configured platforms
- **"Post to X"** — Post single tweet + thread
- **"Post reel"** — Generate and post Instagram reel
- **"Generate video"** — Create a reel/short without posting
- **"Show schedule"** — Display current posting schedule
## Customization (IMPORTANT)
Before using this skill, customize these files for your niche:
1. **Branding:** Search and replace `{BRAND_URL}` and `{BRAND_NAME}` in all scripts with your own brand name and website URL. These appear as watermarks and CTAs in generated videos and captions.
2. **Content pools:** `formatter.py` contains pre-written post templates for an education/exam niche (UPSC). Replace the text in `INSIGHT_POSTS`, `HOT_TAKE_POSTS`, `QUESTION_POSTS`, `MYTH_BUST_POSTS`, `STRUGGLE_POSTS`, and `QUIZ_HOOKS` lists with content relevant to YOUR niche.
3. **Data:** Replace `data/questions.csv` with your own content database.
## Security Notes
- All API credentials are read from environment variables — never hardcoded
- Data is only sent to platforms you explicitly configure (X, Instagram, YouTube, Meta) and Cloudflare R2 (for video hosting, required by Instagram)
- CI/GITHUB_ACTIONS env vars are only used to detect runtime environment (headless Chrome flags)
- Content database stays local — never uploaded anywhere
FILE:README.md
# Social Autopilot 🚀
**Full autopilot social media engine for OpenClaw.** Post to X, Instagram, YouTube, and Meta — automatically, with human-sounding content.
## Why This Skill?
Other social media skills help you *draft* posts. This one **posts them for you** — on schedule, with video generation, data-driven threads, and platform-specific formatting.
**Built by a developer running this exact system 24/7.** Not a template — a battle-tested production tool.
## Features
| Feature | What it does |
|---------|-------------|
| 🐦 X Posts | Human-sounding posts, 1-2 hashtags, 280 char optimized |
| 🧵 X Threads | 4-6 tweet data-driven threads from your dataset |
| 📸 IG Reels | Auto-generated 9:16 video with 8 color themes |
| 📺 YT Shorts | Same videos, YouTube-optimized titles & descriptions |
| 📊 Data Analysis | Analyze your content DB and generate insights |
| 🎨 Video Themes | 8 rotating color schemes — never monotonous |
| ⏰ Scheduling | GitHub Actions cron — set it and forget it |
| 💬 Comment Strategy | Answers posted as comments for engagement |
| #️⃣ Smart Hashtags | 1-2 relevant, topic-rotated hashtags |
## Quick Start
1. Install: `npx clawhub install social-autopilot`
2. Add your API keys to `.env`
3. Add your content to `data/content.csv`
4. Say "Post now" to your OpenClaw agent
## Works for Any Niche
Swap the CSV, change the SOUL.md — works for fitness, finance, tech, food, education, anything.
## Pricing
- **Free tier:** X posting only, 1 post/day, basic formatting
- **Premium ($29):** All platforms, threads, video generation, unlimited posts, custom themes
## Support
Issues: [GitHub Issues](https://github.com/abhinawtech/social-autopilot/issues)
FILE:clawhub.json
{
"name": "Social Autopilot",
"tagline": "Full autopilot social media engine — posts, threads, reels, all automated",
"description": "Complete social media automation for OpenClaw. Auto-generates and posts human-sounding content to X (via tweepy), Instagram Reels (via Graph API), YouTube Shorts (via Data API), and Meta/Facebook. Features: data-driven X threads from CSV analysis, HTML-to-video reel generation with 8 color themes, smart hashtag rotation, content format rotation by day, answer-in-comments strategy. Requires API keys for each platform as environment variables. All credentials read from env — nothing hardcoded. Built by a developer running this system 24/7 in production.",
"category": "productivity",
"tags": ["social-media", "automation", "instagram", "twitter", "youtube", "reels", "threads", "video"],
"version": "1.2.0",
"license": "MIT",
"pricing": "free",
"support_url": "https://github.com/abhinawtech/social-autopilot/issues",
"homepage": "https://github.com/abhinawtech/social-autopilot"
}
FILE:scripts/csv_manager.py
import csv
import logging
import random
import tempfile
from pathlib import Path
from bot.ig_config import CSV_PATH
logger = logging.getLogger(__name__)
INDEX_TO_LETTER = {0: "A", 1: "B", 2: "C", 3: "D"}
MAX_QUESTION_LENGTH = 200
def _read_csv(path: Path = CSV_PATH) -> list[dict]:
with open(path, newline="", encoding="utf-8") as f:
return list(csv.DictReader(f))
def _write_csv(rows: list[dict], path: Path = CSV_PATH) -> None:
if not rows:
return
seen = set()
fieldnames = []
for row in rows:
for key in row.keys():
if key not in seen:
seen.add(key)
fieldnames.append(key)
tmp_fd, tmp_path = tempfile.mkstemp(suffix=".csv", dir=path.parent)
try:
with open(tmp_fd, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
Path(tmp_path).replace(path)
except Exception:
Path(tmp_path).unlink(missing_ok=True)
raise
def _normalize_row(row: dict) -> dict:
if "option_a" in row:
return row
return {
"question": row["question"],
"option_a": row.get("option1", ""),
"option_b": row.get("option2", ""),
"option_c": row.get("option3", ""),
"option_d": row.get("option4", ""),
"answer": INDEX_TO_LETTER.get(int(row.get("correctIndex", 0)), "A"),
"category": row.get("subject", row.get("category", "General")),
"year": row.get("year", ""),
"explanation": row.get("explanation", ""),
"posted": row.get("posted", "no"),
}
def _options_balanced(normalized: dict, tolerance: float = 0.20) -> bool:
lengths = [
len(normalized["option_a"]),
len(normalized["option_b"]),
len(normalized["option_c"]),
len(normalized["option_d"]),
]
avg = sum(lengths) / 4
if avg == 0:
return False
return all(abs(l - avg) / avg <= tolerance for l in lengths)
def get_next_question(path: Path = CSV_PATH) -> tuple[int, dict] | None:
rows = _read_csv(path)
# Build list of unposted question indices
unposted_indices = []
for i, row in enumerate(rows):
posted = row.get("posted", "no").strip().lower()
if posted in ("no", ""):
normalized = _normalize_row(row)
if len(normalized["question"]) > MAX_QUESTION_LENGTH:
logger.debug("Skipping question %d: too long (%d chars)", i, len(normalized["question"]))
continue
if not _options_balanced(normalized):
logger.debug("Skipping question %d: options not balanced", i)
continue
unposted_indices.append(i)
if not unposted_indices:
logger.warning("No unposted questions remaining in %s", path)
return None
# Pick a random unposted question
random_index = random.choice(unposted_indices)
normalized = _normalize_row(rows[random_index])
logger.info("Selected random question %d: %s", random_index, normalized["question"][:60])
return random_index, normalized
def mark_as_posted(index: int, path: Path = CSV_PATH) -> None:
rows = _read_csv(path)
if 0 <= index < len(rows):
rows[index]["posted"] = "yes"
_write_csv(rows, path)
logger.info("Marked question %d as posted", index)
else:
logger.error("Invalid question index: %d", index)
def get_unposted_count(path: Path = CSV_PATH) -> int:
rows = _read_csv(path)
return sum(
1 for r in rows if r.get("posted", "no").strip().lower() in ("no", "")
)
FILE:scripts/formatter.py
"""Formats social media posts — human-sounding, raw, English only.
v4.0 — Sounds like a real UPSC aspirant sharing their journey.
No emoji spam. No perfect structure. Just real talk.
"""
import logging
import random
from datetime import datetime
logger = logging.getLogger(__name__)
X_CHAR_LIMIT = 280
APP_PROMO = ""
# ─── Post Pools ───────────────────────────────────────────────────────────────
# Written like a real person — messy, opinionated, raw
# Vary structure so 3 posts in a row don't feel templated
INSIGHT_POSTS = [
# Short ones
"went through 30 years of UPSC papers. constitutional amendments showed up 28 times out of 30.\nif you're skipping this topic you're literally skipping marks",
"polity + economy = 33% of the entire prelims paper. one third.\nhow are people still treating these as secondary subjects",
"i did the math on negative marking. attempting 75 and getting 55 right beats attempting 90 and getting 58 right.\nhow many marks are you losing by guessing on unsure questions",
"environment questions grew 7x from the 90s to now. 7 times.\nand people still prep it last. doesn't make sense to me",
"3274 questions analyzed from 1995 to 2025. polity alone is 17.7%.\nthat's almost 1 in every 5 questions",
# Medium ones
"spent a week doing topic-wise PYQs instead of year-wise. completely changed how i see the paper.\n\nyou start noticing which concepts keep showing up in different forms. same idea, different angle every few years.\n\nhow are you doing your PYQ practice right now",
"the pattern i noticed in environment questions is honestly wild. 2-3 per paper in the 90s.\nnow it's 15-20 regularly. entire new subject basically grew inside the exam.\n\nif you're treating environment as a small topic you're behind",
"art and culture is 6.7% of the paper. the questions are mostly factual — which dance form, which state, which festival.\nstraightforward marks if you spend even a week on it.\n\nmost people skip it for harder topics and leave easy marks on the table",
"science questions in UPSC have been declining for years. used to be 15+ per paper. now barely 5-8.\n\nall that bandwidth moved to environment and current affairs.\nstill studying all 5 units of science equally",
"spent an hour just plotting which topics appear how often. vedic period — 12 times in 30 years. fundamental rights — 25+ times.\n\nthe exam has favorites. once you know the favorites everything becomes clearer",
"current affairs is 10% of the paper but people spend 80% of their time on it.\n\nnewspaper daily for 2 hours but polity once a month. the numbers don't add up",
"maps. just maps. rivers through which states, national parks, arrange north to south.\n\n3-5 marks just from map-based questions and most people never deliberately practice them. tried this last cycle and it helped",
"here's what i got wrong for too long — treating all subjects equally.\n\npolity 17.7%, economy 15.9%, history 14.8% — these three alone are almost half the paper. prioritize accordingly",
"solved 3274 questions topic-wise. the constitutional amendments pattern is almost embarrassing how clear it is.\n\n28 out of 30 years. UPSC is basically telling you what to study. are we listening",
"the jump in difficulty from pre-2010 to post-2015 papers is real. old papers feel like GK quizzes.\n\nnewer ones need actual thinking and elimination. if you're only practicing old papers you might be underprepared",
"took me too long to figure out that WTO and trade bodies appear almost every time there's a major global agreement.\n\nthe news is telling you what UPSC will ask next year. connecting those dots is a skill",
"economy is 15.9% of the paper and the questions lately are very conceptual.\n\nnot just 'what is repo rate' but 'what happens to inflation when this changes.' mechanism-based. how deep is your economics understanding",
]
HOT_TAKE_POSTS = [
"hot take: most coaching notes are a liability not an asset.\n\nyou spend more time organizing and reading them than actually thinking. the understanding doesn't transfer.\n\nwhat's your experience with notes",
"NCERTs alone will not clear UPSC prelims in 2025. i said it.\n\nthey're the foundation yes. but the paper moved. post-2015 questions need conceptual depth NCERTs don't give.\n\ntell me i'm wrong",
"the '14 hours a day study' flex is mostly performative.\n\n6 focused hours consistently beats 12 scattered hours every time. i've tested this on myself.\n\nwhat's your honest daily average",
"unpopular opinion: CSAT is where a lot of people quietly fail.\n\neveryone's focused on GS. CSAT is 'qualifying only' so it gets 2 days of prep. then exam day hits.\n\nhow much time are you actually giving it",
"self study candidates make it to the top 100 every year. every year.\n\nbut the industry has convinced everyone they need a 1.5 lakh course to crack UPSC.\n\nwhat actually differentiates toppers isn't the coaching. it's the consistency",
"revision is more important than covering new topics. genuinely.\n\nmost people fail not because they don't know things but because they can't recall under pressure.\n\nhow many times have you revised your polity notes",
"monthly current affairs compilations are more useful than daily newspaper reading for UPSC.\n\nfight me. the daily news is 90% noise for prelims. the monthly filter keeps what matters.\n\nhow long is your daily newspaper routine",
"everyone talks about toppers' booklists. nobody talks about toppers' elimination strategies.\n\nknowing which option to kill first is a real skill and it's what separates 55 marks from 75 marks.\n\nhave you ever practiced just elimination",
"taking fewer attempts with higher accuracy is objectively better than attempting everything.\n\ngot this wrong in my first prep phase. was attempting 95+ questions every mock. score was worse than when i attempted 70 carefully.\n\nwhat's your attempt strategy",
"answer writing is tested in mains but the thinking it builds helps prelims too.\n\nbeing able to explain WHY an option is wrong is different from just marking the right one.\n\nhow often are you writing out your reasoning",
"the syllabus is not the guide. PYQs are the guide.\n\nthe official syllabus is vague. the actual questions tell you exactly how deep each topic goes. start there.\n\nwhen did you first start solving PYQs",
"geography gets underrated because people find it boring. but 12.5% of the paper is geography.\n\nand most of it is Indian geography — rivers, regions, soils, agriculture. not even world geo.\n\nhow's your Indian geography",
]
QUESTION_POSTS = [
"what's the one topic that you keep postponing and you know it's going to cost you marks",
"if you had to cut your prep down to just 3 subjects for the next 30 days, which ones would you pick and why",
"what changed most in your prep after you started solving PYQs seriously",
"be honest — how many days in a row can you maintain a real 6-hour study routine without breaking",
"which UPSC subject do you think is the most underrated and why",
"what's the worst advice you received about UPSC prep that you actually followed for a while",
"if you could ask a real UPSC topper one question right now, what would it be",
"what's one thing you wish you'd started doing in your very first month of prep that you only figured out later",
"polity or economy — which one do you find easier to retain and why",
"what does your revision schedule actually look like? be specific — not the ideal one, the real one",
"how do you decide when you've studied a topic 'enough' before moving to the next one",
"what's your strategy for current affairs? newspaper, app, monthly compilation, something else — what works for you",
"if the exam is in 3 months and you haven't touched environment yet, what do you do",
"do you think the way you're studying right now would be enough to clear prelims? honest answer",
]
MYTH_BUST_POSTS = [
"myth: you need to read every major newspaper daily.\n\nlooked at 30 years of papers. editorials from specific outlets don't show up as questions. analysis + PIB + monthly CA compilation is enough.\n\nhow many hours a week are you spending on newspapers",
"myth: prelims is about how much you know.\n\nit's actually about what you choose NOT to attempt. with negative marking at -0.67, wrong answers are expensive.\n\nhave you calculated your ideal attempt count",
"myth: art and culture is too vast to cover for 6% marks.\n\nit's not vast if you go topic-wise. dance forms, folk traditions, classical music basics — 10-12 days is actually enough for the high-frequency parts.\n\nhow much time have you given it",
"myth: environment is a new addition so there's not much PYQ data.\n\nlooked at the data. environment appeared in every single paper since 2000. and the volume tripled post-2012.\n\nalmost 12% of the paper now",
"myth: science is still a major subject in UPSC.\n\nit used to be. 15+ questions in the 90s. now it's down to 5-8 and declining.\n\nthe marks moved to environment and current affairs. update your time allocation",
"myth: self study is only for repeaters, first attempt needs coaching.\n\nevery year toppers with no coaching make it. the exam doesn't know or care where your notes came from.\n\nwhat it cares about is whether you understand the concepts",
"myth: the more topics you cover, the better your score.\n\npicking 3 subjects and going deep beats knowing 8 subjects shallowly.\n\npolity, economy, history alone = 48% of the paper. depth over breadth",
"myth: current affairs is unpredictable so you can't prepare for it specifically.\n\nthere are recurring themes — international organizations, government schemes, environment summits, economic indicators.\n\nthese repeat. it's not random",
"myth: you should finish reading before attempting questions.\n\ni tried both. starting questions on day one forced me to understand what actually matters in each topic.\n\nthe syllabus never really finishes. questions teach you what counts",
"myth: high daily study hours = success.\n\nthree years of UPSC prep data suggests otherwise. consistency beats volume. 5 solid hours every day for a year is more than 12 hours for a month then burnout.\n\nwhat's sustainable for you",
"myth: constitutional amendments are a separate optional topic.\n\n28 out of 30 years. not 'sometimes' or 'often.' twenty-eight times in thirty years.\n\nif this isn't in your serious prep pile we need to talk",
"myth: mains syllabus is separate from prelims preparation.\n\nconcepts overlap massively. polity analysis for mains sharpens prelims MCQ reasoning too.\n\nintegrated prep is more efficient. are you treating them as completely separate",
]
STRUGGLE_POSTS = [
"the day i realized i'd been studying wrong for 6 months was genuinely rough.\n\nnot wrong as in wrong information. wrong as in zero retention. reading without testing.\n\nhad to restart a lot of things. it felt like going backward but it was actually the turning point",
"some days the whole thing feels pointless.\n\nyou read 4 hours, sit down with questions, get 40% right. what even is the point.\n\nbut those days are also weirdly the ones that build something. i can't explain it but it's true",
"mock test day is genuinely stressful even when it's just practice.\n\nthat feeling before you look at your score. like you know it might hurt.\n\nanyone else do this or just me",
"three months into prep and i still hadn't touched environment. kept pushing it.\n\nthen saw the statistics. 12% of the paper. growing every year.\n\nstarted the next day. regretted delaying for so long",
"comparison is the worst part of this journey honestly.\n\nsomeone in your group finishes the polity NCERT in one day. you took a week. feels bad even though you know everyone learns differently.\n\nhow do you deal with it",
"i gave up on CSAT for almost a month. thought it was just qualifying, not a real threat.\n\nfailed a mock. 66 marks when i needed 67.\n\nnever skipped CSAT practice again after that",
"there's a specific kind of tired that UPSC prep creates.\n\nnot physical tired. mentally drained from holding so much information and never feeling like it's enough.\n\nhow do you rest from that kind of tired",
"the gap between knowing something and being able to recall it under exam pressure is huge.\n\nlearned this the hard way. knew the material. blanked on exam day.\n\nrevision fixed this more than reading ever did",
"asked myself honestly last week — if i got the result today, would i be proud of how i prepped.\n\nthe answer was no.\n\ndidn't feel great to admit but something shifted after that",
"people in your life don't always understand why this takes so long.\n\nexplaining why you can't just 'finish studying' is exhausting.\n\nbut this path has a very specific kind of person walking it and you're one of them",
"there's a version of you six months ago who would be genuinely impressed by where you are now.\n\neven on the days it doesn't feel like progress.\n\nespecially on those days",
"failing a mock isn't failing the exam. i repeated this to myself so many times it started to feel true.\n\nthen it became true.\n\nhow are you handling your mock scores right now",
]
QUIZ_HOOKS = [
"got this wrong on my first try honestly",
"this one's trickier than it looks",
"took me way too long to figure this out",
"most people guess C on this. they're wrong",
"if you get this right without googling i'm impressed",
"this tripped me up during practice",
"seemed easy until i read option B carefully",
"the answer surprised me ngl",
"classic UPSC elimination question — looks simple until you think about it",
"spent like 3 minutes on this. should've taken 30 seconds",
"this is the kind of question that breaks a good score if you guess wrong",
]
# ─── Legacy pool aliases (backward compat with html_video_generator) ──────────
SHOCKING_STAT_POSTS = INSIGHT_POSTS
MYTH_BUSTER_POSTS = MYTH_BUST_POSTS
THIS_OR_THAT_POSTS = HOT_TAKE_POSTS
MOTIVATION_POSTS = STRUGGLE_POSTS
def get_day_format() -> str:
"""Get today's content format based on day of week."""
formats = [
"insight", # Monday
"hot_take", # Tuesday
"question", # Wednesday
"myth_bust", # Thursday
"insight", # Friday
"quiz", # Saturday
"struggle", # Sunday
]
day = datetime.now().weekday()
return formats[day]
def format_question_post(q: dict) -> str | None:
"""Format post for X/Twitter (280 char limit).
Quiz days: use the actual question dict with human hook.
Non-quiz days: pick from pre-written pool — never use the question dict.
Zero hashtags. Max 1 emoji. Zero links.
Always ends with question or open thought.
"""
day_format = get_day_format()
if day_format == "quiz":
return _format_quiz_post(q)
pool_map = {
"insight": INSIGHT_POSTS,
"hot_take": HOT_TAKE_POSTS,
"question": QUESTION_POSTS,
"myth_bust": MYTH_BUST_POSTS,
"struggle": STRUGGLE_POSTS,
}
pool = pool_map.get(day_format, INSIGHT_POSTS)
text = random.choice(pool)
text = _add_x_hashtags(text, q)
return trim_to_limit(text)
def _add_x_hashtags(text: str, q: dict) -> str:
"""Add 1-2 relevant hashtags to X post. Sometimes just #UPSC, sometimes +topic."""
subject = q.get("subject", "").lower() if q else ""
topic_tags = {
"polity": "#Polity", "governance": "#Polity",
"environment": "#Environment", "ecology": "#Environment",
"geography": "#Geography",
"economic": "#Economy", "economy": "#Economy",
"history": "#History",
"science": "#ScienceTech", "technology": "#ScienceTech",
"art": "#ArtCulture", "culture": "#ArtCulture",
"international": "#IR",
}
second_tag = None
for key, val in topic_tags.items():
if key in subject:
second_tag = val
break
# Randomly use 1 or 2 hashtags
if second_tag and random.random() > 0.4:
tags = f"#UPSC {second_tag}"
else:
tags = "#UPSC"
return f"{text}\n\n{tags}"
def _format_quiz_post(q: dict) -> str | None:
"""Format a quiz question for X with a casual human hook."""
opts = q.get("options", [])
if len(opts) < 4:
opts = [
q.get("option_a", ""),
q.get("option_b", ""),
q.get("option_c", ""),
q.get("option_d", ""),
]
year = q.get("year", "")
hook = random.choice(QUIZ_HOOKS)
year_str = f"upsc {year}" if year else "upsc pyq"
subject = q.get("subject", "").lower()
# Pick 2 hashtags based on subject
tag2 = "#PYQ"
for key, tag in [("polity","#Polity"),("environment","#Environment"),("geography","#Geography"),
("economic","#Economy"),("history","#History"),("science","#ScienceTech")]:
if key in subject:
tag2 = tag
break
text = (
f"{hook}\n\n"
f"{q['question']}\n\n"
f"a) {opts[0]}\n"
f"b) {opts[1]}\n"
f"c) {opts[2]}\n"
f"d) {opts[3]}\n\n"
f"— {year_str}\n\n"
f"#UPSC {tag2}"
)
return trim_to_limit(text)
def format_answer_post(q_data: dict) -> str:
"""Casual answer reveal for X/Twitter."""
letter = q_data.get("correct_answer", "")
answer_text = q_data.get("correct_option_text", "")
explanation = q_data.get("explanation", "")
templates = [
f"answer: {letter}) {answer_text}\n\n{explanation}",
f"it's {letter}. {explanation}\n\ndon't feel bad if you got it wrong — most people do on first try",
f"{letter}) {answer_text}\n\n{explanation}\n\nthis is exactly why pyq practice matters",
f"correct answer is {letter}.\n\n{explanation}",
f"answer was {letter}) {answer_text}.\n\n{explanation}\n\ndid you get it",
]
text = random.choice(templates)
return trim_to_limit(text) or f"answer: {letter}) {answer_text}"
def build_question_data(q: dict) -> dict:
"""Extract the data needed to post the answer later."""
return {
"question": q["question"],
"correct_answer": q["correct_answer"],
"correct_option_text": q["correct_option_text"],
"explanation": q.get("explanation", ""),
}
def format_ig_caption(q: dict) -> str:
"""Format an Instagram caption — longer, personal, 5-8 hashtags at the END only."""
day_format = get_day_format()
category = q.get("category", "General Studies")
category_tag = category.replace(" ", "").replace("&", "And")
if day_format == "quiz":
opts = q.get("options", [])
if len(opts) < 4:
opts = [q.get("option_a", ""), q.get("option_b", ""), q.get("option_c", ""), q.get("option_d", "")]
year = q.get("year", "")
hook = random.choice(QUIZ_HOOKS)
body = (
f"{hook}\n\n"
f"{q['question']}\n\n"
f"a) {opts[0]}\n"
f"b) {opts[1]}\n"
f"c) {opts[2]}\n"
f"d) {opts[3]}\n\n"
f"comment your answer below — answer reveal in 24 hours\n\n"
f"#UPSC #UPSCPreparation #IAS #PreviousYearQuestions #{category_tag} #UPSCPrelims #UPSC2026"
)
return body
pool_map = {
"insight": INSIGHT_POSTS,
"hot_take": HOT_TAKE_POSTS,
"question": QUESTION_POSTS,
"myth_bust": MYTH_BUST_POSTS,
"struggle": STRUGGLE_POSTS,
}
pool = pool_map.get(day_format, INSIGHT_POSTS)
core = random.choice(pool)
hashtags = {
"insight": f"#UPSC #UPSCPreparation #IAS #UPSCPrelims #{category_tag} #CivilServices #IASPreparation",
"hot_take": f"#UPSC #IAS #UPSCPreparation #UPSCTips #{category_tag} #CivilServices #UPSC2026",
"question": f"#UPSC #IAS #UPSCPreparation #{category_tag} #UPSCAspirants #CivilServices #UPSC2026",
"myth_bust": f"#UPSC #IAS #UPSCPreparation #UPSCMyths #{category_tag} #CivilServices #IASPreparation",
"struggle": f"#UPSC #IAS #UPSCJourney #UPSCAspirants #CivilServices #IASMotivation #UPSC2026",
}.get(day_format, f"#UPSC #IAS #UPSCPreparation #{category_tag} #CivilServices")
return f"{core}\n\n{hashtags}"
def format_yt_title(q: dict) -> str:
"""Engaging YouTube title — lowercase, under 60 chars + #Shorts."""
category = q.get("category", "upsc")
year = q.get("year", "")
day_format = get_day_format()
if day_format == "quiz":
templates = [
f"upsc asked this in {year} — can you solve it? #Shorts",
f"this {category.lower()} question tripped everyone #Shorts",
f"upsc {year}: most people got this wrong #Shorts",
f"can you crack this {category.lower()} pyq? #Shorts",
f"upsc {year} — {category.lower()} question #Shorts",
]
else:
templates = [
"UPSC asked this 28 times in 30 years #Shorts",
"this data changed how i study for UPSC #Shorts",
"the UPSC pattern nobody tells you about #Shorts",
"polity is literally one third of the paper #Shorts",
"environment grew 7x in UPSC — are you ready #Shorts",
"negative marking math most aspirants get wrong #Shorts",
"i analyzed 3274 UPSC questions — here's what i found #Shorts",
"the most repeated UPSC topic in 30 years #Shorts",
]
title = random.choice(templates)
# Enforce ~67 char max (60 + " #Shorts")
if len(title) > 67:
title = title[:59] + " #Shorts"
return title
def format_yt_description(q: dict) -> str:
"""Casual YouTube description — 2-3 lines + site link."""
day_format = get_day_format()
category = q.get("category", "general studies")
year = q.get("year", "")
if day_format == "quiz":
descs = [
f"went through 30 years of UPSC {category.lower()} questions and pulled out this gem from {year}.\n\ntest yourself before watching the answer — comment below.\n\nfree pyq practice: {BRAND_URL}",
f"this {category.lower()} question from UPSC {year} is trickier than it looks.\n\ntry it first. answer is in the comments.\n\nfree pyq practice: {BRAND_URL}",
f"UPSC {year} — {category.lower()} paper.\n\ngot this wrong on my first attempt. let's see if you do better.\n\nfree pyq practice: {BRAND_URL}",
]
else:
descs = [
"analyzed 3274 UPSC questions from 1995 to 2025 and the patterns are wild.\n\nsharing what i found so you don't have to figure it out the hard way.\n\nfree pyq practice: {BRAND_URL}",
"been prepping for UPSC for a while and these data points genuinely changed how i study.\n\nhope this helps someone avoid my mistakes.\n\nfree pyq practice: {BRAND_URL}",
"the exam has patterns if you look at it across 30 years.\n\nmost people don't. you should.\n\nfree pyq practice: {BRAND_URL}",
]
return random.choice(descs)
def trim_to_limit(text: str) -> str | None:
"""Trim text to X character limit."""
if len(text) <= X_CHAR_LIMIT:
return text
# Remove hashtag lines first
lines = text.split("\n")
filtered = [l for l in lines if not l.strip().startswith("#")]
text = "\n".join(filtered).strip()
if len(text) <= X_CHAR_LIMIT:
return text
# Hard truncate with ellipsis
return text[:X_CHAR_LIMIT - 1] + "…"
# ─── Legacy format functions (kept for backward compat — used by telegram_poster
# and html_video_generator) ──────────────────────────────────────────────
def format_shocking_stat() -> str:
"""Insight-style post — personal UPSC data discovery."""
return random.choice(INSIGHT_POSTS)
def format_myth_buster() -> str:
"""Myth-bust post — common UPSC myth destroyed with data."""
return random.choice(MYTH_BUST_POSTS)
def format_this_or_that() -> str:
"""Hot take post — unpopular opinion that invites debate."""
return random.choice(HOT_TAKE_POSTS)
def format_subject_breakdown() -> str:
"""Subject breakdown — insight about specific subject weightage."""
breakdown_posts = [
"here's what i found after going through 3274 questions\n\npolity: 17.7%\neconomy: 15.9%\nhistory: 14.8%\ngeography: 12.5%\nenvironment: 11.6%\n\ntop three alone = almost half the paper. are you allocating time accordingly",
"polity + economy together = 33% of prelims.\n\none third of the entire paper. two subjects.\n\nhow much of your prep time is actually going to these two",
"environment used to be 2-3 questions per paper in the 90s.\n\nnow it's 12-15 regularly. some years even more.\n\nbeing 'okay' at environment isn't enough anymore",
"art and culture is 6.7% of the paper — about 5-7 questions.\n\nfactual, specific, and learnable in 10-12 days of focused prep.\n\nmost people skip it for 'harder' subjects. don't be that person",
"current affairs looks huge but it's 10% of the paper.\n\npeople spend 40% of their time on it.\n\nsome rebalancing might help",
]
return random.choice(breakdown_posts)
def format_motivation() -> str:
"""Struggle/motivation post — relatable aspirant emotional content."""
return random.choice(STRUGGLE_POSTS)
def format_quick_quiz(q: dict) -> str | None:
"""Format quiz question for social media (legacy name kept for compat)."""
return _format_quiz_post(q)
FILE:scripts/html_video_generator.py
from __future__ import annotations
"""Generate Instagram reels from HTML templates with beautiful CSS styling.
v3.0 — 4 viral video formats (timer challenge, shocking stat, comparison, loop)
that rotate by day for maximum IG Reels & YT Shorts watch-through rate.
Platform rules baked in:
- Hook in first 1-2s
- 13s target duration (proven sweet spot)
- Bold, high-contrast, mobile-first 9:16
- Text on screen > voiceover
- Loop content (end connects to start)
"""
import logging
import random
import shutil
import tempfile
from datetime import datetime
from pathlib import Path
# Heavy imports are lazy-loaded to avoid crashes when only get_video_format_for_day is needed
np = None
Html2Image = None
AudioFileClip = None
ImageClip = None
concatenate_videoclips = None
Image = None
def _ensure_heavy_imports():
global np, Html2Image, AudioFileClip, ImageClip, concatenate_videoclips, Image
if np is None:
import numpy as _np
np = _np
if Html2Image is None:
from html2image import Html2Image as _H2I
Html2Image = _H2I
if AudioFileClip is None:
from moviepy import AudioFileClip as _A, ImageClip as _I, concatenate_videoclips as _C
AudioFileClip = _A
ImageClip = _I
concatenate_videoclips = _C
if Image is None:
from PIL import Image as _Img
Image = _Img
try:
from bot.ig_config import OUTPUT_DIR, TEMPLATES_DIR
except ImportError:
OUTPUT_DIR = Path("output")
TEMPLATES_DIR = Path("templates")
logger = logging.getLogger(__name__)
VIDEO_W, VIDEO_H = 1080, 1920
# ── Color themes (rotate per video) ──────────────────────────────────────────
_THEMES = [
{"name": "Dark Fire", "bg": "#0a0a0a", "accent": "#ff6b35", "green": "#22c55e", "red": "#ef4444", "accent_bg": "#1a0a00"},
{"name": "Royal Gold", "bg": "#0a0a1a", "accent": "#ffd700", "green": "#22c55e", "red": "#ef4444", "accent_bg": "#1a1500"},
{"name": "Neon Green", "bg": "#0a1a0a", "accent": "#00ff88", "green": "#00ff88", "red": "#ef4444", "accent_bg": "#001a0a"},
{"name": "Deep Purple", "bg": "#1a0a2a", "accent": "#c084fc", "green": "#22c55e", "red": "#ef4444", "accent_bg": "#1a0a1a"},
{"name": "Blood Red", "bg": "#1a0a0a", "accent": "#ef4444", "green": "#22c55e", "red": "#ff6b6b", "accent_bg": "#1a0000"},
{"name": "Ocean Cyan", "bg": "#0a1a2a", "accent": "#00d4ff", "green": "#22c55e", "red": "#ef4444", "accent_bg": "#001a2a"},
{"name": "Hot Pink", "bg": "#1a0a1a", "accent": "#ff69b4", "green": "#22c55e", "red": "#ef4444", "accent_bg": "#1a0010"},
{"name": "Amber", "bg": "#0a0a05", "accent": "#f59e0b", "green": "#22c55e", "red": "#ef4444", "accent_bg": "#1a1000"},
]
def _pick_theme() -> dict:
"""Pick a random color theme."""
return random.choice(_THEMES)
# Legacy constants (default theme)
_BG = "#0a0a0a"
_ORANGE = "#ff6b35"
_GREEN = "#22c55e"
_RED = "#ef4444"
# ── Data pools ───────────────────────────────────────────────────────────────
SHOCKING_STATS = [
{
"hook": "UPSC ka sabse bada pattern",
"stat": "28/30",
"detail": "Constitutional Amendments appeared in 28 out of 30 UPSC papers",
"source": "1995-2025 analysis",
},
{
"hook": "Ye subject 7x badh gaya",
"stat": "7x",
"detail": "Environment questions grew 7x since the 1990s",
"source": "3274 PYQs analyzed",
},
{
"hook": "Paper ka 1/3 sirf 2 subjects se",
"stat": "33%",
"detail": "Polity (17.7%) + Economy (15.9%) = 33% of every UPSC paper",
"source": "30 years data",
},
{
"hook": "Science ab important nahi raha",
"stat": "↓50%",
"detail": "Science questions dropped from 15-20 to 5-8 per paper",
"source": "Trend analysis",
},
{
"hook": "Galat answer ki asli keemat",
"stat": "-0.67",
"detail": "One wrong answer costs you 0.67 marks. 3 wrong = 1 right answer wasted",
"source": "UPSC marking scheme",
},
{
"hook": "Art & Culture ka sach",
"stat": "6.7%",
"detail": "Only 6.7% of paper but mostly factual — easiest marks if prepared",
"source": "Subject analysis",
},
{
"hook": "Current Affairs ka weight",
"stat": "10.1%",
"detail": "10% of paper is pure current affairs — 30 min daily newspaper is enough",
"source": "3274 PYQs",
},
{
"hook": "Geography surprise",
"stat": "12.5%",
"detail": "Geography is 12.5% — Indian Geography alone is 8% of that",
"source": "Topic breakdown",
},
]
COMPARISONS = [
{
"wrong": [
"Reading 10 books",
"Year-wise PYQ solving",
"12 hours daily",
"Starting PYQ after syllabus",
],
"right": [
"Mastering 3 books",
"Topic-wise PYQ solving",
"6 focused hours",
"PYQ from Day 1",
],
},
{
"wrong": [
"₹2 Lakh coaching",
"Following 5 CA sources",
"Random mock tests",
"Memorizing facts",
],
"right": [
"₹0 self-study",
"One newspaper daily",
"Timed weekly mocks",
"Understanding concepts",
],
},
{
"wrong": [
"Watching 10 YouTube channels",
"Notes from coaching PDFs",
"Group study daily",
"Attempting all 100 Qs",
],
"right": [
"One trusted source",
"Self-written short notes",
"Solo deep work",
"Strategic 75 Qs with accuracy",
],
},
]
# Modern gradient palettes (used only for quiz Saturday format)
GRADIENT_PALETTES = [
("linear-gradient(135deg, #667eea 0%, #764ba2 100%)", "Purple Dream"),
("linear-gradient(135deg, #f093fb 0%, #f5576c 100%)", "Pink Sunset"),
("linear-gradient(135deg, #4facfe 0%, #00f2fe 100%)", "Ocean Blue"),
("linear-gradient(135deg, #43e97b 0%, #38f9d7 100%)", "Mint Fresh"),
("linear-gradient(135deg, #fa709a 0%, #fee140 100%)", "Warm Glow"),
("linear-gradient(135deg, #30cfd0 0%, #330867 100%)", "Deep Sea"),
("linear-gradient(135deg, #a8edea 0%, #fed6e3 100%)", "Pastel Sky"),
("linear-gradient(135deg, #ff9a9e 0%, #fecfef 100%)", "Soft Rose"),
]
# Keep legacy alias so existing imports of HTML_TEMPLATE don't break
HTML_TEMPLATE = "" # replaced by _build_quiz_html()
# ── Day rotation ─────────────────────────────────────────────────────────────
def get_video_format_for_day() -> str:
"""Return video format name based on today's weekday.
Monday → shocking_stat
Tuesday → comparison
Wednesday → timer_challenge
Thursday → shocking_stat
Friday → comparison
Saturday → quiz
Sunday → loop_format
"""
formats = [
"shocking_stat", # Monday (0)
"comparison", # Tuesday (1)
"timer_challenge", # Wednesday(2)
"shocking_stat", # Thursday (3)
"comparison", # Friday (4)
"quiz", # Saturday (5)
"loop_format", # Sunday (6)
]
return formats[datetime.now().weekday()]
# ── Shared HTML helpers ───────────────────────────────────────────────────────
_BASE_STYLE = f"""
* {{ margin:0; padding:0; box-sizing:border-box; }}
body {{
width: 1080px; height: 1920px;
background: {_BG};
font-family: system-ui, -apple-system, 'Segoe UI', sans-serif;
display: flex; flex-direction: column;
align-items: center; justify-content: center;
padding: 80px 60px;
position: relative; overflow: hidden;
}}
.brand {{
position: absolute; bottom: 44px; left: 0; right: 0;
text-align: center;
font-size: 30px; font-weight: 600;
color: rgba(255,255,255,0.35);
letter-spacing: 1px;
}}
"""
def _wrap(body: str, extra_css: str = "", bg: str = "") -> str:
"""Wrap body in standard 1080×1920 dark shell."""
bg_override = f"\nbody {{ background: {bg}; }}" if bg else ""
return f"""<!DOCTYPE html>
<html lang="en"><head><meta charset="UTF-8">
<style>{_BASE_STYLE}{extra_css}{bg_override}</style>
</head>
<body>
{body}
<div class="brand">{BRAND_URL}</div>
</body></html>"""
def _esc(text: str) -> str:
"""Minimal HTML escaping."""
return (
str(text)
.replace("&", "&")
.replace("<", "<")
.replace(">", ">")
.replace('"', """)
)
# ── Format 0: Quiz (Saturday) — existing template preserved ──────────────────
def _build_quiz_html(question_data: dict, gradient: str) -> list[tuple[str, float]]:
"""Single-frame colorful quiz card. Returns [(html, 15.0)]."""
q = _esc(question_data.get("question", ""))
year = _esc(str(question_data.get("year", "2023")))
oa = _esc(question_data.get("option_a", ""))
ob = _esc(question_data.get("option_b", ""))
oc = _esc(question_data.get("option_c", ""))
od = _esc(question_data.get("option_d", ""))
html = f"""<!DOCTYPE html>
<html lang="en"><head><meta charset="UTF-8">
<style>
* {{ margin:0; padding:0; box-sizing:border-box; }}
body {{
width:1080px; height:1920px;
background:{gradient};
font-family:system-ui,-apple-system,sans-serif;
display:flex; flex-direction:column;
align-items:center; padding:60px 40px;
position:relative; overflow:hidden;
}}
.header {{ background:rgba(255,255,255,.95); padding:16px 40px; border-radius:50px;
box-shadow:0 8px 32px rgba(0,0,0,.15); margin-bottom:40px; z-index:10; }}
.header-text {{ font-size:28px; font-weight:700;
background:linear-gradient(135deg,#667eea,#764ba2);
-webkit-background-clip:text; -webkit-text-fill-color:transparent; text-align:center; }}
.qcard {{ background:rgba(255,255,255,.98); border-radius:30px; padding:40px;
margin-bottom:30px; box-shadow:0 20px 60px rgba(0,0,0,.2);
width:100%; max-width:1000px; z-index:10; }}
.qtext {{ font-size:38px; font-weight:600; color:#1a1a2e; line-height:1.4; margin-bottom:20px; }}
.ybadge {{ display:inline-block;
background:linear-gradient(135deg,#f093fb,#f5576c);
color:#fff; padding:10px 28px; border-radius:25px;
font-size:26px; font-weight:700; }}
.opts {{ width:100%; max-width:1000px; z-index:10; }}
.opt {{ background:rgba(255,255,255,.95); border-radius:25px;
padding:24px 30px; margin-bottom:20px;
display:flex; align-items:center; box-shadow:0 8px 25px rgba(0,0,0,.12); }}
.oletter {{ width:56px; height:56px; border-radius:50%;
background:linear-gradient(135deg,#667eea,#764ba2);
color:#fff; display:flex; align-items:center; justify-content:center;
font-size:28px; font-weight:700; margin-right:20px; flex-shrink:0; }}
.otext {{ font-size:32px; color:#2d3748; line-height:1.4; font-weight:500; }}
.hint {{ background:rgba(255,255,255,.95); border-radius:25px;
padding:20px 35px; margin-top:10px; margin-bottom:30px;
box-shadow:0 8px 25px rgba(0,0,0,.12); z-index:10; }}
.hint-text {{ font-size:28px; font-weight:600;
background:linear-gradient(135deg,#fa709a,#fee140);
-webkit-background-clip:text; -webkit-text-fill-color:transparent; text-align:center; }}
.footer {{ background:linear-gradient(135deg,#f093fb,#f5576c);
border-radius:25px; padding:30px 40px; margin-top:auto;
width:100%; max-width:1000px; z-index:10; }}
.ftitle {{ font-size:32px; font-weight:700; color:#fff; text-align:center; margin-bottom:8px; }}
.fsub {{ font-size:26px; font-weight:600; color:rgba(255,255,255,.95); text-align:center; }}
</style></head>
<body>
<div class="header"><div class="header-text">📚 UPSC Previous Year Quiz</div></div>
<div class="qcard">
<div class="qtext">{q}</div>
<div class="ybadge">UPSC {year}</div>
</div>
<div class="opts">
<div class="opt"><div class="oletter">A</div><div class="otext">{oa}</div></div>
<div class="opt"><div class="oletter">B</div><div class="otext">{ob}</div></div>
<div class="opt"><div class="oletter">C</div><div class="otext">{oc}</div></div>
<div class="opt"><div class="oletter">D</div><div class="otext">{od}</div></div>
</div>
<div class="hint"><div class="hint-text">💡 Answer & Explanation in comments!</div></div>
<div class="footer">
<div class="ftitle">{BRAND_NAME}</div>
<div class="fsub">30 Years PYQs • Mock Tests • Current Affairs</div>
</div>
</body></html>"""
return [(html, 15.0)]
# ── Format 1: Timer Challenge ("90% fail this") ──────────────────────────────
def _timer_frame(question_data: dict, count: int, theme: dict | None = None) -> str:
"""One frame of the countdown — question shown, big timer digit."""
t = theme or _THEMES[0]
_accent = t["accent"]
_rd = t["red"]
_grn = t["green"]
_bg = t["bg"]
q = _esc(question_data.get("question", ""))
oa = _esc(question_data.get("option_a", ""))
ob = _esc(question_data.get("option_b", ""))
oc = _esc(question_data.get("option_c", ""))
od = _esc(question_data.get("option_d", ""))
year = _esc(str(question_data.get("year", "")))
timer_color = _rd if count > 2 else (_accent if count == 2 else _grn)
css = f"""
.hook {{ font-size:52px; font-weight:900; color:{_rd};
text-align:center; line-height:1.2; margin-bottom:40px; letter-spacing:-1px; }}
.qcard {{ background:#1a1a1a; border-radius:24px; padding:44px;
width:100%; margin-bottom:30px; border:2px solid {_accent}33; }}
.qtext {{ font-size:36px; font-weight:700; color:#fff; line-height:1.45; margin-bottom:20px; }}
.ybadge {{ font-size:22px; color:{_accent}; font-weight:700; }}
.opts {{ width:100%; margin-bottom:30px; }}
.opt {{ background:#161616; border-radius:16px; padding:20px 28px;
margin-bottom:16px; display:flex; align-items:center;
border:2px solid #222; }}
.oletter {{ width:50px; height:50px; border-radius:50%;
background:#222; color:{_accent};
display:flex; align-items:center; justify-content:center;
font-size:26px; font-weight:800; margin-right:20px; flex-shrink:0; }}
.otext {{ font-size:30px; color:#e0e0e0; font-weight:600; line-height:1.3; }}
.timer-ring {{
width:220px; height:220px; border-radius:50%;
border: 14px solid {timer_color};
display:flex; align-items:center; justify-content:center;
box-shadow: 0 0 40px {timer_color}88;
}}
.timer-num {{ font-size:120px; font-weight:900; color:{timer_color};
line-height:1; }}
"""
body = f"""
<div class="hook">⚡ Can you answer in time?</div>
<div class="qcard">
<div class="qtext">{q}</div>
<div class="ybadge">UPSC {year}</div>
</div>
<div class="opts">
<div class="opt"><div class="oletter">A</div><div class="otext">{oa}</div></div>
<div class="opt"><div class="oletter">B</div><div class="otext">{ob}</div></div>
<div class="opt"><div class="oletter">C</div><div class="otext">{oc}</div></div>
<div class="opt"><div class="oletter">D</div><div class="otext">{od}</div></div>
</div>
<div class="timer-ring"><div class="timer-num">{count}</div></div>
"""
return _wrap(body, css, bg=_bg)
def _build_timer_challenge_html(question_data: dict) -> list[tuple[str, float]]:
"""3-phase timer challenge: hook(2s) + countdown(5×1s) + answer reveal(5s).
Total: ~12s.
"""
theme = _pick_theme()
logger.info("Theme: %s", theme["name"])
_accent = theme["accent"]
_bg = theme["bg"]
_abg = theme["accent_bg"]
_grn = theme["green"]
_rd = theme["red"]
frames: list[tuple[str, float]] = []
# ── Screen 1: Question + Options thumbnail (2s) ────────────────────────
q = _esc(question_data.get("question", ""))
oa = _esc(question_data.get("option_a", ""))
ob = _esc(question_data.get("option_b", ""))
oc = _esc(question_data.get("option_c", ""))
od = _esc(question_data.get("option_d", ""))
category = question_data.get("category", "").strip()
year = question_data.get("year", "")
# Auto-size question font based on length
q_len = len(question_data.get("question", ""))
q_font = "34px" if q_len > 150 else ("38px" if q_len > 100 else "42px")
css1 = f"""
.top-bar {{ display:flex; justify-content:space-between; align-items:center;
width:100%; margin-bottom:40px; }}
.badge {{ font-size:24px; color:{_accent}; font-weight:700;
background:{_abg}; padding:10px 24px; border-radius:50px;
border:2px solid {_accent}; }}
.year-badge {{ font-size:22px; color:#fff; font-weight:700;
background:#333; padding:8px 20px; border-radius:50px; }}
.qcard {{ background:#1a1a1a; border-radius:24px; padding:36px;
width:100%; margin-bottom:30px; border:2px solid {_accent}33; }}
.qtext {{ font-size:{q_font}; font-weight:700; color:#fff; line-height:1.45; }}
.opts {{ width:100%; }}
.opt {{ background:#161616; border-radius:16px; padding:18px 24px;
margin-bottom:14px; display:flex; align-items:center;
border:2px solid #222; }}
.oletter {{ width:44px; height:44px; border-radius:50%;
background:#222; color:{_accent};
display:flex; align-items:center; justify-content:center;
font-size:24px; font-weight:800; margin-right:18px; flex-shrink:0; }}
.otext {{ font-size:28px; color:#e0e0e0; font-weight:600; line-height:1.3; }}
.cta {{ font-size:32px; font-weight:800; color:{_accent};
text-align:center; margin-top:30px; }}
"""
_ctas = [
"Answer in 3 sec ⏱️",
"Comment your answer 👇",
"Can you solve this? 🤔",
f"UPSC {year} — try it!",
"Pause and think 🧠",
]
cta_text = random.choice(_ctas)
hook_body = f"""
<div class="top-bar">
<div class="badge">⚡ UPSC PYQ</div>
<div class="year-badge">{_esc(str(year))}</div>
</div>
<div class="qcard">
<div class="qtext">{q}</div>
</div>
<div class="opts">
<div class="opt"><div class="oletter">A</div><div class="otext">{oa}</div></div>
<div class="opt"><div class="oletter">B</div><div class="otext">{ob}</div></div>
<div class="opt"><div class="oletter">C</div><div class="otext">{oc}</div></div>
<div class="opt"><div class="oletter">D</div><div class="otext">{od}</div></div>
</div>
<div class="cta">{cta_text}</div>
"""
frames.append((_wrap(hook_body, css1, bg=_bg), 7.0))
# ── Screen 3: CTA — check comments for answer (3s) ─────────────────────
css3 = f"""
.cta-icon {{ font-size:120px; margin-bottom:40px; }}
.cta-big {{ font-size:64px; font-weight:900; color:#fff;
text-align:center; line-height:1.2; margin-bottom:30px; }}
.cta-big em {{ color:{_accent}; font-style:normal; }}
.cta-sub {{ font-size:36px; font-weight:600; color:rgba(255,255,255,.6);
text-align:center; }}
.cta-brand {{ font-size:30px; font-weight:700; color:{_accent};
text-align:center; margin-top:40px; }}
"""
_end_ctas = [
("👇", "Answer in<br><em>comments</em>", "Check below!"),
("💬", "Comment your<br><em>answer</em> first!", "Then check 👇"),
("🤔", "Did you get<br>it <em>right</em>?", "Answer in comments 👇"),
("⬇️", "Scroll down<br>for the <em>answer</em>", "👇"),
]
_ec = random.choice(_end_ctas)
reveal_body = f"""
<div class="cta-icon">{_ec[0]}</div>
<div class="cta-big">{_ec[1]}</div>
<div class="cta-sub">{_ec[2]}</div>
<div class="cta-brand">{BRAND_URL}</div>
"""
frames.append((_wrap(reveal_body, css3, bg=_bg), 3.0))
return frames
# ── Format 2: Shocking Stat Counter ("Data Bomb") ────────────────────────────
def _get_counter_steps(stat: str) -> list[str]:
"""Generate 3 counter steps for rolling animation: [start, mid, final]."""
s = stat.strip()
if "/" in s:
try:
num_s, denom_s = s.split("/", 1)
num, denom = int(num_s.strip()), int(denom_s.strip())
return [f"0/{denom}", f"{num // 2}/{denom}", s]
except Exception:
pass
if "%" in s:
try:
val = float(s.replace("%", "").replace("↑", "").replace("↓", "").strip())
prefix = "↓" if s.startswith("↓") else ("↑" if s.startswith("↑") else "")
return ["0%", f"{prefix}{val / 2:.1f}%", s]
except Exception:
pass
if s.lower().endswith("x"):
try:
val = float(s.lower().rstrip("x"))
return ["1x", f"{val / 2:.0f}x", s]
except Exception:
pass
return ["...", s, s]
def _stat_counter_frame(hook: str, counter_val: str, source: str, is_final: bool = False) -> str:
accent = _ORANGE
counter_size = "160px" if len(counter_val) <= 5 else "120px"
extra_glow = f"box-shadow: 0 0 80px {accent}55;" if is_final else ""
css = f"""
.hook {{ font-size:54px; font-weight:900; color:#fff;
text-align:center; line-height:1.2; margin-bottom:60px; }}
.counter-ring {{
width:340px; height:340px; border-radius:50%;
background:#111; border: 8px solid {accent};
display:flex; align-items:center; justify-content:center;
{extra_glow}
margin-bottom:50px;
}}
.counter-val {{ font-size:{counter_size}; font-weight:900; color:{accent};
line-height:1; text-align:center; }}
.source {{ font-size:26px; color:rgba(255,255,255,.4);
text-align:center; font-style:italic; }}
"""
body = f"""
<div class="hook">{_esc(hook)}</div>
<div class="counter-ring">
<div class="counter-val">{_esc(counter_val)}</div>
</div>
<div class="source">{_esc(source)}</div>
"""
return _wrap(body, css)
def _build_shocking_stat_html(question_data: dict) -> list[tuple[str, float]]:
"""3-phase stat reveal: hook(2s) + counter roll(3×2s) + CTA(5s).
Total: 13s.
"""
# Pick stat — use day-of-year for variety so each day gets a different stat
idx = (datetime.now().timetuple().tm_yday + datetime.now().weekday()) % len(SHOCKING_STATS)
stat_data = SHOCKING_STATS[idx]
hook = stat_data["hook"]
stat = stat_data["stat"]
detail = stat_data["detail"]
source = stat_data["source"]
frames: list[tuple[str, float]] = []
# ── Screen 1: Hook (2s) ─────────────────────────────────────────────────
css1 = f"""
.badge {{ font-size:28px; color:{_ORANGE}; font-weight:700;
border:2px solid {_ORANGE}; padding:12px 30px; border-radius:40px;
margin-bottom:60px; }}
.hook {{ font-size:72px; font-weight:900; color:#fff;
text-align:center; line-height:1.15; }}
.hook em {{ color:{_ORANGE}; font-style:normal; }}
"""
hook_body = f"""
<div class="badge">📊 DATA ALERT</div>
<div class="hook">{_esc(hook).replace(" ", "<br>", 2)}</div>
"""
frames.append((_wrap(hook_body, css1), 2.0))
# ── Screen 2: Counter rolling (3 steps × 2s) ───────────────────────────
steps = _get_counter_steps(stat)
for i, val in enumerate(steps):
is_final = i == len(steps) - 1
frames.append((_stat_counter_frame(hook, val, source, is_final), 2.0))
# ── Screen 3: Detail + CTA (5s) ─────────────────────────────────────────
css3 = f"""
.stat-big {{ font-size:120px; font-weight:900; color:{_ORANGE};
text-align:center; margin-bottom:40px; line-height:1; }}
.detail {{ background:#111; border-radius:20px; padding:36px 44px;
width:100%; margin-bottom:44px; border-left:6px solid {_ORANGE}; }}
.detail-text {{ font-size:36px; font-weight:700; color:#fff; line-height:1.5; }}
.source-text {{ font-size:26px; color:rgba(255,255,255,.45); margin-top:14px; }}
.cta-box {{ background:{_ORANGE}; border-radius:20px; padding:28px 44px; width:100%; }}
.cta-text {{ font-size:36px; font-weight:900; color:#fff; text-align:center; }}
"""
cta_body = f"""
<div class="stat-big">{_esc(stat)}</div>
<div class="detail">
<div class="detail-text">{_esc(detail)}</div>
<div class="source-text">Source: {_esc(source)}</div>
</div>
<div class="cta-box">
<div class="cta-text">Practice karo → {BRAND_URL}</div>
</div>
"""
frames.append((_wrap(cta_body, css3), 5.0))
return frames
# ── Format 3: Comparison Split ("This vs That") ──────────────────────────────
def _comparison_frame(wrong_items: list[str], right_items: list[str], show_n: int, show_cta: bool = False) -> str:
"""Render split-screen comparison showing first `show_n` pairs."""
def item_html(text: str, side: str) -> str:
bg = f"{_RED}22" if side == "wrong" else f"{_GREEN}22"
border = _RED if side == "wrong" else _GREEN
return f"""<div style="background:{bg}; border:2px solid {border};
border-radius:14px; padding:18px 24px; margin-bottom:14px;">
<span style="font-size:28px; font-weight:700; color:{'#ff6b6b' if side=='wrong' else _GREEN};">{_esc(text)}</span>
</div>"""
wrong_html = "".join(item_html(t, "wrong") for t in wrong_items[:show_n])
right_html = "".join(item_html(t, "right") for t in right_items[:show_n])
cta_block = ""
if show_cta:
cta_block = f"""<div style="position:absolute; bottom:100px; left:60px; right:60px;
background:{_ORANGE}; border-radius:20px; padding:26px; text-align:center;">
<span style="font-size:34px; font-weight:900; color:#fff;">Smart prep → {BRAND_URL}</span>
</div>"""
css = f"""
.split {{ display:flex; gap:20px; width:100%; margin-bottom:30px; align-items:flex-start; }}
.col {{ flex:1; }}
.col-header {{ font-size:30px; font-weight:900; padding:16px; border-radius:14px;
text-align:center; margin-bottom:16px; }}
.wrong-header {{ background:{_RED}; color:#fff; }}
.right-header {{ background:{_GREEN}; color:#fff; }}
.top-label {{ font-size:40px; font-weight:900; color:#fff;
text-align:center; margin-bottom:36px; }}
"""
body = f"""
<div class="top-label">UPSC Prep: Right vs Wrong</div>
<div class="split">
<div class="col">
<div class="col-header wrong-header">❌ WRONG</div>
{wrong_html}
</div>
<div class="col">
<div class="col-header right-header">✅ RIGHT</div>
{right_html}
</div>
</div>
{cta_block}
"""
return _wrap(body, css)
def _build_comparison_html(question_data: dict) -> list[tuple[str, float]]:
"""Progressive reveal comparison: 4 frames (1 item, 2, 3, 4+CTA).
Total: 4 × 3s = 12s.
"""
idx = datetime.now().weekday() % len(COMPARISONS)
cmp = COMPARISONS[idx]
wrong = cmp["wrong"]
right = cmp["right"]
max_n = min(len(wrong), len(right), 4)
frames: list[tuple[str, float]] = []
for n in range(1, max_n + 1):
show_cta = n == max_n
frames.append((_comparison_frame(wrong, right, n, show_cta), 3.0))
return frames
# ── Format 4: Loop Format ("3 Second Answer") ────────────────────────────────
def _build_loop_format_html(question_data: dict) -> list[tuple[str, float]]:
"""5-screen loop format that visually connects end back to start.
Total: 1+3+3+3+3 = 13s.
"""
q = _esc(question_data.get("question", ""))
correct_letter = str(question_data.get("correct_answer", "A")).upper()
opts_map = {
"A": question_data.get("option_a", ""),
"B": question_data.get("option_b", ""),
"C": question_data.get("option_c", ""),
"D": question_data.get("option_d", ""),
}
answer_text = _esc(opts_map.get(correct_letter, question_data.get("correct_option_text", "")))
explanation = _esc((question_data.get("explanation", "") or "")[:140])
year = _esc(str(question_data.get("year", "")))
frames: list[tuple[str, float]] = []
# ── Screen 1 (1s): "Can you answer this?" — matches Screen 5 visually ──
css1 = f"""
.cta-big {{ font-size:80px; font-weight:900; color:#fff;
text-align:center; line-height:1.15; }}
.cta-big em {{ color:{_ORANGE}; font-style:normal; }}
.sub {{ font-size:46px; font-weight:700; color:rgba(255,255,255,.5);
text-align:center; margin-top:30px; }}
.orb {{ width:180px; height:180px; border-radius:50%;
background:{_ORANGE}; display:flex; align-items:center;
justify-content:center; margin-bottom:50px;
box-shadow:0 0 60px {_ORANGE}88; }}
.orb-text {{ font-size:90px; }}
"""
category = question_data.get("category", "").strip()
_loop_hooks = [
("Can you<br><em>answer</em><br>this?", "Tap to test yourself ↓"),
(f"<em>{category}</em><br>question —<br>3 seconds", f"UPSC {year}"),
("Answer in<br><em>3 seconds</em><br>or less", "Ready? Go ↓"),
(f"This <em>{year}</em><br>PYQ is<br>tricky", "Think fast 🤔"),
(f"Quick<br><em>{category}</em><br>challenge", "Can you get it right?"),
]
_lh = random.choice(_loop_hooks)
body1 = f"""
<div class="orb"><div class="orb-text">🧠</div></div>
<div class="cta-big">{_lh[0]}</div>
<div class="sub">{_lh[1]}</div>
"""
frames.append((_wrap(body1, css1), 1.0))
# ── Screen 2 (3s): The Question ─────────────────────────────────────────
css2 = f"""
.year-tag {{ font-size:28px; color:{_ORANGE}; font-weight:700;
border:2px solid {_ORANGE}; padding:10px 28px; border-radius:40px;
margin-bottom:40px; }}
.qtext {{ font-size:44px; font-weight:800; color:#fff; line-height:1.4;
text-align:center; }}
.hint {{ font-size:32px; color:rgba(255,255,255,.4);
text-align:center; margin-top:40px; font-weight:600; }}
"""
body2 = f"""
<div class="year-tag">UPSC {year}</div>
<div class="qtext">{q}</div>
<div class="hint">Answer in 3 seconds ↓</div>
"""
frames.append((_wrap(body2, css2), 3.0))
# ── Screen 3 (3s): Countdown 3 → 1 (1s each) ───────────────────────────
for count in [3, 2, 1]:
color = _RED if count == 3 else (_ORANGE if count == 2 else _GREEN)
css_c = f"""
.count-label {{ font-size:40px; font-weight:700; color:rgba(255,255,255,.5);
text-align:center; margin-bottom:40px; }}
.big-num {{ font-size:280px; font-weight:900; color:{color};
text-align:center; line-height:1; }}
"""
body_c = f"""
<div class="count-label">Time's running out...</div>
<div class="big-num">{count}</div>
"""
frames.append((_wrap(body_c, css_c), 1.0))
# ── Screen 4 (3s): Answer ───────────────────────────────────────────────
css4 = f"""
.ans-header {{ font-size:44px; font-weight:900; color:{_GREEN};
text-align:center; margin-bottom:40px; }}
.ans-badge {{ background:{_GREEN}22; border:3px solid {_GREEN};
border-radius:20px; padding:32px 40px; width:100%; margin-bottom:30px; }}
.ans-letter {{ font-size:40px; font-weight:900; color:{_GREEN}; margin-bottom:8px; }}
.ans-text {{ font-size:36px; font-weight:700; color:#fff; line-height:1.4; }}
.exp {{ background:#111; border-radius:16px; padding:28px 34px;
width:100%; border-left:5px solid {_ORANGE}; }}
.exp-text {{ font-size:28px; color:#ccc; line-height:1.6; font-weight:500; }}
"""
body4 = f"""
<div class="ans-header">✅ ANSWER!</div>
<div class="ans-badge">
<div class="ans-letter">Option {correct_letter}</div>
<div class="ans-text">{answer_text}</div>
</div>
<div class="exp"><div class="exp-text">{explanation}</div></div>
"""
frames.append((_wrap(body4, css4), 3.0))
# ── Screen 5 (3s): Loop back — mirrors Screen 1 visually ────────────────
css5 = f"""
.next-big {{ font-size:66px; font-weight:900; color:#fff;
text-align:center; line-height:1.2; margin-bottom:40px; }}
.next-big em {{ color:{_ORANGE}; font-style:normal; }}
.arrow {{ font-size:100px; text-align:center; margin-bottom:40px; }}
.sub2 {{ font-size:36px; font-weight:700; color:rgba(255,255,255,.5);
text-align:center; }}
"""
body5 = f"""
<div class="arrow">👉</div>
<div class="next-big">Next question<br><em>tomorrow</em></div>
<div class="sub2">Follow so you don't miss it!</div>
"""
frames.append((_wrap(body5, css5), 3.0))
return frames
# ── Old non-quiz builder (kept for backward compat) ──────────────────────────
def _build_non_quiz_html(gradient: str, day_format: str, question_data: dict) -> str:
"""Legacy fallback: builds old-style non-quiz HTML. Returns a single HTML string."""
from bot.formatter import (
format_shocking_stat,
format_myth_buster,
format_this_or_that,
format_subject_breakdown,
format_motivation,
)
formatters = {
"shocking_stat": format_shocking_stat,
"myth_buster": format_myth_buster,
"this_or_that": format_this_or_that,
"subject_breakdown": format_subject_breakdown,
"motivation_data": format_motivation,
}
formatter = formatters.get(day_format, format_shocking_stat)
text_content = formatter() or "UPSC PYQ Analysis"
lines = [line for line in text_content.split("\n") if line.strip()]
hook = lines[0] if lines else "UPSC Insight"
body_html = "".join(
f'<div class="body-line">{line}</div>' for line in lines[1:][:8]
)
return f"""<!DOCTYPE html>
<html><head><style>
* {{ margin: 0; padding: 0; box-sizing: border-box; }}
body {{
width: 1080px; height: 1920px;
background: {gradient};
font-family: 'Segoe UI', sans-serif;
display: flex; flex-direction: column;
align-items: center; justify-content: center;
padding: 60px 50px;
}}
.hook {{ font-size: 64px; font-weight: 800; color: white;
text-align: center; line-height: 1.3; margin-bottom: 50px;
text-shadow: 0 4px 20px rgba(0,0,0,0.3); }}
.body-card {{ background: rgba(255,255,255,0.95);
border-radius: 30px; padding: 50px;
box-shadow: 0 20px 60px rgba(0,0,0,0.2);
width: 100%; max-width: 960px; }}
.body-line {{ font-size: 36px; color: #1a1a2e;
line-height: 1.6; margin-bottom: 12px; font-weight: 500; }}
.footer {{ margin-top: 50px; background: rgba(255,255,255,0.2);
border-radius: 25px; padding: 20px 40px; }}
.footer-text {{ font-size: 28px; font-weight: 700; color: white; text-align: center; }}
</style></head><body>
<div class="hook">{hook}</div>
<div class="body-card">{body_html}</div>
<div class="footer"><div class="footer-text">{BRAND_NAME}</div></div>
</body></html>"""
# ── Rendering helpers ─────────────────────────────────────────────────────────
def _pick_music() -> Path:
"""Pick a random music file from templates directory."""
music_files = sorted(TEMPLATES_DIR.glob("*.mp3"))
if not music_files:
raise FileNotFoundError(f"No music files found in {TEMPLATES_DIR}")
return random.choice(music_files)
def _make_hti(output_dir: Path) -> Html2Image:
"""Create one Html2Image instance (launches Chrome once)."""
import os, shutil
is_ci = os.getenv("CI") == "true" or os.getenv("GITHUB_ACTIONS") == "true"
flags = [
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--headless=new",
"--disable-software-rasterizer",
]
# Find chromium/chrome binary
browser_path = None
for candidate in ["chromium-browser", "chromium", "google-chrome", "google-chrome-stable"]:
found = shutil.which(candidate)
if found:
browser_path = found
break
# temp_path must also be accessible by Chrome (snap can't read /tmp)
temp_dir = os.path.join(os.path.expanduser("~"), "hti_temp")
os.makedirs(temp_dir, exist_ok=True)
kwargs = {
"output_path": str(output_dir),
"temp_path": temp_dir,
"custom_flags": flags,
}
if browser_path:
kwargs["browser_executable"] = browser_path
return Html2Image(**kwargs)
def _render_html_to_image(html_content: str, output_path: Path, hti: Html2Image | None = None) -> None:
"""Render HTML to PNG. Creates its own hti instance if none provided."""
if hti is None:
hti = _make_hti(output_path.parent)
hti.screenshot(html_str=html_content, save_as=output_path.name, size=(VIDEO_W, VIDEO_H))
if not output_path.exists():
# html2image may write to its own output_path, check there
alt = Path(hti.output_path) / output_path.name
if alt.exists() and str(alt) != str(output_path):
import shutil
shutil.move(str(alt), str(output_path))
else:
raise FileNotFoundError(f"html2image failed to render: {output_path}")
logger.debug("Rendered → %s (%d bytes)", output_path.name, output_path.stat().st_size)
def _frames_to_video(
frames: list[tuple[str, float]],
output_path: Path,
) -> Path:
"""Render HTML frames to images, sequence into video with music."""
# Use home-based temp dir (snap chromium can't write to /tmp or hidden dirs)
import os
base_tmp = Path(os.path.expanduser("~")) / "hti_render"
base_tmp.mkdir(exist_ok=True)
temp_dir = Path(tempfile.mkdtemp(dir=str(base_tmp)))
try:
hti = _make_hti(temp_dir)
clips = []
for i, (html, duration) in enumerate(frames):
img_path = temp_dir / f"frame_{i:03d}.png"
_render_html_to_image(html, img_path, hti)
img_arr = np.array(Image.open(img_path))
clips.append(ImageClip(img_arr).with_duration(duration))
video = concatenate_videoclips(clips, method="compose")
total_dur = sum(d for _, d in frames)
music_path = _pick_music()
raw_audio = AudioFileClip(str(music_path))
audio = raw_audio.subclipped(0, min(total_dur, raw_audio.duration))
video = video.with_audio(audio)
video.write_videofile(
str(output_path),
codec="libx264",
audio_codec="aac",
fps=30,
ffmpeg_params=["-movflags", "+faststart"],
logger=None,
)
audio.close()
raw_audio.close()
video.close()
for c in clips:
c.close()
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
return output_path
# ── Public API ────────────────────────────────────────────────────────────────
def generate_html_video(
question_data: dict,
output_path: Path | None = None,
) -> Path:
"""Generate a viral Instagram/YouTube Shorts video from question data.
Format is selected automatically by weekday via get_video_format_for_day().
Args:
question_data: Dict with keys: question, option_a/b/c/d, year, category,
correct_answer, correct_option_text, explanation (optional)
output_path: Optional output path (defaults to OUTPUT_DIR/quiz_html_<ts>.mp4)
Returns:
Path to the generated .mp4 file.
"""
if output_path is None:
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = OUTPUT_DIR / f"quiz_html_{ts}.mp4"
_ensure_heavy_imports()
fmt = get_video_format_for_day()
logger.info("Generating video — format: %s → %s", fmt, output_path.name)
try:
if fmt == "quiz":
gradient, name = random.choice(GRADIENT_PALETTES)
logger.info("Quiz gradient: %s", name)
frames = _build_quiz_html(question_data, gradient)
elif fmt == "timer_challenge":
frames = _build_timer_challenge_html(question_data)
elif fmt == "shocking_stat":
frames = _build_shocking_stat_html(question_data)
elif fmt == "comparison":
frames = _build_comparison_html(question_data)
elif fmt == "loop_format":
frames = _build_loop_format_html(question_data)
else:
# Fallback: legacy non-quiz HTML as single frame
gradient, _ = random.choice(GRADIENT_PALETTES)
html = _build_non_quiz_html(gradient, fmt, question_data)
frames = [(html, 15.0)]
except Exception as exc:
logger.warning("Template build failed (%s) — falling back to quiz", exc)
gradient, _ = random.choice(GRADIENT_PALETTES)
frames = _build_quiz_html(question_data, gradient)
_frames_to_video(frames, output_path)
mb = output_path.stat().st_size / (1024 * 1024)
logger.info("Video saved: %s (%.1f MB)", output_path.name, mb)
return output_path
FILE:scripts/ig_config.py
import os
import random
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
# ── Instagram API ────────────────────────────────────────────────────────────
INSTAGRAM_ACCESS_TOKEN = os.getenv("INSTAGRAM_ACCESS_TOKEN", "")
INSTAGRAM_USER_ID = os.getenv("INSTAGRAM_USER_ID", "")
INSTAGRAM_APP_SECRET = os.getenv("INSTAGRAM_APP_SECRET", "")
INSTAGRAM_API_BASE = "https://graph.instagram.com"
INSTAGRAM_API_VERSION = "v21.0"
# ── Cloudflare R2 ────────────────────────────────────────────────────────────
R2_ENDPOINT = os.getenv("R2_ENDPOINT", "")
R2_ACCESS_KEY = os.getenv("R2_ACCESS_KEY", "")
R2_SECRET_KEY = os.getenv("R2_SECRET_KEY", "")
R2_BUCKET = os.getenv("R2_BUCKET", "quiz-reels")
R2_PUBLIC_URL = os.getenv("R2_PUBLIC_URL", "")
# ── Paths ─────────────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).resolve().parent.parent
TEMPLATES_DIR = BASE_DIR / "templates"
OUTPUT_DIR = BASE_DIR / "output"
LOGS_DIR = BASE_DIR / "logs"
CSV_PATH = BASE_DIR / "data" / "questions.csv"
OUTPUT_DIR.mkdir(exist_ok=True)
LOGS_DIR.mkdir(exist_ok=True)
def build_caption(question_data: dict) -> str:
"""Build Instagram caption — format-aware, personal tone, hashtags at end only."""
from bot.html_video_generator import get_video_format_for_day
from bot.formatter import format_ig_caption
fmt = get_video_format_for_day()
category = question_data.get("category", "General Studies")
category_tag = category.replace(" ", "").replace("&", "And")
# Viral-optimised captions per video format
FORMAT_CAPTIONS = {
"timer_challenge": (
"90% fail this UPSC question. can you get it right?\n\n"
"drop your answer in the comments 👇\n\n"
"#UPSC #IAS #UPSCPrelims #UPSCQuiz #PreviousYearQuestions "
f"#{category_tag} #UPSC2026 #UPSCAspirants"
),
"shocking_stat": (
"went through 3274 UPSC questions. this pattern blew my mind 🤯\n\n"
"save this before your exam — you'll thank me later\n\n"
"#UPSC #IAS #UPSCPreparation #UPSCStrategy #PYQ "
f"#{category_tag} #UPSC2026 #CivilServices"
),
"comparison": (
"₹0 vs ₹2 lakh UPSC prep — which one actually works? 🧵\n\n"
"bookmark this. share with every aspirant you know.\n\n"
"#UPSC #IAS #UPSCPreparation #UPSCTips #SmartPrep "
f"#{category_tag} #UPSC2026 #IASPreparation"
),
"loop_format": (
"answer in 3 seconds. bet you can't 😅\n\n"
"follow for a new question every day 👉\n\n"
"#UPSC #IAS #UPSCQuiz #UPSCPrelims #DailyQuiz "
f"#{category_tag} #UPSC2026 #UPSCAspirants"
),
}
if fmt in FORMAT_CAPTIONS:
return FORMAT_CAPTIONS[fmt]
# Fallback: existing quiz caption logic
return format_ig_caption(question_data)
FILE:scripts/ig_reel_poster.py
import logging
import time
import requests
from bot.ig_config import (
INSTAGRAM_ACCESS_TOKEN,
INSTAGRAM_API_BASE,
INSTAGRAM_API_VERSION,
INSTAGRAM_USER_ID,
)
logger = logging.getLogger(__name__)
API_URL = f"{INSTAGRAM_API_BASE}/{INSTAGRAM_API_VERSION}"
MAX_RETRIES = 3
BACKOFF_BASE = 2
POLL_INTERVAL = 30
POLL_TIMEOUT = 600
def _api_request(method: str, url: str, max_retries: int = MAX_RETRIES, **kwargs) -> dict:
for attempt in range(1, max_retries + 1):
try:
resp = requests.request(method, url, timeout=30, **kwargs)
resp.raise_for_status()
return resp.json()
except requests.RequestException as e:
wait = BACKOFF_BASE ** attempt
logger.warning(
"API request failed (attempt %d/%d): %s — retrying in %ds",
attempt, max_retries, e, wait,
)
if attempt == max_retries:
raise
time.sleep(wait)
raise RuntimeError("API request failed after all retries")
def create_container(video_url: str, caption: str) -> str:
url = f"{API_URL}/{INSTAGRAM_USER_ID}/media"
params = {
"media_type": "REELS",
"video_url": video_url,
"caption": caption,
"share_to_feed": "true",
"access_token": INSTAGRAM_ACCESS_TOKEN,
}
data = _api_request("POST", url, params=params)
container_id = data["id"]
logger.info("Created container: %s", container_id)
return container_id
def poll_container_status(container_id: str) -> str:
url = f"{API_URL}/{container_id}"
params = {
"fields": "status_code",
"access_token": INSTAGRAM_ACCESS_TOKEN,
}
start_time = time.time()
while True:
elapsed = time.time() - start_time
if elapsed > POLL_TIMEOUT:
raise TimeoutError(
f"Container {container_id} not ready after {POLL_TIMEOUT}s"
)
data = _api_request("GET", url, params=params)
status = data.get("status_code", "UNKNOWN")
logger.info("Container %s status: %s (%.0fs elapsed)", container_id, status, elapsed)
if status == "FINISHED":
return status
if status == "ERROR":
raise RuntimeError(f"Container {container_id} processing failed: ERROR")
time.sleep(POLL_INTERVAL)
def publish_container(container_id: str) -> str:
url = f"{API_URL}/{INSTAGRAM_USER_ID}/media_publish"
params = {
"creation_id": container_id,
"access_token": INSTAGRAM_ACCESS_TOKEN,
}
data = _api_request("POST", url, params=params)
media_id = data["id"]
logger.info("Published reel: %s", media_id)
return media_id
def comment_on_media(media_id: str, text: str) -> str:
url = f"{API_URL}/{media_id}/comments"
params = {
"message": text,
"access_token": INSTAGRAM_ACCESS_TOKEN,
}
data = _api_request("POST", url, params=params)
comment_id = data["id"]
logger.info("Posted comment %s on media %s", comment_id, media_id)
return comment_id
def build_answer_comment(question_data: dict) -> str:
answer_letter = question_data["answer"].strip().upper()
answer_map = {"A": "option_a", "B": "option_b", "C": "option_c", "D": "option_d"}
answer_key = answer_map.get(answer_letter, "option_a")
answer_text = question_data[answer_key]
explanation = question_data.get("explanation", "")
comment = f"\u2705 Answer: {answer_letter}) {answer_text}"
if explanation:
comment += f"\n\n\U0001f4a1 Explanation:\n{explanation}"
return comment
def post_reel(video_url: str, caption: str, question_data: dict | None = None) -> str:
container_id = create_container(video_url, caption)
poll_container_status(container_id)
media_id = publish_container(container_id)
if question_data:
try:
comment_text = build_answer_comment(question_data)
comment_on_media(media_id, comment_text)
except Exception:
logger.exception("Failed to post answer comment on %s", media_id)
return media_id
FILE:scripts/image_generator.py
"""Generates quiz question images styled like a UPSC question paper."""
import logging
import os
import tempfile
from PIL import Image, ImageDraw, ImageFont
logger = logging.getLogger(__name__)
IMG_WIDTH = 1080
PADDING_X = 80
PADDING_Y = 40
TEXT_AREA_WIDTH = IMG_WIDTH - 2 * PADDING_X
OPTION_INDENT = 40
BG_COLOR = "#FDF8F0"
TEXT_COLOR = "#1a1a1a"
def _get_font(size: int, bold: bool = False) -> ImageFont.FreeTypeFont:
"""Load a serif-style font. Falls back through system fonts."""
font_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "fonts")
if bold:
candidates = [
(font_dir, "NotoSerif-Bold.ttf"),
(font_dir, "DejaVuSerif-Bold.ttf"),
("/usr/share/fonts/truetype/dejavu", "DejaVuSerif-Bold.ttf"),
("/usr/share/fonts/truetype/dejavu", "DejaVuSans-Bold.ttf"),
]
else:
candidates = [
(font_dir, "NotoSerif-Regular.ttf"),
(font_dir, "DejaVuSerif.ttf"),
("/usr/share/fonts/truetype/dejavu", "DejaVuSerif.ttf"),
("/usr/share/fonts/truetype/dejavu", "DejaVuSans.ttf"),
]
for directory, name in candidates:
path = os.path.join(directory, name)
if os.path.exists(path):
return ImageFont.truetype(path, size)
for p in ["/System/Library/Fonts/Times.ttc", "/System/Library/Fonts/Helvetica.ttc"]:
if os.path.exists(p):
return ImageFont.truetype(p, size)
return ImageFont.load_default()
def _wrap_text(text: str, font: ImageFont.FreeTypeFont, max_width: int) -> list[str]:
"""Word-wrap text to fit within max_width pixels."""
words = text.split()
lines = []
current_line = ""
for word in words:
test = f"{current_line} {word}".strip()
bbox = font.getbbox(test)
if bbox[2] - bbox[0] <= max_width:
current_line = test
else:
if current_line:
lines.append(current_line)
current_line = word
if current_line:
lines.append(current_line)
return lines
def generate_question_image(q: dict) -> str:
"""Generate an image styled like a UPSC question paper screenshot.
Layout — just question and options, no header/footer:
Q. <question text>
(a) option 1
(b) option 2
(c) option 3
(d) option 4
"""
question_font = _get_font(28)
option_font = _get_font(26)
opts = q["options"]
year = q.get("year", "")
# Wrap question text (year rendered separately in red)
q_text = f"Q. {q['question']}"
question_lines = _wrap_text(q_text, question_font, TEXT_AREA_WIDTH)
year_tag = f"UPSC {year}" if year else ""
# Wrap options with UPSC-style labels
option_labels = ["(a)", "(b)", "(c)", "(d)"]
option_max_w = TEXT_AREA_WIDTH - OPTION_INDENT
wrapped_options = []
for i, opt in enumerate(opts):
full = f"{option_labels[i]} {opt}"
wrapped = _wrap_text(full, option_font, option_max_w)
wrapped_options.append(wrapped)
# Calculate line heights
line_sp = 10
q_line_h = question_font.getbbox("Ay")[3] + line_sp
opt_line_h = option_font.getbbox("Ay")[3] + line_sp
# Total content height
year_block_h = (15 + q_line_h) if year else 0
content_h = (
len(question_lines) * q_line_h
+ year_block_h # year tag line
+ 30 # gap before options
+ sum(len(lines) * opt_line_h for lines in wrapped_options)
+ 16 * 3 # gaps between options
)
img_h = content_h + 2 * PADDING_Y
img = Image.new("RGB", (IMG_WIDTH, img_h), BG_COLOR)
draw = ImageDraw.Draw(img)
# --- Diagonal watermark ---
wm_font = _get_font(36)
wm_text = "{BRAND_NAME}"
wm_color = "#f0ebe3" # very subtle, just above background
# Create a temporary image for rotated watermark
wm_bbox = wm_font.getbbox(wm_text)
wm_w = wm_bbox[2] - wm_bbox[0]
wm_h = wm_bbox[3] - wm_bbox[1]
spacing_x = wm_w + 80
spacing_y = wm_h + 100
for wy in range(-img_h, img_h * 2, spacing_y):
for wx in range(-IMG_WIDTH, IMG_WIDTH * 2, spacing_x):
# Create small image, draw text, rotate, paste
txt_img = Image.new("RGBA", (wm_w + 20, wm_h + 20), (0, 0, 0, 0))
txt_draw = ImageDraw.Draw(txt_img)
txt_draw.text((10, 10), wm_text, fill=wm_color, font=wm_font)
rotated = txt_img.rotate(30, expand=True, fillcolor=(0, 0, 0, 0))
img.paste(rotated, (wx, wy), rotated)
y = PADDING_Y
# --- Question ---
for line in question_lines:
draw.text((PADDING_X, y), line, fill=TEXT_COLOR, font=question_font)
y += q_line_h
# --- Year tag (red) ---
if year_tag:
y += 15
draw.text((PADDING_X, y), year_tag, fill="#c0392b", font=question_font)
y += q_line_h
y += 30
# --- Options ---
opt_x = PADDING_X + OPTION_INDENT
for i, lines in enumerate(wrapped_options):
for line in lines:
draw.text((opt_x, y), line, fill=TEXT_COLOR, font=option_font)
y += opt_line_h
if i < 3:
y += 16
# Save to temp file
tmp = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
img.save(tmp.name, "PNG")
logger.info("Generated question paper image: %s", tmp.name)
return tmp.name
FILE:scripts/instagram_main.py
"""Instagram Quiz Reel Bot — Entry point.
Usage:
python -m bot.instagram_main # Post one reel
python -m bot.instagram_main --count 3 # Post 3 reels
python -m bot.instagram_main --dry-run # Generate video only
"""
import argparse
import logging
import signal
import sys
from logging.handlers import RotatingFileHandler
from bot.csv_manager import get_next_question, get_unposted_count, mark_as_posted
from bot.ig_config import LOGS_DIR, build_caption
from bot.ig_reel_poster import post_reel
from bot.r2_uploader import upload_to_r2
from bot.html_video_generator import generate_html_video as generate_video
LOG_FORMAT = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
logger = logging.getLogger(__name__)
_shutdown = False
def _setup_logging():
root = logging.getLogger()
root.setLevel(logging.INFO)
console = logging.StreamHandler()
console.setFormatter(logging.Formatter(LOG_FORMAT))
root.addHandler(console)
log_file = LOGS_DIR / "instagram.log"
fh = RotatingFileHandler(log_file, maxBytes=5 * 1024 * 1024, backupCount=3)
fh.setFormatter(logging.Formatter(LOG_FORMAT))
root.addHandler(fh)
def _signal_handler(signum, frame):
global _shutdown
logger.info("Shutdown signal received — finishing current task...")
_shutdown = True
def process_one(dry_run: bool = False) -> bool:
result = get_next_question()
if result is None:
logger.info("No unposted questions remaining")
return False
index, question_data = result
caption = build_caption(question_data)
logger.info("── Step 1/3: Generating video ──")
try:
video_path = generate_video(question_data)
except Exception:
logger.exception("Video generation failed — skipping question %d", index)
return True
if dry_run:
logger.info("── DRY RUN — skipping upload and post ──")
logger.info("Caption:\n%s", caption)
logger.info("Video saved at: %s", video_path)
return True
logger.info("── Step 2/3: Uploading to R2 ──")
try:
public_url = upload_to_r2(video_path)
except Exception:
logger.exception("R2 upload failed — skipping question %d", index)
return True
logger.info("── Step 3/3: Posting to Instagram ──")
try:
media_id = post_reel(public_url, caption, question_data)
logger.info("Successfully posted reel! Media ID: %s", media_id)
except Exception:
logger.exception("Instagram posting failed — skipping question %d", index)
return True
mark_as_posted(index)
try:
video_path.unlink()
logger.info("Cleaned up video file: %s", video_path)
except OSError:
logger.warning("Failed to delete video file: %s", video_path)
return True
def main():
_setup_logging()
parser = argparse.ArgumentParser(description="Instagram Quiz Reel Bot")
parser.add_argument("--count", type=int, default=1)
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
remaining = get_unposted_count()
logger.info("Instagram Reel Bot started — %d unposted questions available", remaining)
posted = 0
for i in range(args.count):
if _shutdown:
logger.info("Shutting down gracefully after %d reel(s)", posted)
break
logger.info("━━━ Processing reel %d/%d ━━━", i + 1, args.count)
if not process_one(dry_run=args.dry_run):
break
posted += 1
action = "generated" if args.dry_run else "posted"
logger.info("Done! %d reel(s) %s", posted, action)
if __name__ == "__main__":
main()
FILE:scripts/meta_poster.py
"""Posts quiz content to a Facebook Page using the Graph API."""
import logging
import os
import time
import requests
logger = logging.getLogger(__name__)
GRAPH_API_VERSION = "v21.0"
GRAPH_API_BASE = f"https://graph.facebook.com/{GRAPH_API_VERSION}"
MAX_RETRIES = 2
RETRY_DELAY = 5 # seconds
def _post_with_retry(url: str, params: dict) -> str:
"""Make a POST request with retry logic. Returns the post/comment ID."""
for attempt in range(1, MAX_RETRIES + 2):
try:
resp = requests.post(url, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
post_id = str(data["id"])
logger.info("Meta post created (id=%s, attempt=%d)", post_id, attempt)
return post_id
except Exception as e:
logger.error("Meta API error (attempt %d/%d): %s", attempt, MAX_RETRIES + 1, e)
if attempt <= MAX_RETRIES:
time.sleep(RETRY_DELAY)
else:
raise
def post_question(text: str) -> str | None:
"""Post a question to the Facebook Page feed. Returns post ID or None on failure."""
try:
page_id = os.environ["META_PAGE_ID"]
access_token = os.environ["META_PAGE_ACCESS_TOKEN"]
url = f"{GRAPH_API_BASE}/{page_id}/feed"
params = {"message": text, "access_token": access_token}
return _post_with_retry(url, params)
except Exception as e:
logger.error("Failed to post question to Meta: %s", e)
return None
def post_answer(text: str, reply_to_post_id: str) -> str | None:
"""Post an answer as a comment on the previous post. Returns comment ID or None on failure."""
try:
access_token = os.environ["META_PAGE_ACCESS_TOKEN"]
url = f"{GRAPH_API_BASE}/{reply_to_post_id}/comments"
params = {"message": text, "access_token": access_token}
return _post_with_retry(url, params)
except Exception as e:
logger.error("Failed to post answer to Meta: %s", e)
return None
FILE:scripts/r2_uploader.py
import logging
from pathlib import Path
import boto3
from botocore.exceptions import ClientError
from bot.ig_config import R2_ACCESS_KEY, R2_BUCKET, R2_ENDPOINT, R2_PUBLIC_URL, R2_SECRET_KEY
logger = logging.getLogger(__name__)
def _get_s3_client():
return boto3.client(
"s3",
endpoint_url=R2_ENDPOINT,
aws_access_key_id=R2_ACCESS_KEY,
aws_secret_access_key=R2_SECRET_KEY,
)
def upload_to_r2(file_path: Path, max_retries: int = 2) -> str:
client = _get_s3_client()
key = file_path.name
for attempt in range(1, max_retries + 1):
try:
logger.info("Uploading %s to R2 (attempt %d/%d)", key, attempt, max_retries)
client.upload_file(
str(file_path),
R2_BUCKET,
key,
ExtraArgs={"ContentType": "video/mp4"},
)
public_url = f"{R2_PUBLIC_URL.rstrip('/')}/{key}"
logger.info("Upload successful: %s", public_url)
return public_url
except ClientError as e:
logger.error("Upload attempt %d failed: %s", attempt, e)
if attempt == max_retries:
raise
raise RuntimeError("Upload failed after all retries")
FILE:scripts/video_generator.py
import logging
import random
from datetime import datetime
from pathlib import Path
import numpy as np
from moviepy import AudioFileClip, CompositeVideoClip, ImageClip
from PIL import Image, ImageDraw, ImageFont
from bot.ig_config import OUTPUT_DIR, TEMPLATES_DIR
logger = logging.getLogger(__name__)
ANSWER_MAP = {"A": "option_a", "B": "option_b", "C": "option_c", "D": "option_d"}
# ── Visual design constants ──────────────────────────────────────────────────
VIDEO_W, VIDEO_H = 1080, 1920
VIDEO_DURATION = 15 # seconds
# Fonts — use bundled Liberation Sans (Arial-compatible), fall back to system Arial
_FONTS_DIR = Path(__file__).resolve().parent.parent / "fonts"
FONT_BOLD = str(_FONTS_DIR / "LiberationSans-Bold.ttf")
FONT_REGULAR = str(_FONTS_DIR / "LiberationSans-Regular.ttf")
# Colors (RGBA)
COLOR_QUESTION_BG = (20, 20, 50, 200)
COLOR_OPTION_BG = (255, 255, 255, 180)
COLOR_QUESTION_TEXT = (255, 255, 255)
COLOR_OPTION_TEXT = (30, 30, 50)
COLOR_ANSWER_TEXT = (255, 255, 255)
COLOR_HEADER_TEXT = (255, 200, 60)
COLOR_YEAR_TEXT = (255, 255, 255, 220)
COLOR_EXPLAIN_BG = (20, 20, 50, 210)
COLOR_EXPLAIN_TEXT = (230, 230, 240)
COLOR_EXPLAIN_TITLE = (255, 200, 60)
# Layout
CARD_MARGIN_X = 45
CARD_WIDTH = VIDEO_W - 2 * CARD_MARGIN_X
CARD_RADIUS = 24
CARD_PADDING = 30
# Gradient background palettes — (top_color, bottom_color)
GRADIENT_PALETTES = [
((25, 15, 60), (10, 30, 80)), # deep purple → navy
((10, 40, 50), (5, 15, 45)), # dark teal → midnight
((15, 30, 15), (10, 20, 50)), # dark forest → navy
((50, 10, 30), (25, 10, 55)), # burgundy → purple
((10, 20, 55), (30, 10, 45)), # royal blue → plum
((40, 20, 10), (15, 15, 50)), # dark brown → indigo
((10, 35, 45), (20, 10, 50)), # ocean → violet
((35, 10, 50), (10, 25, 60)), # magenta → teal-blue
]
def _generate_gradient_bg() -> Image.Image:
top, bottom = random.choice(GRADIENT_PALETTES)
img = Image.new("RGB", (VIDEO_W, VIDEO_H))
draw = ImageDraw.Draw(img)
for y in range(VIDEO_H):
ratio = y / VIDEO_H
r = int(top[0] + (bottom[0] - top[0]) * ratio)
g = int(top[1] + (bottom[1] - top[1]) * ratio)
b = int(top[2] + (bottom[2] - top[2]) * ratio)
draw.line([(0, y), (VIDEO_W, y)], fill=(r, g, b))
logger.info("Generated gradient background: %s → %s", top, bottom)
return img
def _pick_music() -> Path:
music_files = sorted(TEMPLATES_DIR.glob("*.mp3"))
if not music_files:
raise FileNotFoundError(f"No music files found in {TEMPLATES_DIR}")
return random.choice(music_files)
def _load_font(name: str, size: int) -> ImageFont.FreeTypeFont:
return ImageFont.truetype(name, size)
def _wrap_text(text: str, font: ImageFont.FreeTypeFont, max_width: int) -> list[str]:
words = text.split()
lines = []
current = ""
for word in words:
test = f"{current} {word}".strip()
bbox = font.getbbox(test)
if bbox[2] - bbox[0] <= max_width:
current = test
else:
if current:
lines.append(current)
current = word
if current:
lines.append(current)
return lines or [""]
def _render_header_card(category: str) -> Image.Image:
font = _load_font(FONT_BOLD, 34)
label = "Previous Year UPSC Quiz"
bbox = font.getbbox(label)
text_w = bbox[2] - bbox[0]
text_h = bbox[3] - bbox[1]
pad_x, pad_y = 36, 16
card_w = text_w + 2 * pad_x
card_h = text_h + 2 * pad_y
img = Image.new("RGBA", (card_w, card_h), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
draw.rounded_rectangle((0, 0, card_w, card_h), radius=20, fill=(100, 60, 180, 210))
draw.text((pad_x, pad_y - bbox[1]), label, fill=COLOR_HEADER_TEXT, font=font)
return img
def _render_question_card(question: str, year: str = "") -> Image.Image:
font = _load_font(FONT_BOLD, 46)
year_font = _load_font(FONT_BOLD, 38)
max_text_w = CARD_WIDTH - 2 * CARD_PADDING
lines = _wrap_text(question, font, max_text_w)
line_height = 60
text_block_h = len(lines) * line_height
year_line_h = 52 if year else 0
card_h = text_block_h + year_line_h + 2 * CARD_PADDING + 10
img = Image.new("RGBA", (CARD_WIDTH, card_h), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
draw.rounded_rectangle(
(0, 0, CARD_WIDTH, card_h), radius=CARD_RADIUS, fill=COLOR_QUESTION_BG
)
y = CARD_PADDING
for line in lines:
bbox = font.getbbox(line)
line_w = bbox[2] - bbox[0]
x = (CARD_WIDTH - line_w) // 2
draw.text((x, y), line, fill=COLOR_QUESTION_TEXT, font=font)
y += line_height
if year:
year_label = f"— UPSC {year} —"
ybbox = year_font.getbbox(year_label)
yw = ybbox[2] - ybbox[0]
yx = (CARD_WIDTH - yw) // 2
draw.text((yx, y + 6), year_label, fill=(255, 60, 60), font=year_font)
return img
def _render_option_card(letter: str, text: str, is_answer: bool = False) -> Image.Image:
letter_font = _load_font(FONT_BOLD, 38)
text_font = _load_font(FONT_REGULAR, 36)
max_text_w = CARD_WIDTH - 120
lines = _wrap_text(text, text_font, max_text_w)
line_height = 48
text_block_h = len(lines) * line_height
card_h = max(120, text_block_h + 78)
img = Image.new("RGBA", (CARD_WIDTH, card_h), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
bg = (0, 180, 100, 200) if is_answer else COLOR_OPTION_BG
draw.rounded_rectangle((0, 0, CARD_WIDTH, card_h), radius=card_h // 2, fill=bg)
circle_r = 23
circle_x, circle_y = 36, card_h // 2
circle_bg = (100, 60, 180, 255) if not is_answer else (255, 255, 255, 255)
draw.ellipse(
(circle_x - circle_r, circle_y - circle_r,
circle_x + circle_r, circle_y + circle_r),
fill=circle_bg,
)
lbbox = letter_font.getbbox(letter)
lw = lbbox[2] - lbbox[0]
lh = lbbox[3] - lbbox[1]
letter_color = (255, 255, 255) if not is_answer else (0, 140, 80)
draw.text(
(circle_x - lw // 2, circle_y - lh // 2 - lbbox[1]),
letter, fill=letter_color, font=letter_font,
)
text_x = circle_x + circle_r + 22
text_y = (card_h - text_block_h) // 2
txt_color = COLOR_ANSWER_TEXT if is_answer else COLOR_OPTION_TEXT
for line in lines:
draw.text((text_x, text_y), line, fill=txt_color, font=text_font)
text_y += line_height
return img
def _render_answer_hint() -> Image.Image:
font = _load_font(FONT_BOLD, 34)
label = "Answer & Explanation in comments!"
bbox = font.getbbox(label)
text_w = bbox[2] - bbox[0]
text_h = bbox[3] - bbox[1]
pad_y = 16
card_w = CARD_WIDTH
card_h = text_h + 2 * pad_y
img = Image.new("RGBA", (card_w, card_h), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
draw.rounded_rectangle(
(0, 0, card_w, card_h), radius=card_h // 2, fill=(100, 60, 180, 200)
)
x = (card_w - text_w) // 2
draw.text((x, pad_y - bbox[1]), label, fill=COLOR_HEADER_TEXT, font=font)
return img
def _render_bottom_banner() -> Image.Image:
line1_font = _load_font(FONT_BOLD, 36)
line2_font = _load_font(FONT_BOLD, 30)
line1 = "{BRAND_NAME}"
line2 = "30 Years (1995-2025) Topic Wise PYQs"
l1_bbox = line1_font.getbbox(line1)
l2_bbox = line2_font.getbbox(line2)
l1_h = l1_bbox[3] - l1_bbox[1]
l2_h = l2_bbox[3] - l2_bbox[1]
pad = 22
gap = 10
card_h = l1_h + gap + l2_h + 2 * pad
img = Image.new("RGBA", (CARD_WIDTH, card_h), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
draw.rounded_rectangle(
(0, 0, CARD_WIDTH, card_h), radius=CARD_RADIUS, fill=(180, 40, 40, 220)
)
l1_w = l1_bbox[2] - l1_bbox[0]
draw.text(
((CARD_WIDTH - l1_w) // 2, pad),
line1, fill=(255, 255, 255), font=line1_font,
)
l2_w = l2_bbox[2] - l2_bbox[0]
draw.text(
((CARD_WIDTH - l2_w) // 2, pad + l1_h + gap),
line2, fill=(255, 220, 60), font=line2_font,
)
return img
def _pil_to_clip(img: Image.Image, pos, start, duration) -> ImageClip:
arr = np.array(img)
return (
ImageClip(arr, is_mask=False, transparent=True)
.with_position(pos)
.with_start(start)
.with_duration(duration)
)
def _build_clips(question_data: dict, video_duration: float) -> list:
year = question_data.get("year", "")
clips = []
header_img = _render_header_card(question_data.get("category", "General"))
header_x = (VIDEO_W - header_img.width) // 2
clips.append(_pil_to_clip(header_img, (header_x, 160), 0, video_duration))
q_img = _render_question_card(question_data["question"], year)
clips.append(_pil_to_clip(q_img, (CARD_MARGIN_X, 260), 0, video_duration))
q_card_bottom = 260 + q_img.height + 40
option_gap = 30
options = [
("A", question_data["option_a"]),
("B", question_data["option_b"]),
("C", question_data["option_c"]),
("D", question_data["option_d"]),
]
y = q_card_bottom
for letter, text in options:
opt_img = _render_option_card(letter, text, is_answer=False)
clips.append(_pil_to_clip(opt_img, (CARD_MARGIN_X, y), 0, video_duration))
y += opt_img.height + option_gap
hint_img = _render_answer_hint()
clips.append(_pil_to_clip(hint_img, (CARD_MARGIN_X, y + 10), 0, video_duration))
banner_img = _render_bottom_banner()
banner_y = VIDEO_H - banner_img.height - 60
clips.append(_pil_to_clip(banner_img, (CARD_MARGIN_X, banner_y), 0, video_duration))
return clips
def generate_video(
question_data: dict,
output_path: Path | None = None,
) -> Path:
if output_path is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = OUTPUT_DIR / f"quiz_{timestamp}.mp4"
logger.info("Generating video: %s", output_path.name)
bg_img = _generate_gradient_bg()
bg_clip = ImageClip(np.array(bg_img)).with_duration(VIDEO_DURATION)
music_path = _pick_music()
audio = AudioFileClip(str(music_path)).subclipped(0, VIDEO_DURATION)
bg_clip = bg_clip.with_audio(audio)
overlay_clips = _build_clips(question_data, VIDEO_DURATION)
composite = CompositeVideoClip([bg_clip, *overlay_clips])
composite.write_videofile(
str(output_path),
codec="libx264",
audio_codec="aac",
fps=30,
ffmpeg_params=["-movflags", "+faststart"],
logger=None,
)
audio.close()
composite.close()
file_size_mb = output_path.stat().st_size / (1024 * 1024)
logger.info("Video saved: %s (%.1f MB)", output_path.name, file_size_mb)
return output_path
FILE:scripts/x_poster.py
"""Posts quiz content to X (Twitter) using tweepy."""
import logging
import os
import time
import tweepy
logger = logging.getLogger(__name__)
MAX_RETRIES = 2
RETRY_DELAY = 5 # seconds
def _get_client() -> tweepy.Client:
"""Create an authenticated tweepy Client using env vars."""
return tweepy.Client(
consumer_key=os.environ["X_API_KEY"],
consumer_secret=os.environ["X_API_SECRET"],
access_token=os.environ["X_ACCESS_TOKEN"],
access_token_secret=os.environ["X_ACCESS_TOKEN_SECRET"],
)
def _get_v1_api() -> tweepy.API:
"""Create an authenticated tweepy v1.1 API (needed for media upload)."""
auth = tweepy.OAuth1UserHandler(
os.environ["X_API_KEY"],
os.environ["X_API_SECRET"],
os.environ["X_ACCESS_TOKEN"],
os.environ["X_ACCESS_TOKEN_SECRET"],
)
return tweepy.API(auth)
def _post_with_retry(client: tweepy.Client, text: str, reply_to: str | None = None) -> str:
"""Post a tweet with retry logic. Returns the tweet ID as a string."""
for attempt in range(1, MAX_RETRIES + 2):
try:
kwargs = {"text": text}
if reply_to:
kwargs["in_reply_to_tweet_id"] = reply_to
response = client.create_tweet(**kwargs)
tweet_id = str(response.data["id"])
logger.info("Tweet posted (id=%s, attempt=%d)", tweet_id, attempt)
return tweet_id
except Exception as e:
logger.error("X API error (attempt %d/%d): %s", attempt, MAX_RETRIES + 1, e)
if attempt <= MAX_RETRIES:
time.sleep(RETRY_DELAY)
else:
raise
def post_question(text: str) -> str | None:
"""Post a question tweet. Returns tweet ID or None on failure."""
try:
client = _get_client()
return _post_with_retry(client, text)
except Exception as e:
logger.error("Failed to post question to X: %s", e)
return None
def post_answer(text: str, reply_to_tweet_id: str) -> str | None:
"""Post an answer as a reply tweet. Returns tweet ID or None on failure."""
try:
client = _get_client()
return _post_with_retry(client, text, reply_to=reply_to_tweet_id)
except Exception as e:
logger.error("Failed to post answer to X: %s", e)
return None
def post_thread(tweets: list[str]) -> list[str]:
"""Post a thread (list of tweets). Each tweet replies to the previous. Returns list of tweet IDs."""
if not tweets:
return []
client = _get_client()
ids = []
reply_to = None
for i, text in enumerate(tweets):
try:
tweet_id = _post_with_retry(client, text, reply_to=reply_to)
ids.append(tweet_id)
reply_to = tweet_id
logger.info("Thread tweet %d/%d posted (id=%s)", i + 1, len(tweets), tweet_id)
if i < len(tweets) - 1:
import time as _time
_time.sleep(2) # small delay between tweets
except Exception as e:
logger.error("Thread tweet %d failed: %s", i + 1, e)
break
return ids
def post_image(image_path: str, text: str = "") -> str | None:
"""Post a tweet with an image. Returns tweet ID or None on failure."""
try:
api = _get_v1_api()
media = api.media_upload(image_path)
logger.info("Media uploaded (id=%s)", media.media_id)
client = _get_client()
kwargs = {"media_ids": [media.media_id]}
if text:
kwargs["text"] = text
response = client.create_tweet(**kwargs)
tweet_id = str(response.data["id"])
logger.info("Image tweet posted (id=%s)", tweet_id)
return tweet_id
except Exception as e:
logger.error("Failed to post image to X: %s", e)
return None
FILE:scripts/x_thread_generator.py
"""Generate data-driven X threads from PYQ database. One thread per day, auto-rotated."""
import csv
import logging
import random
from collections import Counter, defaultdict
from pathlib import Path
logger = logging.getLogger(__name__)
DATA_PATH = Path(__file__).parent.parent / "data" / "questions.csv"
def _load_questions() -> list[dict]:
with open(DATA_PATH, "r") as f:
return list(csv.DictReader(f))
def _qs_by_era(rows, y1, y2):
return [r for r in rows if r.get("year", "").isdigit() and y1 <= int(r["year"]) <= y2]
def _random_q(rows, **filters) -> dict | None:
"""Pick a random question matching filters. No answer included."""
pool = rows
for k, v in filters.items():
pool = [r for r in pool if v.lower() in r.get(k, "").lower()]
return random.choice(pool) if pool else None
def _fmt_q(q: dict) -> str:
"""Format question + options. NO answer."""
opts = [q.get(f"option{i}", "") for i in range(1, 5)]
year = q.get("year", "")
return (
f"[UPSC {year}]\n"
f"{q['question'][:200]}\n\n"
f"A) {opts[0]}\nB) {opts[1]}\nC) {opts[2]}\nD) {opts[3]}"
)
def _trim(text: str, limit: int = 280) -> str:
if len(text) <= limit:
return text
return text[:limit - 1] + "…"
# ── Thread generators (7 themes, rotate by day_of_year % 7) ──────────────────
def _thread_subject_weightage(rows) -> list[str]:
"""Subject weightage across eras."""
tweets = []
tweets.append(_trim(
"🧵 I analyzed 3,274 UPSC PYQs (1995-2025). Here's how subject weightage shifted over 3 decades.\n\n"
"The exam you're preparing for is NOT the same exam from 10 years ago.\n\n#UPSC"
))
old = _qs_by_era(rows, 1995, 2005)
new = _qs_by_era(rows, 2016, 2025)
old_subs = Counter(r.get("subject", "").strip() for r in old if r.get("subject"))
new_subs = Counter(r.get("subject", "").strip() for r in new if r.get("subject"))
lines = []
for s, n in new_subs.most_common(7):
old_pct = round(old_subs.get(s, 0) * 100 / len(old), 1)
new_pct = round(n * 100 / len(new), 1)
arrow = "📈" if new_pct > old_pct else "📉"
lines.append(f"{arrow} {s}: {old_pct}% → {new_pct}%")
tweets.append(_trim("\n".join(lines)))
# Biggest riser with proof question
q = _random_q(rows, subject="Environment", year="2024")
if q:
tweets.append(_trim(
"Environment went from 1.6% to 16.4% — a 10x increase.\n\n"
"Here's a 2024 Environment question to prove the complexity:\n\n"
f"{q['question'][:180]}"
))
# Biggest faller with old question
q_old = _random_q(rows, subject="History", year="1998")
if q_old:
tweets.append(_trim(
"History dropped from 18.5% to 9.0%.\n\n"
f"Compare this 1998 question:\n\"{q_old['question'][:150]}\"\n\n"
"vs today's multi-statement conceptual questions."
))
tweets.append(_trim(
"Key takeaway: if you're allocating study time based on 2010 patterns, you're leaving marks on the table.\n\n"
"Environment, Economy, Polity = where the marks moved.\n\n#UPSC #Prelims2026"
))
return tweets
def _thread_question_style(rows) -> list[str]:
"""How question style evolved."""
tweets = []
tweets.append(_trim(
"🧵 UPSC questions are 70% longer than they were in 2005.\n\n"
"Here's the data + real questions to prove it.\n\n#UPSC"
))
# Stats
old = _qs_by_era(rows, 1995, 2005)
new = _qs_by_era(rows, 2016, 2025)
old_ctf = sum(1 for r in old if "consider the following" in r.get("question", "").lower())
new_ctf = sum(1 for r in new if "consider the following" in r.get("question", "").lower())
tweets.append(_trim(
f"\"Consider the following\" questions:\n"
f"1995-2005: {round(old_ctf*100/len(old),1)}%\n"
f"2016-2025: {round(new_ctf*100/len(new),1)}%\n\n"
f"Multi-statement questions: 23.8% → 51.9%\n"
f"Avg question length: 151 → 256 chars"
))
# Old style example
short = sorted(old, key=lambda r: len(r["question"]))
if short:
q = short[0]
tweets.append(_trim(f"2000s style (direct recall):\n\n\"{q['question']}\"\n\nYou know it or you don't. 13 words."))
# New style example
long_new = [r for r in new if "consider the following" in r.get("question", "").lower()]
if long_new:
q = random.choice(long_new)
tweets.append(_trim(f"2024 style ({len(q['question'])} chars):\n\n\"{q['question'][:200]}...\"\n\nYou need to evaluate each statement independently."))
tweets.append(_trim(
"What this means: stop memorizing isolated facts.\n\n"
"UPSC now tests whether you UNDERSTAND concepts. Practice elimination — 44% of questions are designed for it.\n\n#UPSC"
))
return tweets
def _thread_rising_falling(rows) -> list[str]:
"""Rising vs falling topics."""
tweets = []
tweets.append(_trim(
"🧵 Which UPSC topics are RISING and which are DYING?\n\n"
"Data from 3,274 PYQs. Real questions as proof.\n\n#UPSC"
))
old = _qs_by_era(rows, 2006, 2015)
new = _qs_by_era(rows, 2016, 2025)
old_t = Counter(r.get("topicId", "") for r in old if r.get("topicId"))
new_t = Counter(r.get("topicId", "") for r in new if r.get("topicId"))
rising = []
falling = []
for t in set(list(old_t.keys()) + list(new_t.keys())):
if not t: continue
o, n = old_t.get(t, 0), new_t.get(t, 0)
if o > 3 and n > 3:
change = round((n - o) * 100 / o, 1) if o else 999
if change > 50: rising.append((t.replace("-", " ").title(), o, n, change))
elif change < -30: falling.append((t.replace("-", " ").title(), o, n, change))
rising.sort(key=lambda x: x[3], reverse=True)
falling.sort(key=lambda x: x[3])
r_lines = [f"📈 {t}: {o}→{n} (+{c}%)" for t, o, n, c in rising[:6]]
tweets.append(_trim("RISING topics:\n\n" + "\n".join(r_lines)))
f_lines = [f"📉 {t}: {o}→{n} ({c}%)" for t, o, n, c in falling[:6]]
tweets.append(_trim("FALLING topics:\n\n" + "\n".join(f_lines)))
# Proof question for top riser
if rising:
topic_id = rising[0][0].lower().replace(" ", "-")
q = _random_q(rows, topicId=topic_id)
if q:
tweets.append(_trim(f"Example — {rising[0][0]} (rose {rising[0][3]}%):\n\n{_fmt_q(q)[:240]}"))
tweets.append(_trim(
"Update your time allocation. The exam evolves — your prep should too.\n\n#UPSC #Prelims2026"
))
return tweets
def _thread_environment(rows) -> list[str]:
"""Environment's 10x growth."""
tweets = []
tweets.append(_trim(
"🧵 Environment went from 1.6% to 16.4% in UPSC Prelims.\n\n"
"That's a 10x increase. Here's the year-by-year data.\n\n#UPSC #Environment"
))
env_by_year = defaultdict(int)
total_by_year = defaultdict(int)
for r in rows:
y = r.get("year", "").strip()
if not y.isdigit(): continue
total_by_year[int(y)] += 1
if "Environment" in r.get("subject", ""):
env_by_year[int(y)] += 1
# Show 5-year averages
for era, y1, y2 in [("1995-2005", 1995, 2005), ("2006-2010", 2006, 2010),
("2011-2015", 2011, 2015), ("2016-2020", 2016, 2020), ("2021-2025", 2021, 2025)]:
env = sum(env_by_year[y] for y in range(y1, y2+1))
total = sum(total_by_year[y] for y in range(y1, y2+1))
pct = round(env * 100 / total, 1) if total else 0
# build inline
lines = []
for era, y1, y2 in [("1995-2005", 1995, 2005), ("2006-2010", 2006, 2010),
("2011-2015", 2011, 2015), ("2016-2020", 2016, 2020), ("2021-2025", 2021, 2025)]:
env = sum(env_by_year[y] for y in range(y1, y2+1))
total = sum(total_by_year[y] for y in range(y1, y2+1))
pct = round(env * 100 / total, 1) if total else 0
bar = "█" * (env // 2)
lines.append(f"{era}: {pct}% {bar}")
tweets.append(_trim("\n".join(lines)))
# 1995 question vs 2024
q_old = _random_q(rows, subject="Environment", year="1995")
q_new = _random_q(rows, subject="Environment", year="2024")
if q_old:
tweets.append(_trim(f"1995 Environment question:\n\n\"{q_old['question'][:200]}\"\n\nSimple factual recall."))
if q_new:
tweets.append(_trim(f"2024 Environment question:\n\n\"{q_new['question'][:200]}\"\n\nMulti-layered, policy + science + current affairs."))
tweets.append(_trim(
"If Environment isn't in your top 3 priority subjects, you're ignoring 16% of the paper.\n\n#UPSC"
))
return tweets
def _thread_answer_patterns(rows) -> list[str]:
"""Answer distribution analysis."""
tweets = []
tweets.append(_trim(
"🧵 I checked the correct answer distribution across 3,274 UPSC questions.\n\n"
"Is there really a pattern? Here's the data.\n\n#UPSC"
))
ans = Counter(r.get("correctIndex", "") for r in rows)
total = len(rows)
tweets.append(_trim(
f"Overall distribution:\n\n"
f"A: {ans.get('0',0)} ({round(ans.get('0',0)*100/total,1)}%)\n"
f"B: {ans.get('1',0)} ({round(ans.get('1',0)*100/total,1)}%)\n"
f"C: {ans.get('2',0)} ({round(ans.get('2',0)*100/total,1)}%)\n"
f"D: {ans.get('3',0)} ({round(ans.get('3',0)*100/total,1)}%)\n\n"
f"B and C slightly more common. But barely."
))
# By era
for era, y1, y2 in [("1995-2005", 1995, 2005), ("2016-2025", 2016, 2025)]:
eq = _qs_by_era(rows, y1, y2)
ea = Counter(r.get("correctIndex", "") for r in eq)
t = len(eq)
lines = [f"{l}: {round(ea.get(str(i),0)*100/t,1)}%" for i, l in enumerate("ABCD")]
tweets.append(_trim(f"{era}:\n" + " | ".join(lines) + "\n\nAlmost perfectly distributed."))
tweets.append(_trim(
"Verdict: UPSC doesn't favor any option consistently. Don't guess based on letter.\n\n"
"Your best strategy is elimination, not statistics.\n\n#UPSC"
))
return tweets
def _thread_topic_deep_dive(rows) -> list[str]:
"""Deep dive into a random high-frequency topic with real questions."""
topics = Counter(r.get("topicId", "") for r in rows if r.get("topicId"))
# Pick from top 15 topics randomly
top = [t for t, c in topics.most_common(15)]
topic = random.choice(top)
topic_name = topic.replace("-", " ").title()
topic_qs = [r for r in rows if r.get("topicId") == topic]
tweets = []
tweets.append(_trim(
f"🧵 UPSC has asked {len(topic_qs)} questions on {topic_name} since 1995.\n\n"
f"Here's what the data shows + real questions.\n\n#UPSC"
))
# By era
for era, y1, y2 in [("1995-2005", 1995, 2005), ("2006-2015", 2006, 2015), ("2016-2025", 2016, 2025)]:
c = len(_qs_by_era(topic_qs, y1, y2))
tweets.append("") if False else None # placeholder
era_counts = []
for era, y1, y2 in [("1995-2005", 1995, 2005), ("2006-2015", 2006, 2015), ("2016-2025", 2016, 2025)]:
c = len(_qs_by_era(topic_qs, y1, y2))
era_counts.append(f"{era}: {c} questions")
tweets.append(_trim(f"Frequency by era:\n\n" + "\n".join(era_counts)))
# 2-3 real questions (no answers)
samples = random.sample(topic_qs, min(2, len(topic_qs)))
for q in samples:
tweets.append(_trim(_fmt_q(q)))
tweets.append(_trim(
f"If {topic_name} keeps appearing, it's not coincidence — it's a priority area.\n\n"
f"Practice these PYQs topic-wise, not year-wise.\n\n#UPSC"
))
return [t for t in tweets if t] # remove None
def _thread_elimination_strategy(rows) -> list[str]:
"""Elimination-style questions analysis."""
tweets = []
tweets.append(_trim(
"🧵 44% of recent UPSC questions use elimination-style options.\n\n"
"\"1 and 2 only\" / \"2 and 3 only\" / \"All of the above\"\n\n"
"Here's how to handle them.\n\n#UPSC"
))
old = _qs_by_era(rows, 1995, 2005)
new = _qs_by_era(rows, 2016, 2025)
old_elim = sum(1 for r in old if any(x in (r.get("option1","")+r.get("option2","")+r.get("option3","")+r.get("option4","")).lower()
for x in ["1 and 2", "1 and 3", "2 and 3", "1, 2 and 3", "all of the above", "none of the above"]))
new_elim = sum(1 for r in new if any(x in (r.get("option1","")+r.get("option2","")+r.get("option3","")+r.get("option4","")).lower()
for x in ["1 and 2", "1 and 3", "2 and 3", "1, 2 and 3", "all of the above", "none of the above"]))
tweets.append(_trim(
f"Elimination questions:\n"
f"1995-2005: {round(old_elim*100/len(old),1)}%\n"
f"2016-2025: {round(new_elim*100/len(new),1)}%\n\n"
f"Nearly tripled. This is the dominant question format now."
))
# Real example
elim_qs = [r for r in new if "consider the following" in r.get("question", "").lower()
and any(x in (r.get("option1","")+r.get("option2","")+r.get("option3","")+r.get("option4","")).lower()
for x in ["1 and 2", "2 and 3"])]
if elim_qs:
q = random.choice(elim_qs)
tweets.append(_trim(f"Real example:\n\n{_fmt_q(q)[:250]}"))
tweets.append(_trim(
"Strategy: evaluate each statement as TRUE/FALSE independently. Then match the combination.\n\n"
"Don't read options first — decide on statements first, then look for your combination.\n\n#UPSC"
))
return tweets
# ── Main entry point ─────────────────────────────────────────────────────────
THREAD_GENERATORS = [
_thread_subject_weightage,
_thread_question_style,
_thread_rising_falling,
_thread_environment,
_thread_answer_patterns,
_thread_topic_deep_dive,
_thread_elimination_strategy,
]
def generate_daily_thread() -> list[str]:
"""Generate today's thread. Rotates through 7 themes."""
rows = _load_questions()
from datetime import datetime
day = datetime.now().timetuple().tm_yday
gen = THREAD_GENERATORS[day % len(THREAD_GENERATORS)]
logger.info("Thread theme: %s", gen.__name__)
tweets = gen(rows)
# Ensure all within 280 chars
return [_trim(t) for t in tweets if t]
FILE:scripts/youtube_main.py
"""YouTube Shorts Quiz Bot — Entry point.
Usage:
python -m bot.youtube_main # Post one Short
python -m bot.youtube_main --count 3 # Post 3 Shorts
python -m bot.youtube_main --dry-run # Generate video only (no upload)
"""
import argparse
import logging
import signal
import sys
from logging.handlers import RotatingFileHandler
from bot.csv_manager import get_next_question, get_unposted_count, mark_as_posted
from bot.html_video_generator import generate_html_video as generate_video
from bot.yt_config import LOGS_DIR
from bot.yt_shorts_poster import post_short
LOG_FORMAT = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
logger = logging.getLogger(__name__)
_shutdown = False
def _setup_logging():
root = logging.getLogger()
root.setLevel(logging.INFO)
console = logging.StreamHandler()
console.setFormatter(logging.Formatter(LOG_FORMAT))
root.addHandler(console)
log_file = LOGS_DIR / "youtube.log"
fh = RotatingFileHandler(log_file, maxBytes=5 * 1024 * 1024, backupCount=3)
fh.setFormatter(logging.Formatter(LOG_FORMAT))
root.addHandler(fh)
def _signal_handler(signum, frame):
global _shutdown
logger.info("Shutdown signal received — finishing current task...")
_shutdown = True
def process_one(dry_run: bool = False) -> bool:
"""Process one video: generate → upload → comment.
Args:
dry_run: If True, only generate video without uploading
Returns:
bool: True if should continue processing more videos
"""
result = get_next_question()
if result is None:
logger.info("No unposted questions remaining")
return False
index, question_data = result
logger.info("── Step 1/2: Generating video ──")
try:
video_path = generate_video(question_data)
except Exception:
logger.exception("Video generation failed — skipping question %d", index)
return True
if dry_run:
logger.info("── DRY RUN — skipping upload ──")
logger.info("Video saved at: %s", video_path)
logger.info("Question: %s", question_data.get("question", "")[:100])
return True
logger.info("── Step 2/2: Uploading to YouTube ──")
try:
video_id = post_short(video_path, question_data)
logger.info("Successfully posted Short! Video ID: %s", video_id)
logger.info("URL: https://youtube.com/shorts/%s", video_id)
except Exception:
logger.exception("YouTube upload failed — skipping question %d", index)
return True
mark_as_posted(index)
try:
video_path.unlink()
logger.info("Cleaned up video file: %s", video_path)
except OSError:
logger.warning("Failed to delete video file: %s", video_path)
return True
def main():
_setup_logging()
parser = argparse.ArgumentParser(description="YouTube Shorts Quiz Bot")
parser.add_argument("--count", type=int, default=1, help="Number of Shorts to post")
parser.add_argument(
"--dry-run",
action="store_true",
help="Generate videos only, don't upload",
)
args = parser.parse_args()
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
remaining = get_unposted_count()
logger.info("YouTube Shorts Bot started — %d unposted questions available", remaining)
# First run will trigger OAuth flow
if args.dry_run:
logger.info("DRY RUN mode — videos will be generated but not uploaded")
else:
logger.info("Authenticating with YouTube (browser may open on first run)")
posted = 0
for i in range(args.count):
if _shutdown:
logger.info("Shutting down gracefully after %d Short(s)", posted)
break
logger.info("━━━ Processing Short %d/%d ━━━", i + 1, args.count)
if not process_one(dry_run=args.dry_run):
break
posted += 1
action = "generated" if args.dry_run else "posted"
logger.info("Done! %d Short(s) %s", posted, action)
if __name__ == "__main__":
main()
FILE:scripts/yt_auth.py
"""YouTube OAuth 2.0 authentication helper.
This module handles the OAuth flow for YouTube Data API access.
On first run, it will open a browser for user authorization.
Subsequent runs will use the saved token from youtube_token.json.
Supports both:
- Pickle format (local development)
- JSON format (GitHub Actions / CI/CD)
"""
import json
import logging
import os
import pickle
from pathlib import Path
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from bot.yt_config import (
CLIENT_SECRETS_FILE,
TOKEN_FILE,
YOUTUBE_API_SERVICE_NAME,
YOUTUBE_API_VERSION,
YOUTUBE_SCOPES,
)
logger = logging.getLogger(__name__)
def get_authenticated_service():
"""Authenticate and return YouTube API service object.
Returns:
googleapiclient.discovery.Resource: Authenticated YouTube API service
Raises:
FileNotFoundError: If client_secrets.json is missing
Exception: If authentication fails
"""
if not CLIENT_SECRETS_FILE.exists():
raise FileNotFoundError(
f"Missing {CLIENT_SECRETS_FILE}. "
"Download it from Google Cloud Console and place it in the project root."
)
credentials = None
# Load saved token if it exists (supports both pickle and JSON)
if TOKEN_FILE.exists():
logger.info("Loading saved credentials from %s", TOKEN_FILE)
try:
# Try pickle format first (local development)
with open(TOKEN_FILE, "rb") as token:
credentials = pickle.load(token)
except (pickle.UnpicklingError, EOFError):
# Fall back to JSON format (GitHub Actions)
logger.info("Pickle failed, trying JSON format")
with open(TOKEN_FILE, "r") as token:
token_data = json.load(token)
credentials = Credentials(
token=token_data.get("token"),
refresh_token=token_data.get("refresh_token"),
token_uri=token_data.get("token_uri"),
client_id=token_data.get("client_id"),
client_secret=token_data.get("client_secret"),
scopes=token_data.get("scopes"),
)
# If no valid credentials, authenticate
if not credentials or not credentials.valid:
if credentials and credentials.expired and credentials.refresh_token:
logger.info("Refreshing expired credentials")
credentials.refresh(Request())
else:
logger.info("Starting OAuth flow (browser will open)")
flow = InstalledAppFlow.from_client_secrets_file(
CLIENT_SECRETS_FILE, YOUTUBE_SCOPES
)
credentials = flow.run_local_server(
port=8080,
prompt="consent",
authorization_prompt_message="Please visit this URL to authorize: {url}",
)
# Save credentials for next run
with open(TOKEN_FILE, "wb") as token:
pickle.dump(credentials, token)
logger.info("Saved credentials to %s", TOKEN_FILE)
logger.info("Building YouTube API service")
return build(
YOUTUBE_API_SERVICE_NAME,
YOUTUBE_API_VERSION,
credentials=credentials,
)
def revoke_credentials():
"""Revoke saved credentials (useful for testing or switching accounts)."""
if TOKEN_FILE.exists():
TOKEN_FILE.unlink()
logger.info("Revoked credentials (deleted %s)", TOKEN_FILE)
else:
logger.info("No saved credentials to revoke")
FILE:scripts/yt_config.py
import random
from pathlib import Path
# ── Paths ─────────────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).resolve().parent.parent
CLIENT_SECRETS_FILE = BASE_DIR / "client_secrets.json"
TOKEN_FILE = BASE_DIR / "youtube_token.json"
OUTPUT_DIR = BASE_DIR / "output"
LOGS_DIR = BASE_DIR / "logs"
CSV_PATH = BASE_DIR / "data" / "questions.csv"
OUTPUT_DIR.mkdir(exist_ok=True)
LOGS_DIR.mkdir(exist_ok=True)
# ── YouTube API ───────────────────────────────────────────────────────────────
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"
YOUTUBE_SCOPES = [
"https://www.googleapis.com/auth/youtube.upload",
"https://www.googleapis.com/auth/youtube.force-ssl",
]
# Privacy: "private", "public", or "unlisted"
VIDEO_PRIVACY = "public"
# Category ID for Education
YOUTUBE_CATEGORY_EDUCATION = "27"
def build_title(question_data: dict) -> str:
"""Build YouTube video title — format-aware, lowercase, engaging, under 100 chars + #Shorts."""
from bot.html_video_generator import get_video_format_for_day
from bot.formatter import format_yt_title
fmt = get_video_format_for_day()
FORMAT_TITLES = {
"timer_challenge": "90% fail this UPSC question. can you get it right? #Shorts",
"shocking_stat": "went through 3274 UPSC questions. this pattern blew my mind #Shorts",
"comparison": "₹0 vs ₹2 lakh UPSC prep — which one actually works? #Shorts",
"loop_format": "answer in 3 seconds. bet you can't #Shorts",
}
if fmt in FORMAT_TITLES:
return FORMAT_TITLES[fmt][:100]
# Fallback: existing title logic
title = format_yt_title(question_data)
return title[:100]
def build_description(question_data: dict) -> str:
"""Build YouTube video description — format-aware, casual 2-3 lines + site link."""
from bot.html_video_generator import get_video_format_for_day
from bot.formatter import format_yt_description
fmt = get_video_format_for_day()
FORMAT_DESCRIPTIONS = {
"timer_challenge": (
"90% of UPSC aspirants get this question wrong on their first try.\n\n"
"watch to the end to see the answer + explanation.\n\n"
"free pyq practice: {BRAND_URL}"
),
"shocking_stat": (
"analyzed 3274 UPSC questions from 1995–2025. the patterns are wild.\n\n"
"sharing so you don't have to figure it out the hard way.\n\n"
"free pyq practice: {BRAND_URL}"
),
"comparison": (
"₹2 lakh coaching vs ₹0 self-study — the data on what actually works.\n\n"
"bookmark this and share with every UPSC aspirant you know.\n\n"
"free pyq practice: {BRAND_URL}"
),
"loop_format": (
"can you answer this UPSC question in 3 seconds?\n\n"
"follow for a new question every day — your daily exam-mode practice.\n\n"
"free pyq practice: {BRAND_URL}"
),
}
if fmt in FORMAT_DESCRIPTIONS:
return FORMAT_DESCRIPTIONS[fmt]
# Fallback: existing description logic
return format_yt_description(question_data)
FILE:scripts/yt_shorts_poster.py
"""YouTube Shorts uploader module.
This module handles uploading video files to YouTube as Shorts.
Videos are automatically classified as Shorts if they are:
- Vertical (9:16 aspect ratio)
- Under 60 seconds duration
"""
import logging
import time
from pathlib import Path
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaFileUpload
from bot.yt_auth import get_authenticated_service
from bot.yt_config import (
VIDEO_PRIVACY,
YOUTUBE_CATEGORY_EDUCATION,
build_description,
build_title,
)
logger = logging.getLogger(__name__)
MAX_RETRIES = 3
BACKOFF_BASE = 2
def build_answer_comment(question_data: dict) -> str:
"""Build answer comment text from question data."""
answer_letter = question_data["answer"].strip().upper()
answer_map = {"A": "option_a", "B": "option_b", "C": "option_c", "D": "option_d"}
answer_key = answer_map.get(answer_letter, "option_a")
answer_text = question_data[answer_key]
explanation = question_data.get("explanation", "")
comment = f"✅ Answer: {answer_letter}) {answer_text}"
if explanation:
comment += f"\n\n💡 Explanation:\n{explanation}"
return comment
def upload_video(
video_path: Path,
question_data: dict,
privacy: str = VIDEO_PRIVACY,
) -> str:
"""Upload video to YouTube as a Short.
Args:
video_path: Path to video file
question_data: Question metadata for title/description
privacy: Video privacy status ("public", "private", or "unlisted")
Returns:
str: YouTube video ID
Raises:
FileNotFoundError: If video file doesn't exist
HttpError: If upload fails
"""
if not video_path.exists():
raise FileNotFoundError(f"Video file not found: {video_path}")
youtube = get_authenticated_service()
title = build_title(question_data)
description = build_description(question_data)
body = {
"snippet": {
"title": title,
"description": description,
"tags": [
"UPSC",
"IAS",
"Civil Services",
"Quiz",
"Previous Year Question",
"{BRAND_NAME}",
"Shorts",
],
"categoryId": YOUTUBE_CATEGORY_EDUCATION,
},
"status": {
"privacyStatus": privacy,
"selfDeclaredMadeForKids": False,
},
}
media = MediaFileUpload(
str(video_path),
mimetype="video/mp4",
resumable=True,
chunksize=1024 * 1024, # 1MB chunks
)
logger.info("Uploading video: %s", video_path.name)
logger.info("Title: %s", title)
for attempt in range(1, MAX_RETRIES + 1):
try:
request = youtube.videos().insert(
part="snippet,status",
body=body,
media_body=media,
)
response = None
while response is None:
status, response = request.next_chunk()
if status:
progress = int(status.progress() * 100)
logger.info("Upload progress: %d%%", progress)
video_id = response["id"]
logger.info("Upload complete! Video ID: %s", video_id)
logger.info("URL: https://youtube.com/shorts/%s", video_id)
return video_id
except HttpError as e:
wait = BACKOFF_BASE**attempt
logger.warning(
"Upload failed (attempt %d/%d): %s — retrying in %ds",
attempt,
MAX_RETRIES,
e,
wait,
)
if attempt == MAX_RETRIES:
raise
time.sleep(wait)
raise RuntimeError("Upload failed after all retries")
def comment_on_video(video_id: str, text: str) -> str:
"""Post a comment on a YouTube video.
Args:
video_id: YouTube video ID
text: Comment text
Returns:
str: Comment ID
Raises:
HttpError: If comment posting fails
"""
youtube = get_authenticated_service()
body = {
"snippet": {
"videoId": video_id,
"topLevelComment": {
"snippet": {
"textOriginal": text,
}
},
}
}
for attempt in range(1, MAX_RETRIES + 1):
try:
request = youtube.commentThreads().insert(
part="snippet",
body=body,
)
response = request.execute()
comment_id = response["id"]
logger.info("Posted comment %s on video %s", comment_id, video_id)
return comment_id
except HttpError as e:
wait = BACKOFF_BASE**attempt
logger.warning(
"Comment posting failed (attempt %d/%d): %s — retrying in %ds",
attempt,
MAX_RETRIES,
e,
wait,
)
if attempt == MAX_RETRIES:
raise
time.sleep(wait)
raise RuntimeError("Comment posting failed after all retries")
def post_short(
video_path: Path,
question_data: dict,
add_answer_comment: bool = True,
) -> str:
"""Upload a Short to YouTube and optionally comment with the answer.
Args:
video_path: Path to video file
question_data: Question metadata
add_answer_comment: Whether to post answer as comment
Returns:
str: YouTube video ID
"""
video_id = upload_video(video_path, question_data)
if add_answer_comment:
try:
comment_text = build_answer_comment(question_data)
comment_on_video(video_id, comment_text)
except Exception:
logger.exception("Failed to post answer comment on %s", video_id)
return video_id