Skip to content

Commit

Permalink
Merge pull request #171 from PanDAWMS/ATLASPANDA-803
Browse files Browse the repository at this point in the history
Atlaspanda 803
  • Loading branch information
Foorth authored May 22, 2023
2 parents e7f5251 + 7323fa6 commit 618fdc3
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 41 deletions.
82 changes: 52 additions & 30 deletions core/libs/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@

_logger = logging.getLogger('bigpandamon')


def get_es_credentials(instance='es-atlas'):
def get_es_credentials(instance):
"""
Getting credentials from settings
:param instance: str, es-atlas or es-monit
Expand All @@ -23,47 +22,69 @@ def get_es_credentials(instance='es-atlas'):
es_host = None
es_user = None
es_password = None
if instance == 'es-atlas' and hasattr(settings, 'ES'):
es_host = settings.ES.get('esHost', None)
es_port = settings.ES.get('esPort', None)
es_host = es_host + ':' + es_port + '/es' if es_host else None
es_user = settings.ES.get('esUser', None)
es_password = settings.ES.get('esPassword', None)
elif instance == 'es-monit' and hasattr(settings, 'ES_MONIT'):
es_host = settings.ES_MONIT.get('esHost', None)
es_port = settings.ES_MONIT.get('esPort', None)
es_host = es_host + ':' + es_port + '/es' if es_host else None
es_user = settings.ES_MONIT.get('esUser', None)
es_password = settings.ES_MONIT.get('esPassword', None)
if settings.DEPLOYMENT == 'ORACLE_ATLAS':
if instance == 'es-atlas' and hasattr(settings, 'ES'):
es_host = settings.ES.get('esHost', None)
es_port = settings.ES.get('esPort', None)
es_host = es_host + ':' + es_port + '/es' if es_host else None
es_user = settings.ES.get('esUser', None)
es_password = settings.ES.get('esPassword', None)
elif instance == 'es-monit' and hasattr(settings, 'ES_MONIT'):
es_host = settings.ES_MONIT.get('esHost', None)
es_port = settings.ES_MONIT.get('esPort', None)
es_host = es_host + ':' + es_port + '/es' if es_host else None
es_user = settings.ES_MONIT.get('esUser', None)
es_password = settings.ES_MONIT.get('esPassword', None)
else:
if hasattr(settings, 'ES_CLUSTER'):
es_host = settings.ELASTIC.get('esHost', None)
es_port = settings.ELASTIC.get('esPort', None)
es_protocol = settings.ELASTIC.get('esProtocol', None)
es_path = settings.ELASTIC.get('esPath', None)
es_host = es_protocol + '://' + es_host + ':' + es_port + '/' + es_path if es_host else None
es_user = settings.ELASTIC.get('esUser', None)
es_password = settings.ELASTIC.get('esPassword', None)

if any(i is None for i in (es_host, es_user, es_password)):
raise Exception('ES cluster credentials was not found in settings')
else:
return es_host, es_user, es_password


def create_es_connection(verify_certs=True, timeout=2000, max_retries=10, retry_on_timeout=True, instance='es-atlas'):
def create_es_connection(instance='es-atlas', protocol='https', timeout=2000, max_retries=10,
retry_on_timeout=True):
"""
Create a connection to ElasticSearch cluster
"""
es_host, es_user, es_password = get_es_credentials(instance)

try:
connection = Elasticsearch(
['https://{0}'.format(es_host)],
http_auth=(es_user, es_password),
verify_certs=verify_certs,
timeout=timeout,
max_retries=max_retries,
retry_on_timeout=retry_on_timeout,
ca_certs='/etc/pki/tls/certs/ca-bundle.trust.crt'
)
if protocol == 'https':
ca_certs = settings.ES_CA_CERT

connection = Elasticsearch(
['{0}://{1}'.format(protocol, es_host)],
http_auth=(es_user, es_password),
verify_certs=True,
timeout=timeout,
max_retries=max_retries,
retry_on_timeout=retry_on_timeout,
ca_certs = ca_certs
)
else:
connection = Elasticsearch(
['{0}://{1}'.format(protocol, es_host)],
http_auth=(es_user, es_password),
timeout=timeout,
max_retries=max_retries,
retry_on_timeout=retry_on_timeout)
return connection

except Exception as ex:
_logger.error(ex)
return None


def get_payloadlog(id, es_conn, start=0, length=50, mode='pandaid', sort='asc', search_string=''):
def get_payloadlog(id, es_conn, index, start=0, length=50, mode='pandaid', sort='asc', search_string=''):
"""
Get pilot logs from ATLAS ElasticSearch storage
"""
Expand All @@ -73,7 +94,8 @@ def get_payloadlog(id, es_conn, start=0, length=50, mode='pandaid', sort='asc',
total = 0
flag_running_job = True
end = start + length
s = Search(using=es_conn, index='atlas_pilotlogs*')

s = Search(using=es_conn, index=index)

s = s.source(["@timestamp", "@timestamp_nanoseconds", "level", "message", "PandaJobID", "TaskID",
"Harvester_WorkerID", "Harvester_ID"])
Expand Down Expand Up @@ -104,7 +126,6 @@ def get_payloadlog(id, es_conn, start=0, length=50, mode='pandaid', sort='asc',

return logs_list, flag_running_job, total


def upload_data(es_conn, index_name_base, data, timestamp_param='creationdate', id_param='jeditaskid'):
"""
Push data to ElasticSearch cluster
Expand Down Expand Up @@ -186,7 +207,6 @@ def upload_data(es_conn, index_name_base, data, timestamp_param='creationdate',

return result


def get_split_rule_info(es_conn, jeditaskid):
"""
Get split rule entries from ATLAS Elastic
Expand All @@ -195,7 +215,9 @@ def get_split_rule_info(es_conn, jeditaskid):
:return: split rule messagees
"""
split_rules = []
s = Search(using=es_conn, index='atlas_jedilogs*')
jedi_logs_index = settings.ES_INDEX_JEDI_LOGS

s = Search(using=es_conn, index=jedi_logs_index)
s = s.source(['@timestamp', 'message'])
s = s.filter('term', jediTaskID='{0}'.format(jeditaskid))
q = Q("match", message='change_split_rule')
Expand Down
10 changes: 7 additions & 3 deletions core/libs/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,9 +880,11 @@ def get_logs_by_taskid(jeditaskid):

tasks_logs = []

connection = create_es_connection()
es_conn = create_es_connection()

s = Search(using=connection, index='atlas_jedilogs-*')
jedi_logs_index = settings.ES_INDEX_JEDI_LOGS

s = Search(using=es_conn, index=jedi_logs_index)

s = s.filter('term', **{'jediTaskID': jeditaskid})

Expand All @@ -900,7 +902,9 @@ def get_logs_by_taskid(jeditaskid):
tasks_logs.append({'jediTaskID': jeditaskid, 'logname': type, 'loglevel': levelname,
'lcount': str(levelnames['doc_count'])})

s = Search(using=connection, index='atlas_pandalogs-*')
panda_logs_index = settings.ES_INDEX_PANDA_LOGS

s = Search(using=connection, index=panda_logs_index)

s = s.filter('term', **{'jediTaskID': jeditaskid})

Expand Down
6 changes: 6 additions & 0 deletions core/settings/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
# PanDA server URL
PANDA_SERVER_URL = os.environ.get('PANDA_SERVER_URL', 'https://pandaserver.cern.ch/server/panda')

# ElasticSearch
ES_INDEX_PANDA_LOGS = os.environ.get('ES_INDEX_PANDA_LOGS', 'atlas_pandalogs*')
ES_INDEX_JEDI_LOGS = os.environ.get('ES_INDEX_JEDI_LOGS', 'atlas_jedilogs*')
ES_INDEX_PILOT_LOGS = os.environ.get('ES_INDEX_PILOT_LOGS', 'atlas_pilotlogs*')
ES_CA_CERT = os.environ.get('ES_CA_CERT', '/etc/pki/tls/certs/ca-bundle.trust.crt')

# DB_ROUTERS for atlas's prodtask
DATABASE_ROUTERS = [
'core.dbrouter.ProdMonDBRouter',
Expand Down
27 changes: 19 additions & 8 deletions core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -6766,11 +6766,13 @@ def esatlasPandaLoggerJson(request):
return response

if settings.DEPLOYMENT != 'ORACLE_ATLAS':
return HttpResponse('It does not exist for non ATLAS BipPanDA monintoring system', content_type='text/html')
return HttpResponse('It does not exist for non ATLAS BipPanDA monitoring system', content_type='text/html')

connection = create_es_connection()
es_conn = create_es_connection()

s = Search(using=connection, index='atlas_jedilogs-*')
jedi_logs_index = settings.ES_INDEX_JEDI_LOGS

s = Search(using=es_conn, index=jedi_logs_index)

s.aggs.bucket('jediTaskID', 'terms', field='jediTaskID', size=100) \
.bucket('type', 'terms', field='fields.type.keyword') \
Expand Down Expand Up @@ -6881,7 +6883,10 @@ def esatlasPandaLogger(request):
}
jediCat = ['cat1', 'cat2', 'cat3', 'cat4', 'cat5', 'cat6', 'cat7']

indices = ['atlas_pandalogs-', 'atlas_jedilogs-']
panda_index = settings.ES_INDEX_PANDA_LOGS[:-1]+'-'
jedi_index = settings.ES_INDEX_JEDI_LOGS[:-1]+'-'

indices = [panda_index, jedi_index]

panda = {}
jedi = {}
Expand All @@ -6895,7 +6900,7 @@ def esatlasPandaLogger(request):

res = s.execute()

if index == "atlas_pandalogs-":
if index == panda_index:
for cat in pandaCat:
panda[cat] = {}
for agg in res['aggregations']['logName']['buckets']:
Expand All @@ -6912,7 +6917,7 @@ def esatlasPandaLogger(request):
panda[cat][name][type][levelname] = {}
panda[cat][name][type][levelname]['logLevel'] = levelname
panda[cat][name][type][levelname]['lcount'] = str(levelnames['doc_count'])
elif index == "atlas_jedilogs-":
elif index == jedi_index:
for cat in jediCat:
jedi[cat] = {}
for agg in res['aggregations']['logName']['buckets']:
Expand Down Expand Up @@ -8251,7 +8256,11 @@ def initSelfMonitor(request):
else:
remote = request.META['REMOTE_ADDR']

urlProto = request.META['wsgi.url_scheme']
if 'wsgi.url_scheme' in request.META:
urlProto = request.META['wsgi.url_scheme']
else:
urlProto = 'http'

if 'HTTP_X_FORWARDED_PROTO' in request.META:
urlProto = request.META['HTTP_X_FORWARDED_PROTO']
urlProto = str(urlProto) + "://"
Expand Down Expand Up @@ -8762,7 +8771,9 @@ def getPayloadLog(request):
else:
search_string = request.POST['search']

payloadlog, job_running_flag, total = get_payloadlog(id, connection, start=start_var, length=length_var, mode=mode,
pilot_logs_index = settings.ES_INDEX_PILOT_LOGS

payloadlog, job_running_flag, total = get_payloadlog(id, connection, pilot_logs_index, start=start_var, length=length_var, mode=mode,
sort=sort, search_string=search_string)

log_content['payloadlog'] = payloadlog
Expand Down

0 comments on commit 618fdc3

Please sign in to comment.