# -*- coding: utf-8 -*-
"""
CuttlePool.
:license: BSD 3-clause, see LICENSE for details.
"""
__version__ = '0.10.0-dev'
try:
import threading
except ImportError:
import dummy_threading as threading
import time
import warnings
import weakref
_OVERFLOW = 0
_TIMEOUT = None
[docs]class CuttlePool(object):
"""
A resource pool.
:param func factory: A factory that produces the desired resource.
:param int capacity: Max number of resource instances in the pool.
:param int overflow: The number of extra resource instances that can be
made if the pool is exhausted. Defaults to ``0``.
:param int timeout: Time in seconds to wait for a resource. Defaults to
``None``.
:param resource_wrapper: A Resource subclass.
:param \**kwargs: Keyword arguments that are passed to ``factory`` when
a resource instance is created.
:raises ValueError: If capacity <= 0 or overflow < 0 or timeout < 0.
:raises TypeError: If timeout is not int or ``None``.
"""
def __init__(self,
factory,
capacity,
overflow=_OVERFLOW,
timeout=_TIMEOUT,
resource_wrapper=None,
**kwargs):
if capacity <= 0:
raise ValueError('CuttlePool requires a minimum capacity of 1')
if overflow < 0:
raise ValueError('Overflow must be non negative integer')
if timeout is not None:
msg = 'Timeout must be non negative integer'
if type(timeout) != int:
raise TypeError(msg)
if timeout < 0:
raise ValueError(msg)
self._capacity = capacity
self._overflow = overflow
self._timeout = timeout
self._factory = factory
self._resource_wrapper = resource_wrapper or Resource
self._factory_arguments = kwargs
# The reference queue is divided in two sections. One section is a
# queue of resources that are ready for use (the available region).
# The other section is an unordered list of resources that are
# currently in use and NoneType objects (the unavailable region).
self._reference_queue = [None] * self.maxsize
self._resource_start = self._resource_end = 0
# _size is the number of existing resources. _available is the
# number of available resources.
self._size = self._available = 0
# Required for locking the resource pool in multi-threaded
# environments.
self._lock = threading.RLock()
# Notify thread waiting for resource that the queue is not empty when
# a resource is returned to the pool.
self._not_empty = threading.Condition(self._lock)
@property
def capacity(self):
"""
The maximum capacity the pool will hold under normal circumstances.
"""
return self._capacity
@property
def connection_arguments(self):
"""For compatibility with older versions, will be removed in 1.0."""
warnings.warn(('connection_arguments is deprecated in favor of '
'factory_arguments and will be removed in 1.0'),
DeprecationWarning)
return self.factory_arguments
@property
def factory_arguments(self):
"""
Return a copy of the factory arguments used to create a resource.
"""
return self._factory_arguments.copy()
@property
def maxsize(self):
"""
The maximum possible number of resource instances that can exist at any
one time.
"""
return self._capacity + self._overflow
@property
def overflow(self):
"""
The number of additional resource instances the pool will create when
it is at capacity.
"""
return self._overflow
@property
def size(self):
"""
The number of existing resource instances that have been made by the
pool.
:note: This is not the number of resources *in* the pool, but the
number of existing resources. This includes resources in the
pool and resources in use.
.. warning:: This is not threadsafe. ``size`` can change when context
switches to another thread.
"""
with self._lock:
return self._size
@property
def timeout(self):
"""
The duration to wait for a resource to be returned to the pool when the
pool is depleted.
"""
return self._timeout
def _get(self, timeout, resource_wrapper=None):
"""
Get a resource from the pool. If timeout is ``None`` waits
indefinitely.
:param timeout: Time in seconds to wait for a resource.
:type timeout: int
:param resource_wrapper: A Resource subclass.
:return: A tuple containing a ``_ResourceTracker`` and ``Resource``.
:raises PoolEmptyError: When timeout has elapsed and unable to
retrieve resource.
"""
if resource_wrapper is None:
resource_wrapper = self._resource_wrapper
with self._lock:
if timeout is None:
while self.empty():
self._not_empty.wait()
else:
time_end = time.time() + timeout
while self.empty():
time_left = time_end - time.time()
if time_left < 0:
raise PoolEmptyError
self._not_empty.wait(time_left)
rtracker = self._reference_queue[self._resource_start]
self._resource_start = (self._resource_start + 1) % self.maxsize
self._available -= 1
wrapped_resource = rtracker.wrap_resource(self, resource_wrapper)
return rtracker, wrapped_resource
def _get_tracker(self, resource):
"""
Return the resource tracker that is tracking ``resource``.
:param resource: A resource.
:return: A resource tracker.
:rtype: :class:`_ResourceTracker`
"""
with self._lock:
for rt in self._reference_queue:
if rt is not None and resource is rt.resource:
return rt
raise UnknownResourceError('Resource not created by pool')
def _harvest_lost_resources(self):
"""Return lost resources to pool."""
with self._lock:
for i in self._unavailable_range():
rtracker = self._reference_queue[i]
if rtracker is not None and rtracker.available():
self.put_resource(rtracker.resource)
def _make_resource(self, resource_wrapper=None):
"""
Returns a resource instance.
:param resource_wrapper: A Resource subclass.
:return: A tuple containing a ``_ResourceTracker`` and ``Resource``.
"""
if resource_wrapper is None:
resource_wrapper = self._resource_wrapper
with self._lock:
for i in self._unavailable_range():
if self._reference_queue[i] is None:
rtracker = _ResourceTracker(
self._factory(**self._factory_arguments))
self._reference_queue[i] = rtracker
self._size += 1
# tell the resource-tracker to wrap the resource. We return the resource-tracker an the wrapped resource
wrapped_resource = rtracker.wrap_resource(
self, resource_wrapper)
return rtracker, wrapped_resource
raise PoolFullError
def _put(self, rtracker):
"""
Put a resource back in the queue.
:param rtracker: A resource.
:type rtracker: :class:`_ResourceTracker`
:raises PoolFullError: If pool is full.
:raises UnknownResourceError: If resource can't be found.
"""
with self._lock:
if self._available < self.capacity:
for i in self._unavailable_range():
if self._reference_queue[i] is rtracker:
# i retains its value and will be used to swap with
# first "empty" space in queue.
break
else:
raise UnknownResourceError
j = self._resource_end
rq = self._reference_queue
rq[i], rq[j] = rq[j], rq[i]
self._resource_end = (self._resource_end + 1) % self.maxsize
self._available += 1
self._not_empty.notify()
else:
raise PoolFullError
def _remove(self, rtracker):
"""
Remove a resource from the pool.
:param rtracker: A resource.
:type rtracker: :class:`_ResourceTracker`
"""
with self._lock:
i = self._reference_queue.index(rtracker)
self._reference_queue[i] = None
self._size -= 1
def _unavailable_range(self):
"""
Return a generator for the indices of the unavailable region of
``_reference_queue``.
"""
with self._lock:
i = self._resource_end
j = self._resource_start
if j < i or self.empty():
j += self.maxsize
for k in range(i, j):
yield k % self.maxsize
[docs] def empty(self):
"""Return ``True`` if pool is empty."""
with self._lock:
return self._available == 0
[docs] def get_connection(self, connection_wrapper=None):
"""For compatibility with older versions, will be removed in 1.0."""
warnings.warn(('get_connection() is deprecated in favor of '
'get_resource() and will be removed in 1.0'),
DeprecationWarning)
return self.get_resource(connection_wrapper)
[docs] def get_resource(self, resource_wrapper=None):
"""
Returns a ``Resource`` instance.
:param resource_wrapper: A Resource subclass.
:return: A ``Resource`` instance.
:raises PoolEmptyError: If attempt to get resource fails or times
out.
"""
rtracker = None
wrapped_resource = None
if resource_wrapper is None:
resource_wrapper = self._resource_wrapper
if self.empty():
self._harvest_lost_resources()
try:
# Try to get a resource from the pool. Do not wait.
rtracker, wrapped_resource = self._get(0, resource_wrapper)
except PoolEmptyError:
pass
if rtracker is None:
# Could not find resource, try to make one.
try:
rtracker, wrapped_resource = self._make_resource(
resource_wrapper)
except PoolFullError:
pass
if rtracker is None:
# Could not find or make resource, so must wait for a resource
# to be returned to the pool.
try:
rtracker, wrapped_resource = self._get(
self._timeout, resource_wrapper)
except PoolEmptyError:
pass
if rtracker is None:
raise PoolEmptyError
# Ensure resource is active.
if not self.ping(rtracker.resource):
# Lock here to prevent another thread creating a resource in the
# index that will have this resource removed. This ensures there
# will be space for _make_resource() to place a newly created
# resource.
with self._lock:
self._remove(rtracker)
rtracker, wrapped_resource = self._make_resource(
resource_wrapper)
# Ensure all resources leave pool with same attributes.
# normalize_connection() is used since it calls
# normalize_resource(), so if a user implements either one, the
# resource will still be normalized. This will be changed in 1.0 to
# call normalize_resource() when normalize_connection() is
# removed.
self.normalize_connection(rtracker.resource)
return wrapped_resource
[docs] def normalize_connection(self, connection):
"""For compatibility with older versions, will be removed in 1.0."""
warnings.warn(('normalize_connection is deprecated in favor of '
'normalize_resource and will be removed in 1.0'),
DeprecationWarning)
return self.normalize_resource(connection)
[docs] def normalize_resource(self, resource):
"""
A user implemented function that resets the properties of the
resource instance that was created by `factory`. This prevents
unwanted behavior from a resource retrieved from the pool as it could
have been changed when previously used.
:param obj resource: A resource instance.
"""
warnings.warn('Failing to implement normalize_resource() can '
'result in unwanted behavior.')
[docs] def ping(self, resource):
"""
A user implemented function that ensures the ``Resource`` object is
open.
:param obj resource: A ``Resource`` object.
:return: A bool indicating if the resource is open (``True``) or
closed (``False``).
"""
warnings.warn('Failing to implement ping() can result in unwanted '
'behavior.')
return True
[docs] def put_connection(self, connection):
"""For compatibility with older versions, will be removed in 1.0."""
warnings.warn(('put_connection is deprecated in favor of '
'put_resource and will be removed in 1.0'),
DeprecationWarning)
return self.put_resource(connection)
[docs] def put_resource(self, resource):
"""
Adds a resource back to the pool or discards it if the pool is full.
:param resource: A resource object.
:raises UnknownResourceError: If resource was not made by the
pool.
"""
rtracker = self._get_tracker(resource)
try:
self._put(rtracker)
except PoolFullError:
self._remove(rtracker)
class _ResourceTracker(object):
"""
Track if a resource is in use.
:param resource: A resource instance.
"""
def __init__(self, resource):
self.resource = resource
self._weakref = None
def available(self):
"""Determine if resource available for use."""
return self._weakref is None or self._weakref() is None
def wrap_resource(self, pool, resource_wrapper):
"""
Return a resource wrapped in ``resource_wrapper``.
:param pool: A pool instance.
:type pool: :class:`CuttlePool`
:param resource_wrapper: A wrapper class for the resource.
:type resource_wrapper: :class:`Resource`
:return: A wrapped resource.
:rtype: :class:`Resource`
"""
resource = resource_wrapper(self.resource, pool)
self._weakref = weakref.ref(resource)
return resource
[docs]class Resource(object):
"""
A wrapper around a resource instance.
:param resource: A resource instance.
:param pool: A resource pool.
"""
def __init__(self, resource, pool):
object.__setattr__(self, '_resource', resource)
object.__setattr__(self, '_pool', pool)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def __getattr__(self, name):
"""
Gets attributes of resource object.
"""
return getattr(self._resource, name)
def __setattr__(self, name, value):
"""Sets attributes of resource object."""
if name not in self.__dict__:
setattr(self._resource, name, value)
else:
object.__setattr__(self, name, value)
[docs] def close(self):
"""
Returns the resource to the resource pool.
"""
if self._resource is not None:
self._pool.put_resource(self._resource)
self._resource = None
self._pool = None
[docs]class CuttlePoolError(Exception):
"""Base class for exceptions in this module."""
[docs]class PoolEmptyError(CuttlePoolError):
"""Exception raised when pool timeouts."""
[docs]class PoolFullError(CuttlePoolError):
"""Exception raised when there is no space to add a resource."""
[docs]class UnknownResourceError(CuttlePoolError):
"""
Exception raised when a resource is returned to the pool that was not
made by the pool.
"""
class PoolConnection(Resource):
"""For compatibility with older versions, will be removed in 1.0."""
def __init__(self, *args, **kwargs):
warnings.warn(('PoolConnection is deprecated in favor of Resource and '
'will be removed in 1.0'), DeprecationWarning)
super(PoolConnection, self).__init__(*args, **kwargs)