package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)
const (
wikidataSparql = "https://query.wikidata.org/sparql"
wikiBatchSize = 30
wikiDelay = 2 * time.Second // ~15 req/min, safe under 20 req/min limit
wikiMaxRetries = 3
wikiRetryBackoff = 15 * time.Second
)
var wikiClient = &http.Client{Timeout: 120 * time.Second}
// wikiAcc accumulates per-ID results from a SPARQL batch.
type wikiAcc struct {
wikiArticle string
title string
}
// type alias for SPARQL JSON response
type sparqlResponse struct {
Results struct {
BindingList []map[string]jsonNode `json:"bindings"`
} `json:"results"`
}
type jsonNode struct {
Type string `json:"type"`
Value string `json:"value"`
Lang string `json:"xml:lang,omitempty"`
}
// getMissingWikiArticles returns imdb_ids where wiki_article IS NULL.
func (a *App) getMissingWikiArticles() ([]string, error) {
rows, err := a.DB.Query(`SELECT imdb_id FROM imdb WHERE wiki_article IS NULL AND has_no_wiki_article = 0 AND imdb_id LIKE 'tt%'`)
if err != nil {
return nil, fmt.Errorf("query missing wiki articles: %w", err)
}
defer rows.Close()
var ids []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, fmt.Errorf("scan imdb_id: %w", err)
}
ids = append(ids, id)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("rows iteration: %w", err)
}
return ids, nil
}
// wikiArticleFetch carries a resolved wiki article to the data consumer.
type wikiArticleFetch struct {
imdbID string
name string // decoded wiki article name
}
// fetchWikiArticles pipelines SPARQL batches with wiki data extraction.
// SPARQL producer feeds batches into a channel; DB commits per batch.
// Resolved articles are forwarded to a wiki data consumer running in parallel.
func (a *App) fetchWikiArticles() error {
ids, err := a.getMissingWikiArticles()
if err != nil {
return err
}
if len(ids) == 0 {
logInfo("fetchWikiArticles: all entries have wiki_article, skipping")
return nil
}
// Also pre-load existing entries that need wiki data extraction
existing, err := a.getExistingWikiArticles()
if err != nil {
return err
}
// Channel for wiki article fetch tasks (buffered for pipelining)
artCh := make(chan wikiArticleFetch, len(existing)+len(ids)+1)
// Consumer processes wiki data extraction in background
consumerDone := make(chan struct{})
go a.wikiDataConsumer(artCh, consumerDone)
// Feed existing entries to consumer first
for _, e := range existing {
artCh <- wikiArticleFetch{imdbID: e.imdbID, name: e.wikiArticle}
}
// SPARQL producer: fetches batches, commits to DB, forwards resolved
sparqlDone := make(chan struct{})
go a.sparqlPipeline(ids, artCh, sparqlDone)
// Wait for SPARQL to finish
<-sparqlDone
// Close channel and wait for consumer to drain
close(artCh)
<-consumerDone
logInfo("fetchWikiArticles: pipeline complete")
return nil
}
type existingWikiArticle struct {
imdbID string
wikiArticle string
}
// getExistingWikiArticles returns entries that have a wiki_article but need data extraction.
func (a *App) getExistingWikiArticles() ([]existingWikiArticle, error) {
rows, err := a.DB.Query(`
SELECT imdb_id, wiki_article FROM imdb
WHERE wiki_article IS NOT NULL
AND wiki_status_code != 404
AND (synopsis IS NULL OR description IS NULL OR year IS NULL
OR poster_url IS NULL OR license IS NULL OR license_url IS NULL OR num_accolades IS NULL OR has_people = 0)
`)
if err != nil {
return nil, fmt.Errorf("query existing wiki articles: %w", err)
}
defer rows.Close()
var articles []existingWikiArticle
for rows.Next() {
var e existingWikiArticle
if err := rows.Scan(&e.imdbID, &e.wikiArticle); err != nil {
return nil, fmt.Errorf("scan existing wiki: %w", err)
}
articles = append(articles, e)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("rows iteration: %w", err)
}
return articles, nil
}
// sparqlPipeline fetches SPARQL batches, commits each to DB, forwards resolved articles.
func (a *App) sparqlPipeline(ids []string, artCh chan<- wikiArticleFetch, done chan<- struct{}) {
defer close(done)
logInfo("fetchWikiArticles: %d entries missing wiki_article", len(ids))
batchNum := 0
totalResolved := 0
totalNoWiki := 0
for i := 0; i < len(ids); i += wikiBatchSize {
batchNum++
chunk := ids[i:min(i+wikiBatchSize, len(ids))]
results, err := a.queryWikidataBatch(chunk)
if err != nil {
logWarn("sparql batch %d error: %v, marking all as no-wiki", batchNum, err)
// Mark all in failed batch as no-wiki
tx, _ := a.DB.Begin()
stmt, _ := tx.Prepare(`UPDATE imdb SET has_no_wiki_article = 1 WHERE imdb_id = ?`)
for _, id := range chunk {
stmt.Exec(id)
totalNoWiki++
}
stmt.Close()
tx.Commit()
continue
}
// Commit batch to DB
tx, err := a.DB.Begin()
if err != nil {
logWarn("sparql batch %d begin tx error: %v", batchNum, err)
continue
}
wikiStmt, err := tx.Prepare(`UPDATE imdb SET wiki_article = ? WHERE imdb_id = ?`)
if err != nil {
tx.Rollback()
logWarn("sparql batch %d prepare error: %v", batchNum, err)
continue
}
noWikiStmt, err := tx.Prepare(`UPDATE imdb SET has_no_wiki_article = 1 WHERE imdb_id = ?`)
if err != nil {
wikiStmt.Close()
tx.Rollback()
logWarn("sparql batch %d prepare no_wiki error: %v", batchNum, err)
continue
}
resolved := 0
for _, imdbID := range chunk {
acc, found := results[imdbID]
if found && acc.title != "" {
wikiStmt.Exec(acc.title, imdbID)
resolved++
// Forward to consumer (blocking — backpressures SPARQL if consumer is slow)
artCh <- wikiArticleFetch{imdbID: imdbID, name: acc.title}
} else {
noWikiStmt.Exec(imdbID)
totalNoWiki++
}
}
totalResolved += resolved
wikiStmt.Close()
noWikiStmt.Close()
tx.Commit()
logInfo("fetchWikiArticles: sparql batch %d/%d - %d/%d resolved",
batchNum, (len(ids)+wikiBatchSize-1)/wikiBatchSize, resolved, len(chunk))
// Rate limit between SPARQL requests
if i+wikiBatchSize < len(ids) {
time.Sleep(wikiDelay)
}
}
logInfo("fetchWikiArticles: SPARQL done - %d resolved, %d no-wiki", totalResolved, totalNoWiki)
}
// wikiDataConsumer fetches wiki article data from the custom server.
func (a *App) wikiDataConsumer(artCh <-chan wikiArticleFetch, done chan<- struct{}) {
defer close(done)
updated := 0
skipped := 0
for art := range artCh {
// Wait between requests (rate limit)
time.Sleep(2 * time.Second)
entry, statusCode, err := a.queryWikiArticle(art.name)
if err != nil {
logWarn("wiki error %s (%s): HTTP %d - %v", art.imdbID, art.name, statusCode, err)
skipped++
}
// Record status code
if statusCode > 0 {
a.DB.Exec(`UPDATE imdb SET wiki_status_code = ? WHERE imdb_id = ?`, statusCode, art.imdbID)
}
// Update data fields on success
if statusCode == 200 {
a.DB.Exec(`
UPDATE imdb SET synopsis = ?, description = ?, year = ?, poster_url = ?,
license = ?, license_url = ?, num_accolades = ?
WHERE imdb_id = ?`,
entry.Synopsis, entry.Description, entry.Year, entry.PosterURL,
entry.License, entry.LicenseURL, entry.NumAccolades, art.imdbID)
updated++
}
// Insert people (actors, directors, screenwriters)
if statusCode == 200 && len(entry.People) > 0 {
if err := a.insertWikiPeople(art.imdbID, entry.People); err != nil {
logWarn("insert people error %s: %v", art.imdbID, err)
}
a.DB.Exec(`UPDATE imdb SET has_people = 1 WHERE imdb_id = ?`, art.imdbID)
}
}
logInfo("fetchWikiArticlesData: %d updated, %d skipped (non-200)", updated, skipped)
}
// insertWikiPeople upserts people into people/who tables.
func (a *App) insertWikiPeople(imdbID string, people []wikiPerson) error {
// Get the DB row id for this imdb_id
var dbID int
if err := a.DB.Get(&dbID, `SELECT id FROM imdb WHERE imdb_id = ?`, imdbID); err != nil {
return fmt.Errorf("lookup imdb id for %s: %w", imdbID, err)
}
for _, p := range people {
// Upsert person
var personID int
err := a.DB.Get(&personID, `SELECT id FROM people WHERE name = ?`, p.Name)
if err != nil {
// Insert new person
result, err := a.DB.Exec(`INSERT INTO people (name) VALUES (?)`, p.Name)
if err != nil {
continue // skip on conflict
}
if id, err := result.LastInsertId(); err == nil {
personID = int(id)
}
}
// Insert who relationship (unique on imdb_id, people_id, profession_id)
a.DB.Exec(`INSERT IGNORE INTO who (imdb_id, people_id, profession_id) VALUES (?, ?, ?)`,
dbID, personID, p.Profession)
}
return nil
}
// fetchWikiArticlesData fetches wiki article data from the custom server for all
// entries that have a wiki_article but need data extraction. Callable independently.
func (a *App) fetchWikiArticlesData() error {
existing, err := a.getExistingWikiArticles()
if err != nil {
return err
}
if len(existing) == 0 {
logInfo("fetchWikiArticlesData: all entries complete, skipping")
return nil
}
logInfo("fetchWikiArticlesData: %d entries need wiki data", len(existing))
artCh := make(chan wikiArticleFetch, len(existing))
consumerDone := make(chan struct{})
go a.wikiDataConsumer(artCh, consumerDone)
for _, e := range existing {
artCh <- wikiArticleFetch{imdbID: e.imdbID, name: e.wikiArticle}
}
close(artCh)
<-consumerDone
return nil
}
// queryWikidataBatch sends a SPARQL query for the given IDs and returns a map of id -> wikiAcc.
func (a *App) queryWikidataBatch(ids []string) (map[string]wikiAcc, error) {
sparql := buildSparql(ids)
time.Sleep(wikiDelay)
endpoint := wikidataSparql + "?" + url.Values{
"query": {sparql},
"format": {"json"},
}.Encode()
start := time.Now()
raw, err := doGETWithRetry(endpoint, a.Config.UserAgent)
if err != nil {
logHTTPRequest("GET", endpoint, 0, time.Since(start).Seconds())
return nil, fmt.Errorf("SPARQL request: %w", err)
}
logHTTPRequest("GET", endpoint, 200, time.Since(start).Seconds())
var data sparqlResponse
if err := json.Unmarshal(raw, &data); err != nil {
return nil, fmt.Errorf("SPARQL JSON parse: %w", err)
}
results := make(map[string]wikiAcc)
for _, b := range data.Results.BindingList {
imdb := b["imdbVal"].Value
if !strings.HasPrefix(imdb, "tt") {
continue
}
if n, ok := b["article"]; ok && n.Value != "" {
// Extract title from URL: strip "https://en.wikipedia.org/wiki/"
title, _ := url.PathUnescape(strings.TrimPrefix(n.Value, "https://en.wikipedia.org/wiki/"))
results[imdb] = wikiAcc{
wikiArticle: n.Value,
title: title,
}
}
}
return results, nil
}
// buildSparql creates a SPARQL query that resolves IMDb IDs to English Wikipedia article URLs.
func buildSparql(ids []string) string {
var vals strings.Builder
for _, id := range ids {
vals.WriteString(fmt.Sprintf(" \"%s\"\n", id))
}
return fmt.Sprintf(`PREFIX schema:
PREFIX wdt:
SELECT ?imdbVal ?article
WHERE {
VALUES ?imdbVal {
%s
}
?item wdt:P345 ?imdbVal .
OPTIONAL {
?article schema:about ?item ;
schema:isPartOf .
}
}
ORDER BY ?imdbVal`, vals.String())
}
// doGET performs a GET request with the configured User-Agent.
func doGET(uri, userAgent string) ([]byte, error) {
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", userAgent)
resp, err := wikiClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2048))
return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, body)
}
return io.ReadAll(resp.Body)
}
// doGETWithRetry retries on 5xx errors.
func doGETWithRetry(uri, userAgent string) ([]byte, error) {
var lastErr error
for attempt := 0; attempt < wikiMaxRetries; attempt++ {
if attempt > 0 {
backoff := wikiRetryBackoff * time.Duration(1<<(attempt-1))
logDebug(" retry %d/%d after %v", attempt+1, wikiMaxRetries, backoff)
time.Sleep(backoff)
}
raw, err := doGET(uri, userAgent)
if err == nil {
return raw, nil
}
lastErr = err
if !strings.Contains(err.Error(), "HTTP 5") {
break
}
}
return nil, lastErr
}
// min returns the smaller of a and b.
func min(a, b int) int {
if a < b {
return a
}
return b
}