From 55c706ceb74aa4b615ef1c51e616096f47e3e006 Mon Sep 17 00:00:00 2001 From: George Date: Fri, 7 Feb 2025 13:52:46 -0800 Subject: [PATCH] `getLedgerEntries`: optionally use high-performance Core server (#353) * Replace getLedgerEntries DB queries with Core fetches * Add infrastructure for testing the new Core http query server * Sort entries in response according to request order * Only test the query server from protocol 23 onwards * Enable debug printouts for integration tests * Make sure all ports are allocated at once to minimize clashes --------- Co-authored-by: Alfonso Acosta --- .github/workflows/stellar-rpc.yml | 8 +- cmd/stellar-rpc/internal/config/main.go | 13 +- cmd/stellar-rpc/internal/config/options.go | 23 +- .../internal/config/test.soroban.rpc.config | 1 + cmd/stellar-rpc/internal/config/toml_test.go | 2 + cmd/stellar-rpc/internal/daemon/daemon.go | 43 ++- .../internal/daemon/interfaces/interfaces.go | 6 + .../internal/daemon/interfaces/noOpDaemon.go | 11 + cmd/stellar-rpc/internal/daemon/metrics.go | 4 + .../get_ledger_entries_test.go | 41 ++- .../captive-core-integration-tests.cfg.tmpl | 2 +- .../infrastructure/docker/docker-compose.yml | 5 +- .../integrationtest/infrastructure/test.go | 131 +++++---- cmd/stellar-rpc/internal/jsonrpc.go | 11 +- .../internal/methods/get_ledger_entries.go | 266 +++++++++++++----- 15 files changed, 427 insertions(+), 140 deletions(-) diff --git a/.github/workflows/stellar-rpc.yml b/.github/workflows/stellar-rpc.yml index 6b515d2d..f6ca14b3 100644 --- a/.github/workflows/stellar-rpc.yml +++ b/.github/workflows/stellar-rpc.yml @@ -104,10 +104,10 @@ jobs: STELLAR_RPC_INTEGRATION_TESTS_ENABLED: true STELLAR_RPC_INTEGRATION_TESTS_CORE_MAX_SUPPORTED_PROTOCOL: ${{ matrix.protocol-version }} STELLAR_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core - PROTOCOL_21_CORE_DEBIAN_PKG_VERSION: 22.0.0-2138.721fd0a65.focal - PROTOCOL_21_CORE_DOCKER_IMG: stellar/stellar-core:22.0.0-2138.721fd0a65.focal - PROTOCOL_22_CORE_DEBIAN_PKG_VERSION: 22.0.0-2138.721fd0a65.focal - PROTOCOL_22_CORE_DOCKER_IMG: stellar/stellar-core:22.0.0-2138.721fd0a65.focal + PROTOCOL_21_CORE_DEBIAN_PKG_VERSION: 22.1.0-2194.0241e79f7.focal + PROTOCOL_21_CORE_DOCKER_IMG: stellar/stellar-core:22.1.0-2194.0241e79f7.focal + PROTOCOL_22_CORE_DEBIAN_PKG_VERSION: 22.1.1-2251.ac9f21ac7.focal~do~not~use~in~prd + PROTOCOL_22_CORE_DOCKER_IMG: stellar/unsafe-stellar-core:22.1.1-2251.ac9f21ac7.focal-do-not-use-in-prd steps: - uses: actions/checkout@v4 diff --git a/cmd/stellar-rpc/internal/config/main.go b/cmd/stellar-rpc/internal/config/main.go index 710968f3..40a6a6f9 100644 --- a/cmd/stellar-rpc/internal/config/main.go +++ b/cmd/stellar-rpc/internal/config/main.go @@ -14,11 +14,14 @@ type Config struct { Strict bool - StellarCoreURL string - CaptiveCoreStoragePath string - StellarCoreBinaryPath string - CaptiveCoreConfigPath string - CaptiveCoreHTTPPort uint + StellarCoreURL string + CaptiveCoreStoragePath string + StellarCoreBinaryPath string + CaptiveCoreConfigPath string + CaptiveCoreHTTPPort uint16 + CaptiveCoreHTTPQueryPort uint16 + CaptiveCoreHTTPQueryThreadPoolSize uint16 + CaptiveCoreHTTPQuerySnapshotLedgers uint16 Endpoint string AdminEndpoint string diff --git a/cmd/stellar-rpc/internal/config/options.go b/cmd/stellar-rpc/internal/config/options.go index 07acd6ca..21283182 100644 --- a/cmd/stellar-rpc/internal/config/options.go +++ b/cmd/stellar-rpc/internal/config/options.go @@ -20,7 +20,8 @@ const ( OneDayOfLedgers = 17280 SevenDayOfLedgers = OneDayOfLedgers * 7 - defaultHTTPEndpoint = "localhost:8000" + defaultHTTPEndpoint = "localhost:8000" + defaultCaptiveCoreHTTPPort = 11626 // regular queries like /info ) // TODO: refactor and remove the linter exceptions @@ -84,7 +85,25 @@ func (cfg *Config) options() Options { Name: "stellar-captive-core-http-port", Usage: "HTTP port for Captive Core to listen on (0 disables the HTTP server)", ConfigKey: &cfg.CaptiveCoreHTTPPort, - DefaultValue: uint(11626), + DefaultValue: uint16(defaultCaptiveCoreHTTPPort), + }, + { + Name: "stellar-captive-core-http-query-port", + Usage: "HTTP port for Captive Core to listen on for high-performance queries like /getledgerentry (0 disables the HTTP server, must not conflict with CAPTIVE_CORE_HTTP_PORT)", + ConfigKey: &cfg.CaptiveCoreHTTPQueryPort, + DefaultValue: uint16(0), // Disabled by default, although it normally uses 11628 + }, + { + Name: "stellar-captive-core-http-query-thread-pool-size", + Usage: "Number of threads to use by Captive Core's high-performance query server", + ConfigKey: &cfg.CaptiveCoreHTTPQueryThreadPoolSize, + DefaultValue: uint16(runtime.NumCPU()), //nolint:gosec + }, + { + Name: "stellar-captive-core-http-query-snapshot-ledgers", + Usage: "Size of ledger history in Captive Core's high-performance query server (don't touch unless you know what you are doing)", + ConfigKey: &cfg.CaptiveCoreHTTPQuerySnapshotLedgers, + DefaultValue: uint16(4), }, { Name: "log-level", diff --git a/cmd/stellar-rpc/internal/config/test.soroban.rpc.config b/cmd/stellar-rpc/internal/config/test.soroban.rpc.config index 22e5475c..b5b26227 100644 --- a/cmd/stellar-rpc/internal/config/test.soroban.rpc.config +++ b/cmd/stellar-rpc/internal/config/test.soroban.rpc.config @@ -8,4 +8,5 @@ STELLAR_CORE_BINARY_PATH="/usr/bin/stellar-core" HISTORY_ARCHIVE_URLS=["http://localhost:1570"] DB_PATH="/opt/stellar/stellar-rpc/rpc_db.sqlite" STELLAR_CAPTIVE_CORE_HTTP_PORT=0 +STELLAR_CAPTIVE_CORE_HTTP_QUERY_PORT=11628 CHECKPOINT_FREQUENCY=64 diff --git a/cmd/stellar-rpc/internal/config/toml_test.go b/cmd/stellar-rpc/internal/config/toml_test.go index 16d8c2d8..c095e3ed 100644 --- a/cmd/stellar-rpc/internal/config/toml_test.go +++ b/cmd/stellar-rpc/internal/config/toml_test.go @@ -113,6 +113,8 @@ func TestRoundTrip(t *testing.T) { *v = "test" case *uint: *v = 42 + case *uint16: + *v = 22 case *uint32: *v = 32 case *time.Duration: diff --git a/cmd/stellar-rpc/internal/daemon/daemon.go b/cmd/stellar-rpc/internal/daemon/daemon.go index e9de64d9..17864ebe 100644 --- a/cmd/stellar-rpc/internal/daemon/daemon.go +++ b/cmd/stellar-rpc/internal/daemon/daemon.go @@ -3,6 +3,7 @@ package daemon import ( "context" "errors" + "fmt" "net" "net/http" "net/http/pprof" @@ -49,6 +50,7 @@ const ( type Daemon struct { core *ledgerbackend.CaptiveStellarCore coreClient *CoreClientWithMetrics + coreQueryingClient interfaces.FastCoreClient ingestService *ingest.Service db *db.DB jsonRPCHandler *internal.Handler @@ -120,8 +122,19 @@ func (d *Daemon) Close() error { // newCaptiveCore creates a new captive core backend instance and returns it. func newCaptiveCore(cfg *config.Config, logger *supportlog.Entry) (*ledgerbackend.CaptiveStellarCore, error) { + var queryServerParams *ledgerbackend.HTTPQueryServerParams + if cfg.CaptiveCoreHTTPQueryPort != 0 { + // Only try to enable the server if the port passed is non-zero + queryServerParams = &ledgerbackend.HTTPQueryServerParams{ + Port: cfg.CaptiveCoreHTTPQueryPort, + ThreadPoolSize: cfg.CaptiveCoreHTTPQueryThreadPoolSize, + SnapshotLedgers: cfg.CaptiveCoreHTTPQuerySnapshotLedgers, + } + } + + httpPort := uint(cfg.CaptiveCoreHTTPPort) captiveCoreTomlParams := ledgerbackend.CaptiveCoreTomlParams{ - HTTPPort: &cfg.CaptiveCoreHTTPPort, + HTTPPort: &httpPort, HistoryArchiveURLs: cfg.HistoryArchiveURLs, NetworkPassphrase: cfg.NetworkPassphrase, Strict: true, @@ -129,6 +142,7 @@ func newCaptiveCore(cfg *config.Config, logger *supportlog.Entry) (*ledgerbacken EnforceSorobanDiagnosticEvents: true, EnforceSorobanTransactionMetaExtV1: true, CoreBinaryPath: cfg.StellarCoreBinaryPath, + HTTPQueryServerParams: queryServerParams, } captiveCoreToml, err := ledgerbackend.NewCaptiveCoreTomlFromFile(cfg.CaptiveCoreConfigPath, captiveCoreTomlParams) if err != nil { @@ -156,12 +170,13 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon { metricsRegistry := prometheus.NewRegistry() daemon := &Daemon{ - logger: logger, - core: core, - db: mustOpenDatabase(cfg, logger, metricsRegistry), - done: make(chan struct{}), - metricsRegistry: metricsRegistry, - coreClient: newCoreClientWithMetrics(createStellarCoreClient(cfg), metricsRegistry), + logger: logger, + core: core, + db: mustOpenDatabase(cfg, logger, metricsRegistry), + done: make(chan struct{}), + metricsRegistry: metricsRegistry, + coreClient: newCoreClientWithMetrics(createStellarCoreClient(cfg), metricsRegistry), + coreQueryingClient: createHighperfStellarCoreClient(cfg), } feewindows := daemon.mustInitializeStorage(cfg) @@ -235,6 +250,17 @@ func createStellarCoreClient(cfg *config.Config) stellarcore.Client { } } +func createHighperfStellarCoreClient(cfg *config.Config) interfaces.FastCoreClient { + // It doesn't make sense to create a client if the local server is not enabled + if cfg.CaptiveCoreHTTPQueryPort == 0 { + return nil + } + return &stellarcore.Client{ + URL: fmt.Sprintf("http://localhost:%d", cfg.CaptiveCoreHTTPQueryPort), + HTTP: &http.Client{Timeout: cfg.CoreRequestTimeout}, + } +} + func createIngestService(cfg *config.Config, logger *supportlog.Entry, daemon *Daemon, feewindows *feewindow.FeeWindows, historyArchive *historyarchive.ArchiveInterface, ) *ingest.Service { @@ -486,3 +512,6 @@ func (d *Daemon) Run() { return } } + +// Ensure the daemon conforms to the interface +var _ interfaces.Daemon = (*Daemon)(nil) diff --git a/cmd/stellar-rpc/internal/daemon/interfaces/interfaces.go b/cmd/stellar-rpc/internal/daemon/interfaces/interfaces.go index 7cb08168..32158825 100644 --- a/cmd/stellar-rpc/internal/daemon/interfaces/interfaces.go +++ b/cmd/stellar-rpc/internal/daemon/interfaces/interfaces.go @@ -7,6 +7,7 @@ import ( "github.com/stellar/go/ingest/ledgerbackend" proto "github.com/stellar/go/protocols/stellarcore" + "github.com/stellar/go/xdr" ) // Daemon defines the interface that the Daemon would be implementing. @@ -16,6 +17,7 @@ type Daemon interface { MetricsRegistry() *prometheus.Registry MetricsNamespace() string CoreClient() CoreClient + FastCoreClient() FastCoreClient GetCore() *ledgerbackend.CaptiveStellarCore } @@ -23,3 +25,7 @@ type CoreClient interface { Info(ctx context.Context) (*proto.InfoResponse, error) SubmitTransaction(ctx context.Context, txBase64 string) (*proto.TXResponse, error) } + +type FastCoreClient interface { + GetLedgerEntries(ctx context.Context, ledgerSeq uint32, keys ...xdr.LedgerKey) (proto.GetLedgerEntryResponse, error) +} diff --git a/cmd/stellar-rpc/internal/daemon/interfaces/noOpDaemon.go b/cmd/stellar-rpc/internal/daemon/interfaces/noOpDaemon.go index ca66802a..a9d3783a 100644 --- a/cmd/stellar-rpc/internal/daemon/interfaces/noOpDaemon.go +++ b/cmd/stellar-rpc/internal/daemon/interfaces/noOpDaemon.go @@ -7,6 +7,7 @@ import ( "github.com/stellar/go/ingest/ledgerbackend" proto "github.com/stellar/go/protocols/stellarcore" + "github.com/stellar/go/xdr" ) // TODO: deprecate and rename to stellar_rpc @@ -41,6 +42,10 @@ func (d *NoOpDaemon) CoreClient() CoreClient { return d.coreClient } +func (d *NoOpDaemon) FastCoreClient() FastCoreClient { + return d.coreClient +} + func (d *NoOpDaemon) GetCore() *ledgerbackend.CaptiveStellarCore { return d.core } @@ -54,3 +59,9 @@ func (s noOpCoreClient) Info(context.Context) (*proto.InfoResponse, error) { func (s noOpCoreClient) SubmitTransaction(context.Context, string) (*proto.TXResponse, error) { return &proto.TXResponse{Status: proto.PreflightStatusOk}, nil } + +func (s noOpCoreClient) GetLedgerEntries(context.Context, + uint32, ...xdr.LedgerKey, +) (proto.GetLedgerEntryResponse, error) { + return proto.GetLedgerEntryResponse{}, nil +} diff --git a/cmd/stellar-rpc/internal/daemon/metrics.go b/cmd/stellar-rpc/internal/daemon/metrics.go index db799a59..803fabc1 100644 --- a/cmd/stellar-rpc/internal/daemon/metrics.go +++ b/cmd/stellar-rpc/internal/daemon/metrics.go @@ -111,6 +111,10 @@ func (d *Daemon) CoreClient() interfaces.CoreClient { return d.coreClient } +func (d *Daemon) FastCoreClient() interfaces.FastCoreClient { + return d.coreQueryingClient +} + func (d *Daemon) GetCore() *ledgerbackend.CaptiveStellarCore { return d.core } diff --git a/cmd/stellar-rpc/internal/integrationtest/get_ledger_entries_test.go b/cmd/stellar-rpc/internal/integrationtest/get_ledger_entries_test.go index 0314baee..d717414a 100644 --- a/cmd/stellar-rpc/internal/integrationtest/get_ledger_entries_test.go +++ b/cmd/stellar-rpc/internal/integrationtest/get_ledger_entries_test.go @@ -15,7 +15,18 @@ import ( ) func TestGetLedgerEntriesNotFound(t *testing.T) { - test := infrastructure.NewTest(t, nil) + t.Run("WithCore", func(t *testing.T) { + testGetLedgerEntriesNotFound(t, true) + }) + t.Run("WithoutCore", func(t *testing.T) { + testGetLedgerEntriesNotFound(t, false) + }) +} + +func testGetLedgerEntriesNotFound(t *testing.T, useCore bool) { + test := infrastructure.NewTest(t, &infrastructure.TestConfig{ + EnableCoreHTTPQueryServer: useCore, + }) client := test.GetRPCLient() hash := xdr.Hash{0xa, 0xb} @@ -48,7 +59,18 @@ func TestGetLedgerEntriesNotFound(t *testing.T) { } func TestGetLedgerEntriesInvalidParams(t *testing.T) { - test := infrastructure.NewTest(t, nil) + t.Run("WithCore", func(t *testing.T) { + testGetLedgerEntriesInvalidParams(t, true) + }) + t.Run("WithoutCore", func(t *testing.T) { + testGetLedgerEntriesInvalidParams(t, false) + }) +} + +func testGetLedgerEntriesInvalidParams(t *testing.T, useCore bool) { + test := infrastructure.NewTest(t, &infrastructure.TestConfig{ + EnableCoreHTTPQueryServer: useCore, + }) client := test.GetRPCLient() @@ -66,7 +88,18 @@ func TestGetLedgerEntriesInvalidParams(t *testing.T) { } func TestGetLedgerEntriesSucceeds(t *testing.T) { - test := infrastructure.NewTest(t, nil) + t.Run("WithCore", func(t *testing.T) { + testGetLedgerEntriesSucceeds(t, true) + }) + t.Run("WithoutCore", func(t *testing.T) { + testGetLedgerEntriesSucceeds(t, false) + }) +} + +func testGetLedgerEntriesSucceeds(t *testing.T, useCore bool) { + test := infrastructure.NewTest(t, &infrastructure.TestConfig{ + EnableCoreHTTPQueryServer: useCore, + }) _, contractID, contractHash := test.CreateHelloWorldContract() contractCodeKeyB64, err := xdr.MarshalBase64(xdr.LedgerKey{ @@ -117,7 +150,7 @@ func TestGetLedgerEntriesSucceeds(t *testing.T) { require.Equal(t, xdr.LedgerEntryTypeContractCode, firstEntry.Type) require.Equal(t, infrastructure.GetHelloWorldContract(), firstEntry.MustContractCode().Code) - require.Greater(t, result.Entries[1].LastModifiedLedger, uint32(0)) + require.Positive(t, result.Entries[1].LastModifiedLedger) require.LessOrEqual(t, result.Entries[1].LastModifiedLedger, result.LatestLedger) require.NotNil(t, result.Entries[1].LiveUntilLedgerSeq) require.Greater(t, *result.Entries[1].LiveUntilLedgerSeq, result.LatestLedger) diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/docker/captive-core-integration-tests.cfg.tmpl b/cmd/stellar-rpc/internal/integrationtest/infrastructure/docker/captive-core-integration-tests.cfg.tmpl index 670010c7..9abe840a 100644 --- a/cmd/stellar-rpc/internal/integrationtest/infrastructure/docker/captive-core-integration-tests.cfg.tmpl +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/docker/captive-core-integration-tests.cfg.tmpl @@ -21,4 +21,4 @@ PUBLIC_KEY="GD5KD2KEZJIGTC63IGW6UMUSMVUVG5IHG64HUTFWCHVZH2N2IBOQN7PS" # should be "core" when running RPC in a container or "localhost:port" when running RPC in the host ADDRESS="${CORE_HOST_PORT}" -QUALITY="MEDIUM" +QUALITY="MEDIUM" \ No newline at end of file diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/docker/docker-compose.yml b/cmd/stellar-rpc/internal/integrationtest/infrastructure/docker/docker-compose.yml index bc3150f8..3f6c0132 100644 --- a/cmd/stellar-rpc/internal/integrationtest/infrastructure/docker/docker-compose.yml +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/docker/docker-compose.yml @@ -13,7 +13,8 @@ services: # Note: Please keep the image pinned to an immutable tag matching the Captive Core version. # This avoids implicit updates which break compatibility between # the Core container and captive core. - image: ${CORE_IMAGE:-stellar/stellar-core:22.0.0-2138.721fd0a65.focal} + image: ${CORE_IMAGE:-stellar/unsafe-stellar-core:22.1.1-2251.ac9f21ac7.focal-do-not-use-in-prd} + depends_on: - core-postgres environment: @@ -23,6 +24,8 @@ services: - "127.0.0.1:0:11625" # http - "127.0.0.1:0:11626" + # high-perf http + - "127.0.0.1:0:11628" # history archive - "127.0.0.1:0:1570" entrypoint: /usr/bin/env diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go index c054d530..f4db9eb6 100644 --- a/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go @@ -11,6 +11,7 @@ import ( "path" "path/filepath" "regexp" + "runtime" "strconv" "strings" "sync" @@ -44,10 +45,11 @@ const ( captiveCoreConfigFilename = "captive-core-integration-tests.cfg" captiveCoreConfigTemplateFilename = captiveCoreConfigFilename + ".tmpl" - inContainerCoreHostname = "core" - inContainerCorePort = 11625 - inContainerCoreHTTPPort = 11626 - inContainerCoreArchivePort = 1570 + inContainerCoreHostname = "core" + inContainerCorePort = 11625 + inContainerCoreHTTPPort = 11626 + inContainerCoreHTTPQueryPort = 11628 + inContainerCoreArchivePort = 1570 // any unused port would do inContainerCaptiveCorePort = 11725 @@ -70,15 +72,22 @@ type TestConfig struct { SQLitePath string OnlyRPC *TestOnlyRPCConfig // Do not mark the test as running in parallel - NoParallel bool + NoParallel bool + EnableCoreHTTPQueryServer bool } type TestCorePorts struct { CorePort uint16 CoreHTTPPort uint16 CoreArchivePort uint16 - // This only needs to be an unconflicting port - captiveCorePort uint16 + + // These only need to be unconflicting ports + captiveCorePort uint16 + captiveCoreHTTPQueryPort uint16 + // TODO: this is only needed due to a quirk (bug) in the implementation of the + // core's query server, which requires the http port to be set to non-zero for the query server to be spawned. + // REMOVE once the bug is fixed. + captiveCoreHTTPPort uint16 } type TestPorts struct { @@ -107,10 +116,11 @@ type Test struct { daemon *daemon.Daemon - masterAccount txnbuild.Account - shutdownOnce sync.Once - shutdown func() - onlyRPC bool + masterAccount txnbuild.Account + shutdownOnce sync.Once + shutdown func() + onlyRPC bool + enableCoreHTTPQueryServer bool } func NewTest(t *testing.T, cfg *TestConfig) *Test { @@ -136,6 +146,11 @@ func NewTest(t *testing.T, cfg *TestConfig) *Test { shouldWaitForRPC = !cfg.OnlyRPC.DontWait } parallel = !cfg.NoParallel + i.enableCoreHTTPQueryServer = cfg.EnableCoreHTTPQueryServer + } + + if i.enableCoreHTTPQueryServer && GetCoreMaxSupportedProtocol() < 22 { + t.Skip("Core's HTTP Query server is only available from protocol 22") } if i.sqlitePath == "" { @@ -182,6 +197,10 @@ func (i *Test) spawnContainers() { if i.runRPCInContainer() { // The container needs to use the sqlite mount point i.rpcContainerSQLiteMountDir = filepath.Dir(i.sqlitePath) + if i.enableCoreHTTPQueryServer { + i.testPorts.captiveCoreHTTPQueryPort = inContainerCoreHTTPQueryPort + i.testPorts.captiveCoreHTTPPort = inContainerCoreHTTPPort + } i.generateCaptiveCoreCfgForContainer() rpcCfg := i.getRPConfigForContainer() i.generateRPCConfigFile(rpcCfg) @@ -271,9 +290,11 @@ func (i *Test) getRPConfigForContainer() rpcConfig { // The file will be inside the container captiveCoreConfigPath: "/stellar-core.cfg", // Any writable directory would do - captiveCoreStoragePath: "/tmp/captive-core", - archiveURL: fmt.Sprintf("http://%s:%d", inContainerCoreHostname, inContainerCoreArchivePort), - sqlitePath: "/db/" + filepath.Base(i.sqlitePath), + captiveCoreStoragePath: "/tmp/captive-core", + archiveURL: fmt.Sprintf("http://%s:%d", inContainerCoreHostname, inContainerCoreArchivePort), + sqlitePath: "/db/" + filepath.Base(i.sqlitePath), + captiveCoreHTTPPort: i.testPorts.captiveCoreHTTPPort, + captiveCoreHTTPQueryPort: i.testPorts.captiveCoreHTTPQueryPort, } } @@ -284,48 +305,55 @@ func (i *Test) getRPConfigForDaemon() rpcConfig { } return rpcConfig{ // Allocate port dynamically and then figure out what the port is - endPoint: "localhost:0", - adminEndpoint: "localhost:0", - stellarCoreURL: fmt.Sprintf("http://localhost:%d", i.testPorts.CoreHTTPPort), - coreBinaryPath: coreBinaryPath, - captiveCoreConfigPath: path.Join(i.rpcConfigFilesDir, captiveCoreConfigFilename), - captiveCoreStoragePath: i.t.TempDir(), - archiveURL: fmt.Sprintf("http://localhost:%d", i.testPorts.CoreArchivePort), - sqlitePath: i.sqlitePath, + endPoint: "localhost:0", + adminEndpoint: "localhost:0", + stellarCoreURL: fmt.Sprintf("http://localhost:%d", i.testPorts.CoreHTTPPort), + coreBinaryPath: coreBinaryPath, + captiveCoreConfigPath: path.Join(i.rpcConfigFilesDir, captiveCoreConfigFilename), + captiveCoreStoragePath: i.t.TempDir(), + archiveURL: fmt.Sprintf("http://localhost:%d", i.testPorts.CoreArchivePort), + sqlitePath: i.sqlitePath, + captiveCoreHTTPPort: i.testPorts.captiveCoreHTTPPort, + captiveCoreHTTPQueryPort: i.testPorts.captiveCoreHTTPQueryPort, } } type rpcConfig struct { - endPoint string - adminEndpoint string - stellarCoreURL string - coreBinaryPath string - captiveCoreConfigPath string - captiveCoreStoragePath string - archiveURL string - sqlitePath string + endPoint string + adminEndpoint string + stellarCoreURL string + coreBinaryPath string + captiveCoreConfigPath string + captiveCoreStoragePath string + captiveCoreHTTPQueryPort uint16 + captiveCoreHTTPPort uint16 + archiveURL string + sqlitePath string } func (vars rpcConfig) toMap() map[string]string { return map[string]string{ - "ENDPOINT": vars.endPoint, - "ADMIN_ENDPOINT": vars.adminEndpoint, - "STELLAR_CORE_URL": vars.stellarCoreURL, - "CORE_REQUEST_TIMEOUT": "2s", - "STELLAR_CORE_BINARY_PATH": vars.coreBinaryPath, - "CAPTIVE_CORE_CONFIG_PATH": vars.captiveCoreConfigPath, - "CAPTIVE_CORE_STORAGE_PATH": vars.captiveCoreStoragePath, - "STELLAR_CAPTIVE_CORE_HTTP_PORT": "0", - "FRIENDBOT_URL": FriendbotURL, - "NETWORK_PASSPHRASE": StandaloneNetworkPassphrase, - "HISTORY_ARCHIVE_URLS": vars.archiveURL, - "LOG_LEVEL": "debug", - "DB_PATH": vars.sqlitePath, - "INGESTION_TIMEOUT": "10m", - "HISTORY_RETENTION_WINDOW": strconv.Itoa(config.OneDayOfLedgers), - "CHECKPOINT_FREQUENCY": strconv.Itoa(checkpointFrequency), - "MAX_HEALTHY_LEDGER_LATENCY": "10s", - "PREFLIGHT_ENABLE_DEBUG": "true", + "ENDPOINT": vars.endPoint, + "ADMIN_ENDPOINT": vars.adminEndpoint, + "STELLAR_CORE_URL": vars.stellarCoreURL, + "CORE_REQUEST_TIMEOUT": "2s", + "STELLAR_CORE_BINARY_PATH": vars.coreBinaryPath, + "CAPTIVE_CORE_CONFIG_PATH": vars.captiveCoreConfigPath, + "CAPTIVE_CORE_STORAGE_PATH": vars.captiveCoreStoragePath, + "STELLAR_CAPTIVE_CORE_HTTP_PORT": strconv.FormatUint(uint64(vars.captiveCoreHTTPPort), 10), + "STELLAR_CAPTIVE_CORE_HTTP_QUERY_PORT": strconv.FormatUint(uint64(vars.captiveCoreHTTPQueryPort), 10), + "STELLAR_CAPTIVE_CORE_HTTP_QUERY_THREAD_POOL_SIZE": strconv.Itoa(runtime.NumCPU()), + "STELLAR_CAPTIVE_CORE_HTTP_QUERY_SNAPSHOT_LEDGERS": "4", + "FRIENDBOT_URL": FriendbotURL, + "NETWORK_PASSPHRASE": StandaloneNetworkPassphrase, + "HISTORY_ARCHIVE_URLS": vars.archiveURL, + "LOG_LEVEL": "debug", + "DB_PATH": vars.sqlitePath, + "INGESTION_TIMEOUT": "10m", + "HISTORY_RETENTION_WINDOW": strconv.Itoa(config.OneDayOfLedgers), + "CHECKPOINT_FREQUENCY": strconv.Itoa(checkpointFrequency), + "MAX_HEALTHY_LEDGER_LATENCY": "10s", + "PREFLIGHT_ENABLE_DEBUG": "true", } } @@ -449,9 +477,14 @@ func (i *Test) fillRPCDaemonPorts() { } func (i *Test) spawnRPCDaemon() { - // We need to get a free port. Unfortunately this isn't completely clash-Free + // We need to dynamically allocate port numbers since tests run in parallel. + // Unfortunately this isn't completely clash-free, // but there is no way to tell core to allocate the port dynamically i.testPorts.captiveCorePort = getFreeTCPPort(i.t) + if i.enableCoreHTTPQueryServer { + i.testPorts.captiveCoreHTTPQueryPort = getFreeTCPPort(i.t) + i.testPorts.captiveCoreHTTPPort = getFreeTCPPort(i.t) + } i.generateCaptiveCoreCfgForDaemon() rpcCfg := i.getRPConfigForDaemon() i.daemon = i.createRPCDaemon(rpcCfg) diff --git a/cmd/stellar-rpc/internal/jsonrpc.go b/cmd/stellar-rpc/internal/jsonrpc.go index 9285d14a..93ea6c5d 100644 --- a/cmd/stellar-rpc/internal/jsonrpc.go +++ b/cmd/stellar-rpc/internal/jsonrpc.go @@ -156,6 +156,15 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { retentionWindow := cfg.HistoryRetentionWindow + getLedgerEntriesHandler := methods.NewGetLedgerEntriesFromDBHandler(params.Logger, params.LedgerEntryReader) + if params.Daemon.FastCoreClient() != nil { + // Prioritize getting ledger entries from core if available + getLedgerEntriesHandler = methods.NewGetLedgerEntriesFromCoreHandler( + params.Logger, + params.Daemon.FastCoreClient(), + params.LedgerEntryReader) + } + handlers := []struct { methodName string underlyingHandler jrpc2.Handler @@ -222,7 +231,7 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { }, { methodName: protocol.GetLedgerEntriesMethodName, - underlyingHandler: methods.NewGetLedgerEntriesHandler(params.Logger, params.LedgerEntryReader), + underlyingHandler: getLedgerEntriesHandler, longName: toSnakeCase(protocol.GetLedgerEntriesMethodName), queueLimit: cfg.RequestBacklogGetLedgerEntriesQueueLimit, requestDurationLimit: cfg.MaxGetLedgerEntriesExecutionDuration, diff --git a/cmd/stellar-rpc/internal/methods/get_ledger_entries.go b/cmd/stellar-rpc/internal/methods/get_ledger_entries.go index d2dcf0ce..42c01c39 100644 --- a/cmd/stellar-rpc/internal/methods/get_ledger_entries.go +++ b/cmd/stellar-rpc/internal/methods/get_ledger_entries.go @@ -3,12 +3,15 @@ package methods import ( "context" "fmt" + "sort" "github.com/creachadair/jrpc2" + coreProto "github.com/stellar/go/protocols/stellarcore" "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/daemon/interfaces" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/db" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/xdr2json" "github.com/stellar/stellar-rpc/protocol" @@ -19,8 +22,112 @@ var ErrLedgerTTLEntriesCannotBeQueriedDirectly = "ledger ttl entries cannot be q const getLedgerEntriesMaxKeys = 200 -// NewGetLedgerEntriesHandler returns a JSON RPC handler to retrieve the specified ledger entries from Stellar Core. -func NewGetLedgerEntriesHandler(logger *log.Entry, ledgerEntryReader db.LedgerEntryReader) jrpc2.Handler { +type ledgerEntryGetter interface { + GetLedgerEntries(ctx context.Context, keys []xdr.LedgerKey) ([]db.LedgerKeyAndEntry, uint32, error) +} + +type coreLedgerEntryGetter struct { + coreClient interfaces.FastCoreClient + latestLedgerReader db.LedgerEntryReader +} + +func (c coreLedgerEntryGetter) GetLedgerEntries( + ctx context.Context, + keys []xdr.LedgerKey, +) ([]db.LedgerKeyAndEntry, uint32, error) { + latestLedger, err := c.latestLedgerReader.GetLatestLedgerSequence(ctx) + if err != nil { + return nil, 0, fmt.Errorf("could not get latest ledger: %w", err) + } + // Pass latest ledger here in case Core is ahead of us (0 would be Core's latest). + resp, err := c.coreClient.GetLedgerEntries(ctx, latestLedger, keys...) + if err != nil { + return nil, 0, fmt.Errorf("could not query captive core: %w", err) + } + + result := make([]db.LedgerKeyAndEntry, 0, len(resp.Entries)) + for _, entry := range resp.Entries { + // This could happen if the user tries to fetch a ledger entry that + // doesn't exist, making it a 404 equivalent, so just skip it. + if entry.State == coreProto.LedgerEntryStateNew { + continue + } + + var xdrEntry xdr.LedgerEntry + err := xdr.SafeUnmarshalBase64(entry.Entry, &xdrEntry) + if err != nil { + return nil, 0, fmt.Errorf("could not decode ledger entry: %w", err) + } + + // Generate the entry key. We cannot simply reuse the positional keys from the request since + // the response may miss unknown entries or be out of order. + key, err := xdrEntry.LedgerKey() + if err != nil { + return nil, 0, fmt.Errorf("could not obtain ledger key: %w", err) + } + newEntry := db.LedgerKeyAndEntry{ + Key: key, + Entry: xdrEntry, + } + if entry.Ttl != 0 { + newEntry.LiveUntilLedgerSeq = &entry.Ttl + } + result = append(result, newEntry) + } + + return result, latestLedger, nil +} + +type dbLedgerEntryGetter struct { + ledgerEntryReader db.LedgerEntryReader +} + +func (d dbLedgerEntryGetter) GetLedgerEntries(ctx context.Context, + keys []xdr.LedgerKey, +) ([]db.LedgerKeyAndEntry, uint32, error) { + tx, err := d.ledgerEntryReader.NewTx(ctx, false) + if err != nil { + return nil, 0, fmt.Errorf("could not create transaction: %w", err) + } + defer func() { + _ = tx.Done() + }() + + latestLedger, err := tx.GetLatestLedgerSequence() + if err != nil { + return nil, 0, fmt.Errorf("could not get latest ledger: %w", err) + } + + result, err := tx.GetLedgerEntries(keys...) + if err != nil { + return nil, 0, fmt.Errorf("could not get entries: %w", err) + } + + return result, latestLedger, nil +} + +// NewGetLedgerEntriesFromCoreHandler returns a JSON RPC handler which retrieves ledger entries from Stellar Core. +func NewGetLedgerEntriesFromCoreHandler( + logger *log.Entry, + coreClient interfaces.FastCoreClient, + latestLedgerReader db.LedgerEntryReader, +) jrpc2.Handler { + getter := coreLedgerEntryGetter{ + coreClient: coreClient, + latestLedgerReader: latestLedgerReader, + } + return newGetLedgerEntriesHandlerFromGetter(logger, getter) +} + +// NewGetLedgerEntriesFromDBHandler returns a JSON RPC handler which retrieves ledger entries from the database. +func NewGetLedgerEntriesFromDBHandler(logger *log.Entry, ledgerEntryReader db.LedgerEntryReader) jrpc2.Handler { + getter := dbLedgerEntryGetter{ + ledgerEntryReader: ledgerEntryReader, + } + return newGetLedgerEntriesHandlerFromGetter(logger, getter) +} + +func newGetLedgerEntriesHandlerFromGetter(logger *log.Entry, getter ledgerEntryGetter) jrpc2.Handler { return NewHandler(func(ctx context.Context, request protocol.GetLedgerEntriesRequest, ) (protocol.GetLedgerEntriesResponse, error) { if err := protocol.IsValidFormat(request.Format); err != nil { @@ -58,85 +165,33 @@ func NewGetLedgerEntriesHandler(logger *log.Entry, ledgerEntryReader db.LedgerEn ledgerKeys = append(ledgerKeys, ledgerKey) } - tx, err := ledgerEntryReader.NewTx(ctx, false) + ledgerKeysAndEntries, latestLedger, err := getter.GetLedgerEntries(ctx, ledgerKeys) if err != nil { + logger.WithError(err).WithField("request", request). + Info("could not obtain ledger entries") return protocol.GetLedgerEntriesResponse{}, &jrpc2.Error{ Code: jrpc2.InternalError, - Message: "could not create read transaction", + Message: err.Error(), } } - defer func() { - _ = tx.Done() - }() - - latestLedger, err := tx.GetLatestLedgerSequence() + err = sortKeysAndEntriesAccordingToRequest(request.Keys, ledgerKeysAndEntries) if err != nil { return protocol.GetLedgerEntriesResponse{}, &jrpc2.Error{ Code: jrpc2.InternalError, - Message: "could not get latest ledger", + Message: err.Error(), } } ledgerEntryResults := make([]protocol.LedgerEntryResult, 0, len(ledgerKeys)) - ledgerKeysAndEntries, err := tx.GetLedgerEntries(ledgerKeys...) - if err != nil { - logger.WithError(err).WithField("request", request). - Info("could not obtain ledger entries from storage") - return protocol.GetLedgerEntriesResponse{}, &jrpc2.Error{ - Code: jrpc2.InternalError, - Message: "could not obtain ledger entries from storage", - } - } - for _, ledgerKeyAndEntry := range ledgerKeysAndEntries { - switch request.Format { - case protocol.FormatJSON: - keyJs, err := xdr2json.ConvertInterface(ledgerKeyAndEntry.Key) - if err != nil { - return protocol.GetLedgerEntriesResponse{}, &jrpc2.Error{ - Code: jrpc2.InternalError, - Message: err.Error(), - } - } - entryJs, err := xdr2json.ConvertInterface(ledgerKeyAndEntry.Entry.Data) - if err != nil { - return protocol.GetLedgerEntriesResponse{}, &jrpc2.Error{ - Code: jrpc2.InternalError, - Message: err.Error(), - } - } - - ledgerEntryResults = append(ledgerEntryResults, protocol.LedgerEntryResult{ - KeyJSON: keyJs, - DataJSON: entryJs, - LastModifiedLedger: uint32(ledgerKeyAndEntry.Entry.LastModifiedLedgerSeq), - LiveUntilLedgerSeq: ledgerKeyAndEntry.LiveUntilLedgerSeq, - }) - - default: - keyXDR, err := xdr.MarshalBase64(ledgerKeyAndEntry.Key) - if err != nil { - return protocol.GetLedgerEntriesResponse{}, &jrpc2.Error{ - Code: jrpc2.InternalError, - Message: fmt.Sprintf("could not serialize ledger key %v", ledgerKeyAndEntry.Key), - } - } - - entryXDR, err := xdr.MarshalBase64(ledgerKeyAndEntry.Entry.Data) - if err != nil { - return protocol.GetLedgerEntriesResponse{}, &jrpc2.Error{ - Code: jrpc2.InternalError, - Message: fmt.Sprintf("could not serialize ledger entry data for ledger entry %v", ledgerKeyAndEntry.Entry), - } + result, err := ledgerKeyEntryToResult(ledgerKeyAndEntry, request.Format) + if err != nil { + return protocol.GetLedgerEntriesResponse{}, &jrpc2.Error{ + Code: jrpc2.InternalError, + Message: err.Error(), } - - ledgerEntryResults = append(ledgerEntryResults, protocol.LedgerEntryResult{ - KeyXDR: keyXDR, - DataXDR: entryXDR, - LastModifiedLedger: uint32(ledgerKeyAndEntry.Entry.LastModifiedLedgerSeq), - LiveUntilLedgerSeq: ledgerKeyAndEntry.LiveUntilLedgerSeq, - }) } + ledgerEntryResults = append(ledgerEntryResults, result) } response := protocol.GetLedgerEntriesResponse{ @@ -146,3 +201,82 @@ func NewGetLedgerEntriesHandler(logger *log.Entry, ledgerEntryReader db.LedgerEn return response, nil }) } + +type keyEntriesAndOrdering struct { + ordering []int + keyEntries []db.LedgerKeyAndEntry +} + +func (k keyEntriesAndOrdering) Len() int { + return len(k.keyEntries) +} + +func (k keyEntriesAndOrdering) Less(i, j int) bool { + return k.ordering[i] < k.ordering[j] +} + +func (k keyEntriesAndOrdering) Swap(i, j int) { + k.ordering[i], k.ordering[j] = k.ordering[j], k.ordering[i] + k.keyEntries[i], k.keyEntries[j] = k.keyEntries[j], k.keyEntries[i] +} + +func sortKeysAndEntriesAccordingToRequest(b64RequestKeys []string, keyEntries []db.LedgerKeyAndEntry) error { + // Create a map for the keys so that we can quickly query their order + requestKeyToOrder := make(map[string]int, len(b64RequestKeys)) + for i, key := range b64RequestKeys { + requestKeyToOrder[key] = i + } + // Obtain the expected ordering using the request keys + ordering := make([]int, len(keyEntries)) + for i, keyEntry := range keyEntries { + b64Key, err := xdr.MarshalBase64(keyEntry.Key) + if err != nil { + return err + } + order, ok := requestKeyToOrder[b64Key] + if !ok { + return fmt.Errorf("mismatching key in result: %s", b64Key) + } + ordering[i] = order + } + // Sort entries according to the orders + sort.Sort(keyEntriesAndOrdering{ + ordering: ordering, + keyEntries: keyEntries, + }) + return nil +} + +func ledgerKeyEntryToResult(keyEntry db.LedgerKeyAndEntry, format string) (protocol.LedgerEntryResult, error) { + result := protocol.LedgerEntryResult{} + switch format { + case protocol.FormatJSON: + keyJs, err := xdr2json.ConvertInterface(keyEntry.Key) + if err != nil { + return protocol.LedgerEntryResult{}, err + } + entryJs, err := xdr2json.ConvertInterface(keyEntry.Entry.Data) + if err != nil { + return protocol.LedgerEntryResult{}, err + } + result.KeyJSON = keyJs + result.DataJSON = entryJs + default: + keyXDR, err := xdr.MarshalBase64(keyEntry.Key) + if err != nil { + return protocol.LedgerEntryResult{}, fmt.Errorf("could not serialize ledger key %v: %w", keyEntry.Key, err) + } + + entryXDR, err := xdr.MarshalBase64(keyEntry.Entry.Data) + if err != nil { + err = fmt.Errorf("could not serialize ledger entry %v: %w", keyEntry.Entry, err) + return protocol.LedgerEntryResult{}, err + } + + result.KeyXDR = keyXDR + result.DataXDR = entryXDR + } + result.LastModifiedLedger = uint32(keyEntry.Entry.LastModifiedLedgerSeq) + result.LiveUntilLedgerSeq = keyEntry.LiveUntilLedgerSeq + return result, nil +}