From 60487a79dbbeff7340fc6e1a2686bd163897d930 Mon Sep 17 00:00:00 2001 From: Thomas Vogl Date: Fri, 16 Dec 2022 23:27:24 +0100 Subject: [PATCH] initial commit --- go.mod | 16 +++ go.sum | 26 +++++ internal/Config/Config.go | 34 ++++++ internal/MqttService/Helpers.go | 17 +++ internal/MqttService/MqttService.go | 102 ++++++++++++++++++ internal/MqttService/MqttServiceConfig.go | 21 ++++ internal/PinControlService/Pin.go | 101 +++++++++++++++++ .../PinControlService/PinControlConfig.go | 25 +++++ .../PinControlService/PinControlService.go | 94 ++++++++++++++++ internal/PinControlService/Types.go | 23 ++++ main.go | 56 ++++++++++ rpicontrol.config.example | 11 ++ rpicontrol.service | 24 +++++ 13 files changed, 550 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/Config/Config.go create mode 100644 internal/MqttService/Helpers.go create mode 100644 internal/MqttService/MqttService.go create mode 100644 internal/MqttService/MqttServiceConfig.go create mode 100644 internal/PinControlService/Pin.go create mode 100644 internal/PinControlService/PinControlConfig.go create mode 100644 internal/PinControlService/PinControlService.go create mode 100644 internal/PinControlService/Types.go create mode 100644 main.go create mode 100644 rpicontrol.config.example create mode 100644 rpicontrol.service diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..aea4869 --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module rpiMqttControl + +go 1.17 + +require ( + github.com/eclipse/paho.mqtt.golang v1.3.5 + github.com/sirupsen/logrus v1.8.1 + github.com/stianeikeland/go-rpio v4.2.0+incompatible + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b +) + +require ( + github.com/gorilla/websocket v1.4.2 // indirect + golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect + golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8ef3b20 --- /dev/null +++ b/go.sum @@ -0,0 +1,26 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y= +github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/stianeikeland/go-rpio v4.2.0+incompatible h1:CUOlIxdJdT+H1obJPsmg8byu7jMSECLfAN9zynm5QGo= +github.com/stianeikeland/go-rpio v4.2.0+incompatible/go.mod h1:Sh81rdJwD96E2wja2Gd7rrKM+XZ9LrwvN2w4IXrqLR8= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/Config/Config.go b/internal/Config/Config.go new file mode 100644 index 0000000..68220ab --- /dev/null +++ b/internal/Config/Config.go @@ -0,0 +1,34 @@ +package Config + +import ( + "gopkg.in/yaml.v3" + "io/ioutil" + "rpiMqttControl/internal/MqttService" + "rpiMqttControl/internal/PinControlService" +) + +type Config struct { + PinControlConfig PinControlService.PinControlConfig `yaml:"pin-control-config"` + MqttConfig MqttService.MqttServiceConfig `yaml:"mqtt-config"` +} + +func NewConfig() Config { + return Config{ + PinControlConfig: PinControlService.NewPinControlConfig(), + MqttConfig: MqttService.NewMqttServiceConfig(), + } +} + +func NewConfigFromYamlFile(configFile string) (*Config, error) { + config := NewConfig() + if data, err := ioutil.ReadFile(configFile); err == nil { + if err := yaml.Unmarshal(data, &config); err == nil { + return &config, nil + } else { + return nil, err + } + } else { + return nil, err + } + +} diff --git a/internal/MqttService/Helpers.go b/internal/MqttService/Helpers.go new file mode 100644 index 0000000..5742305 --- /dev/null +++ b/internal/MqttService/Helpers.go @@ -0,0 +1,17 @@ +package MqttService + +import ( + "errors" + "strings" +) + +func PinNameFromTopic(topic string) (string,error) { + topicComponents := strings.Split(topic, "/") + if len(topicComponents) >= 2 { + gpioCh := topicComponents[len(topicComponents)-2] + return gpioCh, nil + + } else { + return "", errors.New("invalid topic") + } +} diff --git a/internal/MqttService/MqttService.go b/internal/MqttService/MqttService.go new file mode 100644 index 0000000..8e33b24 --- /dev/null +++ b/internal/MqttService/MqttService.go @@ -0,0 +1,102 @@ +package MqttService + +import ( + mqtt "github.com/eclipse/paho.mqtt.golang" + log "github.com/sirupsen/logrus" + "rpiMqttControl/internal/PinControlService" + "time" +) + +type OnCompletionHandler func() + +type MqttService struct { + mqttClient mqtt.Client + quit chan struct{} + config MqttServiceConfig + pinService *PinControlService.PinControlService +} + +func (m *MqttService) pinEventCallback(pinName string, pinState PinControlService.PinState) { + topic := m.config.TopicPrefix + "/gpio/" + pinName + "/event" + token := m.mqttClient.Publish(topic, 1, false, string(pinState)) + go m._asyncWait(token, nil) +} + +func (m *MqttService) pinCyclicCallback(pinName string, pinState PinControlService.PinState) { + topic := m.config.TopicPrefix + "/gpio/" + pinName + "/value" + token := m.mqttClient.Publish(topic, 1, false, string(pinState)) + go m._asyncWait(token, nil) +} +func (m *MqttService) controlCallback(client mqtt.Client, message mqtt.Message) { + if pinNo, err := PinNameFromTopic(message.Topic()); err == nil { + if err := m.pinService.Command(pinNo, PinControlService.PinCommand(message.Payload())); err != nil { + log.Error(err.Error()) + } + } else { + log.Error(err.Error()) + } +} +func (m *MqttService) Stop() { + m.quit <- struct{}{} + m.mqttClient.Disconnect(100) +} + +func (m *MqttService) _asyncWait(token mqtt.Token, onComplete OnCompletionHandler) { + for { + select { + case <-token.Done(): + if err := token.Error(); err != nil { + log.Fatal(err) + } else { + if onComplete != nil { + onComplete() + } + } + return + case <-m.quit: + return + } + } +} +func (m *MqttService) _subscribe() { + topic := m.config.TopicPrefix + "/gpio/+/control" + token := m.mqttClient.Subscribe(topic, 1, m.controlCallback) + + go m._asyncWait(token, nil) +} + +func (m *MqttService) Start() { + token := m.mqttClient.Connect() + go m._asyncWait(token, func() { + m._subscribe() + }) +} + +func NewMqttService(config MqttServiceConfig, pinService *PinControlService.PinControlService) MqttService { + clientOptions := mqtt.NewClientOptions() + clientOptions.AddBroker(config.BrokerAddress) + + if config.Username != "" { + clientOptions.Username = config.Username + clientOptions.Password = config.Password + } + + clientOptions.SetConnectRetry(true) + clientOptions.SetConnectRetryInterval(10 * time.Second) + clientOptions.SetAutoReconnect(config.AutoReconnect) + clientOptions.SetConnectTimeout(time.Duration(config.ConnectTimeoutMs) * time.Millisecond) + + mqttClient := mqtt.NewClient(clientOptions) + + m := MqttService{ + mqttClient: mqttClient, + quit: make(chan struct{}, 1), + pinService: pinService, + config: config, + } + + pinService.OnCycleCallback = m.pinCyclicCallback + pinService.OnChangeCallback = m.pinEventCallback + + return m +} diff --git a/internal/MqttService/MqttServiceConfig.go b/internal/MqttService/MqttServiceConfig.go new file mode 100644 index 0000000..9ef279c --- /dev/null +++ b/internal/MqttService/MqttServiceConfig.go @@ -0,0 +1,21 @@ +package MqttService + +type MqttServiceConfig struct { + BrokerAddress string `yaml:"broker-address"` + TopicPrefix string `yaml:"topic-prefix"` + AutoReconnect bool `yaml:"auto-reconnect"` + ConnectTimeoutMs int `yaml:"connect-timeout-ms"` + Username string `yaml:"username"` + Password string `yaml:"password"` + +} + +func NewMqttServiceConfig() MqttServiceConfig { + return MqttServiceConfig{ + BrokerAddress: "tcp://localhost:1883", + TopicPrefix: "/rpicontrol", + AutoReconnect: true, + ConnectTimeoutMs: 10000, + + } +} diff --git a/internal/PinControlService/Pin.go b/internal/PinControlService/Pin.go new file mode 100644 index 0000000..3d2ec62 --- /dev/null +++ b/internal/PinControlService/Pin.go @@ -0,0 +1,101 @@ +package PinControlService + +import ( + "errors" + log "github.com/sirupsen/logrus" + "github.com/stianeikeland/go-rpio" +) + +type Pin struct { + Id int + Name string + Direction PinDirection + PullConfig PinPull + InitialState PinCommand + PinHandle rpio.Pin + SendPollingEvents bool + SendChangeEvents bool + +} + +func NewPin(config PinConfig) Pin { + p := Pin{ + Direction: config.Direction, + PullConfig: config.PullConfig, + Name: config.Name, + Id: config.PinNumber, + PinHandle: rpio.Pin(config.PinNumber), + } + + if config.SendPollingEvents != nil { + p.SendPollingEvents = *config.SendPollingEvents + } else { + p.SendPollingEvents = true + } + + if config.SendChangeEvents != nil { + p.SendChangeEvents = *config.SendChangeEvents + } else { + p.SendChangeEvents = true + } + + if config.InitialState != "" { + p.InitialState = PinCommand(config.InitialState) + } else { + p.InitialState = Off + } + + return p +} + +func (p *Pin) State() PinState { + if state := p.PinHandle.Read(); state == rpio.High { + return StateOn + } else { + return StateOff + } +} + +func (p *Pin) Command(cmd PinCommand) error{ + log.Debugf("try to send command %s for pin %s (pin no: %d)", cmd, p.Name, p.Id) + if p.Direction != Output { + return errors.New("pin is not an output") + } + + if cmd == On { + p.PinHandle.High() + } else if cmd == Off { + p.PinHandle.Low() + } else if cmd == Toggle { + p.PinHandle.Toggle() + } else { + return errors.New("unknown command") + } + + return nil +} + +func (p *Pin) Configure() { + if p.Direction == Input { + p.PinHandle.Input() + } else if p.Direction == Output { + p.PinHandle.Output() + _ = p.Command(p.InitialState) + } + + p.PinHandle.Detect(rpio.AnyEdge) + + if p.PullConfig == PullUp { + p.PinHandle.PullUp() + } else if p.PullConfig == PullDown { + p.PinHandle.PullDown() + } else if p.PullConfig == PullOff { + p.PinHandle.PullOff() + } + +} + +func (p *Pin) Changed() bool { + return p.PinHandle.EdgeDetected() +} + diff --git a/internal/PinControlService/PinControlConfig.go b/internal/PinControlService/PinControlConfig.go new file mode 100644 index 0000000..792bda1 --- /dev/null +++ b/internal/PinControlService/PinControlConfig.go @@ -0,0 +1,25 @@ +package PinControlService + +type PinConfig struct { + PinNumber int `yaml:"number"` + Name string `yaml:"name"` + Direction PinDirection `yaml:"direction"` + PullConfig PinPull `yaml:"pull-config"` + InitialState PinCommand `yaml:"initial-state"` + SendPollingEvents *bool `yaml:"send-polling-events"` + SendChangeEvents *bool `yaml:"send-change-events"` + +} + +type PinControlConfig struct { + GpioPins []PinConfig `yaml:"gpio-pins"` + PollingTimeMs int `yaml:"polling-time-ms"` + +} + +func NewPinControlConfig() PinControlConfig { + return PinControlConfig{ + GpioPins: make([]PinConfig, 0), + PollingTimeMs: 100, + } +} diff --git a/internal/PinControlService/PinControlService.go b/internal/PinControlService/PinControlService.go new file mode 100644 index 0000000..d9cd7c1 --- /dev/null +++ b/internal/PinControlService/PinControlService.go @@ -0,0 +1,94 @@ +package PinControlService + +import ( + "errors" + log "github.com/sirupsen/logrus" + "github.com/stianeikeland/go-rpio" + "time" +) + + + + + +type PinControlService struct { + Pins map[string]Pin + timer* time.Ticker + exit chan bool + OnChangeCallback PinCallback + OnCycleCallback PinCallback + +} + + +func (p*PinControlService) AddPin( + config PinConfig) { + pin := NewPin(config) + p.Pins[pin.Name] = pin + +} + +func (p*PinControlService) Command(pinName string, command PinCommand) error { + if pin, found := p.Pins[pinName]; found == false { + return errors.New("pin not configured") + } else { + return pin.Command(command) + } +} + +func (p*PinControlService) Start() { + if err := rpio.Open(); err != nil { + log.Fatal(err) + panic(err) + } + + for _,v := range p.Pins { + v.Configure() + } + go p._task() +} +func (p*PinControlService) Stop() { + p.exit <- true +} + +func (p*PinControlService) _task() { + for { + select { + case <- p.timer.C: + for pinName, pin := range p.Pins { + log.Debug("timer event") + if pin.Changed() { + log.Debugf("detected pin change for pin %s (pin no %d)", pin.Name, pin.Id) + if pin.SendChangeEvents && p.OnChangeCallback != nil { + p.OnChangeCallback(pinName, pin.State()) + } + + } + + if pin.SendPollingEvents && p.OnCycleCallback != nil { + p.OnCycleCallback(pinName, pin.State()) + } + } + case <- p.exit: + log.Debug("stop timer") + p.timer.Stop() + return + + } + } + +} + + +func NewPinControl(config *PinControlConfig) (*PinControlService, error) { + p := PinControlService{ + Pins: make(map[string]Pin), + exit: make(chan bool,1), + timer: time.NewTicker(time.Duration(config.PollingTimeMs) * time.Millisecond)} + + for _, pinConfig := range config.GpioPins { + p.AddPin(pinConfig) + } + + return &p, nil +} \ No newline at end of file diff --git a/internal/PinControlService/Types.go b/internal/PinControlService/Types.go new file mode 100644 index 0000000..10a855c --- /dev/null +++ b/internal/PinControlService/Types.go @@ -0,0 +1,23 @@ +package PinControlService + +type PinCommand string +type PinState string +type PinDirection string +type PinPull string +type PinCallback func(pinName string, state PinState) + +const ( + On PinCommand = "ON" + Off PinCommand = "OFF" + Toggle PinCommand = "TOGGLE" + + StateOn PinState = "ON" + StateOff PinState = "OFF" + + Input PinDirection = "Input" + Output PinDirection = "Output" + + PullUp PinPull = "PULL_UP" + PullDown PinPull = "PULL_DOWN" + PullOff PinPull = "PULL_OFF" +) diff --git a/main.go b/main.go new file mode 100644 index 0000000..e23c0f4 --- /dev/null +++ b/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "flag" + log "github.com/sirupsen/logrus" + "os" + "os/signal" + "rpiMqttControl/internal/Config" + "rpiMqttControl/internal/MqttService" + "rpiMqttControl/internal/PinControlService" +) + +func main() { + flagConfig := flag.String("config", "/etc/rpicontrol/rpicontrol.conf", "path to config file") + flagLogLevel := flag.String("log", "warn", "set log level for console output") + + flag.Parse() + + if level, err := log.ParseLevel(*flagLogLevel); err != nil { + log.SetLevel(log.WarnLevel) + log.Warnf("could not set log level. %s", err.Error()) + } else { + log.SetLevel(level) + } + + config, errConfig := Config.NewConfigFromYamlFile(*flagConfig) + if errConfig != nil { + log.Fatal(errConfig) + os.Exit(1) + } + + pinControl, err := PinControlService.NewPinControl(&config.PinControlConfig) + if err != nil { + log.Fatal(err) + os.Exit(1) + } + + mqttService := MqttService.NewMqttService(config.MqttConfig, pinControl) + + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, os.Interrupt) + + mqttService.Start() + pinControl.Start() + + for { + select { + case <-signalCh: + log.Info("attempting to shutdown...") + pinControl.Stop() + mqttService.Stop() + os.Exit(0) + } + } + +} diff --git a/rpicontrol.config.example b/rpicontrol.config.example new file mode 100644 index 0000000..9422932 --- /dev/null +++ b/rpicontrol.config.example @@ -0,0 +1,11 @@ +mqtt-config: + broker-address: "tcp://nuc.fritz.box:1883" + +pin-control-config: + gpio-pins: + - number: 17 + name: out1 + direction: Output + pull-config: PullOff + + polling-time-ms: 100 diff --git a/rpicontrol.service b/rpicontrol.service new file mode 100644 index 0000000..e239435 --- /dev/null +++ b/rpicontrol.service @@ -0,0 +1,24 @@ +[Unit] +Description=RPI MQTT Control Service +After=network.target + +[Service] +Type=simple +User=root +Group=root + +Restart=on-failure +RestartSec=10 +startLimitIntervalSec=60 + +WorkingDirectory=/usr/bin +ExecStart=/usr/bin/rpicontrol + +# make sure log directory exists and owned by syslog +PermissionsStartOnly=true +StandardOutput=syslog +StandardError=syslog +SyslogIdentifier=rpicontrol + +[Install] +WantedBy=multi-user.target \ No newline at end of file