The following is an object-oriented implementation that allows storage and retrieval of objects in S3 buckets. It can handle compression and decompression as well as CRUD operations on objects stored in AWS S3.
🔵
import datetime as dt
import functools
import gzip
import io
import json
import logging
import os
import threading
import boto3
import conf
import pandas as pd
from cached_property import cached_property
from project.utils.functional import ItemArray
class S3Object(object):
store = None
def __init__(self, key_id=None, body=None):
assert S3Object.store is not None
assert key_id is not None
self._key_id = key_id
self._body = body or self.get()
self.data = None
@classmethod
def from_df(cls, df, key_id, **kwargs):
header = kwargs.pop('header', False)
index = kwargs.pop('index', False)
buffer = io.StringIO()
df.to_csv(buffer, header=header, index=index, **kwargs)
obj = cls(body=buffer.getvalue(), key_id=key_id)
obj.compress()
return obj
def to_df(self, **kwargs):
compression = kwargs.pop('compression', True)
header = kwargs.pop('header', 'infer')
names = kwargs.pop('names', None)
dtype = kwargs.pop('dtype', str)
if compression:
self.decompress()
else:
self.read()
body = self._body
body = '\n'.join([l.replace('?', ' ') for l in body.split('\n')])
self._body = body
csv_buffer = io.StringIO(body)
return pd.read_csv(
csv_buffer,
header=header,
names=names,
dtype=dtype,
**kwargs
)
def read(self, encoding='utf-8'):
if self.data:
return self.data
self._read(encoding)
return self._body
def _read(self, encoding):
try:
body = self._body.read().decode(encoding)
self._body = body
self.data = self._body
except Exception as e:
return
def _compress(self, encoding):
body = self._body
gz_buffer = io.BytesIO()
with gzip.GzipFile(mode='wb', fileobj=gz_buffer) as gz_file:
gz_file.write(body.encode(encoding))
self._body = gz_buffer.getvalue()
return self
def _decompress(self, encoding):
try:
body = self._body
with gzip.GzipFile(mode='rb', fileobj=body) as gz_file:
new_body = gz_file.read().decode(encoding)
except Exception as e:
logging.error(
str(e) +
f": Object '{self._key_id}' does not have gzip compression applied. Retrying..."
)
try:
new_body = self.get().read().decode(encoding)
except Exception as e:
raise
self._body = new_body
return self
def decompress(self, encoding='utf-8'):
return self._decompress(encoding)
def compress(self, encoding='utf-8'):
return self._compress(encoding)
def put(self):
S3Object.store._put(key_id=self._key_id, obj=self._body)
def get(self):
return S3Object.store._get(key_id=self._key_id)['Body']
def __repr__(self):
return f'Document {self._key_id} in bucket: {self.store._bucket}'
class CloudObjectStorage:
_name = "CloudObjectStorage"
_api_client = False
_dir = set()
_last_update = dt.datetime.now() - dt.timedelta(days=1)
def __init__(self, bucket=conf.S3_BUCKET, api_client=None):
self.lock = threading.Lock()
self._bucket = bucket
self._api_client = (
CloudObjectStorage._api_client
or (api_client or self._create_client())
)
def _create_client(self):
with self.lock:
client = boto3.client(
's3',
aws_access_key_id=conf.AWS_KEY,
aws_secret_access_key=conf.AWS_SECRET
)
CloudObjectStorage._api_client = client
return client
@property
def S3Object(self):
S3Object.store = self
return S3Object
def list_all_objects(self, prefix=None, suffix=None, **kwargs):
cache_exists = '.s3.cache.txt' in os.listdir(conf.DATA_DIR)
refresh = kwargs.get('refresh', True) and cache_exists
if self._dir_is_stale() and refresh:
self._update_dir()
directory = ItemArray(self._dir)
return ((prefix or suffix) and directory.filter(
lambda o: o.startswith(prefix or "") and o.endswith(suffix or "")
)) or directory
def _load_dir(self):
filepath = os.path.join(conf.DATA_DIR, '.s3.cache.txt')
striptime = lambda s: dt.datetime.strptime(s, '%b %d %Y %I:%M%p')
try:
if os.path.isfile(filepath):
with open(filepath, mode='r') as f:
s3_dir = json.load(f)
_last_update_str = list(s3_dir.keys())[0]
CloudObjectStorage._last_update = striptime(_last_update_str)
self._dir.update(s3_dir[_last_update_str])
except Exception as e:
print(e)
return
def _save_dir(self):
filepath = os.path.join(conf.DATA_DIR, '.s3.cache.txt')
timestring = lambda d: dt.datetime.strftime(d, '%b %d %Y %I:%M%p')
try:
s3_dir = {
timestring(CloudObjectStorage._last_update):
list(CloudObjectStorage._dir)
}
with open(filepath, mode='w') as f:
json.dump(s3_dir, f)
except Exception:
print(Exception)
return
def _dir_is_stale(self):
requires_init = len(CloudObjectStorage._dir) == 0
is_stale = CloudObjectStorage._last_update < dt.datetime.now(
) - dt.timedelta(minutes=90)
if requires_init or is_stale:
return True
return False
def _update_dir(self):
self._load_dir()
if self._dir_is_stale():
object_list = self._api_client.list_objects_v2(Bucket=self._bucket)
if "IsTruncated" in object_list:
is_truncated = object_list["IsTruncated"]
else:
is_truncated = False
if (
object_list and "Contents" in object_list
and len(object_list["Contents"]) > 0
):
self._dir.update([o["Key"] for o in object_list["Contents"]])
while is_truncated:
contination_token = object_list["NextContinuationToken"]
object_list = self._api_client.list_objects_v2(
Bucket=self._bucket,
ContinuationToken=contination_token
)
CloudObjectStorage._dir.update([
o["Key"] for o in object_list["Contents"]
])
is_truncated = object_list["IsTruncated"]
CloudObjectStorage._last_update = dt.datetime.now()
self._save_dir()
def get(self, key_id, compression=True):
try:
response = self._get(key_id=key_id)
obj = self.S3Object(body=response['Body'], key_id=key_id)
return (compression and obj.decompress()) or obj
except Exception as e:
raise Exception(e)
def put(self, obj, key_id, compression=True):
try:
obj = self.S3Object(body=obj, key_id=key_id)
return (compression and obj.compress().put()) or obj.put()
except Exception as e:
raise Exception(e)
def _get(self, key_id):
return self._api_client.get_object(Bucket=self._bucket, Key=key_id)
def _put(self, obj, key_id):
return self._api_client.put_object(
Body=obj,
Bucket=self._bucket,
Key=key_id
)