🔵

Code ✂️ : Cloud Object Storage

💡
The following is an object-oriented implementation that allows storage and retrieval of objects in S3 buckets. It can handle compression and decompression as well as CRUD operations on objects stored in AWS S3.
import datetime as dt
import functools
import gzip
import io
import json
import logging
import os
import threading
import boto3
import conf
import pandas as pd
from cached_property import cached_property
from project.utils.functional import ItemArray


class S3Object(object):
    store = None

    def __init__(self, key_id=None, body=None):
        assert S3Object.store is not None
        assert key_id is not None
        self._key_id = key_id
        self._body = body or self.get()
        self.data = None

    @classmethod
    def from_df(cls, df, key_id, **kwargs):
        header = kwargs.pop('header', False)
        index = kwargs.pop('index', False)

        buffer = io.StringIO()
        df.to_csv(buffer, header=header, index=index, **kwargs)
        obj = cls(body=buffer.getvalue(), key_id=key_id)
        obj.compress()

        return obj

    def to_df(self, **kwargs):
        compression = kwargs.pop('compression', True)

        header = kwargs.pop('header', 'infer')
        names = kwargs.pop('names', None)
        dtype = kwargs.pop('dtype', str)

        if compression:
            self.decompress()
        else:
            self.read()

        body = self._body
        body = '\n'.join([l.replace('?', ' ') for l in body.split('\n')])
        self._body = body

        csv_buffer = io.StringIO(body)

        return pd.read_csv(
            csv_buffer,
            header=header,
            names=names,
            dtype=dtype,
            **kwargs
        )

    def read(self, encoding='utf-8'):
        if self.data:
            return self.data
        self._read(encoding)
        return self._body

    def _read(self, encoding):
        try:
            body = self._body.read().decode(encoding)
            self._body = body
            self.data = self._body
        except Exception as e:
            return

    def _compress(self, encoding):
        body = self._body

        gz_buffer = io.BytesIO()

        with gzip.GzipFile(mode='wb', fileobj=gz_buffer) as gz_file:
            gz_file.write(body.encode(encoding))
        self._body = gz_buffer.getvalue()

        return self

    def _decompress(self, encoding):
        try:
            body = self._body

            with gzip.GzipFile(mode='rb', fileobj=body) as gz_file:
                new_body = gz_file.read().decode(encoding)
        except Exception as e:
            logging.error(
                str(e) +
                f": Object '{self._key_id}' does not have gzip compression applied. Retrying..."
            )
            try:
                new_body = self.get().read().decode(encoding)
            except Exception as e:
                raise
        self._body = new_body

        return self

    def decompress(self, encoding='utf-8'):
        return self._decompress(encoding)

    def compress(self, encoding='utf-8'):
        return self._compress(encoding)

    def put(self):
        S3Object.store._put(key_id=self._key_id, obj=self._body)

    def get(self):
        return S3Object.store._get(key_id=self._key_id)['Body']

    def __repr__(self):
        return f'Document {self._key_id} in bucket: {self.store._bucket}'


class CloudObjectStorage:
    _name = "CloudObjectStorage"
    _api_client = False
    _dir = set()
    _last_update = dt.datetime.now() - dt.timedelta(days=1)

    def __init__(self, bucket=conf.S3_BUCKET, api_client=None):
        self.lock = threading.Lock()
        self._bucket = bucket
        self._api_client = (
            CloudObjectStorage._api_client
            or (api_client or self._create_client())
        )

    def _create_client(self):
        with self.lock:
            client = boto3.client(
                's3',
                aws_access_key_id=conf.AWS_KEY,
                aws_secret_access_key=conf.AWS_SECRET
            )
        CloudObjectStorage._api_client = client
        return client

    @property
    def S3Object(self):
        S3Object.store = self
        return S3Object

    def list_all_objects(self, prefix=None, suffix=None, **kwargs):
        cache_exists = '.s3.cache.txt' in os.listdir(conf.DATA_DIR)
        refresh = kwargs.get('refresh', True) and cache_exists
        if self._dir_is_stale() and refresh:
            self._update_dir()
        directory = ItemArray(self._dir)
        return ((prefix or suffix) and directory.filter(
            lambda o: o.startswith(prefix or "") and o.endswith(suffix or "")
        )) or directory

    def _load_dir(self):
        filepath = os.path.join(conf.DATA_DIR, '.s3.cache.txt')
        striptime = lambda s: dt.datetime.strptime(s, '%b %d %Y %I:%M%p')
        try:
            if os.path.isfile(filepath):
                with open(filepath, mode='r') as f:
                    s3_dir = json.load(f)
                _last_update_str = list(s3_dir.keys())[0]
                CloudObjectStorage._last_update = striptime(_last_update_str)
                self._dir.update(s3_dir[_last_update_str])
        except Exception as e:
            print(e)
            return

    def _save_dir(self):
        filepath = os.path.join(conf.DATA_DIR, '.s3.cache.txt')
        timestring = lambda d: dt.datetime.strftime(d, '%b %d %Y %I:%M%p')
        try:
            s3_dir = {
                timestring(CloudObjectStorage._last_update):
                    list(CloudObjectStorage._dir)
            }
            with open(filepath, mode='w') as f:
                json.dump(s3_dir, f)
        except Exception:
            print(Exception)
            return

    def _dir_is_stale(self):
        requires_init = len(CloudObjectStorage._dir) == 0
        is_stale = CloudObjectStorage._last_update < dt.datetime.now(
        ) - dt.timedelta(minutes=90)

        if requires_init or is_stale:
            return True
        return False

    def _update_dir(self):
        self._load_dir()
        if self._dir_is_stale():
            object_list = self._api_client.list_objects_v2(Bucket=self._bucket)
            if "IsTruncated" in object_list:
                is_truncated = object_list["IsTruncated"]
            else:
                is_truncated = False
            if (
                object_list and "Contents" in object_list
                and len(object_list["Contents"]) > 0
            ):
                self._dir.update([o["Key"] for o in object_list["Contents"]])

            while is_truncated:
                contination_token = object_list["NextContinuationToken"]
                object_list = self._api_client.list_objects_v2(
                    Bucket=self._bucket,
                    ContinuationToken=contination_token
                )
                CloudObjectStorage._dir.update([
                    o["Key"] for o in object_list["Contents"]
                ])
                is_truncated = object_list["IsTruncated"]

            CloudObjectStorage._last_update = dt.datetime.now()
            self._save_dir()

    def get(self, key_id, compression=True):
        try:
            response = self._get(key_id=key_id)
            obj = self.S3Object(body=response['Body'], key_id=key_id)
            return (compression and obj.decompress()) or obj
        except Exception as e:
            raise Exception(e)

    def put(self, obj, key_id, compression=True):
        try:
            obj = self.S3Object(body=obj, key_id=key_id)
            return (compression and obj.compress().put()) or obj.put()

        except Exception as e:
            raise Exception(e)

    def _get(self, key_id):
        return self._api_client.get_object(Bucket=self._bucket, Key=key_id)

    def _put(self, obj, key_id):
        return self._api_client.put_object(
            Body=obj,
            Bucket=self._bucket,
            Key=key_id
        )