diff --git a/README.md b/README.md index 16235a2..570e3c6 100644 --- a/README.md +++ b/README.md @@ -36,9 +36,6 @@ Welcome to the Diaspora Event Fabric Service documentation! Our goal in the Dias ### TODOs -- **Event Fabric** - - Implement usage logging - - **Trigger Improvements** - Enhance authentication mechanisms - Simplify usability diff --git a/docs/grafana-console/intro.md b/docs/grafana-console/intro.md index 688c8ce..66f9754 100644 --- a/docs/grafana-console/intro.md +++ b/docs/grafana-console/intro.md @@ -7,6 +7,7 @@ replace them below with appropriate credentials: - `AWS_ACCESS_KEY_ID`: `********************` - `AWS_SECRET_ACCESS_KEY`: `****************************` +The keys need to have IAM access to S3 for Thanos; For details see [this article](https://aws.amazon.com/blogs/opensource/improving-ha-and-long-term-storage-for-prometheus-using-thanos-on-eks-with-s3/). ### 1. System updates and docker install ```bash diff --git a/examples/sdk_and_flows.ipynb b/examples/sdk_and_flows.ipynb new file mode 100644 index 0000000..65adb49 --- /dev/null +++ b/examples/sdk_and_flows.ipynb @@ -0,0 +1,944 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 10-Minute Quick Demo of Diaspora Event SDK and Action Provider Features\n", + "\n", + "#### SDK Features to Demo\n", + "| Section | SDK Features |\n", + "|---------|--------------|\n", + "| 1.1 | Register and List topics |\n", + "| 1.2 | List and update and topic configs |\n", + "| 1.3 | Produce and consume messages |\n", + "| 1.4 | Create and update triggers, get log events |\n", + "\n", + "#### AP Features to Demo\n", + "| Section | AP Features |\n", + "|---------|-------------|\n", + "| 2.1 | Produce messages to AP with keys |\n", + "| 2.2 | Consume messages since a timestamp |\n", + "\n", + "For a comprehensive list of SDK features, visit the [Diaspora SDK repository](https://github.com/globus-labs/diaspora-event-sdk/blob/main/DiasporaDemo.ipynb).\n", + "\n", + "For a full list of AP features, refer to the [Diaspora Service documentation](https://haochenpan.github.io/diaspora-service/main/ap/examples/)." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### SDK requirement\n", + "`%pip install 'diaspora-event-sdk[kafka-python]'`\n", + "\n", + "#### AP requirement (already satisfied through SDK requirements)\n", + "`%pip install globus-sdk`" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 0.1 Import libraries and print SDK versions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"Import necessary libraries for Demo.\"\"\"\n", + "from __future__ import annotations\n", + "\n", + "import base64\n", + "import json\n", + "import os\n", + "import random\n", + "import string\n", + "import time\n", + "import uuid\n", + "\n", + "import globus_sdk\n", + "import globus_sdk.scopes\n", + "from diaspora_event_sdk import block_until_ready\n", + "from diaspora_event_sdk import Client\n", + "from diaspora_event_sdk import KafkaConsumer\n", + "from diaspora_event_sdk import KafkaProducer\n", + "from diaspora_event_sdk.version import __version__ as diaspora_sdk_version\n", + "from globus_sdk import __version__ as globus_sdk_version" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "c = Client()\n", + "print('Globus SDK version:', globus_sdk_version)\n", + "print('Diaspora Event SDK version:', diaspora_sdk_version)\n", + "print(\"User's OpenID:\", c.subject_openid)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 0.2 Create a cluster authentication credential and verify cluster connection" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(c.create_key())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert block_until_ready()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.1 Register Topics" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "topic1 = 'topic-' + c.subject_openid[-12:]\n", + "print('Topic name:', topic1)\n", + "print(c.register_topic(topic1))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "topic2_suffix = ''.join(random.choice(string.ascii_uppercase +\n", + " string.digits) for _ in range(8))\n", + "topic2 = 'topic-' + topic2_suffix\n", + "print('Topic name:', topic2)\n", + "print(c.register_topic(topic2))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(c.list_topics())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.2 List and Update Topic Configs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(c.get_topic_configs(topic2))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "configs = {\n", + " 'delete.retention.ms': 43200000,\n", + " 'retention.ms': 43200000,\n", + "}\n", + "print(c.update_topic_configs(topic2, configs))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(c.update_topic_partitions(topic2, 2))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.3 Produce and Consume Messages" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.3.1 Synchronously produce messages to a registered topic." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "producer = KafkaProducer()\n", + "future = producer.send(\n", + " topic1, {'message': 'Synchronous message 1 from Diaspora SDK'})\n", + "print(future.get(timeout=10))\n", + "future = producer.send(\n", + " topic1, {'message': 'Synchronous message 2 from Diaspora SDK'})\n", + "print(future.get(timeout=10))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.3.2 Asynchronously produce batched messages to a registered topic." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "producer = KafkaProducer()\n", + "producer.send(topic1, {'message': 'Asynchronous message 3 from Diaspora SDK'})\n", + "producer.send(topic1, {'message': 'Asynchronous message 4 from Diaspora SDK'})\n", + "producer.flush()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.3.3 Consume messages from the beginning of the topic (need to interrupt). " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "consumer = KafkaConsumer(topic1, auto_offset_reset='earliest')\n", + "for message in consumer:\n", + " print(message)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1.4 Create and Update triggers, Get Execution Logs" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.4.1 Create a deployment package" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "trigger_package = f'{os.getcwd()}/my_deployment_package'\n", + "trigger_file = 'lambda_function.py'\n", + "trigger_name_in_def='lambda_handler'\n", + "\n", + "os.system(f'mkdir {trigger_package}')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.4.2 Save code to `trigger_package/trigger_file`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "trigger_code = f'''import base64\n", + "\n", + "def {trigger_name_in_def}(event, context):\n", + " try:\n", + " print('EVENT:')\n", + " print(event)\n", + "\n", + " for partition, records in event['records'].items():\n", + " for record in records:\n", + " print(\"topic:\", record['topic'],\n", + " \"partition:\", record['partition'],\n", + " \"offset:\", record['offset'],\n", + " \"key:\", record.get(\"key\", \"NOT-SET\"),\n", + " \"value:\", base64.b64decode(record['value']))\n", + " except Exception as e:\n", + " print(\"ERROR:\", e)\n", + "'''\n", + "\n", + "with open(os.path.join(trigger_package, trigger_file), 'w') as f:\n", + " f.write(trigger_code)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.4.3 Zip the code in `trigger_file`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_zipped_code(lambda_function_package): # noqa: D103\n", + " print(f'Zipping {lambda_function_package}')\n", + " os.system(f'cd {lambda_function_package} && zip -r {lambda_function_package}.zip .') # noqa: E501\n", + " with open(f'{lambda_function_package}.zip', 'rb') as f:\n", + " return base64.b64encode(f.read()).decode('utf-8')\n", + "\n", + "\n", + "zipped_code = get_zipped_code(trigger_package)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.4.4 Inspect trigger info\n", + "\n", + "Note: one topic can be associated with multiple triggers\n", + "\n", + "`topic_name`: which topic to consume from\n", + "\n", + "`function_name`: along with topic_name, used to identify and delete the function\n", + "\n", + "`function_runtime`: a function runtime like `python3.11` and `python3.12`\n", + "\n", + "`function_handler`: py-file-name.function-name\n", + "\n", + "`function_code_zipped`: serialized function code\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "topic_name = 'topic-' + c.subject_openid[-12:]\n", + "trigger_name = f'lambda{random.randint(100, 999)}'\n", + "trigger_runtime = 'python3.11'\n", + "trigger_handler = f'{trigger_file.split(\".\")[0]}.{trigger_name_in_def}'\n", + "print(c.register_topic(topic_name))\n", + "print()\n", + "print('topic name:\\t\\t', topic_name)\n", + "print('trigger name:\\t\\t', trigger_name)\n", + "print('trigger runtime:\\t', trigger_runtime)\n", + "print('trigger handler:\\t', trigger_handler)\n", + "print('zipped trigger code:\\t', zipped_code)\n", + "print('length of the code:\\t', len(zipped_code))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.4.5 Create the trigger\n", + "\n", + "Note: the call blocks for a few seconds to wait for creation results or error message.\n", + "\n", + "Default values are listed in the table below, note that if the runtime is `python3.11` or `python3.12`, a layer with Globus SDK and Diaspora SDK will be attached.\n", + "\n", + "[Trigger parameter syntax (`Code`, etc.)](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/create_function.html)\n", + "\n", + "| Trigger Parameter | Default Value |\n", + "|--------------------|------------------------------------|\n", + "| Runtime | python3.11 |\n", + "| Handler | lambda_function.lambda_handler |\n", + "| Code | {} |\n", + "| Timeout | 30 |\n", + "| MemorySize | 128 |\n", + "| Environment | {} |\n", + "| EphemeralStorage | {'Size': 512} |\n", + "| Layers | [] |\n", + "\n", + "\n", + "\n", + "[Invocation parameter syntax (`FilterCriteria`, etc.)](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax)\n", + "\n", + "| Invocation Parameter | Default Value |\n", + "|--------------------------------|---------------|\n", + "| Enabled | True |\n", + "| BatchSize | 1 |\n", + "| FilterCriteria | {} |\n", + "| MaximumBatchingWindowInSeconds | 500ms |\n", + "| StartingPosition | LATEST |\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "'''\n", + " Create a new trigger that response to events in a registered topic.\n", + " Note: the creation call takes around 10 seconds to return.\n", + " Note: for Python 3.12 runtime, use \n", + " arn:aws:lambda:us-east-1:845889416464:layer:globus-diaspora-layer312:1\n", + " to enable the globus SDK in the trigger.\n", + " expected return (first time): {\"status\": \"success\", \"message\": ...}\n", + " expected return (subsequent): {\"status\": \"error\", \"message\": ...}\n", + "'''\n", + "\n", + "trigger_configs = {\n", + " 'Runtime': trigger_runtime,\n", + " 'Handler': trigger_handler,\n", + " 'Code': {'ZipFile': zipped_code},\n", + " 'Timeout': 3,\n", + " 'MemorySize': 128,\n", + " 'Environment': {},\n", + " 'EphemeralStorage': {'Size': 512},\n", + " 'Layers': ['arn:aws:lambda:us-east-1:845889416464:layer:globus-diaspora-layer311:1'], # noqa: E501\n", + "}\n", + "invoke_configs = {\n", + " 'Enabled': True,\n", + " 'BatchSize': 1,\n", + " 'StartingPosition': 'LATEST',\n", + "}\n", + "print(c.create_trigger(topic_name, trigger_name, \n", + " trigger_configs, invoke_configs))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### List created triggers" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "'''\n", + " List all triggered created by the user.\n", + " Note: the print function below highlights the trigger name, \n", + " handler name, uuid, and topic it taps on.\n", + " expected return:\n", + " trigger name: ... trigger handler name: ... \n", + " trigger uuid: ... trigger topic: ...\n", + "'''\n", + "\n", + "for function in c.list_triggers()['triggers']:\n", + " print('trigger name:', function['function_name'], '\\n',\n", + " 'trigger handler name:',\n", + " function['function_detail']['Configuration']['Handler'], '\\n',\n", + " 'trigger uuid:', function['triggers'][0]['UUID'], '\\n',\n", + " 'trigger topic:', function['triggers'][0]['Topics'][0], '\\n')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.4.6 Produce events to invoke the trigger and verify invocations through inspecting the latest log stream" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "'''\n", + " Synchronously produce messages to a registered topic to invoke triggers\n", + " expected return: \n", + " multiple RecordMetadata(...)\n", + "'''\n", + "\n", + "producer = KafkaProducer()\n", + "future = producer.send(\n", + " topic_name, {'message': 'Synchronous message 3 from Diaspora SDK'})\n", + "print(future.get(timeout=10))\n", + "future = producer.send(\n", + " topic_name, {'message': 'Synchronous message 4 from Diaspora SDK'})\n", + "print(future.get(timeout=10))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "'''\n", + " Get the list of log streams belong to the trigger.\n", + " Note: recent_log_stream_name may not contain logs of all invocations,\n", + " as some logs may exist in other streams.\n", + " expected return: {\"status\": \"success\", \"streams\": [...]}\n", + "'''\n", + "\n", + "streams_response = c.list_log_streams(trigger_name)\n", + "print(streams_response)\n", + "recent_log_stream_name = streams_response['streams'][0]['logStreamName']" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "'''\n", + " Retrieve the events in a particular log stream.\n", + " Note: this log stream may not contain logs of all trigger invocations,\n", + " as some logs may exist in other streams.\n", + " expected return: {\"status\": \"success\", \"events\": [...]}\n", + "'''\n", + "print(c.get_log_events(trigger_name, recent_log_stream_name))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.4.7 Trigger deletion call" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "'''\n", + " Delete trigger by (topic_name, trigger_name)\n", + " expected return: {\"status\": \"success\", \"message\": ...}\n", + "'''\n", + "print(c.delete_trigger(topic_name, trigger_name))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "'''\n", + " List all triggered created by the user.\n", + " expected return (if all triggers are deleted): None \n", + " expected return (otherwise): \n", + " {'function_name': ..., 'function_detail': ..., 'triggers': [...]}\n", + "'''\n", + "for function in c.list_triggers()['triggers']:\n", + " print('trigger name:', function['function_name'], '\\n',\n", + " 'trigger handler name:', \n", + " function['function_detail']['Configuration']['Handler'], '\\n',\n", + " 'trigger uuid:', function['triggers'][0]['UUID'], '\\n',\n", + " 'trigger topic:', function['triggers'][0]['Topics'][0], '\\n')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.4.8 Delete the local lambda package" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "os.system(f'rm -rf {trigger_package}')\n", + "os.system(f'rm {trigger_package}.zip')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2.1 Produce messages to AP with keys" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# ID of this tutorial notebook as registered with Globus Auth\n", + "CLIENT_ID = 'f794186b-f330-4595-b6c6-9c9d3e903e47'\n", + "\n", + "# Do a native app authentication flow to get tokens that allow us\n", + "# to interact with the Globus Flows service\n", + "\n", + "scopes = [\n", + " 'openid',\n", + " 'profile',\n", + " 'email',\n", + " globus_sdk.FlowsClient.scopes.manage_flows,\n", + " globus_sdk.FlowsClient.scopes.run_manage,\n", + "]\n", + "native_auth_client = globus_sdk.NativeAppAuthClient(CLIENT_ID)\n", + "native_auth_client.oauth2_start_flow(requested_scopes=scopes)\n", + "print(f'Login Here:\\n\\n{native_auth_client.oauth2_get_authorize_url()}')\n", + "\n", + "auth_code = input('Authorization Code: ')\n", + "response = native_auth_client.oauth2_exchange_code_for_tokens(auth_code)\n", + "\n", + "tokens = response.by_resource_server\n", + "print(json.dumps(tokens, indent=2))\n", + "\n", + "flows_authorizer = globus_sdk.AccessTokenAuthorizer(\n", + " access_token=tokens['flows.globus.org']['access_token'],\n", + ")\n", + "flows_client = globus_sdk.FlowsClient(authorizer=flows_authorizer)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create an Auth client so we can look up identities\n", + "auth_authorizer = globus_sdk.AccessTokenAuthorizer(\n", + " access_token=tokens['auth.globus.org']['access_token'],\n", + ")\n", + "ac = globus_sdk.AuthClient(authorizer=auth_authorizer)\n", + "\n", + "# Get the user's primary identity\n", + "primary_identity = ac.oauth2_userinfo()\n", + "identity_id = primary_identity['sub']\n", + "\n", + "print(f\"Username: {primary_identity['preferred_username']}\")\n", + "print(f'ID: {identity_id}')\n", + "print('Topic to produce/consume:', topic1)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 2.1.1 Select a Topic" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "action_url = 'https://diaspora-action-provider.ml22sevubfnks.us-east-1.cs.amazonlightsail.com/'\n", + "\n", + "flow_definition = {\n", + " 'Comment': 'Publish messages to Diaspora Event Fabric',\n", + " 'StartAt': 'PublishMessages',\n", + " 'States': {\n", + " 'PublishMessages': {\n", + " 'Comment': 'Send messages to a specified topic in Diaspora',\n", + " 'Type': 'Action',\n", + " 'ActionUrl': action_url,\n", + " 'Parameters': {\n", + " 'action.$': '$.input.action',\n", + " 'topic.$': '$.input.topic',\n", + " 'msgs.$': '$.input.msgs',\n", + " 'keys.$': '$.input.keys',\n", + " },\n", + " 'ResultPath': '$.PublishMessages',\n", + " 'End': True,\n", + " },\n", + " },\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "flow_title = f'Diapora-AP-Flow-{str(uuid.uuid4())[:4]}'\n", + "flow = flows_client.create_flow(\n", + " title=flow_title,\n", + " definition=flow_definition,\n", + " # definition=flow_definition2,\n", + " input_schema={},\n", + ")\n", + "flow_id = flow['id']\n", + "flow_scope = globus_sdk.SpecificFlowClient(flow_id).scopes.make_mutable('user')\n", + "print(f\"Successfully created flow: '{flow_title} (ID: {flow_id})\")\n", + "print(f'View the flow in the Web App: https://app.globus.org/flows/{flow_id}')\n", + "\n", + "if flow_id not in tokens:\n", + " # Do a native app authentication flow and get tokens that\n", + " # include the newly deployed flow scope\n", + " native_auth_client = globus_sdk.NativeAppAuthClient(CLIENT_ID)\n", + " native_auth_client.oauth2_start_flow(requested_scopes=flow_scope)\n", + " print(f'Login Here:\\n\\n{native_auth_client.oauth2_get_authorize_url()}')\n", + "\n", + " # Authenticate and come back with your authorization code;\n", + " # paste it into the prompt below.\n", + " auth_code = input('Authorization Code: ')\n", + " token_response = native_auth_client.oauth2_exchange_code_for_tokens(\n", + " auth_code,\n", + " )\n", + "\n", + " # Save the new token in a place where the flows client can retrieve it.\n", + " tokens[flow_id] = token_response.by_resource_server[flow_id]\n", + "\n", + " # These are the saved scopes for the flow\n", + " print(json.dumps(tokens, indent=2))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "flow_input = {\n", + " 'input': {\n", + " 'action': 'produce',\n", + " 'topic': topic1,\n", + " 'msgs': [\n", + " {'content1': 'hello world1'},\n", + " {'content2': 'hello world2'},\n", + " {'content3': 'hello world3'},\n", + " ],\n", + " 'keys': [\n", + " 'my-key-123',\n", + " 'my-key-456',\n", + " 'my-key-789',\n", + " ],\n", + " },\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get a client for the flow\n", + "specific_flow_authorizer = globus_sdk.AccessTokenAuthorizer(\n", + " access_token=tokens[flow_id]['access_token'],\n", + ")\n", + "print(tokens[flow_id]['access_token'])\n", + "specific_flow_client = globus_sdk.SpecificFlowClient(\n", + " flow_id=flow_id,\n", + " authorizer=specific_flow_authorizer,\n", + ")\n", + "\n", + "# Run the flow\n", + "# Set a descriptive label for this flow run\n", + "run_label = f\"Diaspora AP Flow by {primary_identity['preferred_username']}\"\n", + "run = specific_flow_client.run_flow(\n", + " body=flow_input,\n", + " label=run_label,\n", + " tags=['tutorial', 'diaspora'],\n", + ")\n", + "\n", + "# Get run details\n", + "run_id = run['run_id']\n", + "run_status = run['status']\n", + "print('This flow can be monitored in the Web App:')\n", + "print(f'https://app.globus.org/runs/{run_id}')\n", + "print(f'Flow run started with ID: {run_id} - Status: {run_status}')\n", + "\n", + "# Poll the Flow service to check on the status of the flow\n", + "while run_status == 'ACTIVE':\n", + " time.sleep(5)\n", + " run = flows_client.get_run(run_id)\n", + " run_status = run['status']\n", + " print(f'Run status: {run_status}')\n", + "\n", + "# Run completed\n", + "print(json.dumps(run.data, indent=2))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2.2 Consume messages since a timestamp" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "flow_definition_b = {\n", + " 'Comment': 'Consume messages to Diaspora Event Fabric',\n", + " 'StartAt': 'ConsumeMessages',\n", + " 'States': {\n", + " 'ConsumeMessages': {\n", + " 'Comment': 'Receive messages from a specified topic in Diaspora',\n", + " 'Type': 'Action',\n", + " 'ActionUrl': action_url,\n", + " 'Parameters': {\n", + " 'action.$': '$.input.action',\n", + " 'topic.$': '$.input.topic',\n", + " 'ts.$': '$.input.ts',\n", + " },\n", + " 'ResultPath': '$.ConsumeMessages',\n", + " 'End': True,\n", + " },\n", + " },\n", + "}\n", + "\n", + "flows_client.update_flow(flow_id,\n", + " definition=flow_definition_b,\n", + " input_schema={},\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "flow_input_b = {\n", + " 'input': {'action': 'consume', 'topic': topic1, 'ts': 1715930522000},\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get a client for the flow\n", + "specific_flow_authorizer = globus_sdk.AccessTokenAuthorizer(\n", + " access_token=tokens[flow_id]['access_token'],\n", + ")\n", + "print(tokens[flow_id]['access_token'])\n", + "specific_flow_client = globus_sdk.SpecificFlowClient(\n", + " flow_id=flow_id,\n", + " authorizer=specific_flow_authorizer,\n", + ")\n", + "\n", + "# Run the flow\n", + "# Set a descriptive label for this flow run\n", + "run_label = f\"Diaspora AP Flow by {primary_identity['preferred_username']}\"\n", + "run = specific_flow_client.run_flow(\n", + " body=flow_input_b,\n", + " label=run_label,\n", + " tags=['tutorial', 'diaspora'],\n", + ")\n", + "\n", + "# Get run details\n", + "run_id = run['run_id']\n", + "run_status = run['status']\n", + "print('This flow can be monitored in the Web App:')\n", + "print(f'https://app.globus.org/runs/{run_id}')\n", + "print(f'Flow run started with ID: {run_id} - Status: {run_status}')\n", + "\n", + "# Poll the Flow service to check on the status of the flow\n", + "while run_status == 'ACTIVE':\n", + " time.sleep(5)\n", + " run = flows_client.get_run(run_id)\n", + " run_status = run['status']\n", + " print(f'Run status: {run_status}')\n", + "\n", + "# Run completed\n", + "print(json.dumps(run.data, indent=2))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Clean up: Reset and unregister topics" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(c.reset_topic(topic1))\n", + "print(c.reset_topic(topic2))\n", + "print(c.unregister_topic(topic1))\n", + "print(c.unregister_topic(topic2))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(c.list_topics())" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}