Skip to content

Commit

Permalink
Merge pull request #8 from getyourguide/feat/dump-to-file
Browse files Browse the repository at this point in the history
Feat/dump to file
  • Loading branch information
ebrard authored May 31, 2024
2 parents 6a3d77a + 6edf8c8 commit 2b871f3
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 39 deletions.
20 changes: 18 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# PgOutputDump
A simple CLI to read pgoutput format from a replication slot

A simple CLI to read pgoutput format from a replication slot and optionally dump
it to a file. This file can later be read back using the same CLI.

# Usage

## Read replication stream from live database

Note: Java >= 11 is required.

```bash
Expand All @@ -16,5 +20,17 @@ java -jar target/PgOutputDump-jar-with-dependencies.jar \
-c 10
```

This tool will not create nor the slot nor the replication, they need to exist
To dump a copy of the binary stream, use the `--file output.bin` option. The file is simply written
as the payload size (int32) followed by the payload itself for each received replication event.

```bash

## Read replication stream from a local file

```bash
java -jar target/PgOutputDump-jar-with-dependencies.jar \
--file output.bin
```

Note: This tool will not create nor the slot nor the replication, they need to exist
beforehand.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.getyourguide</groupId>
<artifactId>PgOutputDump</artifactId>
<version>0.1</version>
<version>0.2</version>
<packaging>jar</packaging>

<properties>
Expand Down Expand Up @@ -81,7 +81,7 @@
</googleJavaFormat>
</java>
</configuration>
</plugin>
</plugin>

</plugins>
</build>
Expand Down
129 changes: 95 additions & 34 deletions src/main/java/org/getyourguide/App.java
Original file line number Diff line number Diff line change
@@ -1,57 +1,80 @@
package org.getyourguide;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import org.getyourguide.source.FileStreamSource;
import org.getyourguide.source.PgReplicationSource;
import org.getyourguide.source.Source;
import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.replication.PGReplicationStream;
import picocli.CommandLine;
import picocli.CommandLine.Option;

public class App implements Callable<Integer> {

private static final Logger LOGGER = Logger.getLogger(App.class.getName());

@Option(names = "--ssl", description = "Use secured connection")
boolean useSsl;

@Option(names = "--silent", description = "Do not display messages")
boolean isSilent;

@Option(names = "--dump", description = "File name to dump to")
String outFileName;

@Option(
names = {"-h", "--host"},
description = "The database hostname",
required = true)
required = false)
private String host;

@Option(
names = {"-f", "--file"},
description = "The WAL archive file to read from",
required = false)
private String file;

@Option(
names = {"-d", "--database"},
description = "The database to connect to",
required = true)
required = false)
private String dbName;

@Option(
names = {"-U", "--user"},
description = "The user login",
required = true)
required = false)
private String userName;

@Option(
names = {"-P", "--password"},
description = "The user password",
required = true)
required = false)
private String password;

@Option(
names = {"-s", "--slot"},
description = "The name of the slot, must already exist",
required = true)
required = false)
private String slot;

@Option(
names = {"-p", "--publication"},
description = "The name of the publication, must already exist",
required = true)
required = false)
private String publication;

@Option(
Expand All @@ -60,48 +83,60 @@ public class App implements Callable<Integer> {
required = false)
private Integer maxCount;

private PGReplicationStream stream;
private Connection conn;
private DataOutputStream dos;
private Source source;
private AtomicBoolean closed = new AtomicBoolean(false);

public static void main(String[] args) {
int exitCode = new CommandLine(new App()).execute(args);
System.exit(exitCode);
}

@Override
public Integer call() throws SQLException, InterruptedException {

String url = String.format("jdbc:postgresql://%s/%s?ssl=%s", host, dbName, useSsl);
Properties props = new Properties();
PGProperty.USER.set(props, userName);
PGProperty.PASSWORD.set(props, password);
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");

conn = DriverManager.getConnection(url, props);
PGConnection replConnection = conn.unwrap(PGConnection.class);

stream =
replConnection
.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName(slot)
.withSlotOption("proto_version", 4)
.withSlotOption("publication_names", publication)
.withSlotOption("messages", "true")
.start();
public Integer call() throws SQLException, InterruptedException, IOException {
LOGGER.info("Application starting");

if (file == null) {
String url = String.format("jdbc:postgresql://%s/%s?ssl=%s", host, dbName, useSsl);
Properties props = new Properties();
PGProperty.USER.set(props, userName);
PGProperty.PASSWORD.set(props, password);
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");

conn = DriverManager.getConnection(url, props);
PGConnection replConnection = conn.unwrap(PGConnection.class);
source = new PgReplicationSource(replConnection, slot, publication);
} else {
source = new FileStreamSource(file);
}

if (outFileName != null) {
File file = new File(outFileName);
var fos = new FileOutputStream(file);
this.dos = new DataOutputStream(fos);
}

Runtime.getRuntime().addShutdownHook(new Thread(this::close) {});

var decoder = new PgOutputDecoder();

var msgCount = 0;
var byteWritten = 0;

while (!closed.get()) {

while (true) {
ByteBuffer msg;

ByteBuffer msg = stream.readPending();
try {
msg = source.readPending();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}

var lsn = source.getLastReceiveLSN();

if (msg == null) {
TimeUnit.MILLISECONDS.sleep(10L);
Expand All @@ -110,6 +145,28 @@ public Integer call() throws SQLException, InterruptedException {

msgCount += 1;

if (outFileName != null) {
int len = msg.remaining();
byte[] data = new byte[len];
msg.get(data);

for (int i = 0; i < len + 4; i++) {
byteWritten++;
if (byteWritten % (10 * 1024 * 1024) == 0) {
System.out.printf(
"[%s] %s MB written so far to file\n",
LocalDateTime.now(), dos.size() / 1024 / 1024);
}
}

dos.writeInt(len);
dos.write(data);
msg.rewind();
}

if (isSilent) continue;

System.out.printf("----| %s |----\n", lsn);
decoder.decodeMessage(msg);

if (maxCount != null && msgCount == maxCount) {
Expand All @@ -121,8 +178,12 @@ public Integer call() throws SQLException, InterruptedException {
}

private void close() {
System.out.print("Closing application\n");
closed.set(true);
try {
stream.close();
dos.flush();
dos.close();
source.close();
conn.close();
} catch (Exception ignore) {
// no handling
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/getyourguide/PgOutputDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ public void decodeMessage(ByteBuffer msg) {
}
default:
{
System.out.printf("Type %s not implemented", type);
System.out.printf("Type %s not implemented\n", type);
System.out.printf("Message: %s\n", new String(msg.array()));
}
previousType = type;
}
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/org/getyourguide/models/Lsn.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.getyourguide.models;

import java.nio.ByteBuffer;

public class Lsn {
private final long value;

public Lsn(long lsn) {
this.value = lsn;
}

@Override
public String toString() {
return String.format("%s, [hex:%s]", value, asHexString());
}

private String asHexString() {
final ByteBuffer buf = ByteBuffer.allocate(8);
buf.putLong(value);
buf.position(0);

final int logicalXlog = buf.getInt();
final int segment = buf.getInt();
return String.format("%X/%X", logicalXlog, segment);
}
}
48 changes: 48 additions & 0 deletions src/main/java/org/getyourguide/source/FileStreamSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.getyourguide.source;

import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;

public class FileStreamSource implements Source {

private int count = 0;
private final DataInputStream dataInputStream;

public FileStreamSource(String fileName) {
try {
dataInputStream = new DataInputStream(new FileInputStream(fileName));
} catch (FileNotFoundException e) {
throw new RuntimeException("File not found: " + fileName);
}
}

@Override
public ByteBuffer readPending() throws Exception {
count++;
if (dataInputStream.available() == 0) {
System.out.printf("End of File: %s events read\n", count);
throw new RuntimeException("End of File");
}

// Read the size of the message
var size = dataInputStream.readInt();
System.out.printf("Reading %s bytes\n", size);

// Read the message
var data = dataInputStream.readNBytes(size);
return ByteBuffer.wrap(data);
}

@Override
public long getLastReceiveLSN() {
return 0;
}

@Override
public void close() throws IOException {
dataInputStream.close();
}
}
Loading

0 comments on commit 2b871f3

Please sign in to comment.