package MqttService import ( "crypto/tls" mqtt "github.com/eclipse/paho.mqtt.golang" log "github.com/sirupsen/logrus" "net/url" "rpiMqttControl/internal/PinControlService" "time" ) type OnCompletionHandler func() type MqttService struct { mqttClient mqtt.Client quit chan struct{} config MqttServiceConfig pinService *PinControlService.PinControlService } func (m *MqttService) pinEventCallback(pinName string, pinState PinControlService.PinState) { topic := m.config.TopicPrefix + "/gpio/" + pinName + "/event" token := m.mqttClient.Publish(topic, 1, false, string(pinState)) go m._asyncWait(token, nil) } func (m *MqttService) pinCyclicCallback(pinName string, pinState PinControlService.PinState) { topic := m.config.TopicPrefix + "/gpio/" + pinName + "/value" token := m.mqttClient.Publish(topic, 1, false, string(pinState)) go m._asyncWait(token, nil) } func (m *MqttService) controlCallback(_ mqtt.Client, message mqtt.Message) { if pinNo, err := PinNameFromTopic(message.Topic()); err == nil { if err := m.pinService.Command(pinNo, PinControlService.PinCommand(message.Payload())); err != nil { log.Error(err.Error()) } } else { log.Error(err.Error()) } } func (m *MqttService) Stop() { m.quit <- struct{}{} m.mqttClient.Disconnect(100) } func (m *MqttService) _asyncWait(token mqtt.Token, onComplete OnCompletionHandler) { for { select { case <-token.Done(): if err := token.Error(); err != nil { log.Fatal(err) } else { if onComplete != nil { onComplete() } } return case <-m.quit: return } } } func (m *MqttService) _subscribe() { topic := m.config.TopicPrefix + "/gpio/+/control" token := m.mqttClient.Subscribe(topic, 1, m.controlCallback) go m._asyncWait(token, nil) } func (m *MqttService) Start() { m.mqttClient.Connect() } func NewMqttService(config MqttServiceConfig, pinService *PinControlService.PinControlService) MqttService { clientOptions := mqtt.NewClientOptions() clientOptions.AddBroker(config.BrokerAddress) if config.Username != "" { clientOptions.Username = config.Username clientOptions.Password = config.Password } clientOptions.SetConnectRetry(true) clientOptions.SetConnectRetryInterval(time.Duration(config.ConnectRetryIntervalMs) * time.Millisecond) clientOptions.SetMaxReconnectInterval(time.Duration(config.MaxReconnectIntervalMs) * time.Millisecond) clientOptions.SetAutoReconnect(config.AutoReconnect) clientOptions.SetConnectTimeout(time.Duration(config.ConnectTimeoutMs) * time.Millisecond) clientOptions.SetConnectionLostHandler(func(client mqtt.Client, err error) { log.Warn("connection lost to broker") }) clientOptions.SetConnectionAttemptHandler(func(broker *url.URL, tlsCfg *tls.Config) *tls.Config { log.Infof("connection attempt to %s", broker.String()) return tlsCfg }) var mqttService *MqttService clientOptions.SetOnConnectHandler(func(client mqtt.Client) { log.Info("successfully connected to broker") mqttService._subscribe() }) mqttClient := mqtt.NewClient(clientOptions) m := MqttService{ mqttClient: mqttClient, quit: make(chan struct{}, 1), pinService: pinService, config: config, } mqttService = &m pinService.OnCycleCallback = m.pinCyclicCallback pinService.OnChangeCallback = m.pinEventCallback return m }