package main import ( "os" "os/exec" "os/signal" "strings" "sync" "syscall" "time" log "github.com/Sirupsen/logrus" "github.com/kr/beanstalk" ) // beanstalkd, the message queue type Queue struct { Conn *beanstalk.Conn Tube *beanstalk.Tube } func (app *Queue) newBeanstalkd() error { conn, err := beanstalk.Dial("tcp", "192.168.122.1:11300") //conn, err := beanstalk.Dial("tcp", "127.0.0.1:11300") if err != nil { return err } app.Conn = conn app.Tube = &beanstalk.Tube{conn, "gospelchor"} return nil } func main() { app := Queue{} err := app.newBeanstalkd() if err != nil { log.WithFields( log.Fields{ "error": err, }, ).Fatal("cannot connect to beanstalkd") } // closes the connection to the beanstalkd server on exit defer func() { app.Conn.Close() }() cntGoRoutine := 0 // we will ensure that we wait for the same number // of routines to complete by using a waitGroup waitGroup := &sync.WaitGroup{} waitGroup.Add(cntGoRoutine + 1) shutdownChan := make(chan bool) log.Println("app started. beginning consuming jobs...") app.consumeJob(shutdownChan, waitGroup, 1) // waits for shutting down term := make(chan os.Signal, 1) signal.Notify(term, os.Interrupt) signal.Notify(term, syscall.SIGTERM) <-term // sends the signal to close to the workers // +1 because we have a separate goroutine // to check for buried jobs for i := 0; i < cntGoRoutine+1; i++ { go func() { shutdownChan <- true }() } // we wait, until they complete log.Info("waiting for goroutine to finish") waitGroup.Wait() } func (app *Queue) consumeJob(shutdownChannel chan bool, waitGroup *sync.WaitGroup, number int) { tubeset := beanstalk.NewTubeSet(app.Conn, "gospelchor") // we tell when we return defer waitGroup.Done() // Looping to consume all Jobs for { select { case _ = <-shutdownChannel: log.WithFields( log.Fields{ "goroutine_id": number + 1, }, ).Info("getting message to return: ") return default: id, body, err := tubeset.Reserve(60 * time.Second) if err != nil { //log.Println(err) continue } log.WithFields( log.Fields{ "job_id": id, }, ).Info("recieving job") /* err = json.Unmarshal(body) if err != nil { app.Conn.Bury(id, 1) log.WithFields( log.Fields{ "job_id": id, "error": err, }, ).Warn("decoding json failed") continue } */ if !strings.EqualFold(string(body), "true") { app.Conn.Bury(id, 1) continue } // rsync den ordner rĂ¼ber cmdArgs := []string{"--delete", "-avz", "-e ssh", "applications:/var/www/gospeladlershof.de/gospeladlershof.de/", "/var/www/gospeladlershof.de/gospeladlershof.de/"} if _, err = exec.Command("rsync", cmdArgs...).Output(); err != nil { log.WithFields( log.Fields{ "error": err, }, ).Warn("fatal error using rsync") app.Conn.Bury(id, 1) } else { log.Println("seite synchronisiert") // Deleting the job from beanstalkd app.Conn.Delete(id) } } } }