From 6abbf29de5b08448005df974e95bf773de304550 Mon Sep 17 00:00:00 2001 From: dev Date: Fri, 26 Jun 2026 04:19:44 +0200 Subject: fix: prevent dropped wiki entries when channel fills - Remove non-blocking select/default that silently dropped entries - Channel sized to hold all pending entries (existing + SPARQL) - Blocking send backpressures SPARQL if consumer is slow --- src/wikidata.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/wikidata.go b/src/wikidata.go index 5d1b594..e6c2e0e 100644 --- a/src/wikidata.go +++ b/src/wikidata.go @@ -88,7 +88,7 @@ func (a *App) fetchWikiArticles() error { } // Channel for wiki article fetch tasks (buffered for pipelining) - artCh := make(chan wikiArticleFetch, wikiBatchSize*2) + artCh := make(chan wikiArticleFetch, len(existing)+len(ids)+1) // Consumer processes wiki data extraction in background consumerDone := make(chan struct{}) @@ -96,12 +96,7 @@ func (a *App) fetchWikiArticles() error { // Feed existing entries to consumer first for _, e := range existing { - select { - case artCh <- wikiArticleFetch{imdbID: e.imdbID, name: e.wikiArticle}: - default: - // Channel full, will pick up on next run - log.Printf("wiki data channel full, deferring %s", e.imdbID) - } + artCh <- wikiArticleFetch{imdbID: e.imdbID, name: e.wikiArticle} } // SPARQL producer: fetches batches, commits to DB, forwards resolved @@ -208,12 +203,8 @@ func (a *App) sparqlPipeline(ids []string, artCh chan<- wikiArticleFetch, done c if found && acc.title != "" { wikiStmt.Exec(acc.title, imdbID) resolved++ - // Forward to consumer - select { - case artCh <- wikiArticleFetch{imdbID: imdbID, name: acc.title}: - default: - log.Printf("wiki data channel full, deferring %s", imdbID) - } + // Forward to consumer (blocking — backpressures SPARQL if consumer is slow) + artCh <- wikiArticleFetch{imdbID: imdbID, name: acc.title} } else { noWikiStmt.Exec(imdbID) totalNoWiki++ -- cgit v1.2.3