summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordev2026-06-26 03:26:07 +0200
committerdev2026-06-26 03:26:07 +0200
commit8e2d742e59b3923852e1ef6e7a5e2ee1de14ce45 (patch)
tree9cbcab194f21431255222c7785e15795e494a728
parentf3ed9396113de4d81ae0fadabc10ae59f7076862 (diff)
downloadhnimdbbot-8e2d742e59b3923852e1ef6e7a5e2ee1de14ce45.tar.gz
refactor: pipeline SPARQL and wiki data in parallel
- Merge fetchWikiArticles + fetchWikiArticlesData into one pipeline - SPARQL producer fetches batches, commits each to DB, forwards resolved articles - Wiki data consumer runs concurrently, fetching at 2s/request - Each SPARQL batch commits independently (no global transaction) - Rate limits respected for both Wikidata SPARQL and wiki server - No parallel requests to either endpoint
-rw-r--r--src/main.go4
-rw-r--r--src/wikiarticle.go122
-rw-r--r--src/wikidata.go209
3 files changed, 169 insertions, 166 deletions
diff --git a/src/main.go b/src/main.go
index 9fd53db..272e4b6 100644
--- a/src/main.go
+++ b/src/main.go
@@ -198,8 +198,4 @@ func main() {
if err = app.fetchWikiArticles(); err != nil {
log.Fatalf("fetchWikiArticles: %v", err)
}
-
- if err = app.fetchWikiArticlesData(); err != nil {
- log.Fatalf("fetchWikiArticlesData: %v", err)
- }
}
diff --git a/src/wikiarticle.go b/src/wikiarticle.go
index 85bf28a..5b891b6 100644
--- a/src/wikiarticle.go
+++ b/src/wikiarticle.go
@@ -25,128 +25,6 @@ type wikiArticleEntry struct {
NumAccolades int
}
-// fetchWikiArticlesData queries the custom wiki server for all entries
-// that have a wiki_article and updates the imdb table with extracted fields.
-func (a *App) fetchWikiArticlesData() error {
- rows, err := a.DB.Query(`
- SELECT id, imdb_id, wiki_article FROM imdb
- WHERE wiki_article IS NOT NULL
- AND wiki_status_code != 404
- AND (synopsis IS NULL OR description IS NULL OR year IS NULL
- OR poster_url IS NULL OR license IS NULL OR license_url IS NULL OR num_accolades IS NULL)
- `)
- if err != nil {
- return fmt.Errorf("query wiki articles: %w", err)
- }
- defer rows.Close()
-
- type dbRow struct {
- id int
- imdbID string
- wikiArticle string
- }
- var entries []dbRow
- for rows.Next() {
- var r dbRow
- if err := rows.Scan(&r.id, &r.imdbID, &r.wikiArticle); err != nil {
- return fmt.Errorf("scan row: %w", err)
- }
- entries = append(entries, r)
- }
- if err := rows.Err(); err != nil {
- return fmt.Errorf("rows iteration: %w", err)
- }
-
- if len(entries) == 0 {
- log.Println("fetchWikiArticlesData: all entries complete, skipping")
- return nil
- }
- log.Printf("fetchWikiArticlesData: %d entries need wiki data", len(entries))
-
- tx, err := a.DB.Begin()
- if err != nil {
- return fmt.Errorf("begin tx: %w", err)
- }
-
- stmt, err := tx.Prepare(`
- UPDATE imdb SET
- synopsis = ?, description = ?, year = ?, poster_url = ?,
- license = ?, license_url = ?, num_accolades = ?
- WHERE id = ?
- `)
- if err != nil {
- tx.Rollback()
- return fmt.Errorf("prepare wiki update: %w", err)
- }
- defer stmt.Close()
-
- statusStmt, err := tx.Prepare(`
- UPDATE imdb SET wiki_status_code = ? WHERE id = ?
- `)
- if err != nil {
- tx.Rollback()
- return fmt.Errorf("prepare wiki status update: %w", err)
- }
- defer statusStmt.Close()
-
- type result struct {
- id int
- entry wikiArticleEntry
- statusCode int
- }
- ch := make(chan result, 1)
-
- // Serial processing with 2s between requests
- go func() {
- for i, item := range entries {
- if i > 0 {
- time.Sleep(2 * time.Second)
- }
- entry, statusCode, err := a.queryWikiArticle(item.wikiArticle)
- ch <- result{id: item.id, entry: entry, statusCode: statusCode}
- if err != nil {
- log.Printf("wiki error %d/%d %s (%s): HTTP %d - %v", i+1, len(entries), item.imdbID, item.wikiArticle, statusCode, err)
- }
- }
- close(ch)
- }()
-
- updated := 0
- skipped := 0
- for r := range ch {
- // Always record status code
- if r.statusCode > 0 {
- if _, err := statusStmt.Exec(r.statusCode, r.id); err != nil {
- tx.Rollback()
- return fmt.Errorf("update wiki_status_code for id %d: %w", r.id, err)
- }
- }
-
- // Only update data fields on success
- if r.statusCode == 200 {
- e := r.entry
- _, err := stmt.Exec(
- e.Synopsis, e.Description, e.Year, e.PosterURL,
- e.License, e.LicenseURL, e.NumAccolades, r.id,
- )
- if err != nil {
- tx.Rollback()
- return fmt.Errorf("update wiki data for id %d: %w", r.id, err)
- }
- updated++
- } else {
- skipped++
- }
- }
-
- if err := tx.Commit(); err != nil {
- return fmt.Errorf("commit wiki data: %w", err)
- }
-
- log.Printf("fetchWikiArticlesData: %d updated, %d skipped (non-200)", updated, skipped)
- return nil
-}
-
func (a *App) queryWikiArticle(name string) (wikiArticleEntry, int, error) {
// Build URL — name is decoded from DB, encode it for the request
reqURL := fmt.Sprintf("%s?username=%s&name=%s",
diff --git a/src/wikidata.go b/src/wikidata.go
index c775f1f..9dacb7b 100644
--- a/src/wikidata.go
+++ b/src/wikidata.go
@@ -62,7 +62,15 @@ func (a *App) getMissingWikiArticles() ([]string, error) {
return ids, nil
}
-// fetchWikiArticles queries Wikidata SPARQL in batches and updates wiki_article in the DB.
+// wikiArticleFetch carries a resolved wiki article to the data consumer.
+type wikiArticleFetch struct {
+ imdbID string
+ name string // decoded wiki article name
+}
+
+// fetchWikiArticles pipelines SPARQL batches with wiki data extraction.
+// SPARQL producer feeds batches into a channel; DB commits per batch.
+// Resolved articles are forwarded to a wiki data consumer running in parallel.
func (a *App) fetchWikiArticles() error {
ids, err := a.getMissingWikiArticles()
if err != nil {
@@ -72,78 +80,199 @@ func (a *App) fetchWikiArticles() error {
log.Println("fetchWikiArticles: all entries have wiki_article, skipping")
return nil
}
- log.Printf("fetchWikiArticles: %d entries missing wiki_article", len(ids))
- tx, err := a.DB.Begin()
+ // Also pre-load existing entries that need wiki data extraction
+ existing, err := a.getExistingWikiArticles()
if err != nil {
- return fmt.Errorf("begin tx: %w", err)
+ return err
}
- wikiStmt, err := tx.Prepare(`UPDATE imdb SET wiki_article = ? WHERE imdb_id = ?`)
- if err != nil {
- tx.Rollback()
- return fmt.Errorf("prepare wiki update: %w", err)
+ // Channel for wiki article fetch tasks (buffered for pipelining)
+ artCh := make(chan wikiArticleFetch, wikiBatchSize*2)
+
+ // Consumer processes wiki data extraction in background
+ consumerDone := make(chan struct{})
+ go a.wikiDataConsumer(artCh, consumerDone)
+
+ // Feed existing entries to consumer first
+ for _, e := range existing {
+ select {
+ case artCh <- wikiArticleFetch{imdbID: e.imdbID, name: e.wikiArticle}:
+ default:
+ // Channel full, will pick up on next run
+ log.Printf("wiki data channel full, deferring %s", e.imdbID)
+ }
}
- defer wikiStmt.Close()
- noWikiStmt, err := tx.Prepare(`UPDATE imdb SET has_no_wiki_article = 1 WHERE imdb_id = ?`)
+ // SPARQL producer: fetches batches, commits to DB, forwards resolved
+ sparqlDone := make(chan struct{})
+ go a.sparqlPipeline(ids, artCh, sparqlDone)
+
+ // Wait for SPARQL to finish
+ <-sparqlDone
+
+ // Close channel and wait for consumer to drain
+ close(artCh)
+ <-consumerDone
+
+ log.Printf("fetchWikiArticles: pipeline complete")
+ return nil
+}
+
+type existingWikiArticle struct {
+ imdbID string
+ wikiArticle string
+}
+
+// getExistingWikiArticles returns entries that have a wiki_article but need data extraction.
+func (a *App) getExistingWikiArticles() ([]existingWikiArticle, error) {
+ rows, err := a.DB.Query(`
+ SELECT imdb_id, wiki_article FROM imdb
+ WHERE wiki_article IS NOT NULL
+ AND wiki_status_code != 404
+ AND (synopsis IS NULL OR description IS NULL OR year IS NULL
+ OR poster_url IS NULL OR license IS NULL OR license_url IS NULL OR num_accolades IS NULL)
+ `)
if err != nil {
- tx.Rollback()
- return fmt.Errorf("prepare no_wiki update: %w", err)
+ return nil, fmt.Errorf("query existing wiki articles: %w", err)
}
- defer noWikiStmt.Close()
+ defer rows.Close()
+
+ var articles []existingWikiArticle
+ for rows.Next() {
+ var e existingWikiArticle
+ if err := rows.Scan(&e.imdbID, &e.wikiArticle); err != nil {
+ return nil, fmt.Errorf("scan existing wiki: %w", err)
+ }
+ articles = append(articles, e)
+ }
+ if err := rows.Err(); err != nil {
+ return nil, fmt.Errorf("rows iteration: %w", err)
+ }
+ return articles, nil
+}
+
+// sparqlPipeline fetches SPARQL batches, commits each to DB, forwards resolved articles.
+func (a *App) sparqlPipeline(ids []string, artCh chan<- wikiArticleFetch, done chan<- struct{}) {
+ defer close(done)
+
+ log.Printf("fetchWikiArticles: %d entries missing wiki_article", len(ids))
+ batchNum := 0
+ totalResolved := 0
+ totalNoWiki := 0
- updated := 0
- noWiki := 0
for i := 0; i < len(ids); i += wikiBatchSize {
+ batchNum++
chunk := ids[i:min(i+wikiBatchSize, len(ids))]
results, err := a.queryWikidataBatch(chunk)
if err != nil {
- log.Printf("wikidata batch error at offset %d: %v", i, err)
- // mark all in skipped batch as no-wiki
+ log.Printf("sparql batch %d error: %v, marking all as no-wiki", batchNum, err)
+ // Mark all in failed batch as no-wiki
+ tx, _ := a.DB.Begin()
+ stmt, _ := tx.Prepare(`UPDATE imdb SET has_no_wiki_article = 1 WHERE imdb_id = ?`)
for _, id := range chunk {
- if _, err := noWikiStmt.Exec(id); err != nil {
- tx.Rollback()
- return fmt.Errorf("mark no_wiki for %s: %w", id, err)
- }
- noWiki++
+ stmt.Exec(id)
+ totalNoWiki++
}
+ stmt.Close()
+ tx.Commit()
+ continue
+ }
+
+ // Commit batch to DB
+ tx, err := a.DB.Begin()
+ if err != nil {
+ log.Printf("sparql batch %d begin tx error: %v", batchNum, err)
continue
}
- for _, id := range chunk {
- acc, found := results[id]
+ wikiStmt, err := tx.Prepare(`UPDATE imdb SET wiki_article = ? WHERE imdb_id = ?`)
+ if err != nil {
+ tx.Rollback()
+ log.Printf("sparql batch %d prepare error: %v", batchNum, err)
+ continue
+ }
+
+ noWikiStmt, err := tx.Prepare(`UPDATE imdb SET has_no_wiki_article = 1 WHERE imdb_id = ?`)
+ if err != nil {
+ wikiStmt.Close()
+ tx.Rollback()
+ log.Printf("sparql batch %d prepare no_wiki error: %v", batchNum, err)
+ continue
+ }
+
+ resolved := 0
+ for _, imdbID := range chunk {
+ acc, found := results[imdbID]
if found && acc.title != "" {
- if _, err := wikiStmt.Exec(acc.title, id); err != nil {
- tx.Rollback()
- return fmt.Errorf("update wiki_article for %s: %w", id, err)
+ wikiStmt.Exec(acc.title, imdbID)
+ resolved++
+ // Forward to consumer
+ select {
+ case artCh <- wikiArticleFetch{imdbID: imdbID, name: acc.title}:
+ default:
+ log.Printf("wiki data channel full, deferring %s", imdbID)
}
- updated++
} else {
- if _, err := noWikiStmt.Exec(id); err != nil {
- tx.Rollback()
- return fmt.Errorf("mark no_wiki for %s: %w", id, err)
- }
- noWiki++
+ noWikiStmt.Exec(imdbID)
+ totalNoWiki++
}
}
+ totalResolved += resolved
+
+ wikiStmt.Close()
+ noWikiStmt.Close()
- done := i + len(chunk)
- log.Printf("fetchWikiArticles: [%d/%d]", done, len(ids))
+ tx.Commit()
- // rate limit between batches
+ log.Printf("fetchWikiArticles: sparql batch %d/%d - %d/%d resolved",
+ batchNum, (len(ids)+wikiBatchSize-1)/wikiBatchSize, resolved, len(chunk))
+
+ // Rate limit between SPARQL requests
if i+wikiBatchSize < len(ids) {
time.Sleep(wikiDelay)
}
}
- if err := tx.Commit(); err != nil {
- return fmt.Errorf("commit wiki articles: %w", err)
+ log.Printf("fetchWikiArticles: SPARQL done - %d resolved, %d no-wiki", totalResolved, totalNoWiki)
+}
+
+// wikiDataConsumer fetches wiki article data from the custom server.
+func (a *App) wikiDataConsumer(artCh <-chan wikiArticleFetch, done chan<- struct{}) {
+ defer close(done)
+
+ updated := 0
+ skipped := 0
+
+ for art := range artCh {
+ // Wait between requests (rate limit)
+ time.Sleep(2 * time.Second)
+
+ entry, statusCode, err := a.queryWikiArticle(art.name)
+ if err != nil {
+ log.Printf("wiki error %s (%s): HTTP %d - %v", art.imdbID, art.name, statusCode, err)
+ skipped++
+ }
+
+ // Record status code
+ if statusCode > 0 {
+ a.DB.Exec(`UPDATE imdb SET wiki_status_code = ? WHERE imdb_id = ?`, statusCode, art.imdbID)
+ }
+
+ // Update data fields on success
+ if statusCode == 200 {
+ a.DB.Exec(`
+ UPDATE imdb SET synopsis = ?, description = ?, year = ?, poster_url = ?,
+ license = ?, license_url = ?, num_accolades = ?
+ WHERE imdb_id = ?`,
+ entry.Synopsis, entry.Description, entry.Year, entry.PosterURL,
+ entry.License, entry.LicenseURL, entry.NumAccolades, art.imdbID)
+ updated++
+ }
}
- log.Printf("fetchWikiArticles: %d wiki articles updated, %d marked as no wiki", updated, noWiki)
- return nil
+ log.Printf("fetchWikiArticlesData: %d updated, %d skipped (non-200)", updated, skipped)
}
// queryWikidataBatch sends a SPARQL query for the given IDs and returns a map of id -> wikiAcc.