diff --git a/postgresql/provider.go b/postgresql/provider.go index 843dfc7b..5d6c5719 100644 --- a/postgresql/provider.go +++ b/postgresql/provider.go @@ -141,6 +141,7 @@ func Provider() terraform.ResourceProvider { "postgresql_extension": resourcePostgreSQLExtension(), "postgresql_grant": resourcePostgreSQLGrant(), "postgresql_grant_role": resourcePostgreSQLGrantRole(), + "postgresql_replication_slot": resourcePostgreSQLReplicationSlot(), "postgresql_schema": resourcePostgreSQLSchema(), "postgresql_role": resourcePostgreSQLRole(), }, diff --git a/postgresql/resource_postgresql_replication_slot.go b/postgresql/resource_postgresql_replication_slot.go new file mode 100644 index 00000000..5d6a56cb --- /dev/null +++ b/postgresql/resource_postgresql_replication_slot.go @@ -0,0 +1,204 @@ +package postgresql + +import ( + "database/sql" + "fmt" + "log" + "strings" + + "github.com/hashicorp/terraform-plugin-sdk/helper/schema" +) + +func resourcePostgreSQLReplicationSlot() *schema.Resource { + return &schema.Resource{ + Create: PGResourceFunc(resourcePostgreSQLReplicationSlotCreate), + Read: PGResourceFunc(resourcePostgreSQLReplicationSlotRead), + Delete: PGResourceFunc(resourcePostgreSQLReplicationSlotDelete), + Exists: PGResourceExistsFunc(resourcePostgreSQLReplicationSlotExists), + Importer: &schema.ResourceImporter{ + State: schema.ImportStatePassthrough, + }, + + Schema: map[string]*schema.Schema{ + "name": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "database": { + Type: schema.TypeString, + Optional: true, + Computed: true, + ForceNew: true, + Description: "Sets the database to add the replication slot to", + }, + "plugin": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Description: "Sets the output plugin to use", + }, + }, + } +} + +func resourcePostgreSQLReplicationSlotCreate(db *DBConnection, d *schema.ResourceData) error { + + name := d.Get("name").(string) + plugin := d.Get("plugin").(string) + databaseName := getDatabaseForReplicationSlot(d, db.client.databaseName) + + txn, err := startTransaction(db.client, databaseName) + if err != nil { + return err + } + defer deferredRollback(txn) + + sql := "SELECT FROM pg_create_logical_replication_slot($1, $2)" + if _, err := txn.Exec(sql, name, plugin); err != nil { + return err + } + + if err = txn.Commit(); err != nil { + return fmt.Errorf("Error creating ReplicationSlot: %w", err) + } + + d.SetId(generateReplicationSlotID(d, databaseName)) + + return resourcePostgreSQLReplicationSlotReadImpl(db, d) +} + +func resourcePostgreSQLReplicationSlotExists(db *DBConnection, d *schema.ResourceData) (bool, error) { + + var ReplicationSlotName string + + database, replicationSlotName, err := getDBReplicationSlotName(d, db.client) + if err != nil { + return false, err + } + + // Check if the database exists + exists, err := dbExists(db, database) + if err != nil || !exists { + return false, err + } + + txn, err := startTransaction(db.client, database) + if err != nil { + return false, err + } + defer deferredRollback(txn) + + query := "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE slot_name = $1 and database = $2" + err = txn.QueryRow(query, replicationSlotName, database).Scan(&ReplicationSlotName) + switch { + case err == sql.ErrNoRows: + return false, nil + case err != nil: + return false, err + } + + return true, nil +} + +func resourcePostgreSQLReplicationSlotRead(db *DBConnection, d *schema.ResourceData) error { + return resourcePostgreSQLReplicationSlotReadImpl(db, d) +} + +func resourcePostgreSQLReplicationSlotReadImpl(db *DBConnection, d *schema.ResourceData) error { + database, replicationSlotName, err := getDBReplicationSlotName(d, db.client) + if err != nil { + return err + } + + txn, err := startTransaction(db.client, database) + if err != nil { + return err + } + defer deferredRollback(txn) + + var replicationSlotPlugin string + query := `SELECT plugin ` + + `FROM pg_catalog.pg_replication_slots ` + + `WHERE slot_name = $1 AND database = $2` + err = txn.QueryRow(query, replicationSlotName, database).Scan(&replicationSlotPlugin) + switch { + case err == sql.ErrNoRows: + log.Printf("[WARN] PostgreSQL ReplicationSlot (%s) not found for database %s", replicationSlotName, database) + d.SetId("") + return nil + case err != nil: + return fmt.Errorf("Error reading ReplicationSlot: %w", err) + } + + d.Set("name", replicationSlotName) + d.Set("plugin", replicationSlotPlugin) + d.Set("database", database) + d.SetId(generateReplicationSlotID(d, database)) + + return nil +} + +func resourcePostgreSQLReplicationSlotDelete(db *DBConnection, d *schema.ResourceData) error { + + replicationSlotName := d.Get("name").(string) + database := getDatabaseForReplicationSlot(d, db.client.databaseName) + + txn, err := startTransaction(db.client, database) + if err != nil { + return err + } + defer deferredRollback(txn) + + sql := "SELECT pg_drop_replication_slot($1)" + if _, err := txn.Exec(sql, replicationSlotName); err != nil { + return err + } + + if err = txn.Commit(); err != nil { + return fmt.Errorf("Error deleting ReplicationSlot: %w", err) + } + + d.SetId("") + + return nil +} + +func getDatabaseForReplicationSlot(d *schema.ResourceData, databaseName string) string { + if v, ok := d.GetOk("database"); ok { + databaseName = v.(string) + } + + return databaseName +} + +func generateReplicationSlotID(d *schema.ResourceData, databaseName string) string { + return strings.Join([]string{ + databaseName, + d.Get("name").(string), + }, ".") +} + +func getReplicationSlotNameFromID(ID string) string { + splitted := strings.Split(ID, ".") + return splitted[0] +} + +// getDBReplicationSlotName returns database and replication slot name. If we are importing this +// resource, they will be parsed from the resource ID (it will return an error if parsing failed) +// otherwise they will be simply get from the state. +func getDBReplicationSlotName(d *schema.ResourceData, client *Client) (string, string, error) { + database := getDatabaseForReplicationSlot(d, client.databaseName) + replicationSlotName := d.Get("name").(string) + + // When importing, we have to parse the ID to find replication slot and database names. + if replicationSlotName == "" { + parsed := strings.Split(d.Id(), ".") + if len(parsed) != 2 { + return "", "", fmt.Errorf("Replication Slot ID %s has not the expected format 'database.replication_slot': %v", d.Id(), parsed) + } + database = parsed[0] + replicationSlotName = parsed[1] + } + return database, replicationSlotName, nil +} diff --git a/postgresql/resource_postgresql_replication_slot_test.go b/postgresql/resource_postgresql_replication_slot_test.go new file mode 100644 index 00000000..69234151 --- /dev/null +++ b/postgresql/resource_postgresql_replication_slot_test.go @@ -0,0 +1,164 @@ +package postgresql + +import ( + "database/sql" + "fmt" + "testing" + + "github.com/hashicorp/terraform-plugin-sdk/helper/resource" + "github.com/hashicorp/terraform-plugin-sdk/terraform" +) + +func TestAccPostgresqlReplicationSlot_Basic(t *testing.T) { + resource.Test(t, resource.TestCase{ + PreCheck: func() { + testAccPreCheck(t) + testSuperuserPreCheck(t) + }, + Providers: testAccProviders, + CheckDestroy: testAccCheckPostgresqlReplicationSlotDestroy, + Steps: []resource.TestStep{ + { + Config: ` + resource "postgresql_replication_slot" "myslot" { + name = "slot" + plugin = "test_decoding" + }`, + Check: resource.ComposeTestCheckFunc( + testAccCheckPostgresqlReplicationSlotExists("postgresql_replication_slot.myslot"), + resource.TestCheckResourceAttr( + "postgresql_replication_slot.myslot", "name", "slot"), + resource.TestCheckResourceAttr( + "postgresql_replication_slot.myslot", "plugin", "test_decoding"), + ), + }, + }, + }) +} + +func testAccCheckPostgresqlReplicationSlotDestroy(s *terraform.State) error { + client := testAccProvider.Meta().(*Client) + + for _, rs := range s.RootModule().Resources { + if rs.Type != "postgresql_replication_slot" { + continue + } + + database, ok := rs.Primary.Attributes[extDatabaseAttr] + if !ok { + return fmt.Errorf("No Attribute for database is set") + } + txn, err := startTransaction(client, database) + if err != nil { + return err + } + defer deferredRollback(txn) + + exists, err := checkReplicationSlotExists(txn, getReplicationSlotNameFromID(rs.Primary.ID)) + + if err != nil { + return fmt.Errorf("Error checking replication slot %s", err) + } + + if exists { + return fmt.Errorf("ReplicationSlot still exists after destroy") + } + } + + return nil +} + +func testAccCheckPostgresqlReplicationSlotExists(n string) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[n] + if !ok { + return fmt.Errorf("Resource not found: %s", n) + } + + if rs.Primary.ID == "" { + return fmt.Errorf("No ID is set") + } + + database, ok := rs.Primary.Attributes[extDatabaseAttr] + if !ok { + return fmt.Errorf("No Attribute for database is set") + } + + extName, ok := rs.Primary.Attributes[extNameAttr] + if !ok { + return fmt.Errorf("No Attribute for replication slot name is set") + } + + client := testAccProvider.Meta().(*Client) + txn, err := startTransaction(client, database) + if err != nil { + return err + } + defer deferredRollback(txn) + + exists, err := checkReplicationSlotExists(txn, extName) + + if err != nil { + return fmt.Errorf("Error checking replication slot %s", err) + } + + if !exists { + return fmt.Errorf("ReplicationSlot not found") + } + + return nil + } +} + +func TestAccPostgresqlReplicationSlot_Database(t *testing.T) { + skipIfNotAcc(t) + + dbSuffix, teardown := setupTestDatabase(t, true, true) + defer teardown() + + dbName, _ := getTestDBNames(dbSuffix) + + testAccPostgresqlReplicationSlotDatabaseConfig := fmt.Sprintf(` + resource "postgresql_replication_slot" "myslot" { + name = "slot" + plugin = "test_decoding" + database = "%s" + } + `, dbName) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { + testAccPreCheck(t) + testSuperuserPreCheck(t) + }, + Providers: testAccProviders, + CheckDestroy: testAccCheckPostgresqlReplicationSlotDestroy, + Steps: []resource.TestStep{ + { + Config: testAccPostgresqlReplicationSlotDatabaseConfig, + Check: resource.ComposeTestCheckFunc( + testAccCheckPostgresqlReplicationSlotExists("postgresql_replication_slot.myslot"), + resource.TestCheckResourceAttr( + "postgresql_replication_slot.myslot", "name", "slot"), + resource.TestCheckResourceAttr( + "postgresql_replication_slot.myslot", "plugin", "test_decoding"), + resource.TestCheckResourceAttr( + "postgresql_replication_slot.myslot", "database", dbName), + ), + }, + }, + }) +} + +func checkReplicationSlotExists(txn *sql.Tx, slotName string) (bool, error) { + var _rez bool + err := txn.QueryRow("SELECT TRUE from pg_catalog.pg_replication_slots d WHERE slot_name=$1", slotName).Scan(&_rez) + switch { + case err == sql.ErrNoRows: + return false, nil + case err != nil: + return false, fmt.Errorf("Error reading info about replication slot: %s", err) + } + + return true, nil +} diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index d580a869..43c49d0a 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -3,6 +3,12 @@ version: "3" services: postgres: image: postgres:${PGVERSION:-latest} + command: + - "postgres" + - "-c" + - "wal_level=logical" + - "-c" + - "max_replication_slots=10" environment: POSTGRES_PASSWORD: ${PGPASSWORD} ports: diff --git a/website/docs/r/postgresql_replication_slot.markdown b/website/docs/r/postgresql_replication_slot.markdown new file mode 100644 index 00000000..28a04417 --- /dev/null +++ b/website/docs/r/postgresql_replication_slot.markdown @@ -0,0 +1,27 @@ +--- +layout: "postgresql" +page_title: "PostgreSQL: postgresql_replication_slot" +sidebar_current: "docs-postgresql-resource-postgresql_replication_slot" +description: |- +Creates and manages a replication slot on a PostgreSQL server. +--- + +# postgresql\_replication\_slot + +The ``postgresql_replication_slot`` resource creates and manages a replication slot on a PostgreSQL +server. + + +## Usage + +```hcl +resource "postgresql_replication_slot" "my_slot" { + name = "my_slot" + plugin = "test_decoding" +} + +## Argument Reference + +* `name` - (Required) The name of the replication slot. +* `plugin` - (Required) Sets the output plugin. +* `database` - (Optional) Which database to create the replication slot on. Defaults to provider database. diff --git a/website/postgresql.erb b/website/postgresql.erb index bce73e5e..fd33f55c 100644 --- a/website/postgresql.erb +++ b/website/postgresql.erb @@ -28,6 +28,9 @@ > postgresql_grant_role + > + postgresql_replication_slot + > postgresql_role