From 49af9dc33c3ff569fbe4a7ccf3d8b335ca32664f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20=C3=85hall?= Date: Sun, 5 May 2024 10:10:04 +0200 Subject: [PATCH] Added notification manager --- main.go | 112 ++++++++++++++++++++++------------------ notification/factory.go | 23 +++++++++ notification/ntfy.go | 83 +++++++++++++++++++++++++++++ notification/pkg.go | 61 ++++++++++++++++++++++ notification_manager.go | 80 ++++++++++++++++++++++++++++ sql/00011.sql | 10 ++++ 6 files changed, 318 insertions(+), 51 deletions(-) create mode 100644 notification/factory.go create mode 100644 notification/ntfy.go create mode 100644 notification/pkg.go create mode 100644 notification_manager.go create mode 100644 sql/00011.sql diff --git a/main.go b/main.go index 230f034..ec8b8cb 100644 --- a/main.go +++ b/main.go @@ -4,7 +4,10 @@ import ( // External ws "git.gibonuddevalla.se/go/webservice" "git.gibonuddevalla.se/go/webservice/session" - we "git.gibonuddevalla.se/go/wrappederror" + werr "git.gibonuddevalla.se/go/wrappederror" + + // Internal + "smon/notification" // Standard "embed" @@ -26,13 +29,14 @@ import ( const VERSION = "v6" var ( - logger *slog.Logger - flagConfigFile string - flagDev bool - service *ws.Service - logFile *os.File - parsedTemplates map[string]*template.Template - componentFilenames []string + logger *slog.Logger + flagConfigFile string + flagDev bool + service *ws.Service + logFile *os.File + parsedTemplates map[string]*template.Template + componentFilenames []string + notificationManager notification.Manager //go:embed sql sqlFS embed.FS @@ -50,7 +54,7 @@ func init() { // {{{ confDir, err := os.UserConfigDir() if err != nil { - logger.Error("application", "error", we.Wrap(err)) + logger.Error("application", "error", werr.Wrap(err)) } cfgPath := path.Join(confDir, "smon.yaml") flag.StringVar(&flagConfigFile, "config", cfgPath, "Path and filename of the YAML configuration file") @@ -68,8 +72,8 @@ func init() { // {{{ func main() { // {{{ var err error - we.Init() - we.SetLogCallback(logHandler) + werr.Init() + werr.SetLogCallback(logHandler) service, err = ws.New(flagConfigFile, VERSION, logger) if err != nil { @@ -96,6 +100,12 @@ func main() { // {{{ return } + err = InitNotificationManager() + if err != nil { + err = werr.Wrap(err).Log() + logger.Error("notification", "error", err) + } + service.Register("/", false, false, staticHandler) service.Register("/area/new/{name}", false, false, areaNew) @@ -124,7 +134,7 @@ func main() { // {{{ err = service.Start() if err != nil { - logger.Error("webserver", "error", we.Wrap(err)) + logger.Error("webserver", "error", werr.Wrap(err)) os.Exit(1) } @@ -139,7 +149,7 @@ func sqlProvider(dbname string, version int) (sql []byte, found bool) { // {{{ found = true return } // }}} -func logHandler(err we.Error) { // {{{ +func logHandler(err werr.Error) { // {{{ j, _ := json.Marshal(err) logFile.Write(j) logFile.Write([]byte("\n")) @@ -175,7 +185,7 @@ func entryDatapoint(w http.ResponseWriter, r *http.Request, sess *session.T) { / err := DatapointAdd(dpoint, value) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -188,7 +198,7 @@ func entryDatapoint(w http.ResponseWriter, r *http.Request, sess *session.T) { / var out any out, err = trigger.Run() if err != nil { - err = we.Wrap(err).Log() + err = werr.Wrap(err).Log() logger.Error("entry", "error", err) } @@ -200,13 +210,13 @@ func entryDatapoint(w http.ResponseWriter, r *http.Request, sess *session.T) { / if v { err = ProblemStart(trigger) if err != nil { - err = we.Wrap(err).Log() + err = werr.Wrap(err).Log() logger.Error("entry", "error", err) } } else { err = ProblemClose(trigger) if err != nil { - err = we.Wrap(err).Log() + err = werr.Wrap(err).Log() logger.Error("entry", "error", err) } } @@ -214,7 +224,7 @@ func entryDatapoint(w http.ResponseWriter, r *http.Request, sess *session.T) { / default: err := fmt.Errorf(`Expression for trigger %s not returning bool (%T)`, trigger.Name, v) logger.Info("entry", "error", err) - we.Wrap(err).WithData(v).Log() + werr.Wrap(err).WithData(v).Log() } } @@ -268,7 +278,7 @@ func getPage(layout, page string) (tmpl *template.Template, err error) { // {{{ tmpl, err = template.New("main.gotmpl").Funcs(funcMap).ParseFS(viewFS, filenames...) } if err != nil { - err = we.Wrap(err).Log() + err = werr.Wrap(err).Log() return } @@ -288,7 +298,7 @@ func areaNew(w http.ResponseWriter, r *http.Request, _ *session.T) { // {{{ name := r.PathValue("name") err := AreaCreate(name) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -300,14 +310,14 @@ func areaRename(w http.ResponseWriter, r *http.Request, _ *session.T) { // {{{ idStr := r.PathValue("id") id, err := strconv.Atoi(idStr) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } name := r.PathValue("name") err = AreaRename(id, name) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -320,14 +330,14 @@ func sectionNew(w http.ResponseWriter, r *http.Request, _ *session.T) { // {{{ idStr := r.PathValue("areaID") areaID, err := strconv.Atoi(idStr) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } name := r.PathValue("name") err = SectionCreate(areaID, name) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -339,14 +349,14 @@ func sectionRename(w http.ResponseWriter, r *http.Request, _ *session.T) { // {{ idStr := r.PathValue("id") id, err := strconv.Atoi(idStr) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } name := r.PathValue("name") err = SectionRename(id, name) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -363,7 +373,7 @@ func pageProblems(w http.ResponseWriter, _ *http.Request, _ *session.T) { // {{{ problems, err := ProblemsRetrieve() if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -377,13 +387,13 @@ func pageProblemAcknowledge(w http.ResponseWriter, r *http.Request, _ *session.T idStr := r.PathValue("id") id, err := strconv.Atoi(idStr) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } err = ProblemAcknowledge(id, true) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -396,13 +406,13 @@ func pageProblemUnacknowledge(w http.ResponseWriter, r *http.Request, _ *session idStr := r.PathValue("id") id, err := strconv.Atoi(idStr) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } err = ProblemAcknowledge(id, false) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -420,7 +430,7 @@ func pageDatapoints(w http.ResponseWriter, r *http.Request, _ *session.T) { // { datapoints, err := DatapointsRetrieve() if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -441,7 +451,7 @@ func pageDatapointEdit(w http.ResponseWriter, r *http.Request, _ *session.T) { / idStr := r.PathValue("id") id, err := strconv.Atoi(idStr) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -452,7 +462,7 @@ func pageDatapointEdit(w http.ResponseWriter, r *http.Request, _ *session.T) { / } else { datapoint, err = DatapointRetrieve(id, "") if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } } @@ -475,7 +485,7 @@ func pageDatapointUpdate(w http.ResponseWriter, r *http.Request, _ *session.T) { idStr := r.PathValue("id") id, err := strconv.Atoi(idStr) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -485,7 +495,7 @@ func pageDatapointUpdate(w http.ResponseWriter, r *http.Request, _ *session.T) { dp.Datatype = DatapointType(r.FormValue("datatype")) err = dp.Update() if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -496,13 +506,13 @@ func pageDatapointDelete(w http.ResponseWriter, r *http.Request, _ *session.T) { idStr := r.PathValue("id") id, err := strconv.Atoi(idStr) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } err = DatapointDelete(id) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -513,7 +523,7 @@ func pageDatapointDelete(w http.ResponseWriter, r *http.Request, _ *session.T) { func pageTriggers(w http.ResponseWriter, _ *http.Request, _ *session.T) { // {{{ areas, err := TriggersRetrieve() if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -535,7 +545,7 @@ func pageTriggerEdit(w http.ResponseWriter, r *http.Request, _ *session.T) { // idStr := r.PathValue("id") id, err := strconv.Atoi(idStr) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -546,7 +556,7 @@ func pageTriggerEdit(w http.ResponseWriter, r *http.Request, _ *session.T) { // if id > 0 { trigger, err = TriggerRetrieve(id) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } } else { @@ -555,7 +565,7 @@ func pageTriggerEdit(w http.ResponseWriter, r *http.Request, _ *session.T) { // if sectionIDStr != "" { trigger.SectionID, err = strconv.Atoi(sectionIDStr) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } } @@ -565,7 +575,7 @@ func pageTriggerEdit(w http.ResponseWriter, r *http.Request, _ *session.T) { // for _, dpname := range trigger.Datapoints { dp, err := DatapointRetrieve(0, dpname) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } dp.LastDatapointValue.TemplateValue = dp.LastDatapointValue.Value() @@ -590,7 +600,7 @@ func pageTriggerUpdate(w http.ResponseWriter, r *http.Request, _ *session.T) { / idStr := r.PathValue("id") id, err := strconv.Atoi(idStr) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -598,13 +608,13 @@ func pageTriggerUpdate(w http.ResponseWriter, r *http.Request, _ *session.T) { / if id > 0 { trigger, err = TriggerRetrieve(id) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } } else { trigger.SectionID, err = strconv.Atoi(r.FormValue("sectionID")) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } } @@ -614,7 +624,7 @@ func pageTriggerUpdate(w http.ResponseWriter, r *http.Request, _ *session.T) { / trigger.Datapoints = r.Form["datapoints[]"] err = trigger.Update() if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -625,14 +635,14 @@ func pageTriggerRun(w http.ResponseWriter, r *http.Request, _ *session.T) { // { idStr := r.PathValue("id") id, err := strconv.Atoi(idStr) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } var trigger Trigger trigger, err = TriggerRetrieve(id) if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } @@ -647,7 +657,7 @@ func pageTriggerRun(w http.ResponseWriter, r *http.Request, _ *session.T) { // { } resp.Output, err = trigger.Run() if err != nil { - we.Wrap(err).Log() + werr.Wrap(err).Log() httpError(w, err) return } @@ -660,7 +670,7 @@ func pageTriggerRun(w http.ResponseWriter, r *http.Request, _ *session.T) { // { func pageConfiguration(w http.ResponseWriter, _ *http.Request, _ *session.T) { // {{{ areas, err := AreaRetrieve() if err != nil { - httpError(w, we.Wrap(err).Log()) + httpError(w, werr.Wrap(err).Log()) return } diff --git a/notification/factory.go b/notification/factory.go new file mode 100644 index 0000000..db0a4b7 --- /dev/null +++ b/notification/factory.go @@ -0,0 +1,23 @@ +package notification + +import ( + // External + werr "git.gibonuddevalla.se/go/wrappederror" + + // Standard + "log/slog" +) + +func ServiceFactory(t string, config []byte, prio int, ackURL string, logger *slog.Logger) (Service, error) { + switch t { + case "NTFY": + ntfy, err := NewNTFY(config, prio, ackURL) + if err != nil { + err = werr.Wrap(err).WithData(config) + return nil, err + } + return ntfy, nil + } + + return nil, werr.New("Unknown notification service, '%s'", t).WithCode("002-0000") +} diff --git a/notification/ntfy.go b/notification/ntfy.go new file mode 100644 index 0000000..4cae808 --- /dev/null +++ b/notification/ntfy.go @@ -0,0 +1,83 @@ +package notification + +import ( + // External + werr "git.gibonuddevalla.se/go/wrappederror" + + // Standard + "bytes" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" +) + +type NTFY struct { + URL string + Prio int + AcknowledgeURL string + logger *slog.Logger +} + +func NewNTFY(config []byte, prio int, ackURL string) (instance *NTFY, err error) { + instance = new(NTFY) + err = json.Unmarshal(config, &instance) + if err != nil { + err = werr.Wrap(err).WithCode("002-0001").WithData(config) + return + } + instance.Prio = prio + instance.AcknowledgeURL = ackURL + return instance, nil +} + +func (ntfy *NTFY) SetLogger(l *slog.Logger) { + ntfy.logger = l +} + +func (ntfy *NTFY) GetType() string { + return "NTFY" +} + +func (ntfy *NTFY) GetPrio() int { + return ntfy.Prio +} + +func (ntfy NTFY) Send(uuid string, msg []byte) (err error) { + var req *http.Request + var res *http.Response + req, err = http.NewRequest("POST", ntfy.URL, bytes.NewReader(msg)) + if err != nil { + err = werr.Wrap(err).WithCode("002-0002").WithData(ntfy.URL) + return + } + + ackURL := fmt.Sprintf("http, OK, %s/notification/ack?uuid=%s", ntfy.AcknowledgeURL, uuid) + req.Header.Add("X-Actions", ackURL) + req.Header.Add("X-Priority", "3") // XXX: should be 5 + req.Header.Add("X-Tags", "calendar") + + res, err = http.DefaultClient.Do(req) + if err != nil { + err = werr.Wrap(err).WithCode("002-0003") + return + } + + body, _ := io.ReadAll(res.Body) + if res.StatusCode != 200 { + err = werr.New("Invalid NTFY response").WithCode("002-0004").WithData(body) + return + } + + ntfyResp := struct { + ID string + }{} + err = json.Unmarshal(body, &ntfyResp) + if err != nil { + err = werr.Wrap(err).WithCode("002-0005").WithData(body) + return + } + + return +} diff --git a/notification/pkg.go b/notification/pkg.go new file mode 100644 index 0000000..2e01519 --- /dev/null +++ b/notification/pkg.go @@ -0,0 +1,61 @@ +package notification + +import ( + // External + werr "git.gibonuddevalla.se/go/wrappederror" + + // Standard + "log/slog" + "slices" +) + +type Service interface { + SetLogger(*slog.Logger) + GetPrio() int + GetType() string + Send(string, []byte) error +} + +type Manager struct { + services []Service + logger *slog.Logger +} + +func NewManager(logger *slog.Logger) (nm Manager) { + nm.services = []Service{} + nm.logger = logger + return +} + +func (nm *Manager) AddService(service Service) { + nm.services = append(nm.services, service) + slices.SortFunc(nm.services, func(a, b Service) int { + if a.GetPrio() < b.GetPrio() { + return -1 + } + if a.GetPrio() > b.GetPrio() { + return 1 + } + return 0 + }) +} + +func (nm *Manager) Send(uuid string, msg []byte) (err error) { + for _, service := range nm.services { + nm.logger.Info("notification", "service", service.GetType(), "prio", service.GetPrio()) + if err = service.Send(uuid, msg); err == nil { + break + } else { + data := struct { + UUID string + Msg []byte + }{ + uuid, + msg, + } + werr.Wrap(err).WithData(data).Log() + } + } + + return +} diff --git a/notification_manager.go b/notification_manager.go new file mode 100644 index 0000000..70460f3 --- /dev/null +++ b/notification_manager.go @@ -0,0 +1,80 @@ +package main + +import ( + // External + werr "git.gibonuddevalla.se/go/wrappederror" + + // Internal + "smon/notification" + + // Standard + "database/sql" + "encoding/json" +) + +type DbNotificationService struct { + ID int + Service string + Configuration string + Prio int +} + +func InitNotificationManager() (err error) { // {{{ + var dbServices []DbNotificationService + var row *sql.Row + + row = service.Db.Conn.QueryRow(` + WITH services AS ( + SELECT + id, + prio, + service, + configuration::varchar + FROM notification n + ORDER BY + prio ASC + ) + SELECT COALESCE(jsonb_agg(s.*), '[]') + FROM services s + `, + ) + var dbData []byte + err = row.Scan(&dbData) + if err != nil { + err = werr.Wrap(err).WithCode("002-0006") + return + } + + err = json.Unmarshal(dbData, &dbServices) + if err != nil { + err = werr.Wrap(err).WithCode("002-0007") + return + } + + notificationManager = notification.NewManager(logger) + var service notification.Service + for _, dbService := range dbServices { + service, err = notification.ServiceFactory( + dbService.Service, + []byte(dbService.Configuration), + dbService.Prio, + "blah", + //config.Application.NotificationBaseURL, + logger, + ) + if err != nil { + err = werr.Wrap(err).WithData(dbService.Service) + } + notificationManager.AddService(service) + } + + return +} // }}} + +func AcknowledgeNotification(uuid string) (err error) { // {{{ + _, err = service.Db.Conn.Exec(`UPDATE schedule SET acknowledged=true WHERE schedule_uuid=$1`, uuid) + if err != nil { + err = werr.Wrap(err).WithData(uuid) + } + return +} // }}} diff --git a/sql/00011.sql b/sql/00011.sql new file mode 100644 index 0000000..4888fa0 --- /dev/null +++ b/sql/00011.sql @@ -0,0 +1,10 @@ + CREATE TYPE notification_type AS ENUM('NTFY', 'PUSHOVER', 'HTTP', 'EMAIL'); + +CREATE TABLE public.notification ( + id serial4 NOT NULL, + service notification_type DEFAULT 'NTFY' NOT NULL, + "configuration" jsonb DEFAULT '{}'::jsonb NOT NULL, + prio int4 DEFAULT 0 NOT NULL, + CONSTRAINT notification_pk PRIMARY KEY (id), + CONSTRAINT notification_unique UNIQUE (prio) +);