Skip to content

Commit

Permalink
cassandra-driver Instrumentation (#1280)
Browse files Browse the repository at this point in the history
* Add cassandradriver to tox

* Add initial cassandra instrumentation

* Add cassandra testing

* Refined instrumentation and tests

* Cover multiple async reactors in tests

* Separate async and sync tests

* Provide cluster options as fixture

* Add ORM Model tests for cqlengine

* Add cassandra runner to Github Actions

* Adjust cassandra tox env list

* Add skip for pypy libev tests

* Format trivy

* Address suggestions from code review

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
TimPansino and mergify[bot] authored Jan 29, 2025
1 parent 019b019 commit 82918c5
Show file tree
Hide file tree
Showing 8 changed files with 654 additions and 27 deletions.
116 changes: 92 additions & 24 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
runs-on: ubuntu-latest
needs:
- python
- cassandra
- elasticsearchserver07
- elasticsearchserver08
- firestore
Expand Down Expand Up @@ -74,29 +75,29 @@ jobs:

- name: Run Trivy vulnerability scanner in repo mode
if: ${{ github.event_name == 'pull_request' }}
uses: aquasecurity/trivy-action@18f2510ee396bbf400402947b394f2dd8c87dbb0 # v0.29.0
uses: aquasecurity/trivy-action@18f2510ee396bbf400402947b394f2dd8c87dbb0 # v0.29.0
with:
scan-type: 'fs'
scan-type: "fs"
ignore-unfixed: true
format: table
exit-code: 1
severity: 'CRITICAL,HIGH,MEDIUM,LOW'
severity: "CRITICAL,HIGH,MEDIUM,LOW"

- name: Run Trivy vulnerability scanner in repo mode
if: ${{ github.event_name == 'schedule' }}
uses: aquasecurity/trivy-action@18f2510ee396bbf400402947b394f2dd8c87dbb0 # v0.29.0
uses: aquasecurity/trivy-action@18f2510ee396bbf400402947b394f2dd8c87dbb0 # v0.29.0
with:
scan-type: 'fs'
scan-type: "fs"
ignore-unfixed: true
format: 'sarif'
output: 'trivy-results.sarif'
severity: 'CRITICAL,HIGH,MEDIUM,LOW'
format: "sarif"
output: "trivy-results.sarif"
severity: "CRITICAL,HIGH,MEDIUM,LOW"

- name: Upload Trivy scan results to GitHub Security tab
if: ${{ github.event_name == 'schedule' }}
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
sarif_file: "trivy-results.sarif"

# Combine and upload coverage data
coverage:
Expand Down Expand Up @@ -274,7 +275,7 @@ jobs:
- 8080:8080
- 8081:8081
- 8082:8082
# Set health checks to wait until nginx has started
# Set health checks to wait until container has started
options: >-
--health-cmd "service nginx status || exit 1"
--health-interval 10s
Expand Down Expand Up @@ -338,7 +339,7 @@ jobs:
ports:
- 8080:5432
- 8081:5432
# Set health checks to wait until postgres has started
# Set health checks to wait until container has started
options: >-
--health-cmd pg_isready
--health-interval 10s
Expand Down Expand Up @@ -402,7 +403,7 @@ jobs:
ports:
- 8080:5432
- 8081:5432
# Set health checks to wait until postgres has started
# Set health checks to wait until container has started
options: >-
--health-cmd pg_isready
--health-interval 10s
Expand Down Expand Up @@ -469,7 +470,7 @@ jobs:
ports:
- 8080:1433
- 8081:1433
# Set health checks to wait until mysql has started
# Set health checks to wait until container has started
options: >-
--health-cmd "/opt/mssql-tools/bin/sqlcmd -U SA -P $MSSQL_SA_PASSWORD -Q 'SELECT 1'"
--health-interval 10s
Expand Down Expand Up @@ -536,7 +537,7 @@ jobs:
ports:
- 8080:3306
- 8081:3306
# Set health checks to wait until mysql has started
# Set health checks to wait until container has started
options: >-
--health-cmd "mysqladmin ping -h localhost"
--health-interval 10s
Expand Down Expand Up @@ -701,7 +702,7 @@ jobs:
ports:
- 8080:6379
- 8081:6379
# Set health checks to wait until redis has started
# Set health checks to wait until container has started
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
Expand Down Expand Up @@ -765,7 +766,7 @@ jobs:
ports:
- 8080:8983
- 8081:8983
# Set health checks to wait until solr has started
# Set health checks to wait until container has started
options: >-
--health-cmd "curl localhost:8983/solr/collection/admin/ping | grep OK"
--health-interval 10s
Expand Down Expand Up @@ -827,7 +828,7 @@ jobs:
ports:
- 8080:11211
- 8081:11211
# Set health checks to wait until memcached has started
# Set health checks to wait until container has started
options: >-
--health-cmd "timeout 5 bash -c 'cat < /dev/null > /dev/udp/127.0.0.1/11211'"
--health-interval 10s
Expand Down Expand Up @@ -890,7 +891,7 @@ jobs:
RABBITMQ_PASSWORD: rabbitmq
ports:
- 5672:5672
# Set health checks to wait until rabbitmq has started
# Set health checks to wait until container has started
options: >-
--health-cmd "rabbitmq-diagnostics status"
--health-interval 10s
Expand Down Expand Up @@ -1026,7 +1027,7 @@ jobs:
ports:
- 8080:27017
- 8081:27017
# Set health checks to wait until mongodb has started
# Set health checks to wait until container has started
options: >-
--health-cmd "echo 'db.runCommand(\"ping\").ok' | mongo localhost:27017/test --quiet || exit 1"
--health-interval 10s
Expand Down Expand Up @@ -1088,7 +1089,7 @@ jobs:
ports:
- 8080:27017
- 8081:27017
# Set health checks to wait until mongodb has started
# Set health checks to wait until container has started
options: >-
--health-cmd "echo 'db.runCommand(\"ping\").ok' | mongosh localhost:27017/test --quiet || exit 1"
--health-interval 10s
Expand Down Expand Up @@ -1129,6 +1130,73 @@ jobs:
path: ./**/.coverage.*
retention-days: 1

cassandra:
env:
TOTAL_GROUPS: 1

strategy:
fail-fast: false
matrix:
group-number: [1]

runs-on: ubuntu-latest
container:
image: ghcr.io/newrelic/newrelic-python-agent-ci:latest
options: >-
--add-host=host.docker.internal:host-gateway
timeout-minutes: 30
services:
cassandra:
image: cassandra:5.0.2
env:
CASSANDRA_SEEDS: "cassandra"
CASSANDRA_CLUSTER_NAME: TestCluster
CASSANDRA_ENDPOINT_SNITCH: SimpleSnitch
CASSANDRA_NUM_TOKENS: "128"
ports:
- 8080:9042
- 8081:9042
# Set health checks to wait until container has started
options: >-
--health-cmd "cqlsh localhost 9042 -e 'describe cluster'"
--health-interval 30s
--health-timeout 5s
--health-retries 10
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # 4.1.1

- name: Fetch git tags
run: |
git config --global --add safe.directory "$GITHUB_WORKSPACE"
git fetch --tags origin
- name: Configure pip cache
run: |
mkdir -p /github/home/.cache/pip
chown -R $(whoami) /github/home/.cache/pip
- name: Get Environments
id: get-envs
run: |
echo "envs=$(tox -l | grep '^${{ github.job }}\-' | ./.github/workflows/get-envs.py)" >> $GITHUB_OUTPUT
env:
GROUP_NUMBER: ${{ matrix.group-number }}

- name: Test
run: |
tox -vv -e ${{ steps.get-envs.outputs.envs }} -p auto
env:
TOX_PARALLEL_NO_SPINNER: 1
PY_COLORS: 0

- name: Upload Coverage Artifacts
uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # 4.3.1
with:
name: coverage-${{ github.job }}-${{ strategy.job-index }}
path: ./**/.coverage.*
retention-days: 1

elasticsearchserver07:
env:
TOTAL_GROUPS: 1
Expand All @@ -1152,7 +1220,7 @@ jobs:
ports:
- 8080:9200
- 8081:9200
# Set health checks to wait until elasticsearch has started
# Set health checks to wait until container has started
options: >-
--health-cmd "curl --silent --fail localhost:9200/_cluster/health || exit 1"
--health-interval 10s
Expand Down Expand Up @@ -1217,7 +1285,7 @@ jobs:
ports:
- 8080:9200
- 8081:9200
# Set health checks to wait until elasticsearch has started
# Set health checks to wait until container has started
options: >-
--health-cmd "curl --silent --fail localhost:9200/_cluster/health || exit 1"
--health-interval 10s
Expand Down Expand Up @@ -1279,7 +1347,7 @@ jobs:
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:437.0.1-emulators
ports:
- 8080:8080
# Set health checks to wait 5 seconds in lieu of an actual healthcheck
# Set health checks to wait until container has started
options: >-
--health-cmd "echo success"
--health-interval 10s
Expand Down Expand Up @@ -1348,7 +1416,7 @@ jobs:
ports:
- 8080:6379
- 8081:6379
# Set health checks to wait until valkey has started
# Set health checks to wait until container has started
options: >-
--health-cmd "valkey-cli ping"
--health-interval 10s
Expand Down
5 changes: 5 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3164,6 +3164,11 @@ def _process_module_builtin_defaults():
"instrument_gunicorn_app_base",
)

_process_module_definition("cassandra", "newrelic.hooks.datastore_cassandradriver", "instrument_cassandra")
_process_module_definition(
"cassandra.cluster", "newrelic.hooks.datastore_cassandradriver", "instrument_cassandra_cluster"
)

_process_module_definition("cx_Oracle", "newrelic.hooks.database_cx_oracle", "instrument_cx_oracle")

_process_module_definition("ibm_db_dbi", "newrelic.hooks.database_ibm_db_dbi", "instrument_ibm_db_dbi")
Expand Down
119 changes: 119 additions & 0 deletions newrelic/hooks/datastore_cassandradriver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from newrelic.api.database_trace import DatabaseTrace, register_database_client
from newrelic.api.function_trace import wrap_function_trace
from newrelic.api.time_trace import current_trace
from newrelic.common.object_wrapper import wrap_function_wrapper
from newrelic.common.signature import bind_args

DBAPI2_MODULE = None
DEFAULT = object()


def wrap_Session_execute(wrapped, instance, args, kwargs):
# Most of this wrapper is lifted from DBAPI2 wrappers, which can't be used
# directly since Cassandra doesn't actually conform to DBAPI2.

trace = current_trace()
if not trace or trace.terminal_node():
# Exit early there's no transaction, or if we're under an existing DatabaseTrace
return wrapped(*args, **kwargs)

bound_args = bind_args(wrapped, args, kwargs)

sql_parameters = bound_args.get("parameters", None)

sql = bound_args.get("query", None)
if not isinstance(sql, str):
statement = getattr(sql, "prepared_statement", sql) # Unbind BoundStatement
sql = getattr(statement, "query_string", statement) # Unpack query from SimpleStatement and PreparedStatement

database_name = getattr(instance, "keyspace", None)

host = None
port = None
try:
contact_points = instance.cluster.contact_points
if len(contact_points) == 1:
contact_point = next(iter(contact_points))
if isinstance(contact_point, str):
host = contact_point
port = instance.cluster.port
elif isinstance(contact_point, tuple):
host, port = contact_point
else: # Handle cassandra.connection.Endpoint types
host = contact_point.address
port = contact_point.port
except Exception:
pass

if sql_parameters is not DEFAULT:
with DatabaseTrace(
sql=sql,
sql_parameters=sql_parameters,
execute_params=(args, kwargs),
host=host,
port_path_or_id=port,
database_name=database_name,
dbapi2_module=DBAPI2_MODULE,
source=wrapped,
):
return wrapped(*args, **kwargs)
else:
with DatabaseTrace(
sql=sql,
execute_params=(args, kwargs),
host=host,
port_path_or_id=port,
database_name=database_name,
dbapi2_module=DBAPI2_MODULE,
source=wrapped,
):
return wrapped(*args, **kwargs)


def instrument_cassandra(module):
# Cassandra isn't DBAPI2 compliant, but we need the DatabaseTrace to function properly. We can set parameters
# for CQL parsing and the product name here, and leave the explain plan functionality unused.
global DBAPI2_MODULE
DBAPI2_MODULE = module

register_database_client(
module,
database_product="Cassandra",
quoting_style="single+double",
explain_query=None,
explain_stmts=(),
instance_info=None, # Already handled in wrappers
)


def instrument_cassandra_cluster(module):
if hasattr(module, "Session"):
# Cluster connect instrumentation, normally supplied by DBAPI2ConnectionFactory
wrap_function_trace(
module, "Cluster.connect", terminal=True, rollup=["Datastore/all", "Datastore/Cassandra/all"]
)

# Currently Session.execute() is a wrapper for calling Session.execute_async() and immediately waiting for
# the result. We therefore need to instrument Session.execute() in order to get timing information for sync
# query executions. We also need to instrument Session.execute_async() to at least get metrics for async
# queries, but we can't get timing information from that alone. We also need to add an early exit condition
# for when instrumentation for Session.execute_async() is called within Session.execute().
wrap_function_wrapper(module, "Session.execute", wrap_Session_execute)

# This wrapper only provides metrics, and not proper timing for async queries as they are distributed across
# potentially many threads at once. This is left uninstrumented for the time being.
wrap_function_wrapper(module, "Session.execute_async", wrap_Session_execute)
Loading

0 comments on commit 82918c5

Please sign in to comment.