package main import ( "encoding/json" "fmt" "io" "log" "net/http" "net/url" "strings" "time" ) const ( wikidataSparql = "https://query.wikidata.org/sparql" wikiBatchSize = 30 wikiDelay = 2 * time.Second // ~15 req/min, safe under 20 req/min limit wikiMaxRetries = 3 wikiRetryBackoff = 15 * time.Second ) var wikiClient = &http.Client{Timeout: 120 * time.Second} // wikiAcc accumulates per-ID results from a SPARQL batch. type wikiAcc struct { wikiArticle string title string } // type alias for SPARQL JSON response type sparqlResponse struct { Results struct { BindingList []map[string]jsonNode `json:"bindings"` } `json:"results"` } type jsonNode struct { Type string `json:"type"` Value string `json:"value"` Lang string `json:"xml:lang,omitempty"` } // getMissingWikiArticles returns imdb_ids where wiki_article IS NULL. func (a *App) getMissingWikiArticles() ([]string, error) { rows, err := a.DB.Query(`SELECT imdb_id FROM imdb WHERE wiki_article IS NULL AND has_no_wiki_article = 0 AND imdb_id LIKE 'tt%'`) if err != nil { return nil, fmt.Errorf("query missing wiki articles: %w", err) } defer rows.Close() var ids []string for rows.Next() { var id string if err := rows.Scan(&id); err != nil { return nil, fmt.Errorf("scan imdb_id: %w", err) } ids = append(ids, id) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("rows iteration: %w", err) } return ids, nil } // wikiArticleFetch carries a resolved wiki article to the data consumer. type wikiArticleFetch struct { imdbID string name string // decoded wiki article name } // fetchWikiArticles pipelines SPARQL batches with wiki data extraction. // SPARQL producer feeds batches into a channel; DB commits per batch. // Resolved articles are forwarded to a wiki data consumer running in parallel. func (a *App) fetchWikiArticles() error { ids, err := a.getMissingWikiArticles() if err != nil { return err } if len(ids) == 0 { log.Println("fetchWikiArticles: all entries have wiki_article, skipping") return nil } // Also pre-load existing entries that need wiki data extraction existing, err := a.getExistingWikiArticles() if err != nil { return err } // Channel for wiki article fetch tasks (buffered for pipelining) artCh := make(chan wikiArticleFetch, wikiBatchSize*2) // Consumer processes wiki data extraction in background consumerDone := make(chan struct{}) go a.wikiDataConsumer(artCh, consumerDone) // Feed existing entries to consumer first for _, e := range existing { select { case artCh <- wikiArticleFetch{imdbID: e.imdbID, name: e.wikiArticle}: default: // Channel full, will pick up on next run log.Printf("wiki data channel full, deferring %s", e.imdbID) } } // SPARQL producer: fetches batches, commits to DB, forwards resolved sparqlDone := make(chan struct{}) go a.sparqlPipeline(ids, artCh, sparqlDone) // Wait for SPARQL to finish <-sparqlDone // Close channel and wait for consumer to drain close(artCh) <-consumerDone log.Printf("fetchWikiArticles: pipeline complete") return nil } type existingWikiArticle struct { imdbID string wikiArticle string } // getExistingWikiArticles returns entries that have a wiki_article but need data extraction. func (a *App) getExistingWikiArticles() ([]existingWikiArticle, error) { rows, err := a.DB.Query(` SELECT imdb_id, wiki_article FROM imdb WHERE wiki_article IS NOT NULL AND wiki_status_code != 404 AND (synopsis IS NULL OR description IS NULL OR year IS NULL OR poster_url IS NULL OR license IS NULL OR license_url IS NULL OR num_accolades IS NULL) `) if err != nil { return nil, fmt.Errorf("query existing wiki articles: %w", err) } defer rows.Close() var articles []existingWikiArticle for rows.Next() { var e existingWikiArticle if err := rows.Scan(&e.imdbID, &e.wikiArticle); err != nil { return nil, fmt.Errorf("scan existing wiki: %w", err) } articles = append(articles, e) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("rows iteration: %w", err) } return articles, nil } // sparqlPipeline fetches SPARQL batches, commits each to DB, forwards resolved articles. func (a *App) sparqlPipeline(ids []string, artCh chan<- wikiArticleFetch, done chan<- struct{}) { defer close(done) log.Printf("fetchWikiArticles: %d entries missing wiki_article", len(ids)) batchNum := 0 totalResolved := 0 totalNoWiki := 0 for i := 0; i < len(ids); i += wikiBatchSize { batchNum++ chunk := ids[i:min(i+wikiBatchSize, len(ids))] results, err := a.queryWikidataBatch(chunk) if err != nil { log.Printf("sparql batch %d error: %v, marking all as no-wiki", batchNum, err) // Mark all in failed batch as no-wiki tx, _ := a.DB.Begin() stmt, _ := tx.Prepare(`UPDATE imdb SET has_no_wiki_article = 1 WHERE imdb_id = ?`) for _, id := range chunk { stmt.Exec(id) totalNoWiki++ } stmt.Close() tx.Commit() continue } // Commit batch to DB tx, err := a.DB.Begin() if err != nil { log.Printf("sparql batch %d begin tx error: %v", batchNum, err) continue } wikiStmt, err := tx.Prepare(`UPDATE imdb SET wiki_article = ? WHERE imdb_id = ?`) if err != nil { tx.Rollback() log.Printf("sparql batch %d prepare error: %v", batchNum, err) continue } noWikiStmt, err := tx.Prepare(`UPDATE imdb SET has_no_wiki_article = 1 WHERE imdb_id = ?`) if err != nil { wikiStmt.Close() tx.Rollback() log.Printf("sparql batch %d prepare no_wiki error: %v", batchNum, err) continue } resolved := 0 for _, imdbID := range chunk { acc, found := results[imdbID] if found && acc.title != "" { wikiStmt.Exec(acc.title, imdbID) resolved++ // Forward to consumer select { case artCh <- wikiArticleFetch{imdbID: imdbID, name: acc.title}: default: log.Printf("wiki data channel full, deferring %s", imdbID) } } else { noWikiStmt.Exec(imdbID) totalNoWiki++ } } totalResolved += resolved wikiStmt.Close() noWikiStmt.Close() tx.Commit() log.Printf("fetchWikiArticles: sparql batch %d/%d - %d/%d resolved", batchNum, (len(ids)+wikiBatchSize-1)/wikiBatchSize, resolved, len(chunk)) // Rate limit between SPARQL requests if i+wikiBatchSize < len(ids) { time.Sleep(wikiDelay) } } log.Printf("fetchWikiArticles: SPARQL done - %d resolved, %d no-wiki", totalResolved, totalNoWiki) } // wikiDataConsumer fetches wiki article data from the custom server. func (a *App) wikiDataConsumer(artCh <-chan wikiArticleFetch, done chan<- struct{}) { defer close(done) updated := 0 skipped := 0 for art := range artCh { // Wait between requests (rate limit) time.Sleep(2 * time.Second) entry, statusCode, err := a.queryWikiArticle(art.name) if err != nil { log.Printf("wiki error %s (%s): HTTP %d - %v", art.imdbID, art.name, statusCode, err) skipped++ } // Record status code if statusCode > 0 { a.DB.Exec(`UPDATE imdb SET wiki_status_code = ? WHERE imdb_id = ?`, statusCode, art.imdbID) } // Update data fields on success if statusCode == 200 { a.DB.Exec(` UPDATE imdb SET synopsis = ?, description = ?, year = ?, poster_url = ?, license = ?, license_url = ?, num_accolades = ? WHERE imdb_id = ?`, entry.Synopsis, entry.Description, entry.Year, entry.PosterURL, entry.License, entry.LicenseURL, entry.NumAccolades, art.imdbID) updated++ } } log.Printf("fetchWikiArticlesData: %d updated, %d skipped (non-200)", updated, skipped) } // queryWikidataBatch sends a SPARQL query for the given IDs and returns a map of id -> wikiAcc. func (a *App) queryWikidataBatch(ids []string) (map[string]wikiAcc, error) { sparql := buildSparql(ids) time.Sleep(wikiDelay) endpoint := wikidataSparql + "?" + url.Values{ "query": {sparql}, "format": {"json"}, }.Encode() raw, err := doGETWithRetry(endpoint, a.Config.UserAgent) if err != nil { return nil, fmt.Errorf("SPARQL request: %w", err) } var data sparqlResponse if err := json.Unmarshal(raw, &data); err != nil { return nil, fmt.Errorf("SPARQL JSON parse: %w", err) } results := make(map[string]wikiAcc) for _, b := range data.Results.BindingList { imdb := b["imdbVal"].Value if !strings.HasPrefix(imdb, "tt") { continue } if n, ok := b["article"]; ok && n.Value != "" { // Extract title from URL: strip "https://en.wikipedia.org/wiki/" title, _ := url.PathUnescape(strings.TrimPrefix(n.Value, "https://en.wikipedia.org/wiki/")) results[imdb] = wikiAcc{ wikiArticle: n.Value, title: title, } } } return results, nil } // buildSparql creates a SPARQL query that resolves IMDb IDs to English Wikipedia article URLs. func buildSparql(ids []string) string { var vals strings.Builder for _, id := range ids { vals.WriteString(fmt.Sprintf(" \"%s\"\n", id)) } return fmt.Sprintf(`PREFIX schema: PREFIX wdt: SELECT ?imdbVal ?article WHERE { VALUES ?imdbVal { %s } ?item wdt:P345 ?imdbVal . OPTIONAL { ?article schema:about ?item ; schema:isPartOf . } } ORDER BY ?imdbVal`, vals.String()) } // doGET performs a GET request with the configured User-Agent. func doGET(uri, userAgent string) ([]byte, error) { req, err := http.NewRequest("GET", uri, nil) if err != nil { return nil, err } req.Header.Set("User-Agent", userAgent) resp, err := wikiClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(io.LimitReader(resp.Body, 2048)) return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, body) } return io.ReadAll(resp.Body) } // doGETWithRetry retries on 5xx errors. func doGETWithRetry(uri, userAgent string) ([]byte, error) { var lastErr error for attempt := 0; attempt < wikiMaxRetries; attempt++ { if attempt > 0 { backoff := wikiRetryBackoff * time.Duration(1<<(attempt-1)) log.Printf(" retry %d/%d after %v", attempt+1, wikiMaxRetries, backoff) time.Sleep(backoff) } raw, err := doGET(uri, userAgent) if err == nil { return raw, nil } lastErr = err if !strings.Contains(err.Error(), "HTTP 5") { break } } return nil, lastErr } // min returns the smaller of a and b. func min(a, b int) int { if a < b { return a } return b }