Skip to content

Commit

Permalink
obsync: tear out rgw
Browse files Browse the repository at this point in the history
  • Loading branch information
Yehuda Sadeh committed Nov 22, 2011
1 parent a859763 commit ebe5fc6
Showing 1 changed file with 1 addition and 261 deletions.
262 changes: 1 addition & 261 deletions src/obsync/obsync
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import hashlib
import mimetypes
import os
from StringIO import StringIO
import rados
import rgw
import re
import shutil
import string
Expand All @@ -45,13 +43,9 @@ global opts
# Translation table mapping users in the source to users in the destination.
global xuser

# Librgw instance
global lrgw
lrgw = None

###### Usage #######
USAGE = """
obsync synchronizes S3, Rados, and local objects. The source and destination
obsync synchronizes S3 and local objects. The source and destination
can both be local or both remote.
Examples:
Expand Down Expand Up @@ -84,22 +78,6 @@ defaults.
obsync (options) [source] [destination]"""

###### Constants #######
ACL_XATTR = "rados.acl"
META_XATTR_PREFIX = "rados.meta."
CONTENT_TYPE_XATTR = "rados.content_type"

RGW_META_BUCKET_NAME = ".rgw"
RGW_USERS_UID_BUCKET_NAME = ".users.uid"
RGW_META_ETAG = "user.rgw.etag"
RGW_META_PREFIX = "user.x-amz-meta-"
RGW_META_CONTENT_TYPE = "user.rgw.content_type"
RGW_META_ACL = "user.rgw.acl"

def vvprint(s):
if (opts.more_verbose):
print s

###### Exception classes #######
class ObsyncException(Exception):
def __init__(self, ty, e):
Expand Down Expand Up @@ -550,15 +528,6 @@ class Store(object):
else:
is_secure = os.environ.has_key("SRC_SECURE")
return S3Store(s3_url, create, akey, skey, is_secure)
rados_url = strip_prefix("rgw:", url)
if (rados_url):
dst_owner = None
if (is_dst):
if not os.environ.has_key("DST_OWNER"):
raise ObsyncArgumentParsingException("You must set \
DST_OWNER when uploading files to RgwStore.")
dst_owner = os.environ["DST_OWNER"]
return RgwStore(rados_url, create, akey, skey, dst_owner)
file_url = strip_prefix("file://", url)
if (file_url):
return FileStore(file_url, create)
Expand Down Expand Up @@ -865,235 +834,6 @@ class FileStore(Store):
if (opts.more_verbose):
print "FileStore: removed %s" % obj.name

###### Rgw store #######
class RgwStoreIterator(object):
"""RgwStore iterator"""
def __init__(self, it, rgw_store):
self.it = it # has type rados.ObjectIterator
self.rgw_store = rgw_store
self.prefix = self.rgw_store.key_prefix
self.prefix_len = len(self.rgw_store.key_prefix)
def __iter__(self):
return self
def next(self):
rados_obj = None
while True:
# This will raise StopIteration when there are no more objects to
# iterate on
rados_obj = self.it.next()
# do the prefixes match?
if rados_obj.key[:self.prefix_len] == self.prefix:
break
ret = self.rgw_store.obsync_obj_from_rgw(rados_obj.key)
if (ret == None):
raise ObsyncPermanentException("internal iterator error")
return ret

class RgwStore(Store):
def __init__(self, url, create, akey, skey, owner):
global lrgw
if (lrgw == None):
lrgw = rgw.Rgw()
self.owner = owner
self.user_exists_cache = {}
self.users_uid_ioctx = None
# Parse the rados url
conf_end = string.find(url, ":")
if (conf_end == -1):
raise ObsyncPermanentException("RgwStore URLs are of the form \
rgw:path/to/ceph/conf:bucket:key_prefix. Failed to find the path to the conf.")
self.conf_file_path = url[0:conf_end]
bucket_end = url.find(":", conf_end+1)
if (bucket_end == -1):
self.rgw_bucket_name = url[conf_end+1:]
self.key_prefix = ""
else:
self.rgw_bucket_name = url[conf_end+1:bucket_end]
self.key_prefix = url[bucket_end+1:]
if (self.rgw_bucket_name == ""):
raise ObsyncPermanentException("RgwStore URLs are of the form \
rgw:/path/to/ceph/conf:pool:key_prefix. Failed to find the bucket.")
if (opts.more_verbose):
print "self.conf_file_path = '" + self.conf_file_path + "', ",
print "self.rgw_bucket_name = '" + self.rgw_bucket_name + "' ",
print "self.key_prefix = '" + self.key_prefix + "'"
self.rados = rados.Rados()
self.rados.conf_read_file(self.conf_file_path)
self.rados.connect()
if self.owner != None and not self.user_exists(ACL_TYPE_CANON_USER + self.owner):
raise ObsyncPermanentException("Unknown owner! DST_OWNER=%s" % self.owner)
if (not self.rados.pool_exists(self.rgw_bucket_name)):
if (create):
self.create_rgw_bucket(self.rgw_bucket_name)
else:
raise ObsyncPermanentException("NonexistentStore")
elif self.owner == None:
# Figure out what owner we should use when creating objects.
# We use the owner of the destination bucket
ioctx = self.rados.open_ioctx(RGW_META_BUCKET_NAME)
try:
bin_ = ioctx.get_xattr(self.rgw_bucket_name, RGW_META_ACL)
xml = lrgw.acl_bin2xml(bin_)
acl = AclPolicy.from_xml(xml)
self.owner = acl.owner_id
if (opts.more_verbose):
print "using owner \"%s\"" % self.owner
finally:
ioctx.close()
self.ioctx = self.rados.open_ioctx(self.rgw_bucket_name)
Store.__init__(self, "rgw:" + url)
def create_rgw_bucket(self, rgw_bucket_name):
global lrgw
""" Create an rgw bucket named 'rgw_bucket_name' """
if (self.owner == None):
raise ObsyncArgumentParsingException("Can't create a bucket \
without knowing who should own it. Please set DST_OWNER")
self.rados.create_pool(self.rgw_bucket_name)
ioctx = None
try:
ioctx = self.rados.open_ioctx(RGW_META_BUCKET_NAME)
ioctx.write(rgw_bucket_name, "", 0)
print "ioctx.set_xattr(rgw_bucket_name=" + rgw_bucket_name + ", " + \
"user.rgw.acl=" + self.owner + ")"
new_bucket_acl = "\
<AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\"> \
<Owner><ID>%s</ID></Owner><AccessControlList>\
<Grant><Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" \
xsi:type=\"CanonicalUser\"><ID>%s</ID> \
<DisplayName>display-name</DisplayName></Grantee> \
<Permission>FULL_CONTROL</Permission></Grant>\
</AccessControlList></AccessControlPolicy>" % (self.owner, self.owner)
new_bucket_acl_bin = lrgw.acl_xml2bin(new_bucket_acl)
ioctx.set_xattr(rgw_bucket_name, "user.rgw.acl", new_bucket_acl_bin)
finally:
if (ioctx):
ioctx.close()
def obsync_obj_from_rgw(self, obj_name):
"""Create an obsync object from a Rados object"""
try:
size, tm = self.ioctx.stat(obj_name)
except rados.ObjectNotFound:
return None
md5 = None
meta = {}
for k,v in self.ioctx.get_xattrs(obj_name):
if k == RGW_META_ETAG:
md5 = v
elif k == RGW_META_CONTENT_TYPE:
meta[CONTENT_TYPE_XATTR] = v
elif k[:len(RGW_META_PREFIX)] == RGW_META_PREFIX:
meta["rados.meta." + k[len(RGW_META_PREFIX):]] = v
elif opts.more_verbose:
print "ignoring unknown xattr " + k
if (md5 == None):
raise ObsyncPermanentException("error on object %s: expected to find " + \
"extended attribute %s" % (obj_name, RGW_META_ETAG))
if (opts.more_verbose):
print "meta = " + str(meta)
return Object(obj_name, md5, size, meta)
def __str__(self):
return "rgw:" + self.conf_file_path + ":" + self.rgw_bucket_name
def get_acl(self, obj):
global lrgw
bin_ = None
try:
bin_ = self.ioctx.get_xattr(obj.name, RGW_META_ACL)
except rados.NoData:
return LocalAcl.get_empty(obj.name)
xml = lrgw.acl_bin2xml(bin_)
return LocalAcl.from_xml(obj.name, xml)
def make_local_copy(self, obj):
temp_file = None
temp_file_f = None
try:
# read the object from rgw in chunks
temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False)
temp_file_f = open(temp_file.name, 'w')
off = 0
while True:
buf = self.ioctx.read(obj.name, offset = off, length = 8192)
if (len(buf) == 0):
break
temp_file_f.write(buf)
if (len(buf) < 8192):
break
off += 8192
temp_file_f.close()
except Exception, e:
if (temp_file_f):
temp_file_f.close()
if (temp_file):
os.unlink(temp_file.name)
raise ObsyncTemporaryException(e)
return LocalCopy(obj.name, temp_file.name, True)
def all_objects(self):
it = self.ioctx.list_objects()
return RgwStoreIterator(it, self)
def locate_object(self, obj):
return self.obsync_obj_from_rgw(obj.name)
def user_exists(self, user):
if (self.user_exists_cache.has_key(user)):
return self.user_exists_cache[user]
if user[:len(ACL_TYPE_CANON_USER)] == ACL_TYPE_CANON_USER:
if (self.users_uid_ioctx == None):
# will be closed in __del__
self.users_uid_ioctx = self.rados.open_ioctx(RGW_USERS_UID_BUCKET_NAME)
try:
self.users_uid_ioctx.stat(user[len(ACL_TYPE_CANON_USER):])
except rados.ObjectNotFound:
return False
self.user_exists_cache[user] = True
return True
elif user[:len(ACL_TYPE_EMAIL_USER)] == ACL_TYPE_EMAIL_USER:
raise ObsyncPermanentException("rgw target can't handle email users yet.")
elif user[:len(ACL_TYPE_GROUP)] == ACL_TYPE_GROUP:
raise ObsyncPermanentException("rgw target can't handle groups yet.")
else:
raise ObsyncPermanentException("can't understand user name %s" % user)
def upload(self, local_copy, src_acl, obj):
global lrgw
if (opts.more_verbose):
print "RgwStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
"obj='" + obj.name + "'"
if (opts.dry_run):
return
local_copy_f = open(local_copy.path, 'r')
off = 0
while True:
buf = local_copy_f.read(8192)
if ((len(buf) == 0) and (off != 0)):
break
self.ioctx.write(obj.name, buf, off)
if (len(buf) < 8192):
break
off += 8192
self.ioctx.set_xattr(obj.name, "user.rgw.etag", obj.md5)
if (src_acl.acl_policy == None):
ap = AclPolicy.create_default(self.owner)
else:
ap = src_acl.acl_policy
for user in ap.get_all_users():
if not self.user_exists(user):
raise ObsyncPermanentException("You must provide an --xuser entry to translate \
user %s into something valid for the rgw destination.")
xml = ap.to_xml()
bin_ = lrgw.acl_xml2bin(xml)
self.ioctx.set_xattr(obj.name, "user.rgw.acl", bin_)
content_type = "application/octet-stream"
for k,v in obj.meta.items():
if k == CONTENT_TYPE_XATTR:
content_type = v
elif k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX:
self.ioctx.set_xattr(obj.name,
RGW_META_PREFIX + k[len(META_XATTR_PREFIX):], v)
self.ioctx.set_xattr(obj.name, "user.rgw.content_type", content_type)
def remove(self, obj):
if (opts.dry_run):
return
self.ioctx.remove_object(obj.name)
if (opts.more_verbose):
print "RgwStore: removed %s" % obj.name

###### Functions #######
def delete_unreferenced(src, dst):
""" delete everything from dst that is not referenced in src """
Expand Down

0 comments on commit ebe5fc6

Please sign in to comment.