Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Source amazon-sqs: Allow role based access #35483

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 0 additions & 38 deletions airbyte-integrations/connectors/source-amazon-sqs/Dockerfile

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
"queue_url": "https://sqs.eu-west-1.amazonaws.com/840836244599/ab-airbyte-testing",
"region": "eu-west-1",
"access_key": "xxx",
"secret_key": "xxx"
"secret_key": "xxx",
"session_token": "xxx"
}
12 changes: 7 additions & 5 deletions airbyte-integrations/connectors/source-amazon-sqs/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from source_amazon_sqs.run import run
import sys

from airbyte_cdk.entrypoint import launch
from source_amazon_sqs import SourceAmazonSqs

if __name__ == "__main__":
run()
source = SourceAmazonSqs()
launch(source, sys.argv[1:])
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 983fd355-6bf3-4709-91b5-37afa391eeb6
dockerImageTag: 0.1.1
dockerImageTag: 0.2.0
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.2.2@sha256:57703de3b4c4204bd68a7b13c9300f8e03c0189bffddaffc796f1da25d2dbea0
dockerRepository: airbyte/source-amazon-sqs
documentationUrl: https://docs.airbyte.com/integrations/sources/amazon-sqs
githubIssueLabel: source-amazon-sqs
Expand Down
1,501 changes: 1,501 additions & 0 deletions airbyte-integrations/connectors/source-amazon-sqs/poetry.lock

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions airbyte-integrations/connectors/source-amazon-sqs/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[build-system]
requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.2.0"
name = "source-amazon-sqs"
description = "Source implementation for Amazon SQS."
authors = [ "Airbyte <contact@airbyte.io>",]
license = "MIT"
readme = "README.md"
documentation = "https://docs.airbyte.com/integrations/sources/amazon-sqs"
homepage = "https://airbyte.com"
repository = "https://github.com/airbytehq/airbyte"
[[tool.poetry.packages]]
include = "source_amazon_sqs"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "^1"
boto3 = "^1.34.125"

[tool.poetry.scripts]
source-amazon-sqs = "source_amazon_sqs.run:run"

[tool.poetry.group.dev.dependencies]
requests-mock = "^1.9.3"
pytest-mock = "^3.6.1"
pytest = "^6.2"
moto = {version = "^5.0.9", extras = ["sqs", "iam"]}

This file was deleted.

40 changes: 0 additions & 40 deletions airbyte-integrations/connectors/source-amazon-sqs/setup.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@


import json
import logging
from datetime import datetime
from typing import Dict, Generator

import boto3
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
Expand Down Expand Up @@ -41,7 +41,7 @@ def change_message_visibility(self, message, visibility_timeout):
def parse_queue_name(self, url: str) -> str:
return url.rsplit("/", 1)[-1]

def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
def check(self, logger: logging.Logger, config: json) -> AirbyteConnectionStatus:
try:
if "max_batch_size" in config:
# Max batch size must be between 1 and 10
Expand All @@ -62,9 +62,14 @@ def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
logger.debug("Amazon SQS Source Config Check - access_key (ends with): " + access_key[-1])
secret_key = config["secret_key"]
logger.debug("Amazon SQS Source Config Check - secret_key (ends with): " + secret_key[-1])
session_token = config.get("session_token")
if session_token:
logger.debug("Amazon SQS Source Config Check - secret_key (ends with): " + session_token[-1])

logger.debug("Amazon SQS Source Config Check - Starting connection test ---")
session = boto3.Session(aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=queue_region)
session = boto3.Session(
aws_access_key_id=access_key, aws_secret_access_key=secret_key, aws_session_token=session_token, region_name=queue_region
)
sqs = session.resource("sqs")
queue = sqs.Queue(url=queue_url)
if hasattr(queue, "attributes"):
Expand All @@ -79,7 +84,7 @@ def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
status=Status.FAILED, message=f"Amazon SQS Source Config Check - An exception occurred: {str(e)}"
)

def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
def discover(self, logger: logging.Logger, config: json) -> AirbyteCatalog:
streams = []

# Get the queue name by getting substring after last /
Expand All @@ -95,7 +100,7 @@ def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
return AirbyteCatalog(streams=streams)

def read(
self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any]
self, logger: logging.Logger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any]
) -> Generator[AirbyteMessage, None, None]:
stream_name = self.parse_queue_name(config["queue_url"])
logger.debug("Amazon SQS Source Read - stream is: " + stream_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"title": "Amazon SQS Source Spec",
"type": "object",
"required": ["queue_url", "region", "delete_messages"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"queue_url": {
"title": "Queue URL",
Expand Down Expand Up @@ -107,6 +107,13 @@
"examples": ["hu+qE5exxxxT6o/ZrKsxxxxxxBhxxXLexxxxxVKz"],
"airbyte_secret": true,
"order": 8
},
"session_token": {
"title": "AWS IAM Session Token",
"description": "The Temporary Session Token of the AWS IAM Role to use for pulling messages",
"type": "string",
"airbyte_secret": true,
"order": 9
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
#

import json
import logging
from typing import Any, Dict, Mapping

import boto3
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import ConfiguredAirbyteCatalog, Status

# from airbyte_cdk.sources.source import Source
from moto import mock_iam, mock_sqs
from moto import mock_aws
from moto.core import set_initial_no_auth_action_count
from source_amazon_sqs import SourceAmazonSqs


@mock_iam
@mock_aws
def create_user_with_all_permissions():
client = boto3.client("iam", region_name="eu-west-1")
client.create_user(UserName="test_user1")
Expand Down Expand Up @@ -52,8 +52,7 @@ def get_catalog() -> Mapping[str, Any]:


@set_initial_no_auth_action_count(3)
@mock_sqs
@mock_iam
@mock_aws
def test_check():
# Create User
user = create_user_with_all_permissions()
Expand All @@ -67,15 +66,15 @@ def test_check():
# Create config
config = create_config(queue_url, user["AccessKeyId"], user["SecretAccessKey"], queue_region, False)
# Create AirbyteLogger
logger = AirbyteLogger()
logger = logging.Logger(name="test")
# Create Source
source = SourceAmazonSqs()
# Run check
status = source.check(logger, config)
assert status.status == Status.SUCCEEDED


@mock_sqs
@mock_aws
def test_discover():
# Create Queue
queue_name = "amazon-sqs-mock-queue"
Expand All @@ -85,7 +84,7 @@ def test_discover():
# Create config
config = create_config(queue_url, "xxx", "xxx", queue_region, False)
# Create AirbyteLogger
logger = AirbyteLogger()
logger = logging.Logger(name="test_2")
# Create Source
source = SourceAmazonSqs()
# Run discover
Expand All @@ -94,8 +93,7 @@ def test_discover():


@set_initial_no_auth_action_count(3)
@mock_sqs
@mock_iam
@mock_aws
def test_read():
# Create User
user = create_user_with_all_permissions()
Expand All @@ -112,7 +110,7 @@ def test_read():
# Create ConfiguredAirbyteCatalog
catalog = ConfiguredAirbyteCatalog(streams=get_catalog()["streams"])
# Create AirbyteLogger
logger = AirbyteLogger()
logger = logging.Logger(name="test_3")
# Create State
state = Dict[str, any]
# Create Source
Expand Down
5 changes: 4 additions & 1 deletion docs/integrations/sources/amazon-sqs.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ Required properties are 'Queue URL', 'AWS Region' and 'Delete Messages After Rea
- If `Delete Messages After Read` is `true` then `sqs:DeleteMessage` is also needed
- AWS IAM Secret Key (STRING)
- The Secret Key for the IAM User with permissions on this Queue
- AWS IAM Session Token (STRING)
- The temporary Session token for the IAM role with permissions on this Queue

### Data loss warning

Expand Down Expand Up @@ -101,7 +103,8 @@ Extra care should be taken to understand this risk before enabling this option.
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :-------------------------------------------------------- | :-------------------------------- |
|:--------| :--------- | :-------------------------------------------------------- |:----------------------------------|
| 0.2.0 | 2024-02-21 | [#35483](https://github.com/airbytehq/airbyte/pull/35483) | Allow role based access |
| 0.1.1 | 2024-01-03 | [#33924](https://github.com/airbytehq/airbyte/pull/33924) | Add new ap-southeast-3 AWS region |
| 0.1.0 | 2021-10-10 | [\#0000](https://github.com/airbytehq/airbyte/pull/0000) | Initial version |

Expand Down
Loading