diff options
Diffstat (limited to 'resources/mq/server/message_queue.go')
| -rw-r--r-- | resources/mq/server/message_queue.go | 149 |
1 files changed, 149 insertions, 0 deletions
diff --git a/resources/mq/server/message_queue.go b/resources/mq/server/message_queue.go new file mode 100644 index 0000000..a4fcf14 --- /dev/null +++ b/resources/mq/server/message_queue.go @@ -0,0 +1,149 @@ +package main + +import ( + "os" + "os/exec" + "os/signal" + "strings" + "sync" + "syscall" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/kr/beanstalk" +) + +// beanstalkd, the message queue +type Queue struct { + Conn *beanstalk.Conn + Tube *beanstalk.Tube +} + +func (app *Queue) newBeanstalkd() error { + conn, err := beanstalk.Dial("tcp", "192.168.122.1:11300") + //conn, err := beanstalk.Dial("tcp", "127.0.0.1:11300") + if err != nil { + return err + } + app.Conn = conn + app.Tube = &beanstalk.Tube{conn, "gospelchor"} + + return nil +} + +func main() { + app := Queue{} + err := app.newBeanstalkd() + if err != nil { + log.WithFields( + log.Fields{ + "error": err, + }, + ).Fatal("cannot connect to beanstalkd") + } + + // closes the connection to the beanstalkd server on exit + defer func() { + app.Conn.Close() + }() + + cntGoRoutine := 0 + + // we will ensure that we wait for the same number + // of routines to complete by using a waitGroup + waitGroup := &sync.WaitGroup{} + waitGroup.Add(cntGoRoutine + 1) + + shutdownChan := make(chan bool) + + log.Println("app started. beginning consuming jobs...") + app.consumeJob(shutdownChan, waitGroup, 1) + + // waits for shutting down + term := make(chan os.Signal, 1) + signal.Notify(term, os.Interrupt) + signal.Notify(term, syscall.SIGTERM) + + <-term + + // sends the signal to close to the workers + // +1 because we have a separate goroutine + // to check for buried jobs + for i := 0; i < cntGoRoutine+1; i++ { + go func() { + shutdownChan <- true + }() + } + // we wait, until they complete + log.Info("waiting for goroutine to finish") + waitGroup.Wait() + +} + +func (app *Queue) consumeJob(shutdownChannel chan bool, waitGroup *sync.WaitGroup, number int) { + + tubeset := beanstalk.NewTubeSet(app.Conn, "gospelchor") + // we tell when we return + defer waitGroup.Done() + + // Looping to consume all Jobs + for { + + select { + case _ = <-shutdownChannel: + log.WithFields( + log.Fields{ + "goroutine_id": number + 1, + }, + ).Info("getting message to return: ") + return + default: + id, body, err := tubeset.Reserve(60 * time.Second) + if err != nil { + //log.Println(err) + continue + } + log.WithFields( + log.Fields{ + "job_id": id, + }, + ).Info("recieving job") + + /* + err = json.Unmarshal(body) + if err != nil { + app.Conn.Bury(id, 1) + log.WithFields( + log.Fields{ + "job_id": id, + "error": err, + }, + ).Warn("decoding json failed") + continue + } + */ + if !strings.EqualFold(string(body), "true") { + app.Conn.Bury(id, 1) + continue + } + + // rsync den ordner rüber + cmdArgs := []string{"--delete", "-avz", "-e ssh", "applications:/var/www/gospeladlershof.de/gospeladlershof.de/", "/var/www/gospeladlershof.de/gospeladlershof.de/"} + if _, err = exec.Command("rsync", cmdArgs...).Output(); err != nil { + log.WithFields( + log.Fields{ + "error": err, + }, + ).Warn("fatal error using rsync") + app.Conn.Bury(id, 1) + } else { + log.Println("seite synchronisiert") + + // Deleting the job from beanstalkd + app.Conn.Delete(id) + } + + } + } + +} |
