Skip to content

Commit

Permalink
Merge pull request rockstor#1002 from schakrava/886_part2
Browse files Browse the repository at this point in the history
886 part2
  • Loading branch information
schakrava committed Nov 7, 2015
2 parents 40754ac + 0ad8970 commit 0cb751d
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 171 deletions.
55 changes: 32 additions & 23 deletions src/rockstor/fs/btrfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,21 @@ def shares_info(mnt_pt):
share_ids.append(vol_id)
return shares_d

def parse_snap_details(mnt_pt, fields):
writable = True
snap_name = None
o1, e1, rc1 = run_command([BTRFS, 'property', 'get',
'%s/%s' % (mnt_pt, fields[-1])])
for l1 in o1:
if (re.match('ro=', l1) is not None):
if (l1.split('=')[1] == 'true'):
writable = False
if (writable is True):
if (len(fields[-1].split('/')) == 1):
#writable snapshot + direct child of pool. so we'll treat it as a share.
continue
snap_name = fields[-1].split('/')[-1]
return snap_name, writable

def snaps_info(mnt_pt, share_name):
o, e, rc = run_command([BTRFS, 'subvolume', 'list', '-u', '-p', '-q', mnt_pt])
Expand All @@ -307,32 +322,31 @@ def snaps_info(mnt_pt, share_name):
if (fields[-1] == share_name):
share_id = fields[1]
share_uuid = fields[12]

if (share_id is None):
raise Exception('Failed to get uuid of the share(%s) under mount(%s)'
% (share_name, mnt_pt))

o, e, rc = run_command([BTRFS, 'subvolume', 'list', '-s', '-p', '-q',
mnt_pt])
'-u', mnt_pt])

snaps_d = {}
snap_uuids = []
for l in o:
if (re.match('ID ', l) is not None):
fields = l.split()
#parent uuid must be share_uuid
if (fields[7] != share_id and fields[15] != share_uuid):
#parent uuid must be share_uuid or another snapshot's uuid
if (fields[7] != share_id and fields[15] != share_uuid and
fields[15] not in snap_uuids):
continue
writable = True
o1, e1, rc1 = run_command([BTRFS, 'property', 'get',
'%s/%s' % (mnt_pt, fields[-1])])
for l1 in o1:
if (re.match('ro=', l1) is not None):
if (l1.split('=')[1] == 'true'):
writable = False
if (writable is True):
if (len(fields[-1].split('/')) == 1):
#writable snapshot + direct child of pool. so we'll treat it as a share.
continue
snap_name = fields[-1].split('/')[-1]
snaps_d[snap_name] = ('0/%s' % fields[1], writable)
snap_name, writable = parse_snap_details(mnt_pt, fields)
snaps_d[snap_name] = ('0/%s' % fields[1], writable, )
#we rely on the observation that child snaps are listed after their
#parents, so no need to iterate through results separately. Instead,
#we add the uuid of a snap to the list and look up if it's a parent
#of subsequent entries.
snap_uuids.append(fields[17])

return snaps_d


Expand Down Expand Up @@ -386,16 +400,11 @@ def remove_snap(pool, share_name, snap_name):
return qgroup_destroy(qgroup, root_mnt)
else:
o, e, rc = run_command([BTRFS, 'subvolume', 'list', '-s', root_mnt])
snap = None
for l in o:
#just give the first match.
if (re.match('ID.*%s$' % snap_name, l) is not None):
snap = '%s/%s' % (root_mnt, l.split()[-1])
break
e_msg = ('This snapshot(%s) was created outside of Rockstor. If you '
'really want to delete it, you can do so manually with this '
'command: btrfs subvol delete %s' % (snap_name, snap))
raise Exception(e_msg)
return run_command([BTRFS, 'subvolume', 'delete', snap], log=True)


def add_snap_helper(orig, snap, readonly=False):
Expand Down Expand Up @@ -893,6 +902,6 @@ def get_oldest_snap(subvol_path, num_retain):
continue
snaps[int(fields[1])] = snap_fields[-1]
snap_ids = sorted(snaps.keys())
if (len(snap_ids) >= num_retain):
if (len(snap_ids) > num_retain):
return snaps[snap_ids[0]]
return None
53 changes: 31 additions & 22 deletions src/rockstor/smart_manager/replication/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
from django.conf import settings
from contextlib import contextmanager
from util import (create_share, create_receive_trail, update_receive_trail,
create_snapshot, create_rshare, rshare_id, get_sender_ip,
delete_snapshot)
create_snapshot, create_rshare, rshare_id, delete_snapshot,
validate_src_share)
from fs.btrfs import (get_oldest_snap, remove_share, set_property, is_subvol)
from system.osi import run_command
from storageadmin.models import (Disk, Pool)
from storageadmin.models import (Disk, Pool, Appliance)
import shutil


Expand All @@ -56,6 +56,7 @@ def __init__(self, meta):
self.rid = None
self.rtid = None
self.meta_push = None
self.num_retain_snaps = 5
self.ctx = zmq.Context()
super(Receiver, self).__init__()

Expand Down Expand Up @@ -86,12 +87,21 @@ def run(self):
msg = ('Failed to get the sender ip from the uuid(%s) for meta: %s' %
(self.meta['uuid'], self.meta))
with self._clean_exit_handler(msg):
self.sender_ip = get_sender_ip(self.meta['uuid'], logger)
self.sender_ip = Appliance.objects.get(uuid=self.meta['uuid']).ip

msg = ('Failed to validate the source share(%s) on sender(uuid: %s '
'ip: %s) for meta: %s. Did the ip of the sender change?' %
(self.src_share, self.sender_id, self.sender_ip, self.meta))
with self._clean_exit_handler(msg):
validate_src_share(self.sender_id, self.src_share)

msg = ('Failed to connect to the sender(%s) on data_port(%s). meta: '
'%s. Aborting.' % (self.sender_ip, self.data_port, self.meta))
with self._clean_exit_handler(msg):
#@todo: add validation
#connection does not mean we are instantly connected.
#so no validation of that kind possible with pub sub
#for example, if the sender ip is different from what we have
#in the db, no error is raised here.
recv_sub = self.ctx.socket(zmq.SUB)
recv_sub.connect('tcp://%s:%d' % (self.sender_ip, self.data_port))
recv_sub.RCVTIMEO = 100
Expand All @@ -101,6 +111,7 @@ def run(self):
'meta_port(%d). meta: %s. Aborting.' %
(self.sender_ip, self.meta_port, self.meta))
with self._clean_exit_handler(msg):
#same comment from above applies here for push connection also.
self.meta_push = self.ctx.socket(zmq.PUSH)
self.meta_push.connect('tcp://%s:%d' % (self.sender_ip,
self.meta_port))
Expand Down Expand Up @@ -130,20 +141,23 @@ def run(self):
self.rid = rshare_id(sname, logger)

sub_vol = ('%s%s/%s' % (settings.MNT_PT, self.meta['pool'],
sname))
sname))
if (not is_subvol(sub_vol)):
msg = ('Failed to create parent subvolume %s' % sub_vol)
with self._clean_exit_handler(msg, ack=True):
run_command([BTRFS, 'subvolume', 'create', sub_vol])

snap_fp = ('%s/%s' % (sub_vol, self.snap_name))
snap_dir = ('%s%s/.snapshots/%s' % (settings.MNT_PT, self.meta['pool'],
sname))
run_command(['mkdir', '-p', snap_dir])
snap_fp = ('%s/%s' % (snap_dir, self.snap_name))
with self._clean_exit_handler(msg):
if (is_subvol(snap_fp)):
ack = {'msg': 'snap_exists',
'id': self.meta['id'], }
self.meta_push.send_json(ack)

cmd = [BTRFS, 'receive', sub_vol]
cmd = [BTRFS, 'receive', snap_dir]
msg = ('Failed to start the low level btrfs receive command(%s)'
'. Aborting.' % (cmd))
with self._clean_exit_handler(msg, ack=True):
Expand Down Expand Up @@ -198,28 +212,25 @@ def run(self):
if (recv_data == 'END_SUCCESS'):
data['receive_succeeded'] = ts.strftime(settings.SNAP_TS_FORMAT)
#delete the share, move the oldest snap to share
oldest_snap = get_oldest_snap(sub_vol, 3)
oldest_snap = get_oldest_snap(snap_dir, self.num_retain_snaps)
if (oldest_snap is not None):
snap_path = ('%s/%s' % (sub_vol, oldest_snap))
snap_path = ('%s/%s' % (snap_dir, oldest_snap))
share_path = ('%s%s/%s' %
(settings.MNT_PT, self.dest_pool,
sname))
msg = ('Failed to promote the oldest Snapshot(%s) '
'to Share(%s)' % (snap_path, share_path))
try:
pool = Pool.objects.get(name=self.dest_pool)
remove_share(pool, sname)
remove_share(pool, sname, '-1/-1')
set_property(snap_path, 'ro', 'false',
mount=False)
run_command(['/usr/bin/rm', '-rf', share_path],
throw=False)
shutil.move(snap_path, share_path)
set_property(share_path, 'ro', 'true',
mount=False)
delete_snapshot(sname, oldest_snap, logger)
except Exception, e:
logger.error(msg)
logger.exception(msg)
logger.error('%s. Exception: %s' % (msg, e.__str__()))
else:
logger.error('END_FAIL received for meta: %s. '
'Terminating.' % self.meta)
Expand Down Expand Up @@ -250,15 +261,14 @@ def run(self):
update_receive_trail(self.rtid, data, logger)
except zmq.error.Again:
recv_timeout_counter = recv_timeout_counter + 1
if (recv_timeout_counter > 600):
if (recv_timeout_counter > 600): #60 seconds
logger.error('Nothing received in the last 60 seconds '
'from the sender for meta: %s. Aborting.'
% self.meta)
'from the sender(%s) for meta: %s. Aborting.'
% (self.sender_ip, self.meta))
self._sys_exit(3)
except Exception, e:
msg = ('Exception occured while receiving fsdata')
msg = ('Exception occured while receiving fsdata: %s' % e.__str__())
logger.error(msg)
logger.exception(e)
rp.terminate()
out, err = rp.communicate()
data['receive_failed'] = datetime.utcnow().replace(tzinfo=utc).strftime(settings.SNAP_TS_FORMAT)
Expand All @@ -281,8 +291,7 @@ def run(self):
out, err = rp.communicate()
except Exception, e:
logger.debug('Exception while terminating receive. Probably '
'already terminated.')
logger.exception(e)
'already terminated: %s' % e.__str__())

ack = {'msg': 'receive_ok',
'id': self.meta['id'], }
Expand Down
Loading

0 comments on commit 0cb751d

Please sign in to comment.