"""
The api module houses the Client class, which provides the main interface
developers will use to interact with RabbitMQ. It also contains errors and
decorators used by the class.
"""
from . import http
import functools
import json
try:
# python 2.x
from urllib import quote
except ImportError:
# python 3.x
from urllib.parse import quote
[docs]class APIError(Exception):
"""Denotes a failure due to unexpected or invalid
input/output between the client and the API
"""
pass
[docs]class PermissionError(Exception):
"""
Raised if the operation requires admin permissions, and the user used to
instantiate the Client class does not have admin privileges.
"""
pass
[docs]class Client(object):
"""
Abstraction of the RabbitMQ Management HTTP API.
HTTP calls are delegated to the HTTPClient class for ease of testing,
cleanliness, separation of duty, flexibility, etc.
"""
urls = {'overview': 'overview',
'all_queues': 'queues',
'all_exchanges': 'exchanges',
'all_channels': 'channels',
'all_connections': 'connections',
'all_nodes': 'nodes',
'all_vhosts': 'vhosts',
'all_users': 'users',
'all_permissions': 'permissions',
'all_bindings': 'bindings',
'whoami': 'whoami',
'queues_by_vhost': 'queues/%s',
'queues_by_name': 'queues/%s/%s',
'exchanges_by_vhost': 'exchanges/%s',
'exchange_by_name': 'exchanges/%s/%s',
'live_test': 'aliveness-test/%s',
'purge_queue': 'queues/%s/%s/contents',
'channels_by_name': 'channels/%s',
'connections_by_name': 'connections/%s',
'bindings_by_source_exch': 'exchanges/%s/%s/bindings/source',
'bindings_by_dest_exch': 'exchanges/%s/%s/bindings/destination',
'bindings_on_queue': 'queues/%s/%s/bindings',
'bindings_between_exch_queue': 'bindings/%s/e/%s/q/%s',
'rt_bindings_between_exch_queue': 'bindings/%s/e/%s/q/%s/%s',
'get_from_queue': 'queues/%s/%s/get',
'publish_to_exchange': 'exchanges/%s/%s/publish',
'vhosts_by_name': 'vhosts/%s',
'vhost_permissions': 'permissions/%s/%s',
'users_by_name': 'users/%s',
'user_permissions': 'users/%s/permissions',
'vhost_permissions_get': 'vhosts/%s/permissions'
}
json_headers = {"content-type": "application/json"}
def __init__(self, api_url, user, passwd, timeout=5, scheme='http'):
"""
:param string api_url: base url for the broker API
:param string user: Username used to authenticate to the API.
:param string passwd: Password used to authenticate to the API.
:param int timeout: Integer number of seconds to wait for each call.
:param string scheme: HTTP scheme used to make the connection
Populates server attributes using passed-in parameters and
the HTTP API's 'overview' information.
"""
self.api_url = api_url
self.user = user
self.passwd = passwd
self.timeout = timeout
self.scheme = scheme
self.http = http.HTTPClient(
self.api_url,
self.user,
self.passwd,
self.timeout,
self.scheme
)
return
def _call(self, path, method, body=None, headers=None):
"""
Wrapper around http.do_call that transforms some HTTPError into
our own exceptions
"""
try:
resp = self.http.do_call(path, method, body, headers)
except http.HTTPError as err:
if err.status == 401:
raise PermissionError('Insufficient permissions to query ' +
'%s with user %s :%s' % (path, self.user, err))
raise
return resp
[docs] def is_alive(self, vhost='%2F'):
"""
Uses the aliveness-test API call to determine if the
server is alive and the vhost is active. The broker (not this code)
creates a queue and then sends/consumes a message from it.
:param string vhost: There should be no real reason to ever change
this from the default value, but it's there if you need to.
:returns bool: True if alive, False otherwise
:raises: HTTPError if *vhost* doesn't exist on the broker.
"""
uri = Client.urls['live_test'] % vhost
try:
resp = self._call(uri, 'GET')
except http.HTTPError as err:
if err.status == 404:
raise APIError("No vhost named '%s'" % vhost)
raise
if resp['status'] == 'ok':
return True
else:
return False
[docs] def get_whoami(self):
"""
A convenience function used in the event that you need to confirm that
the broker thinks you are who you think you are.
:returns dict whoami: Dict structure contains:
* administrator: whether the user is has admin privileges
* name: user name
* auth_backend: backend used to determine admin rights
"""
path = Client.urls['whoami']
whoami = self._call(path, 'GET')
return whoami
[docs] def get_overview(self):
"""
:rtype: dict
Data in the 'overview' depends on the privileges of the creds used,
but typically contains information about the management plugin version,
some high-level message stats, and aggregate queue totals. Admin-level
creds gets you information about the cluster node, listeners, etc.
"""
overview = self._call(Client.urls['overview'], 'GET')
return overview
[docs] def get_nodes(self):
"""
:rtype: dict
Returns a list of dictionaries, each containing the details of each
node of the cluster.
"""
nodes = self._call(Client.urls['all_nodes'], 'GET')
return nodes
[docs] def get_users(self):
"""
Returns a list of dictionaries, each containing the attributes of a
different RabbitMQ user.
:returns: a list of dictionaries, each representing a user.
"""
users = self._call(Client.urls['all_users'], 'GET')
return users
################################################
### VHOSTS
################################################
[docs] def get_all_vhosts(self):
"""
Lists the names of all RabbitMQ vhosts.
:returns: a list of dicts, each dict representing a vhost
on the broker.
"""
vhosts = self._call(Client.urls['all_vhosts'], 'GET')
return vhosts
[docs] def get_vhost_names(self):
"""
A convenience function for getting back only the vhost names instead of
the larger vhost dicts.
:returns list vhost_names: A list of just the vhost names.
"""
vhosts = self.get_all_vhosts()
vhost_names = [i['name'] for i in vhosts]
return vhost_names
[docs] def get_vhost(self, vname):
"""
Returns the attributes of a single named vhost in a dict.
:param string vname: Name of the vhost to get.
:returns dict vhost: Attribute dict for the named vhost
"""
vname = quote(vname, '')
path = Client.urls['vhosts_by_name'] % vname
vhost = self._call(path, 'GET', headers=Client.json_headers)
return vhost
[docs] def create_vhost(self, vname):
"""
Creates a vhost on the server to house exchanges.
:param string vname: The name to give to the vhost on the server
:returns: boolean
"""
vname = quote(vname, '')
path = Client.urls['vhosts_by_name'] % vname
return self._call(path, 'PUT',
headers=Client.json_headers)
[docs] def delete_vhost(self, vname):
"""
Deletes a vhost from the server. Note that this also deletes any
exchanges or queues that belong to this vhost.
:param string vname: Name of the vhost to delete from the server.
"""
vname = quote(vname, '')
path = Client.urls['vhosts_by_name'] % vname
return self._call(path, 'DELETE')
###############################################
## PERMISSIONS
###############################################
[docs] def get_permissions(self):
"""
:returns: list of dicts, or an empty list if there are no permissions.
"""
path = Client.urls['all_permissions']
conns = self._call(path, 'GET')
return conns
[docs] def get_vhost_permissions(self, vname):
"""
:returns: list of dicts, or an empty list if there are no permissions.
:param string vname: Name of the vhost to set perms on.
"""
vname = quote(vname, '')
path = Client.urls['vhost_permissions_get'] % (vname,)
conns = self._call(path, 'GET')
return conns
[docs] def get_user_permissions(self, username):
"""
:returns: list of dicts, or an empty list if there are no permissions.
:param string username: User to set permissions for.
"""
path = Client.urls['user_permissions'] % (username,)
conns = self._call(path, 'GET')
return conns
[docs] def set_vhost_permissions(self, vname, username, config, rd, wr):
"""
Set permissions for a given username on a given vhost. Both
must already exist.
:param string vname: Name of the vhost to set perms on.
:param string username: User to set permissions for.
:param string config: Permission pattern for configuration operations
for this user in this vhost.
:param string rd: Permission pattern for read operations for this user
in this vhost
:param string wr: Permission pattern for write operations for this user
in this vhost.
Permission patterns are regex strings. If you're unfamiliar with this,
you should definitely check out this section of the RabbitMQ docs:
http://www.rabbitmq.com/admin-guide.html#access-control
"""
vname = quote(vname, '')
body = json.dumps({"configure": config, "read": rd, "write": wr})
path = Client.urls['vhost_permissions'] % (vname, username)
return self._call(path, 'PUT', body,
headers=Client.json_headers)
[docs] def delete_permission(self, vname, username):
"""
Delete permission for a given username on a given vhost. Both
must already exist.
:param string vname: Name of the vhost to set perms on.
:param string username: User to set permissions for.
"""
vname = quote(vname, '')
path = Client.urls['vhost_permissions'] % (vname, username)
return self._call(path, 'DELETE')
[docs] def get_permission(self, vname, username):
"""
:returns: dicts of permissions.
:param string vname: Name of the vhost to set perms on.
:param string username: User to set permissions for.
"""
vname = quote(vname, '')
path = Client.urls['vhost_permissions'] % (vname, username)
return self._call(path, 'GET')
###############################################
## EXCHANGES
###############################################
[docs] def get_exchanges(self, vhost=None):
"""
:returns: A list of dicts
:param string vhost: A vhost to query for exchanges, or None (default),
which triggers a query for all exchanges in all vhosts.
"""
if vhost:
vhost = quote(vhost, '')
path = Client.urls['exchanges_by_vhost'] % vhost
else:
path = Client.urls['all_exchanges']
exchanges = self._call(path, 'GET')
return exchanges
[docs] def get_exchange(self, vhost, name):
"""
Gets a single exchange which requires a vhost and name.
:param string vhost: The vhost containing the target exchange
:param string name: The name of the exchange
:returns: dict
"""
vhost = quote(vhost, '')
name = quote(name, '')
path = Client.urls['exchange_by_name'] % (vhost, name)
exch = self._call(path, 'GET')
return exch
[docs] def create_exchange(self,
vhost,
name,
xtype,
auto_delete=False,
durable=True,
internal=False,
arguments=None):
"""
Creates an exchange in the given vhost with the given name. As per the
RabbitMQ API documentation, a JSON body also needs to be included that
"looks something like this":
{"type":"direct",
"auto_delete":false,
"durable":true,
"internal":false,
"arguments":[]}
On success, the API returns a 204 with no content, in which case this
function returns True. If any other response is received, it's raised.
:param string vhost: Vhost to create the exchange in.
:param string name: Name of the proposed exchange.
:param string type: The AMQP exchange type.
:param bool auto_delete: Whether or not the exchange should be
dropped when the no. of consumers drops to zero.
:param bool durable: Whether you want this exchange to persist a
broker restart.
:param bool internal: Whether or not this is a queue for use by the
broker only.
:param list arguments: If given, should be a list. If not given, an
empty list is sent.
"""
vhost = quote(vhost, '')
name = quote(name, '')
path = Client.urls['exchange_by_name'] % (vhost, name)
base_body = {"type": xtype, "auto_delete": auto_delete,
"durable": durable, "internal": internal,
"arguments": arguments or list()}
body = json.dumps(base_body)
self._call(path, 'PUT', body,
headers=Client.json_headers)
return True
[docs] def publish(self, vhost, xname, rt_key, payload, payload_enc='string',
properties=None):
"""
Publish a message to an exchange.
:param string vhost: vhost housing the target exchange
:param string xname: name of the target exchange
:param string rt_key: routing key for message
:param string payload: the message body for publishing
:param string payload_enc: encoding of the payload. The only choices
here are 'string' and 'base64'.
:param dict properties: a dict of message properties
:returns: boolean indicating success or failure.
"""
vhost = quote(vhost, '')
xname = quote(xname, '')
path = Client.urls['publish_to_exchange'] % (vhost, xname)
body = json.dumps({'routing_key': rt_key, 'payload': payload,
'payload_encoding': payload_enc,
'properties': properties or {}})
result = self._call(path, 'POST', body)
return result['routed']
[docs] def delete_exchange(self, vhost, name):
"""
Delete the named exchange from the named vhost. The API returns a 204
on success, in which case this method returns True, otherwise the
error is raised.
:param string vhost: Vhost where target exchange was created
:param string name: The name of the exchange to delete.
:returns bool: True on success.
"""
vhost = quote(vhost, '')
name = quote(name, '')
path = Client.urls['exchange_by_name'] % (vhost, name)
self._call(path, 'DELETE')
return True
#############################################
## QUEUES
#############################################
[docs] def get_queues(self, vhost=None):
"""
Get all queues, or all queues in a vhost if vhost is not None.
Returns a list.
:param string vhost: The virtual host to list queues for. If This is
None (the default), all queues for the broker instance
are returned.
:returns: A list of dicts, each representing a queue.
:rtype: list of dicts
"""
if vhost:
vhost = quote(vhost, '')
path = Client.urls['queues_by_vhost'] % vhost
else:
path = Client.urls['all_queues']
queues = self._call(path, 'GET')
return queues or list()
[docs] def get_queue(self, vhost, name):
"""
Get a single queue, which requires both vhost and name.
:param string vhost: The virtual host for the queue being requested.
If the vhost is '/', note that it will be translated to '%2F' to
conform to URL encoding requirements.
:param string name: The name of the queue being requested.
:returns: A dictionary of queue properties.
:rtype: dict
"""
vhost = quote(vhost, '')
name = quote(name, '')
path = Client.urls['queues_by_name'] % (vhost, name)
queue = self._call(path, 'GET')
return queue
[docs] def get_queue_depth(self, vhost, name):
"""
Get the number of messages currently in a queue. This is a convenience
function that just calls :meth:`Client.get_queue` and pulls
out/returns the 'messages' field from the dictionary it returns.
:param string vhost: The vhost of the queue being queried.
:param string name: The name of the queue to query.
:returns: Number of messages in the queue
:rtype: integer
"""
vhost = quote(vhost, '')
name = quote(name, '')
path = Client.urls['queues_by_name'] % (vhost, name)
queue = self._call(path, 'GET')
depth = queue['messages']
return depth
[docs] def get_queue_depths(self, vhost, names=None):
"""
Get the number of messages currently sitting in either the queue
names listed in 'names', or all queues in 'vhost' if no 'names' are
given.
:param str vhost: Vhost where queues in 'names' live.
:param list names: OPTIONAL - Specific queues to show depths for. If
None, show depths for all queues in 'vhost'.
"""
vhost = quote(vhost, '')
if not names:
# get all queues in vhost
path = Client.urls['queues_by_vhost'] % vhost
queues = self._call(path, 'GET')
for queue in queues:
depth = queue['messages']
print("\t%s: %s" % (queue, depth))
else:
# get the named queues only.
for name in names:
depth = self.get_queue_depth(vhost, name)
print("\t%s: %s" % (name, depth))
[docs] def purge_queues(self, queues):
"""
Purge all messages from one or more queues.
:param list queues: A list of ('qname', 'vhost') tuples.
:returns: True on success
"""
for name, vhost in queues:
vhost = quote(vhost, '')
name = quote(name, '')
path = Client.urls['purge_queue'] % (vhost, name)
self._call(path, 'DELETE')
return True
[docs] def purge_queue(self, vhost, name):
"""
Purge all messages from a single queue. This is a convenience method
so you aren't forced to supply a list containing a single tuple to
the purge_queues method.
:param string vhost: The vhost of the queue being purged.
:param string name: The name of the queue being purged.
:rtype: None
"""
vhost = quote(vhost, '')
name = quote(name, '')
path = Client.urls['purge_queue'] % (vhost, name)
return self._call(path, 'DELETE')
[docs] def create_queue(self, vhost, name, **kwargs):
"""
Create a queue. The API documentation specifies that all of the body
elements are optional, so this method only requires arguments needed
to form the URI
:param string vhost: The vhost to create the queue in.
:param string name: The name of the queue
More on these operations can be found at:
http://www.rabbitmq.com/amqp-0-9-1-reference.html
"""
vhost = quote(vhost, '')
name = quote(name, '')
path = Client.urls['queues_by_name'] % (vhost, name)
body = json.dumps(kwargs)
return self._call(path,
'PUT',
body,
headers=Client.json_headers)
[docs] def delete_queue(self, vhost, qname):
"""
Deletes the named queue from the named vhost.
:param string vhost: Vhost housing the queue to be deleted.
:param string qname: Name of the queue to delete.
Note that if you just want to delete the messages from a queue, you
should use purge_queue instead of deleting/recreating a queue.
"""
vhost = quote(vhost, '')
qname = quote(qname, '')
path = Client.urls['queues_by_name'] % (vhost, qname)
return self._call(path, 'DELETE', headers=Client.json_headers)
[docs] def get_messages(self, vhost, qname, count=1,
requeue=False, truncate=None, encoding='auto'):
"""
Gets <count> messages from the queue.
:param string vhost: Name of vhost containing the queue
:param string qname: Name of the queue to consume from
:param int count: Number of messages to get.
:param bool requeue: Whether to requeue the message after getting it.
This will cause the 'redelivered' flag to be set in the message on
the queue.
:param int truncate: The length, in bytes, beyond which the server will
truncate the message before returning it.
:returns: list of dicts. messages[msg-index]['payload'] will contain
the message body.
"""
vhost = quote(vhost, '')
base_body = {'count': count, 'requeue': requeue, 'encoding': encoding}
if truncate:
base_body['truncate'] = truncate
body = json.dumps(base_body)
qname = quote(qname, '')
path = Client.urls['get_from_queue'] % (vhost, qname)
messages = self._call(path, 'POST', body,
headers=Client.json_headers)
return messages
#########################################
# CONNS/CHANS & BINDINGS
#########################################
[docs] def get_connections(self):
"""
:returns: list of dicts, or an empty list if there are no connections.
"""
path = Client.urls['all_connections']
conns = self._call(path, 'GET')
return conns
[docs] def get_connection(self, name):
"""
Get a connection by name. To get the names, use get_connections.
:param string name: Name of connection to get
:returns dict conn: A connection attribute dictionary.
"""
name = quote(name, '')
path = Client.urls['connections_by_name'] % name
conn = self._call(path, 'GET')
return conn
[docs] def delete_connection(self, name):
"""
Close the named connection. The API returns a 204 on success,
in which case this method returns True, otherwise the
error is raised.
:param string name: The name of the connection to delete.
:returns bool: True on success.
"""
name = quote(name, '')
path = Client.urls['connections_by_name'] % name
self._call(path, 'DELETE')
return True
[docs] def get_channels(self):
"""
Return a list of dicts containing details about broker connections.
:returns: list of dicts
"""
path = Client.urls['all_channels']
chans = self._call(path, 'GET')
return chans
[docs] def get_channel(self, name):
"""
Get a channel by name. To get the names, use get_channels.
:param string name: Name of channel to get
:returns dict conn: A channel attribute dictionary.
"""
name = quote(name, '')
path = Client.urls['channels_by_name'] % name
chan = self._call(path, 'GET')
return chan
[docs] def get_bindings(self):
"""
:returns: list of dicts
"""
path = Client.urls['all_bindings']
bindings = self._call(path, 'GET')
return bindings
[docs] def get_queue_bindings(self, vhost, qname):
"""
Return a list of dicts, one dict per binding. The dict format coming
from RabbitMQ for queue named 'testq' is:
{"source":"sourceExch","vhost":"/","destination":"testq",
"destination_type":"queue","routing_key":"*.*","arguments":{},
"properties_key":"%2A.%2A"}
"""
vhost = quote(vhost, '')
qname = quote(qname, '')
path = Client.urls['bindings_on_queue'] % (vhost, qname)
bindings = self._call(path, 'GET')
return bindings
def get_bindings_from_exchange(self, vhost, exch):
pass
def get_bindings_to_exchange(self, vhost, exch):
pass
def get_bindings_between_exch_and_queue(self, vhost, exch, queue):
pass
[docs] def create_binding(self, vhost, exchange, queue, rt_key=None, args=None):
"""
Creates a binding between an exchange and a queue on a given vhost.
:param string vhost: vhost housing the exchange/queue to bind
:param string exchange: the target exchange of the binding
:param string queue: the queue to bind to the exchange
:param string rt_key: the routing key to use for the binding
:param list args: extra arguments to associate w/ the binding.
:returns: boolean
"""
vhost = quote(vhost, '')
exchange = quote(exchange, '')
queue = quote(queue, '')
body = json.dumps({'routing_key': rt_key, 'arguments': args or []})
path = Client.urls['bindings_between_exch_queue'] % (vhost,
exchange,
queue)
binding = self._call(path, 'POST', body=body,
headers=Client.json_headers)
return binding
[docs] def delete_binding(self, vhost, exchange, queue, rt_key):
"""
Deletes a binding between an exchange and a queue on a given vhost.
:param string vhost: vhost housing the exchange/queue to bind
:param string exchange: the target exchange of the binding
:param string queue: the queue to bind to the exchange
:param string rt_key: the routing key to use for the binding
"""
vhost = quote(vhost, '')
exchange = quote(exchange, '')
queue = quote(queue, '')
body = ''
path = Client.urls['rt_bindings_between_exch_queue'] % (vhost,
exchange,
queue,
rt_key)
return self._call(path, 'DELETE', headers=Client.json_headers)
[docs] def create_user(self, username, password, tags=""):
"""
Creates a user.
:param string username: The name to give to the new user
:param string password: Password for the new user
:param string tags: Comma-separated list of tags for the user
:returns: boolean
"""
path = Client.urls['users_by_name'] % username
body = json.dumps({'password': password, 'tags': tags})
return self._call(path, 'PUT', body=body,
headers=Client.json_headers)
[docs] def delete_user(self, username):
"""
Deletes a user from the server.
:param string username: Name of the user to delete from the server.
"""
path = Client.urls['users_by_name'] % username
return self._call(path, 'DELETE')