Skip to content
This repository was archived by the owner on Jul 2, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
language: python
python:
- '2.7'
matrix:
include:
- python: 2.7
- python: 3.5

env:
BOTO_CONFIG=/dev/null

install:
- make develop
sudo: false
Expand Down
6 changes: 2 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
.PHONY:

develop:
# Hard to bootstrap these setup-requirements from setup.py unless we
# are happy to use easy_install. Lets pip them-
pip install pytest-runner setupext-pip~=1.0.5
python setup.py requirements --install-test-requirements --install-extra-requirements documentation
pip install pipenv
pipenv install --dev --deploy
python setup.py develop

test:
Expand Down
9 changes: 9 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
celery-cloudwatch = {editable = true, path = "."}

[dev-packages]
101 changes: 101 additions & 0 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 8 additions & 9 deletions celery_cloudwatch/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import os
import voluptuous as v
import yaml
import six
import logging
import logging.config

Expand All @@ -11,8 +10,8 @@

config_schema = v.Schema({
v.Optional('ccwatch', default={}): v.Schema({
v.Optional('broker', default=None): v.Any(None, six.binary_type),
v.Optional('camera', default="celery_cloudwatch.CloudWatchCamera"): v.Any(str, six.binary_type),
v.Optional('broker', default=None): v.Any(None, str),
v.Optional('camera', default="celery_cloudwatch.CloudWatchCamera"): str,
v.Optional('verbose', default=False): bool
}, extra=False),
v.Optional('camera', default={}): v.Schema({
Expand All @@ -21,20 +20,20 @@
}, extra=False),
v.Optional('cloudwatch-camera', default={}): v.Schema({
v.Optional('dryrun', default=False): bool,
v.Optional('namespace', default='celery'): six.binary_type,
v.Optional('namespace', default='celery'): str,
v.Optional('tasks', default=[]): v.Schema([
six.binary_type, v.Schema({
'name': six.binary_type,
str, v.Schema({
'name': str,
'dimensions': v.Schema({
v.Extra: six.binary_type
v.Extra: str
}, extra=True)
}, extra=False)
]),
v.Optional('task-groups', default=[]): [
v.Schema({
'tasks': [six.binary_type],
'tasks': [str],
'dimensions': v.Schema({
v.Extra: six.binary_type
v.Extra: str
})
})
],
Expand Down
19 changes: 10 additions & 9 deletions celery_cloudwatch/cloudwatch_camera.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import six
import sys
import traceback

Expand Down Expand Up @@ -40,14 +41,14 @@ def on_shutter(self, state):
try:
self.metrics = self._build_metrics(state)
except RuntimeError as r:
print r
print(r)

def after_shutter(self):
try:
self.metrics.send()
except:
print "Exception in user code:"
print '-'*60
print("Exception in user code:")
print('-'*60)
traceback.print_exc(file=sys.stdout)
finally:
self.metrics = None
Expand Down Expand Up @@ -86,7 +87,7 @@ def _build_metrics(self, state):

def _add_task_events(self, metrics, task_event_sent, task_event_started, task_event_succeeded, task_event_failed,
num_waiting_by_task, num_running_by_task, time_to_start, time_to_process):
for task_name, dimensions in self.task_mapping.iteritems():
for task_name, dimensions in six.iteritems(self.task_mapping):
metrics.add('CeleryEventSent', unit='Count', value=task_event_sent.get(task_name, 0), dimensions=dimensions)
metrics.add('CeleryEventStarted', unit='Count', value=task_event_started.get(task_name, 0), dimensions=dimensions)
metrics.add('CeleryEventSucceeded', unit='Count', value=task_event_succeeded.get(task_name, 0), dimensions=dimensions)
Expand Down Expand Up @@ -143,7 +144,7 @@ def _add_task_groups(self, metrics, task_event_sent, task_event_started, task_ev


def xchunk(arr, size):
for x in xrange(0, len(arr), size):
for x in six.moves.range(0, len(arr), size):
yield arr[x:x+size]


Expand All @@ -169,7 +170,7 @@ def _serialize(self, metric_chunk):
}
index = 0
for metric in metric_chunk:
for key, val in metric.serialize().iteritems():
for key, val in six.iteritems(metric.serialize()):
params['MetricData.member.%d.%s' % (index + 1, key)] = val
index += 1
return params
Expand All @@ -178,8 +179,8 @@ def send(self):
for metric_chunk in xchunk(self.metrics, self._metric_chunk_size):
metrics = self._serialize(metric_chunk)
if self.verbose:
print 'PutMetricData'
print json.dumps(metrics, indent=2, sort_keys=True)
print('PutMetricData')
print(json.dumps(metrics, indent=2, sort_keys=True))
if self.aws_connection:
self.aws_connection.get_status('PutMetricData', metrics, verb="POST")

Expand Down Expand Up @@ -233,7 +234,7 @@ def _build_dimension_param(dimensions, params):
for dim_name in dimensions:
dim_value = dimensions[dim_name]
if dim_value:
if isinstance(dim_value, basestring):
if isinstance(dim_value, six.string_types):
dim_value = [dim_value]
for value in dim_value:
params['%s.%d.Name' % (prefix, i+1)] = dim_name
Expand Down
38 changes: 19 additions & 19 deletions celery_cloudwatch/print_camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,42 @@ class PrintCamera(Camera):
clear_after = True

def on_shutter(self, state):
print '-----'
print('-----')

print 'Time to Start'
print('Time to Start')
total = Stats()
for method_name, stats in state.time_to_start.items():
print "%s avg:%.2fs, max:%.2fs, min: %.2fs" % (method_name, stats.average() or -1.0 , stats.maximum or -1.0, stats.minimum or -1.0)
print("%s avg:%.2fs, max:%.2fs, min: %.2fs" % (method_name, stats.average() or -1.0 , stats.maximum or -1.0, stats.minimum or -1.0))
total += stats

print ''
print 'Time to Process'
print('')
print('Time to Process')
total = Stats()
for method_name, stats in state.time_to_process.items():
print "%s avg:%.2fs, max:%.2fs, min: %.2fs" % (method_name, stats.average() or -1.0, stats.maximum or -1.0, stats.minimum or -1.0)
print("%s avg:%.2fs, max:%.2fs, min: %.2fs" % (method_name, stats.average() or -1.0, stats.maximum or -1.0, stats.minimum or -1.0))
total += stats
print "Total: avg:%.2fs, max:%.2fs, min: %.2fs" % (total.average() or -1.0, total.maximum or -1.0, total.minimum or -1.0)
print("Total: avg:%.2fs, max:%.2fs, min: %.2fs" % (total.average() or -1.0, total.maximum or -1.0, total.minimum or -1.0))

print ''
print 'Event Totals'
print('')
print('Event Totals')
methods = set(state.task_event_sent.keys() + state.task_event_started.keys() +
state.task_event_succeeded.keys() + state.task_event_failed.keys())
for method_name in methods:
if method_name in state.task_event_sent:
print "%s[%s]: %d" % (method_name, 'waiting', state.task_event_sent[method_name])
print("%s[%s]: %d" % (method_name, 'waiting', state.task_event_sent[method_name]))
if method_name in state.task_event_started:
print "%s[%s]: %d" % (method_name, 'running', state.task_event_started[method_name])
print("%s[%s]: %d" % (method_name, 'running', state.task_event_started[method_name]))
if method_name in state.task_event_succeeded:
print "%s[%s]: %d" % (method_name, 'completed', state.task_event_succeeded[method_name])
print("%s[%s]: %d" % (method_name, 'completed', state.task_event_succeeded[method_name]))
if method_name in state.task_event_failed:
print "%s[%s]: %d" % (method_name, 'failed', state.task_event_failed[method_name])
print("%s[%s]: %d" % (method_name, 'failed', state.task_event_failed[method_name]))

num_waiting_by_task, num_running_by_task = state.num_waiting_running_by_task()
print ''
print 'Queue Sizes'
print 'Waiting Tasks: %d' % sum(num_waiting_by_task.values())
print 'Running Tasks: %d' % sum(num_running_by_task.values())
print('')
print('Queue Sizes')
print('Waiting Tasks: %d' % sum(num_waiting_by_task.values()))
print('Running Tasks: %d' % sum(num_running_by_task.values()))

print ''
print ''
print('')
print('')

6 changes: 3 additions & 3 deletions celery_cloudwatch/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ def freeze_while(self, fun, *args, **kwargs):
try:
return fun(*args, **kwargs)
except:
print "Exception in user code:"
print '-'*60
print("Exception in user code:")
print('-'*60)
traceback.print_exc(file=sys.stdout)
print '-'*60
print('-'*60)
finally:
if clear_after:
self._clear()
Expand Down
2 changes: 1 addition & 1 deletion celery_cloudwatch/task_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ def proxy_event(self, event_name, fn):
return fn
else:
def proxy_event_fn(event):
print '[{}] - {}'.format(event_name, pprint.pformat(event))
print('[{}] - {}'.format(event_name, pprint.pformat(event)))
return fn(event)
return proxy_event_fn
4 changes: 1 addition & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
needs_pytest = {'pytest', 'test', 'ptr'}.intersection(sys.argv)
pytest_runner = ['pytest-runner'] if needs_pytest else []

needs_setupext_pip = {'requirements'}.intersection(sys.argv)
setupext_pip = ['setupext-pip~=1.0.5'] if needs_setupext_pip else []

here = os.path.abspath(os.path.dirname(__file__))


Expand All @@ -37,6 +34,7 @@ def read_markdown(*file_paths):
except ImportError:
return ''


setup(
name='celery-cloudwatch',
version=find_version("VERSION"),
Expand Down