The following is an object-oriented implementation that allows storage and retrieval of objects in S3 buckets. It can handle compression and decompression as well as CRUD operations on objects stored in AWS S3.
import datetime as dt
import functools
import gzip
import io
import json
import logging
import os
import threading
import boto3
import conf
import pandas as pd
from cached_property import cached_property
from project.utils.functional import ItemArray
class S3Object(object):
store = None
def __init__(self, key_id=None, body=None):
assert S3Object.store is not None
assert key_id is not None
self._key_id = key_id
self._body = body or self.get()
self.data = None
@classmethod
def from_df(cls, df, key_id, **kwargs):
header = kwargs.pop('header', False)
index = kwargs.pop('index', False)
buffer = io.StringIO()
df.to_csv(buffer, header=header, index=index, **kwargs)
obj = cls(body=buffer.getvalue(), key_id=key_id)
obj.compress()
return obj
def to_df(self, **kwargs):
compression = kwargs.pop('compression', True)
header = kwargs.pop('header', 'infer')
names = kwargs.pop('names', None)
dtype = kwargs.pop('dtype', str)
if compression:
self.decompress()
else:
self.read()
body = self._body
body = '\n'.join([l.replace('?', ' ') for l in body.split('\n')])
self._body = body
csv_buffer = io.StringIO(body)
return pd.read_csv(
csv_buffer,
header=header,
names=names,
dtype=dtype,
**kwargs
)
def read(self, encoding='utf-8'):
if self.data:
return self.data
self._read(encoding)
return self._body
def _read(self, encoding):
try:
body = self._body.read().decode(encoding)
self._body = body
self.data = self._body
except Exception as e:
return
def _compress(self, encoding):
body = self._body
gz_buffer = io.BytesIO()
with gzip.GzipFile(mode='wb', fileobj=gz_buffer) as gz_file:
gz_file.write(body.encode(encoding))
self._body = gz_buffer.getvalue()
return self
def _decompress(self, encoding):
try:
body = self._body
with gzip.GzipFile(mode='rb', fileobj=body) as gz_file:
new_body = gz_file.read().decode(encoding)
except Exception as e:
logging.error(
str(e) +
f": Object '{self._key_id}' does not have gzip compression applied. Retrying..."
)
try:
new_body = self.get().read().decode(encoding)
except Exception as e:
raise
self._body = new_body
return self
def decompress(self, encoding='utf-8'):
return self._decompress(encoding)
def compress(self, encoding='utf-8'):
return self._compress(encoding)
def put(self):
S3Object.store._put(key_id=self._key_id, obj=self._body)
def get(self):
return S3Object.store._get(key_id=self._key_id)['Body']
def __repr__(self):
return f'Document {self._key_id} in bucket: {self.store._bucket}'
class CloudObjectStorage:
_name = "CloudObjectStorage"
_api_client = False
_dir = set()
_last_update = dt.datetime.now() - dt.timedelta(days=1)
def __init__(self, bucket=conf.S3_BUCKET, api_client=None):
self.lock = threading.Lock()
self._bucket = bucket
self._api_client = (
CloudObjectStorage._api_client
or (api_client or self._create_client())
)
def _create_client(self):
with self.lock:
client = boto3.client(
's3',
aws_access_key_id=conf.AWS_KEY,
aws_secret_access_key=conf.AWS_SECRET
)
CloudObjectStorage._api_client = client
return client
@property
def S3Object(self):
S3Object.store = self
return S3Object
def list_all_objects(self, prefix=None, suffix=None, **kwargs):
cache_exists = '.s3.cache.txt' in os.listdir(conf.DATA_DIR)
refresh = kwargs.get('refresh', True) and cache_exists
if self._dir_is_stale() and refresh:
self._update_dir()
directory = ItemArray(self._dir)
return ((prefix or suffix) and directory.filter(
lambda o: o.startswith(prefix or "") and o.endswith(suffix or "")
)) or directory
def _load_dir(self):
filepath = os.path.join(conf.DATA_DIR, '.s3.cache.txt')
striptime = lambda s: dt.datetime.strptime(s, '%b %d %Y %I:%M%p')
try:
if os.path.isfile(filepath):
with open(filepath, mode='r') as f:
s3_dir = json.load(f)
_last_update_str = list(s3_dir.keys())[0]
CloudObjectStorage._last_update = striptime(_last_update_str)
self._dir.update(s3_dir[_last_update_str])
except Exception as e:
print(e)
return
def _save_dir(self):
filepath = os.path.join(conf.DATA_DIR, '.s3.cache.txt')
timestring = lambda d: dt.datetime.strftime(d, '%b %d %Y %I:%M%p')
try:
s3_dir = {
timestring(CloudObjectStorage._last_update):
list(CloudObjectStorage._dir)
}
with open(filepath, mode='w') as f:
json.dump(s3_dir, f)
except Exception:
print(Exception)
return
def _dir_is_stale(self):
requires_init = len(CloudObjectStorage._dir) == 0
is_stale = CloudObjectStorage._last_update < dt.datetime.now(
) - dt.timedelta(minutes=90)
if requires_init or is_stale:
return True
return False
def _update_dir(self):
self._load_dir()
if self._dir_is_stale():
object_list = self._api_client.list_objects_v2(Bucket=self._bucket)
if "IsTruncated" in object_list:
is_truncated = object_list["IsTruncated"]
else:
is_truncated = False
if (
object_list and "Contents" in object_list
and len(object_list["Contents"]) > 0
):
self._dir.update([o["Key"] for o in object_list["Contents"]])
while is_truncated:
contination_token = object_list["NextContinuationToken"]
object_list = self._api_client.list_objects_v2(
Bucket=self._bucket,
ContinuationToken=contination_token
)
CloudObjectStorage._dir.update([
o["Key"] for o in object_list["Contents"]
])
is_truncated = object_list["IsTruncated"]
CloudObjectStorage._last_update = dt.datetime.now()
self._save_dir()
def get(self, key_id, compression=True):
try:
response = self._get(key_id=key_id)
obj = self.S3Object(body=response['Body'], key_id=key_id)
return (compression and obj.decompress()) or obj
except Exception as e:
raise Exception(e)
def put(self, obj, key_id, compression=True):
try:
obj = self.S3Object(body=obj, key_id=key_id)
return (compression and obj.compress().put()) or obj.put()
except Exception as e:
raise Exception(e)
def _get(self, key_id):
return self._api_client.get_object(Bucket=self._bucket, Key=key_id)
def _put(self, obj, key_id):
return self._api_client.put_object(
Body=obj,
Bucket=self._bucket,
Key=key_id
)