Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Replication Slot Resource #70

Merged
merged 2 commits into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions postgresql/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func Provider() terraform.ResourceProvider {
"postgresql_extension": resourcePostgreSQLExtension(),
"postgresql_grant": resourcePostgreSQLGrant(),
"postgresql_grant_role": resourcePostgreSQLGrantRole(),
"postgresql_replication_slot": resourcePostgreSQLReplicationSlot(),
"postgresql_schema": resourcePostgreSQLSchema(),
"postgresql_role": resourcePostgreSQLRole(),
},
Expand Down
204 changes: 204 additions & 0 deletions postgresql/resource_postgresql_replication_slot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package postgresql

import (
"database/sql"
"fmt"
"log"
"strings"

"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
)

func resourcePostgreSQLReplicationSlot() *schema.Resource {
return &schema.Resource{
Create: PGResourceFunc(resourcePostgreSQLReplicationSlotCreate),
Read: PGResourceFunc(resourcePostgreSQLReplicationSlotRead),
Delete: PGResourceFunc(resourcePostgreSQLReplicationSlotDelete),
Exists: PGResourceExistsFunc(resourcePostgreSQLReplicationSlotExists),
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},

Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"database": {
Type: schema.TypeString,
Optional: true,
Computed: true,
ForceNew: true,
Description: "Sets the database to add the replication slot to",
},
"plugin": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
Description: "Sets the output plugin to use",
},
},
}
}

func resourcePostgreSQLReplicationSlotCreate(db *DBConnection, d *schema.ResourceData) error {

name := d.Get("name").(string)
plugin := d.Get("plugin").(string)
databaseName := getDatabaseForReplicationSlot(d, db.client.databaseName)

txn, err := startTransaction(db.client, databaseName)
if err != nil {
return err
}
defer deferredRollback(txn)

sql := "SELECT FROM pg_create_logical_replication_slot($1, $2)"
if _, err := txn.Exec(sql, name, plugin); err != nil {
return err
}

if err = txn.Commit(); err != nil {
return fmt.Errorf("Error creating ReplicationSlot: %w", err)
}

d.SetId(generateReplicationSlotID(d, databaseName))

return resourcePostgreSQLReplicationSlotReadImpl(db, d)
}

func resourcePostgreSQLReplicationSlotExists(db *DBConnection, d *schema.ResourceData) (bool, error) {

var ReplicationSlotName string

database, replicationSlotName, err := getDBReplicationSlotName(d, db.client)
if err != nil {
return false, err
}

// Check if the database exists
exists, err := dbExists(db, database)
if err != nil || !exists {
return false, err
}

txn, err := startTransaction(db.client, database)
if err != nil {
return false, err
}
defer deferredRollback(txn)

query := "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE slot_name = $1 and database = $2"
err = txn.QueryRow(query, replicationSlotName, database).Scan(&ReplicationSlotName)
switch {
case err == sql.ErrNoRows:
return false, nil
case err != nil:
return false, err
}

return true, nil
}

func resourcePostgreSQLReplicationSlotRead(db *DBConnection, d *schema.ResourceData) error {
return resourcePostgreSQLReplicationSlotReadImpl(db, d)
}

func resourcePostgreSQLReplicationSlotReadImpl(db *DBConnection, d *schema.ResourceData) error {
database, replicationSlotName, err := getDBReplicationSlotName(d, db.client)
if err != nil {
return err
}

txn, err := startTransaction(db.client, database)
if err != nil {
return err
}
defer deferredRollback(txn)

var replicationSlotPlugin string
query := `SELECT plugin ` +
`FROM pg_catalog.pg_replication_slots ` +
`WHERE slot_name = $1 AND database = $2`
err = txn.QueryRow(query, replicationSlotName, database).Scan(&replicationSlotPlugin)
switch {
case err == sql.ErrNoRows:
log.Printf("[WARN] PostgreSQL ReplicationSlot (%s) not found for database %s", replicationSlotName, database)
d.SetId("")
return nil
case err != nil:
return fmt.Errorf("Error reading ReplicationSlot: %w", err)
}

d.Set("name", replicationSlotName)
d.Set("plugin", replicationSlotPlugin)
d.Set("database", database)
d.SetId(generateReplicationSlotID(d, database))

return nil
}

func resourcePostgreSQLReplicationSlotDelete(db *DBConnection, d *schema.ResourceData) error {

replicationSlotName := d.Get("name").(string)
database := getDatabaseForReplicationSlot(d, db.client.databaseName)

txn, err := startTransaction(db.client, database)
if err != nil {
return err
}
defer deferredRollback(txn)

sql := "SELECT pg_drop_replication_slot($1)"
if _, err := txn.Exec(sql, replicationSlotName); err != nil {
return err
}

if err = txn.Commit(); err != nil {
return fmt.Errorf("Error deleting ReplicationSlot: %w", err)
}

d.SetId("")

return nil
}

func getDatabaseForReplicationSlot(d *schema.ResourceData, databaseName string) string {
if v, ok := d.GetOk("database"); ok {
databaseName = v.(string)
}

return databaseName
}

func generateReplicationSlotID(d *schema.ResourceData, databaseName string) string {
return strings.Join([]string{
databaseName,
d.Get("name").(string),
}, ".")
}

func getReplicationSlotNameFromID(ID string) string {
splitted := strings.Split(ID, ".")
return splitted[0]
}

// getDBReplicationSlotName returns database and replication slot name. If we are importing this
// resource, they will be parsed from the resource ID (it will return an error if parsing failed)
// otherwise they will be simply get from the state.
func getDBReplicationSlotName(d *schema.ResourceData, client *Client) (string, string, error) {
database := getDatabaseForReplicationSlot(d, client.databaseName)
replicationSlotName := d.Get("name").(string)

// When importing, we have to parse the ID to find replication slot and database names.
if replicationSlotName == "" {
parsed := strings.Split(d.Id(), ".")
if len(parsed) != 2 {
return "", "", fmt.Errorf("Replication Slot ID %s has not the expected format 'database.replication_slot': %v", d.Id(), parsed)
}
database = parsed[0]
replicationSlotName = parsed[1]
}
return database, replicationSlotName, nil
}
164 changes: 164 additions & 0 deletions postgresql/resource_postgresql_replication_slot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package postgresql

import (
"database/sql"
"fmt"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/terraform"
)

func TestAccPostgresqlReplicationSlot_Basic(t *testing.T) {
resource.Test(t, resource.TestCase{
PreCheck: func() {
testAccPreCheck(t)
testSuperuserPreCheck(t)
},
Providers: testAccProviders,
CheckDestroy: testAccCheckPostgresqlReplicationSlotDestroy,
Steps: []resource.TestStep{
{
Config: `
resource "postgresql_replication_slot" "myslot" {
name = "slot"
plugin = "test_decoding"
}`,
Check: resource.ComposeTestCheckFunc(
testAccCheckPostgresqlReplicationSlotExists("postgresql_replication_slot.myslot"),
resource.TestCheckResourceAttr(
"postgresql_replication_slot.myslot", "name", "slot"),
resource.TestCheckResourceAttr(
"postgresql_replication_slot.myslot", "plugin", "test_decoding"),
),
},
},
})
}

func testAccCheckPostgresqlReplicationSlotDestroy(s *terraform.State) error {
client := testAccProvider.Meta().(*Client)

for _, rs := range s.RootModule().Resources {
if rs.Type != "postgresql_replication_slot" {
continue
}

database, ok := rs.Primary.Attributes[extDatabaseAttr]
if !ok {
return fmt.Errorf("No Attribute for database is set")
}
txn, err := startTransaction(client, database)
if err != nil {
return err
}
defer deferredRollback(txn)

exists, err := checkReplicationSlotExists(txn, getReplicationSlotNameFromID(rs.Primary.ID))

if err != nil {
return fmt.Errorf("Error checking replication slot %s", err)
}

if exists {
return fmt.Errorf("ReplicationSlot still exists after destroy")
}
}

return nil
}

func testAccCheckPostgresqlReplicationSlotExists(n string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Resource not found: %s", n)
}

if rs.Primary.ID == "" {
return fmt.Errorf("No ID is set")
}

database, ok := rs.Primary.Attributes[extDatabaseAttr]
if !ok {
return fmt.Errorf("No Attribute for database is set")
}

extName, ok := rs.Primary.Attributes[extNameAttr]
if !ok {
return fmt.Errorf("No Attribute for replication slot name is set")
}

client := testAccProvider.Meta().(*Client)
txn, err := startTransaction(client, database)
if err != nil {
return err
}
defer deferredRollback(txn)

exists, err := checkReplicationSlotExists(txn, extName)

if err != nil {
return fmt.Errorf("Error checking replication slot %s", err)
}

if !exists {
return fmt.Errorf("ReplicationSlot not found")
}

return nil
}
}

func TestAccPostgresqlReplicationSlot_Database(t *testing.T) {
skipIfNotAcc(t)

dbSuffix, teardown := setupTestDatabase(t, true, true)
defer teardown()

dbName, _ := getTestDBNames(dbSuffix)

testAccPostgresqlReplicationSlotDatabaseConfig := fmt.Sprintf(`
resource "postgresql_replication_slot" "myslot" {
name = "slot"
plugin = "test_decoding"
database = "%s"
}
`, dbName)

resource.Test(t, resource.TestCase{
PreCheck: func() {
testAccPreCheck(t)
testSuperuserPreCheck(t)
},
Providers: testAccProviders,
CheckDestroy: testAccCheckPostgresqlReplicationSlotDestroy,
Steps: []resource.TestStep{
{
Config: testAccPostgresqlReplicationSlotDatabaseConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckPostgresqlReplicationSlotExists("postgresql_replication_slot.myslot"),
resource.TestCheckResourceAttr(
"postgresql_replication_slot.myslot", "name", "slot"),
resource.TestCheckResourceAttr(
"postgresql_replication_slot.myslot", "plugin", "test_decoding"),
resource.TestCheckResourceAttr(
"postgresql_replication_slot.myslot", "database", dbName),
),
},
},
})
}

func checkReplicationSlotExists(txn *sql.Tx, slotName string) (bool, error) {
var _rez bool
err := txn.QueryRow("SELECT TRUE from pg_catalog.pg_replication_slots d WHERE slot_name=$1", slotName).Scan(&_rez)
switch {
case err == sql.ErrNoRows:
return false, nil
case err != nil:
return false, fmt.Errorf("Error reading info about replication slot: %s", err)
}

return true, nil
}
6 changes: 6 additions & 0 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ version: "3"
services:
postgres:
image: postgres:${PGVERSION:-latest}
command:
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=10"
environment:
POSTGRES_PASSWORD: ${PGPASSWORD}
ports:
Expand Down
Loading