Implemented rudimentary notification with NTFY and acknowledgement

This commit is contained in:
Magnus Åhall 2024-03-29 20:24:53 +01:00
parent c2555a1d35
commit afcadc8ae1
9 changed files with 346 additions and 69 deletions

View File

@ -28,6 +28,7 @@ type Config struct {
Static string Static string
Upload string Upload string
} }
NotificationBaseURL string
} }
Session struct { Session struct {

1
go.mod
View File

@ -4,6 +4,7 @@ go 1.21.0
require ( require (
git.gibonuddevalla.se/go/webservice v0.2.2 git.gibonuddevalla.se/go/webservice v0.2.2
git.gibonuddevalla.se/go/wrappederror v0.3.3
github.com/google/uuid v1.5.0 github.com/google/uuid v1.5.0
github.com/gorilla/websocket v1.5.0 github.com/gorilla/websocket v1.5.0
github.com/jmoiron/sqlx v1.3.5 github.com/jmoiron/sqlx v1.3.5

2
go.sum
View File

@ -2,6 +2,8 @@ git.gibonuddevalla.se/go/dbschema v1.3.0 h1:HzFMR29tWfy/ibIjltTbIMI4inVktj/rh8bE
git.gibonuddevalla.se/go/dbschema v1.3.0/go.mod h1:BNw3q/574nXbGoeWyK+tLhRfggVkw2j2aXZzrBKC3ig= git.gibonuddevalla.se/go/dbschema v1.3.0/go.mod h1:BNw3q/574nXbGoeWyK+tLhRfggVkw2j2aXZzrBKC3ig=
git.gibonuddevalla.se/go/webservice v0.2.2 h1:pmfeLa7c9pSPbuu6TuzcJ6yuVwdMLJ8SSPm1IkusThk= git.gibonuddevalla.se/go/webservice v0.2.2 h1:pmfeLa7c9pSPbuu6TuzcJ6yuVwdMLJ8SSPm1IkusThk=
git.gibonuddevalla.se/go/webservice v0.2.2/go.mod h1:3uBS6nLbK9qbuGzDls8MZD5Xr9ORY1Srbj6v06BIhws= git.gibonuddevalla.se/go/webservice v0.2.2/go.mod h1:3uBS6nLbK9qbuGzDls8MZD5Xr9ORY1Srbj6v06BIhws=
git.gibonuddevalla.se/go/wrappederror v0.3.3 h1:pdIy3/daSY3zMmUr9PXW6ffIt8iYonOv64mgJBpKz+0=
git.gibonuddevalla.se/go/wrappederror v0.3.3/go.mod h1:j4w320Hk1wvhOPjUaK4GgLvmtnjUUM5yVu6JFO1OCSc=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=

93
main.go
View File

@ -3,14 +3,17 @@ package main
import ( import (
// External // External
"git.gibonuddevalla.se/go/webservice" "git.gibonuddevalla.se/go/webservice"
"git.gibonuddevalla.se/go/wrappederror"
// Internal // Internal
"git.gibonuddevalla.se/go/webservice/session" "git.gibonuddevalla.se/go/webservice/session"
"notes/notification"
// Standard // Standard
"crypto/md5" "crypto/md5"
"embed" "embed"
"encoding/hex" "encoding/hex"
"encoding/json"
"flag" "flag"
"fmt" "fmt"
"io" "io"
@ -32,13 +35,14 @@ var (
flagCheckLocal bool flagCheckLocal bool
flagConfig string flagConfig string
service *webservice.Service service *webservice.Service
connectionManager ConnectionManager connectionManager ConnectionManager
static http.Handler notificationManager notification.Manager
config Config static http.Handler
logger *slog.Logger config Config
schedulers map[int]Schedule logger *slog.Logger
VERSION string schedulers map[int]Schedule
VERSION string
//go:embed version sql/* //go:embed version sql/*
embeddedSQL embed.FS embeddedSQL embed.FS
@ -47,7 +51,7 @@ var (
staticFS embed.FS staticFS embed.FS
) )
func sqlProvider(dbname string, version int) (sql []byte, found bool) { func sqlProvider(dbname string, version int) (sql []byte, found bool) { // {{{
var err error var err error
sql, err = embeddedSQL.ReadFile(fmt.Sprintf("sql/%05d.sql", version)) sql, err = embeddedSQL.ReadFile(fmt.Sprintf("sql/%05d.sql", version))
if err != nil { if err != nil {
@ -55,7 +59,27 @@ func sqlProvider(dbname string, version int) (sql []byte, found bool) {
} }
found = true found = true
return return
} } // }}}
func logCallback(e WrappedError.Error) { // {{{
now := time.Now()
out := struct {
Year int
Month int
Day int
Time string
Error any
}{now.Year(), int(now.Month()), now.Day(), now.Format("15:04:05"), e}
j, _ := json.Marshal(out)
file, err := os.OpenFile("/tmp/notes.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
logger.Error("log", "error", err)
return
}
file.Write(j)
file.Write([]byte("\n"))
file.Close()
} // }}}
func init() { // {{{ func init() { // {{{
version, _ := embeddedSQL.ReadFile("version") version, _ := embeddedSQL.ReadFile("version")
@ -76,6 +100,9 @@ func init() { // {{{
flag.Parse() flag.Parse()
} // }}} } // }}}
func main() { // {{{ func main() { // {{{
WrappedError.Init()
WrappedError.SetLogCallback(logCallback)
var err error var err error
if flagVersion { if flagVersion {
@ -119,6 +146,7 @@ func main() { // {{{
service.Register("/key/retrieve", true, true, keyRetrieve) service.Register("/key/retrieve", true, true, keyRetrieve)
service.Register("/key/create", true, true, keyCreate) service.Register("/key/create", true, true, keyCreate)
service.Register("/key/counter", true, true, keyCounter) service.Register("/key/counter", true, true, keyCounter)
service.Register("/notification/ack", false, false, notificationAcknowledge)
service.Register("/", false, false, service.StaticHandler) service.Register("/", false, false, service.StaticHandler)
if flagCreateUser { if flagCreateUser {
@ -128,6 +156,17 @@ func main() { // {{{
go scheduleHandler() go scheduleHandler()
if err = service.InitDatabaseConnection(); err != nil {
logger.Error("application", "error", err)
os.Exit(1)
}
err = InitNotificationManager()
if err != nil {
logger.Error("application", "error", err)
os.Exit(1)
}
err = service.Start() err = service.Start()
if err != nil { if err != nil {
logger.Error("webserver", "error", err) logger.Error("webserver", "error", err)
@ -136,13 +175,25 @@ func main() { // {{{
} // }}} } // }}}
func scheduleHandler() { // {{{ func scheduleHandler() { // {{{
// Wait for the approximate minute. // Wait for the approximate minute.
wait := 60000 - time.Now().Sub(time.Now().Truncate(time.Minute)).Milliseconds() //wait := 60000 - time.Now().Sub(time.Now().Truncate(time.Minute)).Milliseconds()
time.Sleep(time.Millisecond * time.Duration(wait)) //time.Sleep(time.Millisecond * time.Duration(wait))
tick := time.NewTicker(time.Minute) tick := time.NewTicker(time.Minute)
tick = time.NewTicker(time.Second*5)
for { for {
<-tick.C <-tick.C
for _, event := range ExpiredSchedules() {
notificationManager.Send(
event.UserID,
event.ScheduleUUID,
[]byte(
fmt.Sprintf(
"%s\n%s",
event.Time.Format("2006-01-02 15:04"),
event.Description,
),
),
)
}
} }
} // }}} } // }}}
@ -710,4 +761,20 @@ func keyCounter(w http.ResponseWriter, r *http.Request, sess *session.T) { // {{
}) })
} // }}} } // }}}
func notificationAcknowledge(w http.ResponseWriter, r *http.Request, sess *session.T) { // {{{
logger.Info("webserver", "request", "/notification/ack")
var err error
err = AcknowledgeNotification(r.URL.Query().Get("uuid"))
if err != nil {
responseError(w, err)
return
}
responseData(w, map[string]interface{}{
"OK": true,
})
} // }}}
// vim: foldmethod=marker // vim: foldmethod=marker

15
notification/factory.go Normal file
View File

@ -0,0 +1,15 @@
package notification
import (
// External
werr "git.gibonuddevalla.se/go/wrappederror"
)
func ServiceFactory(t string, config []byte, prio int, ackURL string) (Service, error) {
switch t {
case "NTFY":
return NewNTFY(config, prio, ackURL)
}
return nil, werr.New("Unknown notification service, '%s'", t).WithCode("002-0000")
}

View File

@ -1,26 +1,70 @@
package notification package notification
import ( import (
// External
werr "git.gibonuddevalla.se/go/wrappederror"
// Standard // Standard
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt"
"io"
"net/http" "net/http"
) )
type NTFY struct { type NTFY struct {
URL string URL string
Prio int
AcknowledgeURL string
} }
func NewNTFY(config []byte) (instance NTFY, err error) { func NewNTFY(config []byte, prio int, ackURL string) (instance NTFY, err error) {
err = json.Unmarshal(config, &instance) err = json.Unmarshal(config, &instance)
if err != nil { if err != nil {
return err = werr.Wrap(err).WithCode("002-0001").WithData(config)
return
} }
instance.Prio = prio
instance.AcknowledgeURL = ackURL
return instance, nil return instance, nil
} }
func (ntfy NTFY) Send(msg []byte) (err error) { func (ntfy NTFY) GetPrio() int {
http.NewRequest("POST", ntfy.URL, bytes.NewReader(msg)) return ntfy.Prio
}
func (ntfy NTFY) Send(uuid string, msg []byte) (err error) {
var req *http.Request
var res *http.Response
req, err = http.NewRequest("POST", ntfy.URL, bytes.NewReader(msg))
if err != nil {
err = werr.Wrap(err).WithCode("002-0002").WithData(ntfy.URL)
return
}
ackURL := fmt.Sprintf("http, OK, %s/notification/ack?uuid=%s", ntfy.AcknowledgeURL, uuid)
req.Header.Add("X-Actions", ackURL)
res, err = http.DefaultClient.Do(req)
if err != nil {
err = werr.Wrap(err).WithCode("002-0003")
return
}
body, _ := io.ReadAll(res.Body)
if res.StatusCode != 200 {
err = werr.New("Invalid NTFY response").WithCode("002-0004").WithData(body)
return
}
ntfyResp := struct {
ID string
}{}
err = json.Unmarshal(body, &ntfyResp)
if err != nil {
err = werr.Wrap(err).WithCode("002-0005").WithData(body)
return
}
return return
} }

View File

@ -1,5 +1,59 @@
package notification package notification
type Notification interface { import (
Send([]byte) error // External
werr "git.gibonuddevalla.se/go/wrappederror"
// Standard
_ "fmt"
"slices"
)
type Service interface {
GetPrio() int
Send(string, []byte) error
}
type Manager struct {
services map[int][]Service
}
func NewManager() (nm Manager) {
nm.services = make(map[int][]Service, 32)
return
}
func (nm *Manager) AddService(userID int, service Service) {
var services []Service
var found bool
if services, found = nm.services[userID]; !found {
services = []Service{}
}
services = append(services, service)
slices.SortFunc(services, func(a, b Service) int {
if a.GetPrio() < b.GetPrio() {
return -1
}
if a.GetPrio() > b.GetPrio() {
return 1
}
return 0
})
nm.services[userID] = services
}
func (nm *Manager) Send(userID int, uuid string, msg []byte) (err error) {
services, found := nm.services[userID]
if !found {
return werr.New("No notification services defined for user ID %d", userID).WithCode("002-0008")
}
for _, service := range services {
if err = service.Send(uuid, msg); err == nil {
break
}
}
return
} }

75
notification_manager.go Normal file
View File

@ -0,0 +1,75 @@
package main
import (
// External
werr "git.gibonuddevalla.se/go/wrappederror"
// Internal
"notes/notification"
// Standard
"database/sql"
"encoding/json"
)
type DbNotificationService struct {
ID int
UserID int `json:"user_id"`
Service string
Configuration string
Prio int
}
func InitNotificationManager() (err error) {// {{{
var dbServices []DbNotificationService
var row *sql.Row
row = service.Db.Conn.QueryRow(`
WITH services AS (
SELECT
id,
user_id,
prio,
service,
configuration::varchar
FROM notification n
ORDER BY
user_id ASC,
prio ASC
)
SELECT jsonb_agg(s.*)
FROM services s
`,
)
var dbData []byte
err = row.Scan(&dbData)
if err != nil {
err = werr.Wrap(err).WithCode("002-0006")
return
}
err = json.Unmarshal(dbData, &dbServices)
if err != nil {
err = werr.Wrap(err).WithCode("002-0007")
return
}
notificationManager = notification.NewManager()
var service notification.Service
for _, dbService := range dbServices {
service, err = notification.ServiceFactory(
dbService.Service,
[]byte(dbService.Configuration),
dbService.Prio,
config.Application.NotificationBaseURL,
)
notificationManager.AddService(dbService.UserID, service)
}
return
}// }}}
func AcknowledgeNotification(uuid string) (err error) {// {{{
_, err = service.Db.Conn.Exec(`UPDATE schedule SET acknowledged=true WHERE schedule_uuid=$1`, uuid)
return
}// }}}

View File

@ -1,6 +1,9 @@
package main package main
import ( import (
// External
werr "git.gibonuddevalla.se/go/wrappederror"
// Standard // Standard
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -15,7 +18,7 @@ func init() {
type Schedule struct { type Schedule struct {
ID int ID int
UserID int `json:"user_id"` UserID int `json:"user_id" db:"user_id"`
Node Node Node Node
ScheduleUUID string `db:"schedule_uuid"` ScheduleUUID string `db:"schedule_uuid"`
Time time.Time Time time.Time
@ -23,7 +26,7 @@ type Schedule struct {
Acknowledged bool Acknowledged bool
} }
func ScanForSchedules(content string) (schedules []Schedule) { func ScanForSchedules(content string) (schedules []Schedule) {// {{{
schedules = []Schedule{} schedules = []Schedule{}
rxp := regexp.MustCompile(`\{\s*([0-9]{4}-[0-9]{2}-[0-9]{2}\s+[0-9]{2}:[0-9]{2}(?::[0-9]{2})?)\s*\,\s*([^\]]+?)\s*\}`) rxp := regexp.MustCompile(`\{\s*([0-9]{4}-[0-9]{2}-[0-9]{2}\s+[0-9]{2}:[0-9]{2}(?::[0-9]{2})?)\s*\,\s*([^\]]+?)\s*\}`)
@ -48,44 +51,8 @@ func ScanForSchedules(content string) (schedules []Schedule) {
} }
return return
} }// }}}
func RetrieveSchedules(userID int, nodeID int) (schedules []Schedule, err error) {// {{{
func (a Schedule) IsEqual(b Schedule) bool {
return a.UserID == b.UserID &&
a.Node.ID == b.Node.ID &&
a.Time.Equal(b.Time) &&
a.Description == b.Description
}
func (s *Schedule) Insert(queryable Queryable) error {
res := queryable.QueryRow(`
INSERT INTO schedule(user_id, node_id, time, description)
VALUES($1, $2, $3, $4)
RETURNING id
`,
s.UserID,
s.Node.ID,
s.Time,
s.Description,
)
return res.Scan(&s.ID)
}
func (s *Schedule) Delete(queryable Queryable) error {
_, err := queryable.Exec(`
DELETE FROM schedule
WHERE
user_id = $1 AND
id = $2
`,
s.UserID,
s.ID,
)
return err
}
func RetrieveSchedules(userID int, nodeID int) (schedules []Schedule, err error) {
schedules = []Schedule{} schedules = []Schedule{}
res := service.Db.Conn.QueryRow(` res := service.Db.Conn.QueryRow(`
@ -122,14 +89,52 @@ func RetrieveSchedules(userID int, nodeID int) (schedules []Schedule, err error)
err = json.Unmarshal(data, &schedules) err = json.Unmarshal(data, &schedules)
return return
} }// }}}
func ExpiredSchedules() []Schedule { func (a Schedule) IsEqual(b Schedule) bool {// {{{
schedules := []Schedule{} return a.UserID == b.UserID &&
/* a.Node.ID == b.Node.ID &&
res, err := service.Db.Conn.Query(` a.Time.Equal(b.Time) &&
a.Description == b.Description
}// }}}
func (s *Schedule) Insert(queryable Queryable) error {// {{{
res := queryable.QueryRow(`
INSERT INTO schedule(user_id, node_id, time, description)
VALUES($1, $2, $3, $4)
RETURNING id
`,
s.UserID,
s.Node.ID,
s.Time,
s.Description,
)
return res.Scan(&s.ID)
}// }}}
func (s *Schedule) Delete(queryable Queryable) error {// {{{
_, err := queryable.Exec(`
DELETE FROM schedule
WHERE
user_id = $1 AND
id = $2
`,
s.UserID,
s.ID,
)
return err
}// }}}
func ExpiredSchedules() (schedules []Schedule) {// {{{
schedules = []Schedule{}
res, err := service.Db.Conn.Queryx(`
SELECT SELECT
* id,
user_id,
node_id,
schedule_uuid,
time,
description
FROM schedule FROM schedule
WHERE WHERE
time < NOW() AND time < NOW() AND
@ -137,6 +142,19 @@ func ExpiredSchedules() []Schedule {
ORDER BY ORDER BY
time ASC time ASC
`) `)
*/ if err != nil {
return schedules err = werr.Wrap(err).WithCode("002-0009")
} return
}
defer res.Close()
for res.Next() {
s := Schedule{}
if err = res.Scan(&s.ID, &s.UserID, &s.Node.ID, &s.ScheduleUUID, &s.Time, &s.Description); err != nil {
werr.Wrap(err).WithCode("002-000a")
continue
}
schedules = append(schedules, s)
}
return
}// }}}