smon/datapoint.go
2024-06-27 16:21:16 +02:00

387 lines
8.3 KiB
Go

package main
import (
// External
werr "git.gibonuddevalla.se/go/wrappederror"
"github.com/jmoiron/sqlx"
// Standard
"database/sql"
"errors"
"strings"
"time"
)
type DatapointType string
const (
INT DatapointType = "INT"
STRING = "STRING"
DATETIME = "DATETIME"
)
type Datapoint struct {
ID int
Group string
Name string
Datatype DatapointType
Comment string
LastValue time.Time `db:"last_value"`
DatapointValueJSON []byte `db:"datapoint_value_json"`
LastDatapointValue DatapointValue
Found bool
NodataProblemSeconds int `db:"nodata_problem_seconds"`
NodataIsProblem bool `db:"nodata_is_problem"`
}
type DatapointValue struct {
ID int
DatapointID int `db:"datapoint_id"`
Ts time.Time
ValueInt sql.NullInt64 `db:"value_int"`
ValueString sql.NullString `db:"value_string"`
ValueDateTime sql.NullTime `db:"value_datetime"`
TemplateValue any
}
func (dp DatapointValue) Value() any { // {{{
if dp.ValueInt.Valid {
return dp.ValueInt.Int64
}
if dp.ValueString.Valid {
return dp.ValueString.String
}
if dp.ValueDateTime.Valid {
return dp.ValueDateTime.Time
}
return nil
} // }}}
func (dp DatapointValue) FormattedTime() string { // {{{
if dp.ValueDateTime.Valid {
return dp.ValueDateTime.Time.Format("2006-01-02 15:04:05")
}
return "invalid time"
} // }}}
func (dp Datapoint) Update() (err error) { // {{{
name := strings.TrimSpace(dp.Name)
if name == "" {
err = errors.New("Name can't be empty")
return
}
if dp.ID == 0 {
_, err = service.Db.Conn.Exec(
`INSERT INTO datapoint("group", name, datatype, nodata_problem_seconds, comment) VALUES($1, $2, $3, $4, $5)`,
dp.Group,
name,
dp.Datatype,
dp.NodataProblemSeconds,
dp.Comment,
)
} else {
/* Keep nodata_is_problem as is unless the nodata_problem_seconds is changed.
* Otherwise unnecessary nodata problems could be notified when updating unrelated
* datapoint properties. */
_, err = service.Db.Conn.Exec(
`
UPDATE datapoint
SET
"group"=$2,
name=$3,
datatype=$4,
comment=$5,
nodata_problem_seconds=$6,
nodata_is_problem = (
CASE
WHEN $6 != nodata_problem_seconds THEN false
ELSE
nodata_is_problem
END
)
WHERE
id=$1
`,
dp.ID,
dp.Group,
name,
dp.Datatype,
dp.Comment,
dp.NodataProblemSeconds,
)
}
if err != nil {
err = werr.Wrap(err)
}
return
} // }}}
func DatapointAdd[T any](name string, value T) (err error) { // {{{
type dpRequest = struct {
ID int
value any
}
row := service.Db.Conn.QueryRow(`SELECT id, datatype FROM datapoint WHERE name=$1`, name)
var dpID int
var dpType DatapointType
err = row.Scan(&dpID, &dpType)
if err != nil {
err = werr.Wrap(err).WithData(struct {
Name string
Value any
}{name, value})
return
}
switch dpType {
case INT:
_, err = service.Db.Conn.Exec(`INSERT INTO datapoint_value(datapoint_id, value_int) VALUES($1, $2)`, dpID, value)
case STRING:
_, err = service.Db.Conn.Exec(`INSERT INTO datapoint_value(datapoint_id, value_string) VALUES($1, $2)`, dpID, value)
case DATETIME:
// Time value is required to be a RFC 3339 formatted time string
var t time.Time
valueStr, ok := any(value).([]byte)
if !ok {
return werr.New("DATETIME value not a string").WithData(dpRequest{dpID, value})
}
t, err = stringToTime(string(valueStr))
if err != nil {
return werr.Wrap(err).WithData(dpRequest{dpID, value}).Log()
}
_, err = service.Db.Conn.Exec(`INSERT INTO datapoint_value(datapoint_id, value_datetime) VALUES($1, $2)`, dpID, t)
}
if err != nil {
err = werr.Wrap(err).WithData(dpRequest{dpID, value})
return
}
service.Db.Conn.Exec(`UPDATE datapoint SET last_value = NOW(), nodata_is_problem = false WHERE id=$1`, dpID)
return
} // }}}
func DatapointsRetrieve() (dps []Datapoint, err error) { // {{{
dps = []Datapoint{}
var rows *sqlx.Rows
rows, err = service.Db.Conn.Queryx(`
SELECT
dp.id,
dp.name,
dp.datatype,
dp.last_value,
dp.group,
dp.comment,
dp.nodata_problem_seconds,
dpv.id AS v_id,
dpv.ts,
dpv.value_int,
dpv.value_string,
dpv.value_datetime
FROM public.datapoint dp
LEFT JOIN (
SELECT
*,
row_number() OVER (PARTITION BY "datapoint_id" ORDER BY ts DESC) AS rn
FROM datapoint_value
) dpv ON dpv.datapoint_id = dp.id AND rn = 1
ORDER BY
dp.group ASC,
dp.name ASC
`)
if err != nil {
err = werr.Wrap(err)
}
defer rows.Close()
type DbRes struct {
ID int
Group string
Name string
Datatype DatapointType
Comment string
LastValue time.Time `db:"last_value"`
NodataProblemSeconds int `db:"nodata_problem_seconds"`
VID sql.NullInt64 `db:"v_id"`
Ts sql.NullTime
ValueInt sql.NullInt64 `db:"value_int"`
ValueString sql.NullString `db:"value_string"`
ValueDateTime sql.NullTime `db:"value_datetime"`
}
for rows.Next() {
dp := Datapoint{}
dpv := DatapointValue{}
res := DbRes{}
err = rows.StructScan(&res)
if err != nil {
err = werr.Wrap(err)
return
}
dp.ID = res.ID
dp.Name = res.Name
dp.Group = res.Group
dp.Datatype = res.Datatype
dp.Comment = res.Comment
dp.LastValue = res.LastValue
dp.Found = true
dp.NodataProblemSeconds = res.NodataProblemSeconds
if res.VID.Valid {
dpv.ID = int(res.VID.Int64)
dpv.Ts = res.Ts.Time
dpv.ValueInt = res.ValueInt
dpv.ValueString = res.ValueString
dpv.ValueDateTime = res.ValueDateTime
dp.LastDatapointValue = dpv
}
dps = append(dps, dp)
}
return
} // }}}
func DatapointRetrieve(id int, name string) (dp Datapoint, err error) { // {{{
var query string
var param any
if id > 0 {
query = `SELECT *, true AS found FROM datapoint WHERE id = $1`
param = id
dp.ID = id
} else {
query = `SELECT *, true AS found FROM datapoint WHERE name = $1`
param = name
}
row := service.Db.Conn.QueryRowx(query, param)
err = row.StructScan(&dp)
if err == sql.ErrNoRows {
dp = Datapoint{
Name: name,
}
err = nil
return
}
if err != nil {
err = werr.Wrap(err).WithData(name)
return
}
row = service.Db.Conn.QueryRowx(`
SELECT *
FROM datapoint_value
WHERE datapoint_id = $1
ORDER BY ts DESC
LIMIT 1
`,
dp.ID,
)
err = row.StructScan(&dp.LastDatapointValue)
if err == sql.ErrNoRows {
err = nil
return
}
if err != nil {
err = werr.Wrap(err).WithData(dp.ID)
return
}
return
} // }}}
func DatapointDelete(id int) (err error) { // {{{
var dpName string
row := service.Db.Conn.QueryRow(`SELECT name FROM public.datapoint WHERE id = $1`, id)
err = row.Scan(&dpName)
if err != nil {
err = werr.Wrap(err).WithData(id)
return
}
var rows *sql.Rows
rows, err = service.Db.Conn.Query(`SELECT name FROM public.trigger WHERE datapoints ? $1`, dpName)
if err != nil {
err = werr.Wrap(err).WithData(dpName)
return
}
defer rows.Close()
var triggerNames []string
var name string
for rows.Next() {
err = rows.Scan(&name)
if err != nil {
err = werr.Wrap(err)
return
}
triggerNames = append(triggerNames, name)
}
if len(triggerNames) > 0 {
return werr.New("Datapoint '%s' used in the following triggers: %s", dpName, strings.Join(triggerNames, ", "))
}
_, err = service.Db.Conn.Exec(`DELETE FROM datapoint WHERE id=$1`, id)
if err != nil {
err = werr.Wrap(err).WithData(id)
}
return
} // }}}
func DatapointValues(id int, from, to time.Time) (values []DatapointValue, err error) { // {{{
_, err = service.Db.Conn.Exec(`SELECT set_config('timezone', $1, false)`, smonConfig.Timezone().String())
if err != nil {
err = werr.Wrap(err).WithData(smonConfig.Timezone().String())
return
}
rows, err := service.Db.Conn.Queryx(
`
SELECT
id,
datapoint_id,
ts,
value_int,
value_string,
value_datetime
FROM datapoint_value
WHERE
datapoint_id=$1 AND
ts >= $2 AND
ts <= $3
ORDER BY
ts DESC
`,
id,
from,
to,
)
if err != nil {
err = werr.Wrap(err).WithData(id)
return
}
defer rows.Close()
for rows.Next() {
dpv := DatapointValue{}
err = rows.StructScan(&dpv)
if err != nil {
err = werr.Wrap(err).WithData(id)
return
}
values = append(values, dpv)
}
return
} // }}}