Compare commits

..

No commits in common. "main" and "v0.1.0" have entirely different histories.
main ... v0.1.0

3 changed files with 135 additions and 161 deletions

View file

@ -1,11 +1,8 @@
package dbschema package dbschema
import ( import (
// External
"github.com/jackc/pgx/v5/pgxpool"
// Standard // Standard
"context" "database/sql"
"fmt" "fmt"
) )
@ -16,21 +13,17 @@ func newDatabase(host string, port int, dbName, user, pass string) (dbase Databa
dbase.Username = user dbase.Username = user
dbase.Password = pass dbase.Password = pass
dbase.db, err = pgxpool.New(context.Background(), dbase.sqlConnString()) dbase.db, err = sql.Open("postgres", dbase.sqlConnString())
return return
}// }}} }// }}}
func databaseFromInstance(db *pgxpool.Pool) (dbase Database, err error) {
dbase.db = db
return
}
func (dbase Database) sqlConnString() string {// {{{ func (dbase Database) sqlConnString() string {// {{{
return fmt.Sprintf( return fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", "postgresql://%s:%s@%s:%d/%s?sslmode=disable",
dbase.Host,
dbase.Port,
dbase.Username, dbase.Username,
dbase.Password, dbase.Password,
dbase.Host,
dbase.Port,
dbase.DbName, dbase.DbName,
) )
}// }}} }// }}}

View file

@ -1,33 +1,16 @@
/*
Package dbschema is used to keep the SQL schema up to date.
func sqlProvider(dbName string, version int) (sql []byte, found bool) {
// read an SQL file and return the contents
return
}
upgrader := dbschema.NewUpgrader()
upgrader.SetSqlCallback(sqlProvider)
if err = upgrader.AddDatabase("127.0.0.1", 5432, "foo", "postgres", "password"); err != nil {
panic(err)
}
if err = upgrader.Run(); err != nil {
panic(err)
}
*/
package dbschema package dbschema
import ( import (
// External // External
"github.com/jackc/pgx/v5/pgxpool" _ "github.com/lib/pq"
// Standard
"database/sql"
) )
// An upgrader verifies the schema for one or more databases and upgrades them if possible.
type Upgrader struct { type Upgrader struct {
schema string schemaDb Database
databases map[string]Database databases map[string]Database
logCallback func(string, string) logCallback func(string, string)
sqlCallback func(string, int) ([]byte, bool) sqlCallback func(string, int) ([]byte, bool)
} }
@ -39,8 +22,50 @@ type Database struct {
Username string Username string
Password string Password string
db *pgxpool.Pool db *sql.DB
upgrader *Upgrader schemaDb *Database
} }
/*
func dbUpdate() (err error) {// {{{
var rows *sqlx.Rows
var schemaStr string
var schema int
rows, err = db.Queryx(`SELECT value FROM _internal.db WHERE "key"='schema'`)
if err != nil { return }
defer rows.Close()
if !rows.Next() {
return errors.New("Table _interval.db missing schema row")
}
if err = rows.Scan(&schemaStr); err != nil {
return
}
// Run updates
schema, err = strconv.Atoi(schemaStr)
if err != nil {
return err
}
for i := (schema+1); i <= DB_SCHEMA; i++ {
log.Printf("\x1b[32mNotes\x1b[0m Upgrading SQL schema to revision %d...", i)
sql, _ := embedded.ReadFile(
fmt.Sprintf("sql/%04d.sql", i),
)
_, err = db.Exec(string(sql))
if err != nil {
return
}
_, err = db.Exec(`UPDATE _internal.db SET "value"=$1 WHERE "key"='schema'`, i)
if err != nil {
return
}
log.Printf("\x1b[32mNotes\x1b[0m OK: %d", i)
}
return
}// }}}
*/
// vim: foldmethod=marker // vim: foldmethod=marker

View file

@ -1,89 +1,41 @@
package dbschema package dbschema
import ( import (
// External
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
// Standard // Standard
"context" "database/sql"
"fmt" "fmt"
) )
func defaultCallback(topic, msg string) { // {{{ func defaultCallback(topic, msg string) {// {{{
fmt.Printf("[%s] %s\n", topic, msg) fmt.Printf("[%s] %s\n", topic, msg)
} // }}} }// }}}
func NewUpgrader(host string, port int, dbName, user, pass string) (upgrader Upgrader, err error) {// {{{
// NewUpgrader creates an upgrader with an empty list of databases.
func NewUpgrader(schema ...string) (upgrader Upgrader) { // {{{
// Using a variadic function for backward compatibility.
if len(schema) > 0 {
upgrader.schema = schema[0]
} else {
upgrader.schema = "_db"
}
upgrader.logCallback = defaultCallback upgrader.logCallback = defaultCallback
upgrader.databases = map[string]Database{} upgrader.databases = map[string]Database{}
return upgrader.schemaDb, err = newDatabase(
} // }}} host,
port,
// SetLogCallback allows to set a callback for custom logging. dbName,
func (upgrader *Upgrader) SetLogCallback(callback func(string, string)) { // {{{ user,
upgrader.logCallback = callback pass,
} // }}}
// SetSqlCallback is required for providing the SQL schema updates.
func (upgrader *Upgrader) SetSqlCallback(callback func(string, int) ([]byte, bool)) { // {{{
upgrader.sqlCallback = callback
} // }}}
// Version returns the current dbschema version for the given database name.
func (upgrader *Upgrader) Version(dbName string) (version int, err error) { // {{{
dbase, found := upgrader.databases[dbName]
if !found {
err = fmt.Errorf("Database %s not previously added to the upgrader", dbName)
return
}
version, err = dbase.Version()
return
} // }}}
func (dbase Database) createSchemaTable() (err error) { // {{{
dbase.upgrader.logCallback("create", fmt.Sprintf("%s, %s.schema", dbase.DbName, dbase.upgrader.schema))
_, err = dbase.db.Exec(context.Background(), `CREATE SCHEMA "` + dbase.upgrader.schema + `"`)
// Error code 42P06 "duplicate_schema" is an OK error,
// table can still be missing and created.
pqErr, _ := err.(*pgconn.PgError)
if pqErr != nil && pqErr.Code != "42P06" {
return
}
_, err = dbase.db.Exec(
context.Background(),
`CREATE TABLE "` + dbase.upgrader.schema + `"."schema" (
version int4 NOT NULL,
updated timestamp NOT NULL DEFAULT NOW(),
CONSTRAINT schema_pk PRIMARY KEY (version)
)`,
) )
err = upgrader.verifySchemaTable()
return return
} // }}} }// }}}
func (dbase Database) appendSchemaVersion(version int) (err error) { // {{{
_, err = dbase.db.Exec(context.Background(), `INSERT INTO `+dbase.upgrader.schema+`.schema(version) VALUES($1)`, version)
return
} // }}}
func (dbase Database) verifySchemaTable() (err error) { // {{{ func (upgrader *Upgrader) SetLogCallback(callback func(string, string)) {// {{{
var rows pgx.Rows upgrader.logCallback = callback
if rows, err = dbase.db.Query( }// }}}
context.Background(), func (upgrader *Upgrader) SetSqlCallback(callback func(string, int) ([]byte, bool)) {// {{{
upgrader.sqlCallback = callback
}// }}}
func (upgrader Upgrader) verifySchemaTable() (err error) {// {{{
var rows *sql.Rows
if rows, err = upgrader.schemaDb.db.Query(
`SELECT EXISTS ( `SELECT EXISTS (
SELECT FROM pg_catalog.pg_class c SELECT FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = '` + dbase.upgrader.schema + `' WHERE n.nspname = '_db'
AND c.relname = 'schema' AND c.relname = 'schema'
)`, )`,
); err != nil { ); err != nil {
@ -96,30 +48,48 @@ func (dbase Database) verifySchemaTable() (err error) { // {{{
return return
} }
if exists { if !exists {
upgrader.logCallback("create", "_db.schema")
upgrader.schemaDb.db.Exec(`CREATE SCHEMA "_db"`)
if _, err = upgrader.schemaDb.db.Exec(`
CREATE TABLE "_db"."schema" (
database varchar NOT NULL,
version int4 NOT NULL,
updated timestamp NOT NULL DEFAULT NOW(),
CONSTRAINT schema_pk PRIMARY KEY (database)
);
`,
); err != nil {
return
}
}
return
}// }}}
func (upgrader Upgrader) verifySchemaEntry(dbase Database) (err error) {// {{{
var rows *sql.Rows
rows, err = upgrader.schemaDb.db.Query(`SELECT version FROM _db.schema WHERE database=$1`, dbase.DbName)
if err != nil {
return return
} }
err = dbase.createSchemaTable() defer rows.Close()
return
} // }}}
func (dbase Database) verifySchemaEntry() (err error) { // {{{
var version int
var row pgx.Row
row = dbase.db.QueryRow(context.Background(), `SELECT version FROM `+dbase.upgrader.schema+`.schema LIMIT 1`)
err = row.Scan(&version) if !rows.Next() {
if err == pgx.ErrNoRows { upgrader.logCallback("initiate version", dbase.DbName)
dbase.upgrader.logCallback("initiate version", dbase.DbName) _, err = upgrader.schemaDb.db.Exec(`INSERT INTO _db.schema(database, version) VALUES($1, 0)`, dbase.DbName)
err = dbase.appendSchemaVersion(0) if err != nil {
return
}
} }
return return
} // }}} }// }}}
func (dbase Database) Version() (version int, err error) { // {{{ func (upgrader Upgrader) version(dbName string) (version int, err error) {// {{{
var rows pgx.Rows var rows *sql.Rows
rows, err = dbase.db.Query( rows, err = upgrader.schemaDb.db.Query(
context.Background(), `SELECT version FROM _db.schema WHERE database=$1`,
`SELECT version FROM ` + dbase.upgrader.schema + `.schema ORDER BY version DESC LIMIT 1`, dbName,
) )
if err != nil { if err != nil {
return return
@ -129,52 +99,31 @@ func (dbase Database) Version() (version int, err error) { // {{{
if rows.Next() { if rows.Next() {
err = rows.Scan(&version) err = rows.Scan(&version)
} else { } else {
err = fmt.Errorf(`Database "%s" is missing an entry in `+dbase.upgrader.schema+`.schema`, dbase.DbName) err = fmt.Errorf(`Database "%s" is missing an entry in _db.schema`, dbName)
} }
return return
} // }}} }// }}}
// AddDatabase sets a database up for the Run() function with verifying/creating the _db.schema table. func (upgrader Upgrader) AddDatabase(host string, port int, dbName, user, pass string) (err error) {// {{{
func (upgrader Upgrader) AddDatabase(host string, port int, dbName, user, pass string) (db Database, err error) { // {{{ var db Database
if db, err = newDatabase(host, port, dbName, user, pass); err != nil { if db, err = newDatabase(host, port, dbName, user, pass); err != nil {
return return
} }
db.upgrader = &upgrader
upgrader.databases[dbName] = db upgrader.databases[dbName] = db
if err = db.verifySchemaTable(); err != nil { err = upgrader.verifySchemaEntry(db)
return
}
err = db.verifySchemaEntry()
return return
} // }}} }// }}}
func (upgrader Upgrader) AddDatabaseInstance(sqlDB *pgxpool.Pool, dbName string) (db Database, err error) { // {{{ func (upgrader Upgrader) Run() (err error) {// {{{
db, err = databaseFromInstance(sqlDB)
db.upgrader = &upgrader
upgrader.databases[dbName] = db
if err = db.verifySchemaTable(); err != nil {
return
}
err = db.verifySchemaEntry()
return
} // }}}
// Run executes the actual schema updates until there are no more available.
func (upgrader Upgrader) Run() (err error) { // {{{
var version int var version int
for dbName, dbase := range upgrader.databases { for dbName, db := range upgrader.databases {
version, err = dbase.Version() version, err = upgrader.version(dbName)
if err != nil { if err != nil {
return return
} }
upgrader.logCallback("version", fmt.Sprintf("%s.%s: %d", dbName, upgrader.schema, version)) upgrader.logCallback("version", fmt.Sprintf("%s: %d", dbName, version))
for { for {
version++ version++
@ -183,16 +132,23 @@ func (upgrader Upgrader) Run() (err error) { // {{{
break break
} }
upgrader.logCallback("exec", fmt.Sprintf("%s.%s: %d", dbName, upgrader.schema, version)) upgrader.logCallback("exec", fmt.Sprintf("%s: %d", dbName, version))
if _, err = dbase.db.Exec(context.Background(), string(sql)); err != nil { if _, err = db.db.Exec(string(sql)); err != nil {
return return
} }
if err = dbase.appendSchemaVersion(version); err != nil { _, err = upgrader.schemaDb.db.Exec(`
UPDATE _db.schema
SET
version=$1,
updated=NOW()
WHERE database=$2
`, version, dbName)
if err != nil {
return return
} }
} }
} }
return return
} // }}} }// }}}
// vim: foldmethod=marker // vim: foldmethod=marker