Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bigtable-backup-tool: Improvements #895

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 71 additions & 22 deletions tools/bigtable-backup/bigtable-backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway

registry = CollectorRegistry()
bigtable_backup_job_last_run_seconds = Gauge('bigtable_backup_job_last_run_seconds', 'Last time a bigtable backup job ran at', registry=registry)
bigtable_backup_job_last_success_seconds = Gauge('bigtable_backup_job_last_success_seconds', 'Last time a bigtable backup job successfully finished', registry=registry)
bigtable_backup_job_runtime_seconds = Gauge('bigtable_backup_job_runtime_seconds', 'Runtime of last successfully finished bigtable backup job', registry=registry)
bigtable_backup_job_backups_created = Gauge('bigtable_backup_job_backups_created', 'Number of backups created during last run', registry=registry)
bigtable_backup_job_last_run_seconds = Gauge('bigtable_backup_job_last_run_seconds', 'Last time a bigtable backup job ran at.', registry=registry)
bigtable_backup_job_last_success_seconds = Gauge('bigtable_backup_job_last_success_seconds', 'Last time a bigtable backup job successfully finished.', registry=registry)
bigtable_backup_job_runtime_seconds = Gauge('bigtable_backup_job_runtime_seconds', 'Runtime of last successfully finished bigtable backup job.', registry=registry)
bigtable_backup_job_backups_created = Gauge('bigtable_backup_job_backups_created', 'Number of backups created during last run.', registry=registry)
bigtable_backup_job_last_active_table_backup_time_seconds = Gauge('bigtable_backup_job_last_active_table_backup_time_seconds', 'Last time an active table was backed up at.', registry=registry)

job_backup_active_periodic_table = "backup-active-periodic-table"
job_ensure_backups = "ensure-backups"
Expand All @@ -27,32 +28,18 @@ def backup_active_periodic_table(args):
table_id = args.bigtable_table_id_prefix + str(int(time.time() / args.periodic_table_duration))
create_backup(table_id, args)

bigtable_backup_job_last_active_table_backup_time_seconds.set_to_current_time()
push_job_finished_metric(args.prom_push_gateway_endpoint, args.namespace, job_backup_active_periodic_table, int(time.time() - start_time))


def ensure_backups(args):
push_job_started_metric(args.prom_push_gateway_endpoint, args.namespace, job_ensure_backups)
start_time = time.time()

# ensure-backups job specific metrics
bigtable_backup_job_num_tables_backed_up = Gauge('bigtable_backup_job_num_tables_backed_up', 'Number of table backups found during last run', registry=registry)
bigtable_backup_job_num_backup_ups = Gauge('bigtable_backup_job_num_backup_ups', 'Sum of number of backups per table found during last run', registry=registry)

# Read all the existing backups
popen = subprocess.Popen(['bigtable-backup', 'list-backups', '-ojson', '--backup-path', args.destination_path],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
popen.wait()

# build and push metrics related to existing backups
backups = json.loads(popen.stdout.readline())
if (args.duration == None and args.period_from == None) or (args.duration != None and args.period_from != None):
raise ValueError("Either of --duration or --periodic-table-duration must be set")

bigtable_backup_job_num_tables_backed_up.set(len(backups))
for __, timestamps in backups.items():
bigtable_backup_job_num_backup_ups.inc(len(timestamps))

push_metrics(args.prom_push_gateway_endpoint, args.namespace, job_ensure_backups)
backups = list_backups(args.destination_path)

if args.period_from == None:
period_from = datetime.utcnow() - timedelta(days=args.duration)
Expand All @@ -61,7 +48,7 @@ def ensure_backups(args):

oldest_table_number = int(args.period_from.timestamp() / args.periodic_table_duration)
newest_table_number = int(args.period_to.timestamp() / args.periodic_table_duration)
active_table_number = time.time() / args.periodic_table_duration
active_table_number = int(time.time() / args.periodic_table_duration)

print("Checking right backups exist")
while oldest_table_number <= newest_table_number:
Expand All @@ -71,6 +58,10 @@ def ensure_backups(args):
print("backup for {} not found".format(table_id))
create_backup(table_id, args)
bigtable_backup_job_backups_created.inc(1)
if table_id == active_table_number:
bigtable_backup_job_last_active_table_backup_time_seconds.set_to_current_time()

num_backups_deleted = 0

print("Checking whether all the backups are created after their period is over and deleting old unwanted backups")
for table_id, timestamps in backups.items():
Expand All @@ -86,12 +77,58 @@ def ensure_backups(args):
if table_number != active_table_number and len(timestamps) > 1:
for timestamp in timestamps[:-1]:
delete_backup(table_id, str(timestamp), args)
num_backups_deleted += 1

if args.delete_out_of_range_backups:
num_backups_deleted += delete_out_of_range_backups(oldest_table_number, newest_table_number, backups, args)

set_ensure_backups_specific_metrics(args, num_backups_deleted, active_table_number)
push_job_finished_metric(args.prom_push_gateway_endpoint, args.namespace, job_ensure_backups, int(time.time() - start_time))

def find_last_timestamp_from_table_number(table_number, periodic_secs):
return ((table_number + 1) * periodic_secs) - 1

def list_backups(backup_path):
popen = subprocess.Popen(['bigtable-backup', 'list-backups', '-ojson', '--backup-path', backup_path],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
popen.wait()

return json.loads(popen.stdout.readline())

def set_ensure_backups_specific_metrics(args, num_backups_deleted, active_table_number):
# ensure-backups job specific metrics
bigtable_backup_job_tables_backed_up = Gauge('bigtable_backup_job_tables_backed_up', 'Number of active and inactive tables backed up.', ['kind'], registry=registry)
bigtable_backup_job_backups = Gauge('bigtable_backup_job_backups', 'Number of backups for all active and inactive tables.', ['kind'], registry=registry)
bigtable_backup_job_backups_deleted = Gauge('bigtable_backup_job_backups_deleted', 'Number of backups deleted during last run.', registry=registry)
bigtable_backup_job_expected_inactive_table_backups = Gauge('bigtable_backup_job_expected_inactive_table_backups', 'Expected number of backups for inactive tables.', registry=registry)

duration = args.duration
if args.duration == None:
duration = (args.period_to - args.period_from).days

# there should be 1 backup per inactive table
periodic_table_duration_in_days = args.periodic_table_duration / 86400
bigtable_backup_job_expected_inactive_table_backups.set(int(duration/periodic_table_duration_in_days))

bigtable_backup_job_backups_deleted.set(num_backups_deleted)

backups = list_backups(args.destination_path)
inactive_table_backups_count = 0

# setting sum of number of backups per table
for table_id, timestamps in backups.items():
table_number = int(table_id.rsplit("_", 1)[-1])

label = 'active'
if active_table_number != table_number:
label = 'inactive'
inactive_table_backups_count += 1

bigtable_backup_job_backups.labels(label).inc(len(timestamps))

bigtable_backup_job_tables_backed_up.labels('inactive').set(inactive_table_backups_count)
if len(backups) != inactive_table_backups_count:
bigtable_backup_job_tables_backed_up.labels('active').set(1)

def valid_date(s):
try:
Expand Down Expand Up @@ -131,7 +168,6 @@ def create_backup(table_id, args):
else:
print("Backup created for table {}".format(table_id))


def delete_backup(table_id, timestamp, args):
popen = subprocess.Popen(['bigtable-backup', 'delete-backup', '--bigtable-table-id', table_id,
'--backup-path', args.destination_path, "--backup-timestamp", timestamp],
Expand Down Expand Up @@ -163,6 +199,17 @@ def push_metrics(endpoint, namespace, job):
except Exception as e:
print("failed to push metrics with error {}".format(e))

def delete_out_of_range_backups(oldest_table_number, newest_table_number, backups, args):
num_backups_deleted = 0
for table_id, timestamps in backups.items():
table_number = int(table_id.rsplit("_", 1)[-1])
if table_number < oldest_table_number or table_number > newest_table_number:
for timestamp in timestamps:
delete_backup(table_id, timestamp, args)
num_backups_deleted += 1

return num_backups_deleted


def main():
parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -193,6 +240,8 @@ def main():
ensure_backups_parser.add_argument('--period-from', type=valid_date, help="Backups should exist starting from the date. Must not be set with --duration")
ensure_backups_parser.add_argument('--period-to', type=valid_date,
default=datetime.utcnow().strftime("%Y-%m-%d"))
ensure_backups_parser.add_argument('--delete-out-of-range-backups', help="Delete backups which are out of range of duration for which backups are being ensured",
default=False)
ensure_backups_parser.set_defaults(func=ensure_backups)

args = parser.parse_args()
Expand Down