diff --git a/CHANGELOG.md b/CHANGELOG.md index ca231bc6..aa067475 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +6.4.0 +----- + +- Add ``number_of_shards`` and ``number_of_replicas`` to as possible options +in the ``elasticsearch`` configuration file section (closes issue #78) + 6.3.7 ----- diff --git a/README.rst b/README.rst index 8194941b..c21a4949 100644 --- a/README.rst +++ b/README.rst @@ -161,6 +161,8 @@ The full set of configuration options are: - ``cert_path`` - str: Path to a trusted certificates - ``index_suffix`` - str: A suffix to apply to the index names - ``monthly_indexes`` - bool: Use monthly indexes instead of daily indexes + - ``number_of_shards`` - int: The number of shards to use when creating the index (Default: 1) + - ``number_of_replicas`` - int: The number of replicas to use when creating the index (Default: 1) - ``splunk_hec`` - ``url`` - str: The URL of the Splunk HTTP Events Collector (HEC) - ``token`` - str: The HEC token diff --git a/ci.ini b/ci.ini index 22fb5724..3e35e9cf 100644 --- a/ci.ini +++ b/ci.ini @@ -6,3 +6,6 @@ debug = True [elasticsearch] hosts = http://localhost:9200 ssl = False +number_of_shards=2 +number_of_replicas=2 + diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 8f4674e4..f0569ce9 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -38,7 +38,7 @@ from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime from parsedmarc.utils import parse_email -__version__ = "6.3.7" +__version__ = "6.4.0" logging.basicConfig( format='%(levelname)8s:%(filename)s:%(lineno)d:' diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 88f5a4e7..361695fe 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -80,10 +80,15 @@ def process_reports(reports_): for report in reports_["aggregate_reports"]: try: if opts.elasticsearch_hosts: + shards = opts.elasticsearch_number_of_shards + replicas = opts.elasticsearch_number_of_replicas elastic.save_aggregate_report_to_elasticsearch( report, index_suffix=opts.elasticsearch_index_suffix, - monthly_indexes=opts.elasticsearch_monthly_indexes) + monthly_indexes=opts.elasticsearch_monthly_indexes, + number_of_shards=shards, + number_of_replicas=replicas + ) except elastic.AlreadySaved as warning: logger.warning(warning.__str__()) except elastic.ElasticsearchError as error_: @@ -107,11 +112,15 @@ def process_reports(reports_): if opts.save_forensic: for report in reports_["forensic_reports"]: try: + shards = opts.elasticsearch_number_of_shards + replicas = opts.elasticsearch_number_of_replicas if opts.elasticsearch_hosts: elastic.save_forensic_report_to_elasticsearch( report, index_suffix=opts.elasticsearch_index_suffix, - monthly_indexes=opts.elasticsearch_monthly_indexes) + monthly_indexes=opts.elasticsearch_monthly_indexes, + number_of_shards=shards, + number_of_replicas=replicas) except elastic.AlreadySaved as warning: logger.warning(warning.__str__()) except elastic.ElasticsearchError as error_: @@ -195,6 +204,8 @@ def process_reports(reports_): hec_index=None, hec_skip_certificate_verification=False, elasticsearch_hosts=None, + elasticsearch_number_of_shards=1, + elasticsearch_number_of_replicas=1, elasticsearch_index_suffix=None, elasticsearch_ssl=True, elasticsearch_ssl_cert_path=None, @@ -303,6 +314,14 @@ def process_reports(reports_): logger.critical("hosts setting missing from the " "elasticsearch config section") exit(-1) + if "number_of_shards" in elasticsearch_config: + number_of_shards = elasticsearch_config.getint( + "number_of_shards") + opts.elasticsearch_number_of_shards = number_of_shards + if "number_of_replicas" in elasticsearch_config: + number_of_replicas = elasticsearch_config.getint( + "number_of_replicas") + opts.elasticsearch_number_of_replicas = number_of_replicas if "index_suffix" in elasticsearch_config: opts.elasticsearch_index_suffix = elasticsearch_config[ "index_suffix"] diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index 7da146ae..ebc642a1 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -271,7 +271,9 @@ def migrate_indexes(aggregate_indexes=None, forensic_indexes=None): def save_aggregate_report_to_elasticsearch(aggregate_report, index_suffix=None, - monthly_indexes=False): + monthly_indexes=False, + number_of_shards=1, + number_of_replicas=1): """ Saves a parsed DMARC aggregate report to ElasticSearch @@ -279,6 +281,8 @@ def save_aggregate_report_to_elasticsearch(aggregate_report, aggregate_report (OrderedDict): A parsed forensic report index_suffix (str): The suffix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes + number_of_shards (int): The number of shards to use in the index + number_of_replicas (int): The number of replicas to use in the index Raises: AlreadySaved @@ -374,7 +378,9 @@ def save_aggregate_report_to_elasticsearch(aggregate_report, if index_suffix: index = "{0}_{1}".format(index, index_suffix) index = "{0}-{1}".format(index, index_date) - create_indexes([index]) + index_settings = dict(number_of_shards=number_of_shards, + number_of_replicas=number_of_replicas) + create_indexes([index], index_settings) agg_doc.meta.index = index try: @@ -386,7 +392,9 @@ def save_aggregate_report_to_elasticsearch(aggregate_report, def save_forensic_report_to_elasticsearch(forensic_report, index_suffix=None, - monthly_indexes=False): + monthly_indexes=False, + number_of_shards=1, + number_of_replicas=1): """ Saves a parsed DMARC forensic report to ElasticSearch @@ -395,6 +403,9 @@ def save_forensic_report_to_elasticsearch(forensic_report, index_suffix (str): The suffix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes + number_of_shards (int): The number of shards to use in the index + number_of_replicas (int): The number of replicas to use in the + index Raises: AlreadySaved @@ -505,7 +516,9 @@ def save_forensic_report_to_elasticsearch(forensic_report, else: index_date = arrival_date.strftime("%Y-%m-%d") index = "{0}-{1}".format(index, index_date) - create_indexes([index]) + index_settings = dict(number_of_shards=number_of_shards, + number_of_replicas=number_of_replicas) + create_indexes([index], index_settings) forensic_doc.meta.index = index try: forensic_doc.save() diff --git a/setup.py b/setup.py index c8f5c992..7d24ba85 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ from codecs import open from os import path -__version__ = "6.3.7" +__version__ = "6.4.0" description = "A Python package and CLI for parsing aggregate and " \ "forensic DMARC reports"