summaryrefslogtreecommitdiff
path: root/src/wikidata.go
diff options
context:
space:
mode:
authordev2026-06-26 03:26:07 +0200
committerdev2026-06-26 03:26:07 +0200
commit8e2d742e59b3923852e1ef6e7a5e2ee1de14ce45 (patch)
tree9cbcab194f21431255222c7785e15795e494a728 /src/wikidata.go
parentf3ed9396113de4d81ae0fadabc10ae59f7076862 (diff)
downloadhnimdbbot-8e2d742e59b3923852e1ef6e7a5e2ee1de14ce45.tar.gz
refactor: pipeline SPARQL and wiki data in parallel
- Merge fetchWikiArticles + fetchWikiArticlesData into one pipeline - SPARQL producer fetches batches, commits each to DB, forwards resolved articles - Wiki data consumer runs concurrently, fetching at 2s/request - Each SPARQL batch commits independently (no global transaction) - Rate limits respected for both Wikidata SPARQL and wiki server - No parallel requests to either endpoint
Diffstat (limited to 'src/wikidata.go')
-rw-r--r--src/wikidata.go209
1 files changed, 169 insertions, 40 deletions
diff --git a/src/wikidata.go b/src/wikidata.go
index c775f1f..9dacb7b 100644
--- a/src/wikidata.go
+++ b/src/wikidata.go
@@ -62,7 +62,15 @@ func (a *App) getMissingWikiArticles() ([]string, error) {
return ids, nil
}
-// fetchWikiArticles queries Wikidata SPARQL in batches and updates wiki_article in the DB.
+// wikiArticleFetch carries a resolved wiki article to the data consumer.
+type wikiArticleFetch struct {
+ imdbID string
+ name string // decoded wiki article name
+}
+
+// fetchWikiArticles pipelines SPARQL batches with wiki data extraction.
+// SPARQL producer feeds batches into a channel; DB commits per batch.
+// Resolved articles are forwarded to a wiki data consumer running in parallel.
func (a *App) fetchWikiArticles() error {
ids, err := a.getMissingWikiArticles()
if err != nil {
@@ -72,78 +80,199 @@ func (a *App) fetchWikiArticles() error {
log.Println("fetchWikiArticles: all entries have wiki_article, skipping")
return nil
}
- log.Printf("fetchWikiArticles: %d entries missing wiki_article", len(ids))
- tx, err := a.DB.Begin()
+ // Also pre-load existing entries that need wiki data extraction
+ existing, err := a.getExistingWikiArticles()
if err != nil {
- return fmt.Errorf("begin tx: %w", err)
+ return err
}
- wikiStmt, err := tx.Prepare(`UPDATE imdb SET wiki_article = ? WHERE imdb_id = ?`)
- if err != nil {
- tx.Rollback()
- return fmt.Errorf("prepare wiki update: %w", err)
+ // Channel for wiki article fetch tasks (buffered for pipelining)
+ artCh := make(chan wikiArticleFetch, wikiBatchSize*2)
+
+ // Consumer processes wiki data extraction in background
+ consumerDone := make(chan struct{})
+ go a.wikiDataConsumer(artCh, consumerDone)
+
+ // Feed existing entries to consumer first
+ for _, e := range existing {
+ select {
+ case artCh <- wikiArticleFetch{imdbID: e.imdbID, name: e.wikiArticle}:
+ default:
+ // Channel full, will pick up on next run
+ log.Printf("wiki data channel full, deferring %s", e.imdbID)
+ }
}
- defer wikiStmt.Close()
- noWikiStmt, err := tx.Prepare(`UPDATE imdb SET has_no_wiki_article = 1 WHERE imdb_id = ?`)
+ // SPARQL producer: fetches batches, commits to DB, forwards resolved
+ sparqlDone := make(chan struct{})
+ go a.sparqlPipeline(ids, artCh, sparqlDone)
+
+ // Wait for SPARQL to finish
+ <-sparqlDone
+
+ // Close channel and wait for consumer to drain
+ close(artCh)
+ <-consumerDone
+
+ log.Printf("fetchWikiArticles: pipeline complete")
+ return nil
+}
+
+type existingWikiArticle struct {
+ imdbID string
+ wikiArticle string
+}
+
+// getExistingWikiArticles returns entries that have a wiki_article but need data extraction.
+func (a *App) getExistingWikiArticles() ([]existingWikiArticle, error) {
+ rows, err := a.DB.Query(`
+ SELECT imdb_id, wiki_article FROM imdb
+ WHERE wiki_article IS NOT NULL
+ AND wiki_status_code != 404
+ AND (synopsis IS NULL OR description IS NULL OR year IS NULL
+ OR poster_url IS NULL OR license IS NULL OR license_url IS NULL OR num_accolades IS NULL)
+ `)
if err != nil {
- tx.Rollback()
- return fmt.Errorf("prepare no_wiki update: %w", err)
+ return nil, fmt.Errorf("query existing wiki articles: %w", err)
}
- defer noWikiStmt.Close()
+ defer rows.Close()
+
+ var articles []existingWikiArticle
+ for rows.Next() {
+ var e existingWikiArticle
+ if err := rows.Scan(&e.imdbID, &e.wikiArticle); err != nil {
+ return nil, fmt.Errorf("scan existing wiki: %w", err)
+ }
+ articles = append(articles, e)
+ }
+ if err := rows.Err(); err != nil {
+ return nil, fmt.Errorf("rows iteration: %w", err)
+ }
+ return articles, nil
+}
+
+// sparqlPipeline fetches SPARQL batches, commits each to DB, forwards resolved articles.
+func (a *App) sparqlPipeline(ids []string, artCh chan<- wikiArticleFetch, done chan<- struct{}) {
+ defer close(done)
+
+ log.Printf("fetchWikiArticles: %d entries missing wiki_article", len(ids))
+ batchNum := 0
+ totalResolved := 0
+ totalNoWiki := 0
- updated := 0
- noWiki := 0
for i := 0; i < len(ids); i += wikiBatchSize {
+ batchNum++
chunk := ids[i:min(i+wikiBatchSize, len(ids))]
results, err := a.queryWikidataBatch(chunk)
if err != nil {
- log.Printf("wikidata batch error at offset %d: %v", i, err)
- // mark all in skipped batch as no-wiki
+ log.Printf("sparql batch %d error: %v, marking all as no-wiki", batchNum, err)
+ // Mark all in failed batch as no-wiki
+ tx, _ := a.DB.Begin()
+ stmt, _ := tx.Prepare(`UPDATE imdb SET has_no_wiki_article = 1 WHERE imdb_id = ?`)
for _, id := range chunk {
- if _, err := noWikiStmt.Exec(id); err != nil {
- tx.Rollback()
- return fmt.Errorf("mark no_wiki for %s: %w", id, err)
- }
- noWiki++
+ stmt.Exec(id)
+ totalNoWiki++
}
+ stmt.Close()
+ tx.Commit()
+ continue
+ }
+
+ // Commit batch to DB
+ tx, err := a.DB.Begin()
+ if err != nil {
+ log.Printf("sparql batch %d begin tx error: %v", batchNum, err)
continue
}
- for _, id := range chunk {
- acc, found := results[id]
+ wikiStmt, err := tx.Prepare(`UPDATE imdb SET wiki_article = ? WHERE imdb_id = ?`)
+ if err != nil {
+ tx.Rollback()
+ log.Printf("sparql batch %d prepare error: %v", batchNum, err)
+ continue
+ }
+
+ noWikiStmt, err := tx.Prepare(`UPDATE imdb SET has_no_wiki_article = 1 WHERE imdb_id = ?`)
+ if err != nil {
+ wikiStmt.Close()
+ tx.Rollback()
+ log.Printf("sparql batch %d prepare no_wiki error: %v", batchNum, err)
+ continue
+ }
+
+ resolved := 0
+ for _, imdbID := range chunk {
+ acc, found := results[imdbID]
if found && acc.title != "" {
- if _, err := wikiStmt.Exec(acc.title, id); err != nil {
- tx.Rollback()
- return fmt.Errorf("update wiki_article for %s: %w", id, err)
+ wikiStmt.Exec(acc.title, imdbID)
+ resolved++
+ // Forward to consumer
+ select {
+ case artCh <- wikiArticleFetch{imdbID: imdbID, name: acc.title}:
+ default:
+ log.Printf("wiki data channel full, deferring %s", imdbID)
}
- updated++
} else {
- if _, err := noWikiStmt.Exec(id); err != nil {
- tx.Rollback()
- return fmt.Errorf("mark no_wiki for %s: %w", id, err)
- }
- noWiki++
+ noWikiStmt.Exec(imdbID)
+ totalNoWiki++
}
}
+ totalResolved += resolved
+
+ wikiStmt.Close()
+ noWikiStmt.Close()
- done := i + len(chunk)
- log.Printf("fetchWikiArticles: [%d/%d]", done, len(ids))
+ tx.Commit()
- // rate limit between batches
+ log.Printf("fetchWikiArticles: sparql batch %d/%d - %d/%d resolved",
+ batchNum, (len(ids)+wikiBatchSize-1)/wikiBatchSize, resolved, len(chunk))
+
+ // Rate limit between SPARQL requests
if i+wikiBatchSize < len(ids) {
time.Sleep(wikiDelay)
}
}
- if err := tx.Commit(); err != nil {
- return fmt.Errorf("commit wiki articles: %w", err)
+ log.Printf("fetchWikiArticles: SPARQL done - %d resolved, %d no-wiki", totalResolved, totalNoWiki)
+}
+
+// wikiDataConsumer fetches wiki article data from the custom server.
+func (a *App) wikiDataConsumer(artCh <-chan wikiArticleFetch, done chan<- struct{}) {
+ defer close(done)
+
+ updated := 0
+ skipped := 0
+
+ for art := range artCh {
+ // Wait between requests (rate limit)
+ time.Sleep(2 * time.Second)
+
+ entry, statusCode, err := a.queryWikiArticle(art.name)
+ if err != nil {
+ log.Printf("wiki error %s (%s): HTTP %d - %v", art.imdbID, art.name, statusCode, err)
+ skipped++
+ }
+
+ // Record status code
+ if statusCode > 0 {
+ a.DB.Exec(`UPDATE imdb SET wiki_status_code = ? WHERE imdb_id = ?`, statusCode, art.imdbID)
+ }
+
+ // Update data fields on success
+ if statusCode == 200 {
+ a.DB.Exec(`
+ UPDATE imdb SET synopsis = ?, description = ?, year = ?, poster_url = ?,
+ license = ?, license_url = ?, num_accolades = ?
+ WHERE imdb_id = ?`,
+ entry.Synopsis, entry.Description, entry.Year, entry.PosterURL,
+ entry.License, entry.LicenseURL, entry.NumAccolades, art.imdbID)
+ updated++
+ }
}
- log.Printf("fetchWikiArticles: %d wiki articles updated, %d marked as no wiki", updated, noWiki)
- return nil
+ log.Printf("fetchWikiArticlesData: %d updated, %d skipped (non-200)", updated, skipped)
}
// queryWikidataBatch sends a SPARQL query for the given IDs and returns a map of id -> wikiAcc.