Skip to content

Commit

Permalink
Bump pinotdb version (#27201)
Browse files Browse the repository at this point in the history
Issue python-pinot-dbapi/pinot-dbapi#47 is resolved and released in 0.4.8 thus we can bump the package version
  • Loading branch information
a0x8o committed Oct 24, 2022
1 parent 603e591 commit d6a4c25
Show file tree
Hide file tree
Showing 1,684 changed files with 44,622 additions and 43,423 deletions.
5 changes: 4 additions & 1 deletion BREEZE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1408,7 +1408,10 @@ should be run on multiple combinations of Python, Kubernetes, Backend versions.
needed to run the CI Builds. You can also use the tool to test what tests will be run when you provide
a specific commit that Breeze should run the tests on.
More details about the algorithm used to pick the right tests can be
The selective-check command will produce the set of ``name=value`` pairs of outputs derived
from the context of the commit/PR to be merged via stderr output.
More details about the algorithm used to pick the right tests and the available outputs can be
found in `Selective Checks <dev/breeze/SELECTIVE_CHECKS.md>`_.
Those are all available flags of ``selective-check`` command:
Expand Down
12 changes: 6 additions & 6 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -616,12 +616,12 @@ apache.spark, apache.sqoop, apache.webhdfs, arangodb, asana, async, atlas, atlas
azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes, common.sql, crypto, dask, databricks,
datadog, dbt.cloud, deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord,
doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise,
google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, jira,
kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp,
microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, pandas,
papermill, password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce,
samba, segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd,
tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos,
kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo,
mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, pandas, papermill, password,
pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment,
sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, tabular,
telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
.. END EXTRAS HERE
Provider packages
Expand Down
12 changes: 6 additions & 6 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ apache.spark, apache.sqoop, apache.webhdfs, arangodb, asana, async, atlas, atlas
azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes, common.sql, crypto, dask, databricks,
datadog, dbt.cloud, deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord,
doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise,
google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, jira,
kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp,
microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, pandas,
papermill, password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce,
samba, segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd,
tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos,
kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo,
mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, pandas, papermill, password,
pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment,
sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, tabular,
telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
# END EXTRAS HERE

# For installing Airflow in development environments - see CONTRIBUTING.rst
Expand Down
4 changes: 3 additions & 1 deletion STATIC_CODE_CHECKS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ require Breeze Docker image to be build locally.
+--------------------------------------------------------+------------------------------------------------------------------+---------+
| ID | Description | Image |
+========================================================+==================================================================+=========+
| black | Run Black (the uncompromising Python code formatter) | |
| black | * Run black (python formatter) on core | |
| | * Run black (python formatter) on providers | |
| | * Run black (python formatter) on other | |
+--------------------------------------------------------+------------------------------------------------------------------+---------+
| blacken-docs | Run black on python code blocks in documentation files | |
+--------------------------------------------------------+------------------------------------------------------------------+---------+
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ def _execute(self, session=None):
pickle_id = pickle.id

executor = self.executor
executor.job_id = "backfill"
executor.job_id = self.id
executor.start()

ti_status.total_runs = len(dagrun_infos) # total dag runs in backfill
Expand Down
64 changes: 24 additions & 40 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@
from airflow.models.dag import DAG
from airflow.models.operator import Operator


CreatedTasksType = TypeVar("CreatedTasksType")
CreatedTasks = TypeVar("CreatedTasks", Iterator["dict[str, Any]"], Iterator[TI])
TaskCreator = Callable[[Operator, Iterable[int]], CreatedTasks]


class TISchedulingDecision(NamedTuple):
Expand Down Expand Up @@ -854,11 +854,7 @@ def _emit_duration_stats_for_finished_state(self):
Stats.timing(f'dagrun.duration.failed.{self.dag_id}', duration)

@provide_session
def verify_integrity(
self,
*,
session: Session = NEW_SESSION,
):
def verify_integrity(self, *, session: Session = NEW_SESSION) -> None:
"""
Verifies the DagRun by checking for removed tasks or tasks that are not in the
database yet. It will set state to removed or add the task if required.
Expand All @@ -885,14 +881,12 @@ def task_filter(task: Operator) -> bool:
)

created_counts: dict[str, int] = defaultdict(int)

# Get task creator function
task_creator = self._get_task_creator(created_counts, task_instance_mutation_hook, hook_is_noop)

# Create the missing tasks, including mapped tasks
tasks = self._create_tasks(dag, task_creator, task_filter, session=session)

self._create_task_instances(dag.dag_id, tasks, created_counts, hook_is_noop, session=session)
tasks_to_create = (task for task in dag.task_dict.values() if task_filter(task))
tis_to_create = self._create_tasks(tasks_to_create, task_creator, session=session)
self._create_task_instances(self.dag_id, tis_to_create, created_counts, hook_is_noop, session=session)

def _check_for_removed_or_restored_tasks(
self, dag: DAG, ti_mutation_hook, *, session: Session
Expand Down Expand Up @@ -978,7 +972,7 @@ def _get_task_creator(
created_counts: dict[str, int],
ti_mutation_hook: Callable,
hook_is_noop: Literal[True],
) -> Callable[[Operator, tuple[int, ...]], Iterator[dict[str, Any]]]:
) -> Callable[[Operator, Iterable[int]], Iterator[dict[str, Any]]]:
...

@overload
Expand All @@ -987,15 +981,15 @@ def _get_task_creator(
created_counts: dict[str, int],
ti_mutation_hook: Callable,
hook_is_noop: Literal[False],
) -> Callable[[Operator, tuple[int, ...]], Iterator[TI]]:
) -> Callable[[Operator, Iterable[int]], Iterator[TI]]:
...

def _get_task_creator(
self,
created_counts: dict[str, int],
ti_mutation_hook: Callable,
hook_is_noop: Literal[True, False],
) -> Callable[[Operator, tuple[int, ...]], Iterator[dict[str, Any]] | Iterator[TI]]:
) -> Callable[[Operator, Iterable[int]], Iterator[dict[str, Any]] | Iterator[TI]]:
"""
Get the task creator function.
Expand All @@ -1008,7 +1002,7 @@ def _get_task_creator(
"""
if hook_is_noop:

def create_ti_mapping(task: Operator, indexes: tuple[int, ...]) -> Iterator[dict[str, Any]]:
def create_ti_mapping(task: Operator, indexes: Iterable[int]) -> Iterator[dict[str, Any]]:
created_counts[task.task_type] += 1
for map_index in indexes:
yield TI.insert_mapping(self.run_id, task, map_index=map_index)
Expand All @@ -1017,7 +1011,7 @@ def create_ti_mapping(task: Operator, indexes: tuple[int, ...]) -> Iterator[dict

else:

def create_ti(task: Operator, indexes: tuple[int, ...]) -> Iterator[TI]:
def create_ti(task: Operator, indexes: Iterable[int]) -> Iterator[TI]:
for map_index in indexes:
ti = TI(task, run_id=self.run_id, map_index=map_index)
ti_mutation_hook(ti)
Expand All @@ -1029,36 +1023,26 @@ def create_ti(task: Operator, indexes: tuple[int, ...]) -> Iterator[TI]:

def _create_tasks(
self,
dag: DAG,
task_creator: Callable[[Operator, tuple[int, ...]], CreatedTasksType],
task_filter: Callable[[Operator], bool],
tasks: Iterable[Operator],
task_creator: TaskCreator,
*,
session: Session,
) -> CreatedTasksType:
) -> CreatedTasks:
"""
Create missing tasks -- and expand any MappedOperator that _only_ have literals as input
:param dag: DAG object corresponding to the dagrun
:param task_creator: a function that creates tasks
:param task_filter: a function that filters tasks to create
:param session: the session to use
:param tasks: Tasks to create jobs for in the DAG run
:param task_creator: Function to create task instances
"""

def expand_mapped_literals(task: Operator) -> tuple[Operator, Sequence[int]]:
for task in tasks:
if not task.is_mapped:
return (task, (-1,))
task = cast("MappedOperator", task)
count = task.get_mapped_ti_count(self.run_id, session=session)
if not count:
return (task, (-1,))
return (task, range(count))

tasks_and_map_idxs = map(expand_mapped_literals, filter(task_filter, dag.task_dict.values()))

tasks: CreatedTasksType = itertools.chain.from_iterable( # type: ignore
itertools.starmap(task_creator, tasks_and_map_idxs) # type: ignore
)
return tasks
yield from task_creator(task, (-1,))
continue
count = cast(MappedOperator, task).get_mapped_ti_count(self.run_id, session=session)
if count:
yield from task_creator(task, range(count))
continue
yield from task_creator(task, (-1,))

def _create_task_instances(
self,
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1522,7 +1522,7 @@ def _run_raw_task(

if not test_mode:
session.add(Log(self.state, self))
session.merge(self)
session.merge(self).task = self.task
if self.state == TaskInstanceState.SUCCESS:
self._register_dataset_changes(session=session)
session.commit()
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/taskmixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,5 +272,5 @@ def get_direct_relatives(self, upstream: bool = False) -> Iterable[DAGNode]:
return self.downstream_list

def serialize_for_task_group(self) -> tuple[DagAttributeTypes, Any]:
"""This is used by SerializedTaskGroup to serialize a task group's content."""
"""This is used by TaskGroupSerialization to serialize a task group's content."""
raise NotImplementedError()
16 changes: 8 additions & 8 deletions airflow/providers/airbyte/hooks/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ class AirbyteHook(HttpHook):
:param api_version: Optional. Airbyte API version.
"""

conn_name_attr = 'airbyte_conn_id'
default_conn_name = 'airbyte_default'
conn_type = 'airbyte'
hook_name = 'Airbyte'
conn_name_attr = "airbyte_conn_id"
default_conn_name = "airbyte_default"
conn_type = "airbyte"
hook_name = "Airbyte"

RUNNING = "running"
SUCCEEDED = "succeeded"
Expand Down Expand Up @@ -121,19 +121,19 @@ def cancel_job(self, job_id: int) -> Any:

def test_connection(self):
"""Tests the Airbyte connection by hitting the health API"""
self.method = 'GET'
self.method = "GET"
try:
res = self.run(
endpoint=f"api/{self.api_version}/health",
headers={"accept": "application/json"},
extra_options={'check_response': False},
extra_options={"check_response": False},
)

if res.status_code == 200:
return True, 'Connection successfully tested'
return True, "Connection successfully tested"
else:
return False, res.text
except Exception as e:
return False, str(e)
finally:
self.method = 'POST'
self.method = "POST"
10 changes: 5 additions & 5 deletions airflow/providers/airbyte/operators/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class AirbyteTriggerSyncOperator(BaseOperator):
Only used when ``asynchronous`` is False.
"""

template_fields: Sequence[str] = ('connection_id',)
template_fields: Sequence[str] = ("connection_id",)

def __init__(
self,
Expand All @@ -71,18 +71,18 @@ def execute(self, context: Context) -> None:
"""Create Airbyte Job and wait to finish"""
self.hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
job_object = self.hook.submit_sync_connection(connection_id=self.connection_id)
self.job_id = job_object.json()['job']['id']
self.job_id = job_object.json()["job"]["id"]

self.log.info("Job %s was submitted to Airbyte Server", self.job_id)
if not self.asynchronous:
self.log.info('Waiting for job %s to complete', self.job_id)
self.log.info("Waiting for job %s to complete", self.job_id)
self.hook.wait_for_job(job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout)
self.log.info('Job %s completed successfully', self.job_id)
self.log.info("Job %s completed successfully", self.job_id)

return self.job_id

def on_kill(self):
"""Cancel the job if task is cancelled"""
if self.job_id:
self.log.info('on_kill: cancel the airbyte Job %s', self.job_id)
self.log.info("on_kill: cancel the airbyte Job %s", self.job_id)
self.hook.cancel_job(self.job_id)
8 changes: 4 additions & 4 deletions airflow/providers/airbyte/sensors/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ class AirbyteJobSensor(BaseSensorOperator):
:param api_version: Optional. Airbyte API version.
"""

template_fields: Sequence[str] = ('airbyte_job_id',)
ui_color = '#6C51FD'
template_fields: Sequence[str] = ("airbyte_job_id",)
ui_color = "#6C51FD"

def __init__(
self,
*,
airbyte_job_id: int,
airbyte_conn_id: str = 'airbyte_default',
airbyte_conn_id: str = "airbyte_default",
api_version: str = "v1",
**kwargs,
) -> None:
Expand All @@ -57,7 +57,7 @@ def __init__(
def poke(self, context: Context) -> bool:
hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
job = hook.get_job(job_id=self.airbyte_job_id)
status = job.json()['job']['status']
status = job.json()["job"]["status"]

if status == hook.FAILED:
raise AirflowException(f"Job failed: \n{job}")
Expand Down
Loading

0 comments on commit d6a4c25

Please sign in to comment.