diff options
| author | dev | 2026-06-26 03:26:07 +0200 |
|---|---|---|
| committer | dev | 2026-06-26 03:26:07 +0200 |
| commit | 8e2d742e59b3923852e1ef6e7a5e2ee1de14ce45 (patch) | |
| tree | 9cbcab194f21431255222c7785e15795e494a728 | |
| parent | f3ed9396113de4d81ae0fadabc10ae59f7076862 (diff) | |
| download | hnimdbbot-8e2d742e59b3923852e1ef6e7a5e2ee1de14ce45.tar.gz | |
refactor: pipeline SPARQL and wiki data in parallel
- Merge fetchWikiArticles + fetchWikiArticlesData into one pipeline
- SPARQL producer fetches batches, commits each to DB, forwards resolved articles
- Wiki data consumer runs concurrently, fetching at 2s/request
- Each SPARQL batch commits independently (no global transaction)
- Rate limits respected for both Wikidata SPARQL and wiki server
- No parallel requests to either endpoint
| -rw-r--r-- | src/main.go | 4 | ||||
| -rw-r--r-- | src/wikiarticle.go | 122 | ||||
| -rw-r--r-- | src/wikidata.go | 209 |
3 files changed, 169 insertions, 166 deletions
diff --git a/src/main.go b/src/main.go index 9fd53db..272e4b6 100644 --- a/src/main.go +++ b/src/main.go @@ -198,8 +198,4 @@ func main() { if err = app.fetchWikiArticles(); err != nil { log.Fatalf("fetchWikiArticles: %v", err) } - - if err = app.fetchWikiArticlesData(); err != nil { - log.Fatalf("fetchWikiArticlesData: %v", err) - } } diff --git a/src/wikiarticle.go b/src/wikiarticle.go index 85bf28a..5b891b6 100644 --- a/src/wikiarticle.go +++ b/src/wikiarticle.go @@ -25,128 +25,6 @@ type wikiArticleEntry struct { NumAccolades int } -// fetchWikiArticlesData queries the custom wiki server for all entries -// that have a wiki_article and updates the imdb table with extracted fields. -func (a *App) fetchWikiArticlesData() error { - rows, err := a.DB.Query(` - SELECT id, imdb_id, wiki_article FROM imdb - WHERE wiki_article IS NOT NULL - AND wiki_status_code != 404 - AND (synopsis IS NULL OR description IS NULL OR year IS NULL - OR poster_url IS NULL OR license IS NULL OR license_url IS NULL OR num_accolades IS NULL) - `) - if err != nil { - return fmt.Errorf("query wiki articles: %w", err) - } - defer rows.Close() - - type dbRow struct { - id int - imdbID string - wikiArticle string - } - var entries []dbRow - for rows.Next() { - var r dbRow - if err := rows.Scan(&r.id, &r.imdbID, &r.wikiArticle); err != nil { - return fmt.Errorf("scan row: %w", err) - } - entries = append(entries, r) - } - if err := rows.Err(); err != nil { - return fmt.Errorf("rows iteration: %w", err) - } - - if len(entries) == 0 { - log.Println("fetchWikiArticlesData: all entries complete, skipping") - return nil - } - log.Printf("fetchWikiArticlesData: %d entries need wiki data", len(entries)) - - tx, err := a.DB.Begin() - if err != nil { - return fmt.Errorf("begin tx: %w", err) - } - - stmt, err := tx.Prepare(` - UPDATE imdb SET - synopsis = ?, description = ?, year = ?, poster_url = ?, - license = ?, license_url = ?, num_accolades = ? - WHERE id = ? - `) - if err != nil { - tx.Rollback() - return fmt.Errorf("prepare wiki update: %w", err) - } - defer stmt.Close() - - statusStmt, err := tx.Prepare(` - UPDATE imdb SET wiki_status_code = ? WHERE id = ? - `) - if err != nil { - tx.Rollback() - return fmt.Errorf("prepare wiki status update: %w", err) - } - defer statusStmt.Close() - - type result struct { - id int - entry wikiArticleEntry - statusCode int - } - ch := make(chan result, 1) - - // Serial processing with 2s between requests - go func() { - for i, item := range entries { - if i > 0 { - time.Sleep(2 * time.Second) - } - entry, statusCode, err := a.queryWikiArticle(item.wikiArticle) - ch <- result{id: item.id, entry: entry, statusCode: statusCode} - if err != nil { - log.Printf("wiki error %d/%d %s (%s): HTTP %d - %v", i+1, len(entries), item.imdbID, item.wikiArticle, statusCode, err) - } - } - close(ch) - }() - - updated := 0 - skipped := 0 - for r := range ch { - // Always record status code - if r.statusCode > 0 { - if _, err := statusStmt.Exec(r.statusCode, r.id); err != nil { - tx.Rollback() - return fmt.Errorf("update wiki_status_code for id %d: %w", r.id, err) - } - } - - // Only update data fields on success - if r.statusCode == 200 { - e := r.entry - _, err := stmt.Exec( - e.Synopsis, e.Description, e.Year, e.PosterURL, - e.License, e.LicenseURL, e.NumAccolades, r.id, - ) - if err != nil { - tx.Rollback() - return fmt.Errorf("update wiki data for id %d: %w", r.id, err) - } - updated++ - } else { - skipped++ - } - } - - if err := tx.Commit(); err != nil { - return fmt.Errorf("commit wiki data: %w", err) - } - - log.Printf("fetchWikiArticlesData: %d updated, %d skipped (non-200)", updated, skipped) - return nil -} - func (a *App) queryWikiArticle(name string) (wikiArticleEntry, int, error) { // Build URL — name is decoded from DB, encode it for the request reqURL := fmt.Sprintf("%s?username=%s&name=%s", diff --git a/src/wikidata.go b/src/wikidata.go index c775f1f..9dacb7b 100644 --- a/src/wikidata.go +++ b/src/wikidata.go @@ -62,7 +62,15 @@ func (a *App) getMissingWikiArticles() ([]string, error) { return ids, nil } -// fetchWikiArticles queries Wikidata SPARQL in batches and updates wiki_article in the DB. +// wikiArticleFetch carries a resolved wiki article to the data consumer. +type wikiArticleFetch struct { + imdbID string + name string // decoded wiki article name +} + +// fetchWikiArticles pipelines SPARQL batches with wiki data extraction. +// SPARQL producer feeds batches into a channel; DB commits per batch. +// Resolved articles are forwarded to a wiki data consumer running in parallel. func (a *App) fetchWikiArticles() error { ids, err := a.getMissingWikiArticles() if err != nil { @@ -72,78 +80,199 @@ func (a *App) fetchWikiArticles() error { log.Println("fetchWikiArticles: all entries have wiki_article, skipping") return nil } - log.Printf("fetchWikiArticles: %d entries missing wiki_article", len(ids)) - tx, err := a.DB.Begin() + // Also pre-load existing entries that need wiki data extraction + existing, err := a.getExistingWikiArticles() if err != nil { - return fmt.Errorf("begin tx: %w", err) + return err } - wikiStmt, err := tx.Prepare(`UPDATE imdb SET wiki_article = ? WHERE imdb_id = ?`) - if err != nil { - tx.Rollback() - return fmt.Errorf("prepare wiki update: %w", err) + // Channel for wiki article fetch tasks (buffered for pipelining) + artCh := make(chan wikiArticleFetch, wikiBatchSize*2) + + // Consumer processes wiki data extraction in background + consumerDone := make(chan struct{}) + go a.wikiDataConsumer(artCh, consumerDone) + + // Feed existing entries to consumer first + for _, e := range existing { + select { + case artCh <- wikiArticleFetch{imdbID: e.imdbID, name: e.wikiArticle}: + default: + // Channel full, will pick up on next run + log.Printf("wiki data channel full, deferring %s", e.imdbID) + } } - defer wikiStmt.Close() - noWikiStmt, err := tx.Prepare(`UPDATE imdb SET has_no_wiki_article = 1 WHERE imdb_id = ?`) + // SPARQL producer: fetches batches, commits to DB, forwards resolved + sparqlDone := make(chan struct{}) + go a.sparqlPipeline(ids, artCh, sparqlDone) + + // Wait for SPARQL to finish + <-sparqlDone + + // Close channel and wait for consumer to drain + close(artCh) + <-consumerDone + + log.Printf("fetchWikiArticles: pipeline complete") + return nil +} + +type existingWikiArticle struct { + imdbID string + wikiArticle string +} + +// getExistingWikiArticles returns entries that have a wiki_article but need data extraction. +func (a *App) getExistingWikiArticles() ([]existingWikiArticle, error) { + rows, err := a.DB.Query(` + SELECT imdb_id, wiki_article FROM imdb + WHERE wiki_article IS NOT NULL + AND wiki_status_code != 404 + AND (synopsis IS NULL OR description IS NULL OR year IS NULL + OR poster_url IS NULL OR license IS NULL OR license_url IS NULL OR num_accolades IS NULL) + `) if err != nil { - tx.Rollback() - return fmt.Errorf("prepare no_wiki update: %w", err) + return nil, fmt.Errorf("query existing wiki articles: %w", err) } - defer noWikiStmt.Close() + defer rows.Close() + + var articles []existingWikiArticle + for rows.Next() { + var e existingWikiArticle + if err := rows.Scan(&e.imdbID, &e.wikiArticle); err != nil { + return nil, fmt.Errorf("scan existing wiki: %w", err) + } + articles = append(articles, e) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("rows iteration: %w", err) + } + return articles, nil +} + +// sparqlPipeline fetches SPARQL batches, commits each to DB, forwards resolved articles. +func (a *App) sparqlPipeline(ids []string, artCh chan<- wikiArticleFetch, done chan<- struct{}) { + defer close(done) + + log.Printf("fetchWikiArticles: %d entries missing wiki_article", len(ids)) + batchNum := 0 + totalResolved := 0 + totalNoWiki := 0 - updated := 0 - noWiki := 0 for i := 0; i < len(ids); i += wikiBatchSize { + batchNum++ chunk := ids[i:min(i+wikiBatchSize, len(ids))] results, err := a.queryWikidataBatch(chunk) if err != nil { - log.Printf("wikidata batch error at offset %d: %v", i, err) - // mark all in skipped batch as no-wiki + log.Printf("sparql batch %d error: %v, marking all as no-wiki", batchNum, err) + // Mark all in failed batch as no-wiki + tx, _ := a.DB.Begin() + stmt, _ := tx.Prepare(`UPDATE imdb SET has_no_wiki_article = 1 WHERE imdb_id = ?`) for _, id := range chunk { - if _, err := noWikiStmt.Exec(id); err != nil { - tx.Rollback() - return fmt.Errorf("mark no_wiki for %s: %w", id, err) - } - noWiki++ + stmt.Exec(id) + totalNoWiki++ } + stmt.Close() + tx.Commit() + continue + } + + // Commit batch to DB + tx, err := a.DB.Begin() + if err != nil { + log.Printf("sparql batch %d begin tx error: %v", batchNum, err) continue } - for _, id := range chunk { - acc, found := results[id] + wikiStmt, err := tx.Prepare(`UPDATE imdb SET wiki_article = ? WHERE imdb_id = ?`) + if err != nil { + tx.Rollback() + log.Printf("sparql batch %d prepare error: %v", batchNum, err) + continue + } + + noWikiStmt, err := tx.Prepare(`UPDATE imdb SET has_no_wiki_article = 1 WHERE imdb_id = ?`) + if err != nil { + wikiStmt.Close() + tx.Rollback() + log.Printf("sparql batch %d prepare no_wiki error: %v", batchNum, err) + continue + } + + resolved := 0 + for _, imdbID := range chunk { + acc, found := results[imdbID] if found && acc.title != "" { - if _, err := wikiStmt.Exec(acc.title, id); err != nil { - tx.Rollback() - return fmt.Errorf("update wiki_article for %s: %w", id, err) + wikiStmt.Exec(acc.title, imdbID) + resolved++ + // Forward to consumer + select { + case artCh <- wikiArticleFetch{imdbID: imdbID, name: acc.title}: + default: + log.Printf("wiki data channel full, deferring %s", imdbID) } - updated++ } else { - if _, err := noWikiStmt.Exec(id); err != nil { - tx.Rollback() - return fmt.Errorf("mark no_wiki for %s: %w", id, err) - } - noWiki++ + noWikiStmt.Exec(imdbID) + totalNoWiki++ } } + totalResolved += resolved + + wikiStmt.Close() + noWikiStmt.Close() - done := i + len(chunk) - log.Printf("fetchWikiArticles: [%d/%d]", done, len(ids)) + tx.Commit() - // rate limit between batches + log.Printf("fetchWikiArticles: sparql batch %d/%d - %d/%d resolved", + batchNum, (len(ids)+wikiBatchSize-1)/wikiBatchSize, resolved, len(chunk)) + + // Rate limit between SPARQL requests if i+wikiBatchSize < len(ids) { time.Sleep(wikiDelay) } } - if err := tx.Commit(); err != nil { - return fmt.Errorf("commit wiki articles: %w", err) + log.Printf("fetchWikiArticles: SPARQL done - %d resolved, %d no-wiki", totalResolved, totalNoWiki) +} + +// wikiDataConsumer fetches wiki article data from the custom server. +func (a *App) wikiDataConsumer(artCh <-chan wikiArticleFetch, done chan<- struct{}) { + defer close(done) + + updated := 0 + skipped := 0 + + for art := range artCh { + // Wait between requests (rate limit) + time.Sleep(2 * time.Second) + + entry, statusCode, err := a.queryWikiArticle(art.name) + if err != nil { + log.Printf("wiki error %s (%s): HTTP %d - %v", art.imdbID, art.name, statusCode, err) + skipped++ + } + + // Record status code + if statusCode > 0 { + a.DB.Exec(`UPDATE imdb SET wiki_status_code = ? WHERE imdb_id = ?`, statusCode, art.imdbID) + } + + // Update data fields on success + if statusCode == 200 { + a.DB.Exec(` + UPDATE imdb SET synopsis = ?, description = ?, year = ?, poster_url = ?, + license = ?, license_url = ?, num_accolades = ? + WHERE imdb_id = ?`, + entry.Synopsis, entry.Description, entry.Year, entry.PosterURL, + entry.License, entry.LicenseURL, entry.NumAccolades, art.imdbID) + updated++ + } } - log.Printf("fetchWikiArticles: %d wiki articles updated, %d marked as no wiki", updated, noWiki) - return nil + log.Printf("fetchWikiArticlesData: %d updated, %d skipped (non-200)", updated, skipped) } // queryWikidataBatch sends a SPARQL query for the given IDs and returns a map of id -> wikiAcc. |
