notifs (old fmn): retire
We are retiring this in favor of the new service. Signed-off-by: Kevin Fenzi <kevin@scrye.com>
This commit is contained in:
parent
3808d867de
commit
20dc948173
|
@ -293,26 +293,6 @@ noc01.iad2.fedoraproject.org
|
|||
[noc_rdu_cc]
|
||||
cloud-noc-os01.rdu-cc.fedoraproject.org
|
||||
|
||||
[notifs:children]
|
||||
notifs_backend
|
||||
notifs_web
|
||||
|
||||
[notifs_stg:children]
|
||||
notifs_backend_stg
|
||||
notifs_web_stg
|
||||
|
||||
[notifs_backend]
|
||||
notifs-backend01.iad2.fedoraproject.org
|
||||
|
||||
[notifs_backend_stg]
|
||||
notifs-backend01.stg.iad2.fedoraproject.org
|
||||
|
||||
[notifs_web]
|
||||
notifs-web01.iad2.fedoraproject.org
|
||||
|
||||
[notifs_web_stg]
|
||||
notifs-web01.stg.iad2.fedoraproject.org
|
||||
|
||||
[memcached]
|
||||
memcached01.iad2.fedoraproject.org
|
||||
|
||||
|
@ -681,8 +661,6 @@ memcached01.stg.iad2.fedoraproject.org
|
|||
mm-backend01.stg.iad2.fedoraproject.org
|
||||
mm-crawler01.stg.iad2.fedoraproject.org
|
||||
mm-frontend01.stg.iad2.fedoraproject.org
|
||||
notifs-backend01.stg.iad2.fedoraproject.org
|
||||
notifs-web01.stg.iad2.fedoraproject.org
|
||||
odcs-backend01.stg.iad2.fedoraproject.org
|
||||
odcs-frontend01.stg.iad2.fedoraproject.org
|
||||
os-control01.stg.iad2.fedoraproject.org
|
||||
|
@ -806,7 +784,6 @@ badges_backend
|
|||
busgateway
|
||||
fedimg
|
||||
mbs_backend
|
||||
notifs_backend
|
||||
pkgs
|
||||
|
||||
[fedmsg_hubs_stg:children]
|
||||
|
@ -814,7 +791,6 @@ badges_backend_stg
|
|||
busgateway_stg
|
||||
fedimg_stg
|
||||
mbs_backend_stg
|
||||
notifs_backend_stg
|
||||
pkgs_stg
|
||||
|
||||
[fedmsg_ircs:children]
|
||||
|
@ -853,10 +829,6 @@ fedmsg_gateways_stg
|
|||
[python34_fedmsg:children]
|
||||
mailman
|
||||
#mailman_stg
|
||||
notifs_backend
|
||||
notifs_web
|
||||
notifs_backend_stg
|
||||
notifs_web_stg
|
||||
|
||||
## END fedmsg services
|
||||
|
||||
|
@ -1159,7 +1131,6 @@ mailman01.iad2.fedoraproject.org
|
|||
people02.fedoraproject.org
|
||||
pagure02.fedoraproject.org
|
||||
pkgs01.iad2.fedoraproject.org
|
||||
notifs-web01.iad2.fedoraproject.org
|
||||
#wiki01.iad2.fedoraproject.org
|
||||
|
||||
[nfs_servers]
|
||||
|
@ -1227,7 +1198,6 @@ mbs
|
|||
memcached
|
||||
mm
|
||||
nagios_iad2
|
||||
notifs
|
||||
oci_registry
|
||||
odcs
|
||||
openqa
|
||||
|
|
3
main.yml
3
main.yml
|
@ -48,9 +48,6 @@
|
|||
- import_playbook: /srv/web/infra/ansible/playbooks/groups/mirrormanager.yml
|
||||
- import_playbook: /srv/web/infra/ansible/playbooks/groups/nfs-servers.yml
|
||||
- import_playbook: /srv/web/infra/ansible/playbooks/groups/noc.yml
|
||||
- import_playbook: /srv/web/infra/ansible/playbooks/groups/notifs-backend.yml
|
||||
- import_playbook: /srv/web/infra/ansible/playbooks/groups/notifs-web.yml
|
||||
- import_playbook: /srv/web/infra/ansible/playbooks/groups/nuancier.yml
|
||||
- import_playbook: /srv/web/infra/ansible/playbooks/groups/oci-registry.yml
|
||||
- import_playbook: /srv/web/infra/ansible/playbooks/groups/odcs.yml
|
||||
- import_playbook: /srv/web/infra/ansible/playbooks/groups/openqa-workers.yml
|
||||
|
|
|
@ -13,8 +13,6 @@
|
|||
- include_playbook: /srv/web/infra/ansible/playbooks/groups/gallery.yml
|
||||
- include_playbook: /srv/web/infra/ansible/playbooks/groups/koji-hub.yml
|
||||
- include_playbook: /srv/web/infra/ansible/playbooks/groups/mailman.yml
|
||||
- include_playbook: /srv/web/infra/ansible/playbooks/groups/notifs-backend.yml
|
||||
- include_playbook: /srv/web/infra/ansible/playbooks/groups/notifs-web.yml
|
||||
- include_playbook: /srv/web/infra/ansible/playbooks/groups/packages.yml
|
||||
- include_playbook: /srv/web/infra/ansible/playbooks/groups/releng.yml
|
||||
- include_playbook: /srv/web/infra/ansible/playbooks/groups/wiki.yml
|
||||
|
|
|
@ -1,81 +0,0 @@
|
|||
# create a new notifs-backend server
|
||||
# NOTE: should be used with --limit most of the time
|
||||
# NOTE: make sure there is room/space for this server on the vmhost
|
||||
# NOTE: most of these vars_path come from group_vars/mirrorlist or from hostvars
|
||||
|
||||
- import_playbook: "/srv/web/infra/ansible/playbooks/include/virt-create.yml"
|
||||
vars:
|
||||
myhosts: "notifs_backend:notifs_backend_stg"
|
||||
|
||||
- name: dole out the generic configuration
|
||||
hosts: notifs_backend:notifs_backend_stg
|
||||
user: root
|
||||
gather_facts: True
|
||||
|
||||
vars_files:
|
||||
- /srv/web/infra/ansible/vars/global.yml
|
||||
- "/srv/private/ansible/vars.yml"
|
||||
- /srv/web/infra/ansible/vars/{{ ansible_distribution }}.yml
|
||||
|
||||
pre_tasks:
|
||||
- import_tasks: "{{ tasks_path }}/yumrepos.yml"
|
||||
|
||||
roles:
|
||||
- base
|
||||
- rkhunter
|
||||
- hosts
|
||||
# The proxies don't actually need to talk to these hosts so we won't bother
|
||||
# putting them on the vpn.
|
||||
#- { role: openvpn/client,
|
||||
# when: env != "staging" }
|
||||
- ipa/client
|
||||
- nagios_client
|
||||
- {role: zabbix/zabbix_agent, when: env == "staging"}
|
||||
- collectd/base
|
||||
- fedmsg/base
|
||||
- role: keytab/service
|
||||
owner_user: fedmsg
|
||||
owner_group: fedmsg
|
||||
service: fedmsg-hub-3
|
||||
# Set up for fedora-messaging
|
||||
- role: rabbit/user
|
||||
username: "notifs-backend{{ env_suffix }}"
|
||||
sent_topics: ^org\.fedoraproject\.{{ env_short }}\.(fmn|logger)\..*
|
||||
- sudo
|
||||
|
||||
tasks:
|
||||
- import_tasks: "{{ tasks_path }}/motd.yml"
|
||||
|
||||
handlers:
|
||||
- import_tasks: "{{ handlers_path }}/restart_services.yml"
|
||||
|
||||
- name: dole out the service-specific config
|
||||
hosts: notifs_backend:notifs_backend_stg
|
||||
user: root
|
||||
gather_facts: True
|
||||
|
||||
pre_tasks:
|
||||
- name: tell nagios to shush w.r.t. the backend since it usually complains
|
||||
nagios: action=downtime minutes=25 service=host host={{ inventory_hostname_short }}{{ env_suffix }}
|
||||
delegate_to: noc01.iad2.fedoraproject.org
|
||||
ignore_errors: true
|
||||
tags:
|
||||
- fedmsgdconfig
|
||||
- notifs/backend
|
||||
|
||||
roles:
|
||||
- fedmsg/hub
|
||||
- rabbitmq
|
||||
- memcached
|
||||
- notifs/backend
|
||||
- role: collectd/fedmsg-service
|
||||
process: fedmsg-hub
|
||||
when: inventory_hostname.startswith('notifs-backend01.iad2')
|
||||
|
||||
vars_files:
|
||||
- /srv/web/infra/ansible/vars/global.yml
|
||||
- "/srv/private/ansible/vars.yml"
|
||||
- /srv/web/infra/ansible/vars/{{ ansible_distribution }}.yml
|
||||
|
||||
handlers:
|
||||
- import_tasks: "{{ handlers_path }}/restart_services.yml"
|
|
@ -1,47 +0,0 @@
|
|||
---
|
||||
# create a new notifs-web server
|
||||
# NOTE: should be used with --limit most of the time
|
||||
# NOTE: make sure there is room/space for this server on the vmhost
|
||||
# NOTE: most of these vars_path come from group_vars/notifs-web* or from hostvars
|
||||
|
||||
- import_playbook: "/srv/web/infra/ansible/playbooks/include/virt-create.yml"
|
||||
vars:
|
||||
myhosts: "notifs_backend:notifs_backend_stg"
|
||||
|
||||
- name: make the box be real
|
||||
hosts: notifs_web:notifs_web_stg
|
||||
user: root
|
||||
gather_facts: True
|
||||
|
||||
vars_files:
|
||||
- /srv/web/infra/ansible/vars/global.yml
|
||||
- "/srv/private/ansible/vars.yml"
|
||||
- /srv/web/infra/ansible/vars/{{ ansible_distribution }}.yml
|
||||
|
||||
roles:
|
||||
- base
|
||||
- rkhunter
|
||||
- nagios_client
|
||||
- {role: zabbix/zabbix_agent, when: env == "staging"}
|
||||
- hosts
|
||||
- {role: openvpn/client,
|
||||
when: env != "staging"}
|
||||
- ipa/client
|
||||
- collectd/base
|
||||
- mod_wsgi
|
||||
- role: fedmsg/base
|
||||
# Set up for fedora-messaging
|
||||
- role: rabbit/user
|
||||
username: "notifs-web{{ env_suffix }}"
|
||||
sent_topics: ^org\.fedoraproject\.{{ env_short }}\.(fmn|logger)\..*
|
||||
- notifs/frontend
|
||||
- sudo
|
||||
|
||||
pre_tasks:
|
||||
- import_tasks: "{{ tasks_path }}/yumrepos.yml"
|
||||
|
||||
tasks:
|
||||
- import_tasks: "{{ tasks_path }}/motd.yml"
|
||||
|
||||
handlers:
|
||||
- import_tasks: "{{ handlers_path }}/restart_services.yml"
|
|
@ -15,9 +15,6 @@
|
|||
- moksha-hubs
|
||||
- moksha-hubs-stg
|
||||
|
||||
- notifs-web
|
||||
- notifs-web-stg
|
||||
|
||||
- datagrepper
|
||||
- datagrepper-stg
|
||||
|
||||
|
@ -53,7 +50,7 @@
|
|||
|
||||
# Also restart the frontend web services
|
||||
- name: bounce apache
|
||||
hosts: notifs_web:notifs_web_stg:datagrepper:datagrepper_stg
|
||||
hosts: datagrepper:datagrepper_stg
|
||||
user: root
|
||||
vars_files:
|
||||
- /srv/web/infra/ansible/vars/global.yml
|
||||
|
|
|
@ -1,14 +0,0 @@
|
|||
{% if env == 'staging' %}
|
||||
suffix = 'stg.iad2.fedoraproject.org'
|
||||
{% else %}
|
||||
suffix = 'iad2.fedoraproject.org'
|
||||
{% endif %}
|
||||
|
||||
config = dict(
|
||||
endpoints={
|
||||
"fmn.notifs-backend01": [
|
||||
"tcp://notifs-backend01.%s:30%0.2i" % (suffix, i)
|
||||
for i in range(6)
|
||||
],
|
||||
},
|
||||
)
|
|
@ -1,28 +0,0 @@
|
|||
{% if datacenter == 'iad2' %}
|
||||
{% if env == 'staging' %}
|
||||
suffix = 'stg.iad2.fedoraproject.org'
|
||||
{% else %}
|
||||
suffix = 'iad2.fedoraproject.org'
|
||||
vpn_suffix = 'vpn.fedoraproject.org'
|
||||
{% endif %}
|
||||
{% else %}
|
||||
{% if env == 'staging' %}
|
||||
suffix = 'stg.fedoraproject.org'
|
||||
{% else %}
|
||||
suffix = 'fedoraproject.org'
|
||||
vpn_suffix = 'vpn.fedoraproject.org'
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
|
||||
config = dict(
|
||||
endpoints={
|
||||
"fmn.notifs-web01": [
|
||||
"tcp://notifs-web01.%s:30%0.2i" % (suffix, i)
|
||||
for i in range(16)
|
||||
],
|
||||
"fmn.notifs-web02": [
|
||||
"tcp://notifs-web02.%s:30%0.2i" % (suffix, i)
|
||||
for i in range(16)
|
||||
],
|
||||
},
|
||||
)
|
|
@ -184,18 +184,6 @@ backend badges-backend
|
|||
server badges-web01 badges-web01:80 check inter 10s rise 1 fall 2
|
||||
option httpchk GET /heartbeat
|
||||
|
||||
frontend notifs-web-frontend
|
||||
bind 0.0.0.0:10036
|
||||
default_backend notifs-web-backend
|
||||
|
||||
backend notifs-web-backend
|
||||
balance hdr(appserver)
|
||||
server notifs-web01 notifs-web01:80 check inter 10s rise 1 fall 2
|
||||
{% if env == "production" %}
|
||||
# server notifs-web02 notifs-web02:80 check inter 10s rise 1 fall 2
|
||||
{% endif %}
|
||||
option httpchk GET /notifications-old/_heartbeat
|
||||
|
||||
frontend github2fedmsg-frontend
|
||||
bind 0.0.0.0:10037
|
||||
default_backend github2fedmsg-backend
|
||||
|
|
|
@ -287,13 +287,6 @@ define service {
|
|||
use defaulttemplate
|
||||
}
|
||||
|
||||
define service {
|
||||
host_name notifs-backend01.iad2.fedoraproject.org
|
||||
service_description Check fedmsg-hub consumers exceptions
|
||||
check_command check_by_nrpe!check_fedmsg_cexceptions_notifs_backend
|
||||
use defaulttemplate
|
||||
}
|
||||
|
||||
#define service {
|
||||
# host_name bugzilla2fedmsg01.iad2.fedoraproject.org
|
||||
# service_description Check fedmsg-hub consumers exceptions
|
||||
|
@ -347,13 +340,6 @@ define service {
|
|||
use defaulttemplate
|
||||
}
|
||||
|
||||
define service {
|
||||
host_name notifs-backend01.iad2.fedoraproject.org
|
||||
service_description Check fedmsg-hub consumers backlog
|
||||
check_command check_by_nrpe!check_fedmsg_cbacklog_notifs_backend
|
||||
use defaulttemplate
|
||||
}
|
||||
|
||||
#define service {
|
||||
# host_name bugzilla2fedmsg01.iad2.fedoraproject.org
|
||||
# service_description Check fedmsg-hub consumers backlog
|
||||
|
|
|
@ -1,20 +0,0 @@
|
|||
define service {
|
||||
host_name notifs-backend01.iad2.fedoraproject.org
|
||||
service_description Check backend irc queue size
|
||||
check_command check_by_nrpe!check_fmn_backend_irc_queue
|
||||
use defaulttemplate
|
||||
}
|
||||
|
||||
define service {
|
||||
host_name notifs-backend01.iad2.fedoraproject.org
|
||||
service_description Check backend email queue size
|
||||
check_command check_by_nrpe!check_fmn_backend_email_queue
|
||||
use defaulttemplate
|
||||
}
|
||||
|
||||
define service {
|
||||
host_name notifs-backend01.iad2.fedoraproject.org
|
||||
service_description Check worker queue size
|
||||
check_command check_by_nrpe!check_fmn_worker_queue
|
||||
use defaulttemplate
|
||||
}
|
|
@ -1,71 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
""" fmn-create-account USER
|
||||
|
||||
Create a new FMN account for an existing FAS user.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
|
||||
import fedmsg
|
||||
import fedmsg.config
|
||||
|
||||
import fmn.lib
|
||||
import fmn.lib.models
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(__doc__)
|
||||
parser.add_argument('user', help='FAS username to disable.')
|
||||
parser.add_argument('--create-defaults', default=False, action='store_true',
|
||||
help='If specified, create the'
|
||||
'default preferences. Otherwise, no preferences are'
|
||||
'set.')
|
||||
parser.add_argument('--staging', default=False, action='store_true',
|
||||
help='Create the user with a staging openid.')
|
||||
parser.add_argument('--email', default=None,
|
||||
help='Email address to set for the account.')
|
||||
parser.add_argument('--ircnick', default=None,
|
||||
help='IRC nick to set for the account.')
|
||||
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def create(session, user, create_defaults, detail_values, staging):
|
||||
if staging:
|
||||
openid = '%s.id.stg.fedoraproject.org' % user
|
||||
else:
|
||||
openid = '%s.id.fedoraproject.org' % user
|
||||
|
||||
openid_url = 'http://%s/' % openid
|
||||
user = fmn.lib.models.User.get_or_create(
|
||||
session,
|
||||
openid=openid,
|
||||
openid_url=openid_url,
|
||||
create_defaults=create_defaults,
|
||||
detail_values=detail_values,
|
||||
)
|
||||
session.commit()
|
||||
print user
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = parse_args()
|
||||
|
||||
config = fedmsg.config.load_config()
|
||||
config.update({
|
||||
'active': True,
|
||||
'name': 'relay_inbound',
|
||||
'cert_prefix': 'fmn',
|
||||
})
|
||||
fedmsg.init(**config)
|
||||
|
||||
details = {}
|
||||
if args.email:
|
||||
details['email'] = args.email
|
||||
if args.ircnick:
|
||||
details['irc nick'] = args.ircnick
|
||||
|
||||
session = fmn.lib.models.init(config['fmn.sqlalchemy.uri'])
|
||||
|
||||
create(session, args.user, args.create_defaults, details, args.staging)
|
|
@ -1,47 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
""" fmn-disable-account USER
|
||||
|
||||
Disables the FMN account for a user.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
|
||||
import fedmsg
|
||||
import fedmsg.config
|
||||
|
||||
import fmn.lib
|
||||
import fmn.lib.models
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(__doc__)
|
||||
parser.add_argument('user', help='FAS username to disable.')
|
||||
parser.add_argument('--context', nargs='+', default=['irc', 'email'],
|
||||
help="Contexts to disable. Defaults to all.")
|
||||
return parser.parse_args()
|
||||
|
||||
def disable(session, user, contexts):
|
||||
openid = '%s.id.fedoraproject.org' % user
|
||||
for context in contexts:
|
||||
pref = fmn.lib.models.Preference.load(session, openid, context)
|
||||
if pref:
|
||||
print "Disabling %r for %r" % (context, openid)
|
||||
pref.set_enabled(session, False)
|
||||
else:
|
||||
print "No context %r found for %r" % (context, openid)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = parse_args()
|
||||
|
||||
config = fedmsg.config.load_config()
|
||||
config.update({
|
||||
'active': True,
|
||||
'name': 'relay_inbound',
|
||||
'cert_prefix': 'fmn',
|
||||
})
|
||||
fedmsg.init(**config)
|
||||
|
||||
session = fmn.lib.models.init(config['fmn.sqlalchemy.uri'])
|
||||
|
||||
disable(session, args.user, args.context)
|
|
@ -1,252 +0,0 @@
|
|||
# This file is part of Moksha.
|
||||
# Copyright (C) 2008-2014 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
:mod:`moksha.hub.api.consumer` - The Moksha Consumer API
|
||||
========================================================
|
||||
Moksha provides a simple API for creating "consumers" of message topics.
|
||||
|
||||
This means that your consumer is instantiated when the MokshaHub is initially
|
||||
loaded, and receives each message for the specified topic through the
|
||||
:meth:`Consumer.consume` method.
|
||||
|
||||
.. moduleauthor:: Luke Macken <lmacken@redhat.com>
|
||||
.. moduleauthor:: Ralph Bean <rbean@redhat.com>
|
||||
"""
|
||||
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
import logging
|
||||
log = logging.getLogger('moksha.hub')
|
||||
|
||||
import six.moves.queue as queue
|
||||
from collections import deque
|
||||
|
||||
from kitchen.iterutils import iterate
|
||||
from moksha.common.lib.helpers import create_app_engine
|
||||
from moksha.common.lib.converters import asbool
|
||||
import moksha.hub.reactor
|
||||
|
||||
|
||||
class Consumer(object):
|
||||
""" A message consumer """
|
||||
topic = ''
|
||||
|
||||
# Automatically decode JSON data
|
||||
jsonify = True
|
||||
|
||||
# Internal use only
|
||||
_initialized = False
|
||||
_exception_count = 0
|
||||
|
||||
def __init__(self, hub):
|
||||
self.hub = hub
|
||||
self.log = log
|
||||
|
||||
# Set up a queue to communicate between the main twisted thread
|
||||
# receiving raw messages, and a worker thread that pulls items off
|
||||
# the queue to do "consume" work.
|
||||
self.incoming = queue.Queue()
|
||||
self.headcount_in = self.headcount_out = 0
|
||||
self._times = deque(maxlen=1024)
|
||||
|
||||
callback = self._consume
|
||||
if self.jsonify:
|
||||
callback = self._consume_json
|
||||
|
||||
for topic in iterate(self.topic):
|
||||
log.debug('Subscribing to consumer topic %s' % topic)
|
||||
self.hub.subscribe(topic.decode("utf-8"), callback)
|
||||
|
||||
# If the consumer specifies an 'app', then setup `self.engine` to
|
||||
# be a SQLAlchemy engine, along with a configured DBSession
|
||||
app = getattr(self, 'app', None)
|
||||
self.engine = self.DBSession = None
|
||||
if app:
|
||||
log.debug("Setting up individual engine for consumer")
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
self.engine = create_app_engine(app, hub.config)
|
||||
self.DBSession = sessionmaker(bind=self.engine)()
|
||||
|
||||
self.blocking_mode = asbool(self.hub.config.get('moksha.blocking_mode', False))
|
||||
if self.blocking_mode:
|
||||
log.info("Blocking mode true for %r. "
|
||||
"Messages handled as they arrive." % self)
|
||||
else:
|
||||
self.N = int(self.hub.config.get('moksha.workers_per_consumer', 1))
|
||||
log.info("Blocking mode false for %r. "
|
||||
"Messages to be queued and distributed to %r threads." % (
|
||||
self, self.N))
|
||||
for i in range(self.N):
|
||||
moksha.hub.reactor.reactor.callInThread(self._work_loop)
|
||||
|
||||
self._initialized = True
|
||||
|
||||
def __json__(self):
|
||||
if self._initialized:
|
||||
backlog = self.incoming.qsize()
|
||||
headcount_out = self.headcount_out
|
||||
headcount_in = self.headcount_in
|
||||
times = list(self._times)
|
||||
else:
|
||||
backlog = None
|
||||
headcount_out = headcount_in = 0
|
||||
times = []
|
||||
|
||||
results = {
|
||||
"name": type(self).__name__,
|
||||
"module": type(self).__module__,
|
||||
"topic": self.topic,
|
||||
"initialized": self._initialized,
|
||||
"exceptions": self._exception_count,
|
||||
"jsonify": self.jsonify,
|
||||
"backlog": backlog,
|
||||
"headcount_out": headcount_out,
|
||||
"headcount_in": headcount_in,
|
||||
"times": times,
|
||||
}
|
||||
# Reset these counters before returning.
|
||||
self.headcount_out = self.headcount_in = 0
|
||||
self._exception_count = 0
|
||||
self._times.clear()
|
||||
return results
|
||||
|
||||
def debug(self, message):
|
||||
idx = threading.current_thread().ident
|
||||
log.debug("%r thread %r | %s" % (type(self).__name__, idx, message))
|
||||
|
||||
def _consume_json(self, message):
|
||||
""" Convert our AMQP messages into a consistent dictionary format.
|
||||
|
||||
This method exists because our STOMP & AMQP message brokers consume
|
||||
messages in different formats. This causes our messaging abstraction
|
||||
to leak into the consumers themselves.
|
||||
|
||||
:Note: We do not pass the message headers to the consumer (in this AMQP consumer)
|
||||
because the current AMQP.js bindings do not allow the client to change them.
|
||||
Thus, we need to throw any topic/queue details into the JSON body itself.
|
||||
"""
|
||||
try:
|
||||
body = json.loads(message.body)
|
||||
except:
|
||||
log.debug("Unable to decode message body to JSON: %r" % message.body)
|
||||
body = message.body
|
||||
topic = None
|
||||
|
||||
# Try some stuff for AMQP:
|
||||
try:
|
||||
topic = message.headers[0].routing_key
|
||||
except TypeError:
|
||||
# We didn't get a JSON dictionary
|
||||
pass
|
||||
except AttributeError:
|
||||
# We didn't get headers or a routing key?
|
||||
pass
|
||||
|
||||
# If that didn't work, it might be zeromq
|
||||
if not topic:
|
||||
try:
|
||||
topic = message.topic
|
||||
except AttributeError:
|
||||
# Weird. I have no idea...
|
||||
pass
|
||||
|
||||
message_as_dict = {'body': body, 'topic': topic}
|
||||
return self._consume(message_as_dict)
|
||||
|
||||
def _consume(self, message):
|
||||
self.headcount_in += 1
|
||||
if self.blocking_mode:
|
||||
# Do the work right now
|
||||
return self._do_work(message)
|
||||
else:
|
||||
# Otherwise, put the message in a queue for other threads to handle
|
||||
self.incoming.put(message)
|
||||
|
||||
def _work_loop(self):
|
||||
while True:
|
||||
# This is a blocking call. It waits until a message is available.
|
||||
message = self.incoming.get()
|
||||
# Then we are being asked to quit
|
||||
if message is StopIteration:
|
||||
break
|
||||
self._do_work(message)
|
||||
self.debug("Worker thread exiting.")
|
||||
|
||||
def _do_work(self, message):
|
||||
self.headcount_out += 1
|
||||
start = time.time()
|
||||
handled = True
|
||||
|
||||
self.debug("Worker thread picking a message.")
|
||||
try:
|
||||
self.validate(message)
|
||||
except Exception as e:
|
||||
log.warning("Received invalid message %r" % e)
|
||||
return False # Not handled
|
||||
|
||||
try:
|
||||
self.pre_consume(message)
|
||||
except Exception as e:
|
||||
self.log.exception(message)
|
||||
|
||||
try:
|
||||
self.consume(message)
|
||||
except Exception as e:
|
||||
handled = False # Not handled. Return this later.
|
||||
self.log.exception(message)
|
||||
# Keep track of how many exceptions we've hit in a row
|
||||
self._exception_count += 1
|
||||
|
||||
try:
|
||||
self.post_consume(message)
|
||||
except Exception as e:
|
||||
self.log.exception(message)
|
||||
|
||||
# Record how long it took to process this message (for stats)
|
||||
self._times.append(time.time() - start)
|
||||
|
||||
self.debug("Going back to waiting on the incoming queue. Message handled: %r" % handled)
|
||||
return handled
|
||||
|
||||
def validate(self, message):
|
||||
""" Override to implement your own validation scheme. """
|
||||
pass
|
||||
|
||||
def pre_consume(self, message):
|
||||
pass
|
||||
|
||||
def consume(self, message):
|
||||
raise NotImplementedError
|
||||
|
||||
def post_consume(self, message):
|
||||
pass
|
||||
|
||||
def send_message(self, topic, message):
|
||||
try:
|
||||
self.hub.send_message(topic, message)
|
||||
except Exception as e:
|
||||
log.error('Cannot send message: %s' % e)
|
||||
|
||||
def stop(self):
|
||||
for i in range(getattr(self, 'N', 0)):
|
||||
self.incoming.put(StopIteration)
|
||||
|
||||
if hasattr(self, 'hub'):
|
||||
self.hub.close()
|
||||
|
||||
if getattr(self, 'DBSession', None):
|
||||
self.DBSession.close()
|
|
@ -1,210 +0,0 @@
|
|||
"""
|
||||
This is a `fedmsg consumer`_ that subscribes to every topic on the message bus
|
||||
it is connected to. It has two tasks. The first is to place all incoming
|
||||
messages into a RabbitMQ message queue. The second is to manage the FMN caches.
|
||||
|
||||
FMN makes heavy use of caches since it needs to know who owns what packages and
|
||||
what user notification preferences are, both of which require expensive API
|
||||
queries to `FAS`_, `pkgdb`_, or the database.
|
||||
|
||||
.. _fedmsg consumer: http://www.fedmsg.com/en/latest/consuming/#the-hub-consumer-approach
|
||||
.. _FAS: https://accounts.fedoraproject.org/
|
||||
.. _pkgdb: https://admin.fedoraproject.org/pkgdb/
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
import fedmsg.consumers
|
||||
import kombu
|
||||
|
||||
import fmn.lib
|
||||
import fmn.rules.utils
|
||||
from fmn import config
|
||||
from fmn.celery import RELOAD_CACHE_EXCHANGE_NAME
|
||||
from .util import (
|
||||
new_packager,
|
||||
new_badges_user,
|
||||
get_fas_email,
|
||||
get_fasjson_email
|
||||
)
|
||||
from fmn.tasks import find_recipients, REFRESH_CACHE_TOPIC, heat_fas_cache
|
||||
|
||||
|
||||
log = logging.getLogger("fmn")
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FMNConsumer(fedmsg.consumers.FedmsgConsumer):
|
||||
"""
|
||||
A `fedmsg consumer`_ that subscribes to all topics and re-publishes all
|
||||
messages to the ``workers`` exchange.
|
||||
|
||||
Attributes:
|
||||
topic (str): The topics this consumer is subscribed to. Set to ``*``
|
||||
(all topics).
|
||||
config_key (str): The key to set to ``True`` in the fedmsg config to
|
||||
enable this consumer. The key is ``fmn.consumer.enabled``.
|
||||
"""
|
||||
config_key = 'fmn.consumer.enabled'
|
||||
|
||||
def __init__(self, hub, *args, **kwargs):
|
||||
self.topic = config.app_conf['fmn.topics']
|
||||
|
||||
_log.info("FMNConsumer initializing")
|
||||
super(FMNConsumer, self).__init__(hub, *args, **kwargs)
|
||||
|
||||
self.uri = config.app_conf['fmn.sqlalchemy.uri']
|
||||
self.autocreate = config.app_conf['fmn.autocreate']
|
||||
self.junk_suffixes = config.app_conf['fmn.junk_suffixes']
|
||||
self.ignored_copr_owners = config.app_conf['ignored_copr_owners']
|
||||
|
||||
heat_fas_cache.apply_async()
|
||||
|
||||
_log.info("Loading rules from fmn.rules")
|
||||
self.valid_paths = fmn.lib.load_rules(root="fmn.rules")
|
||||
|
||||
session = self.make_session()
|
||||
session.close()
|
||||
|
||||
_log.info("FMNConsumer initialized")
|
||||
|
||||
def make_session(self):
|
||||
"""
|
||||
Initialize the database session and return it.
|
||||
|
||||
Returns:
|
||||
sqlalchemy.orm.scoping.scoped_session: An SQLAlchemy scoped session.
|
||||
Calling it returns the current Session, creating it using the
|
||||
scoped_session.session_factory if not present.
|
||||
"""
|
||||
return fmn.lib.models.init(self.uri)
|
||||
|
||||
def consume(self, raw_msg):
|
||||
"""
|
||||
This method is called when a message arrives on the fedmsg bus.
|
||||
|
||||
Args:
|
||||
raw_msg (dict): The raw fedmsg deserialized to a Python dictionary.
|
||||
"""
|
||||
session = self.make_session()
|
||||
try:
|
||||
self.work(session, raw_msg)
|
||||
session.commit() # transaction is committed here
|
||||
except:
|
||||
session.rollback() # rolls back the transaction
|
||||
raise
|
||||
|
||||
def work(self, session, raw_msg):
|
||||
"""
|
||||
This method is called when a message arrives on the fedmsg bus by the
|
||||
:meth:`.consume` method.
|
||||
|
||||
Args:
|
||||
session (sqlalchemy.orm.session.Session): The SQLAlchemy session to use.
|
||||
raw_msg (dict): The raw fedmsg deserialized to a Python dictionary.
|
||||
"""
|
||||
topic, msg = raw_msg['topic'], raw_msg['body']
|
||||
|
||||
for suffix in self.junk_suffixes:
|
||||
if topic.endswith(suffix):
|
||||
log.debug("Dropping %r", topic)
|
||||
return
|
||||
|
||||
# Ignore high-usage COPRs
|
||||
if topic.startswith('org.fedoraproject.prod.copr.') and \
|
||||
msg['msg'].get('owner') in self.ignored_copr_owners:
|
||||
log.debug('Dropping COPR %r by %r' % (topic, msg['msg']['owner']))
|
||||
return
|
||||
|
||||
_log.info("FMNConsumer received %s %s", msg['msg_id'], msg['topic'])
|
||||
|
||||
# First, do some cache management. This can be confusing because there
|
||||
# are two different caches, with two different mechanisms, storing two
|
||||
# different kinds of data. The first is a simple python dict that
|
||||
# contains the 'preferences' from the fmn database. The second is a
|
||||
# dogpile.cache (potentially stored in memcached, but configurable from
|
||||
# /etc/fedmsg.d/). The dogpile.cache cache stores pkgdb2
|
||||
# package-ownership relations. Both caches are held for a very long
|
||||
# time and update themselves dynamically here.
|
||||
|
||||
if '.fmn.' in topic:
|
||||
openid = msg['msg']['openid']
|
||||
_log.info('Broadcasting message to Celery workers to update cache for %s', openid)
|
||||
find_recipients.apply_async(
|
||||
({'topic': 'fmn.internal.refresh_cache', 'body': openid},),
|
||||
exchange=RELOAD_CACHE_EXCHANGE_NAME,
|
||||
routing_key=config.app_conf['celery']['task_default_queue'],
|
||||
)
|
||||
|
||||
# If a user has tweaked something in the pkgdb2 db, then invalidate our
|
||||
# dogpile cache.. but only the parts that have something to do with any
|
||||
# one of the users involved in the pkgdb2 interaction. Note that a
|
||||
# 'username' here could be an actual username, or a group name like
|
||||
# 'group::infra-sig'.
|
||||
if '.pkgdb.' in topic:
|
||||
usernames = fedmsg.meta.msg2usernames(msg, **config.app_conf)
|
||||
for username in usernames:
|
||||
log.info("Invalidating pkgdb2 dogpile cache for %r" % username)
|
||||
target = fmn.rules.utils.get_packages_of_user
|
||||
fmn.rules.utils.invalidate_cache_for(
|
||||
config.app_conf, target, username)
|
||||
|
||||
# Create a local account with all the default rules if a user is
|
||||
# identified by one of our 'selectors'. Here we can add all kinds of
|
||||
# new triggers that should create new FMN accounts. At this point in
|
||||
# time we only create new accounts if 1) a new user is added to the
|
||||
# packager group or 2) someone logs into badges.fp.o for the first
|
||||
# time.
|
||||
if self.autocreate:
|
||||
selectors = [new_packager, new_badges_user]
|
||||
candidates = [fn(topic, msg) for fn in selectors]
|
||||
for username in candidates:
|
||||
if not username:
|
||||
continue
|
||||
log.info("Autocreating account for %r" % username)
|
||||
openid = '%s.id.fedoraproject.org' % username
|
||||
openid_url = 'https://%s.id.fedoraproject.org' % username
|
||||
fasjson = config.app_conf.get("fasjson", {}).get("active")
|
||||
if fasjson:
|
||||
email = get_fasjson_email(config.app_conf, username)
|
||||
else:
|
||||
email = get_fas_email(config.app_conf, username)
|
||||
user = fmn.lib.models.User.get_or_create(
|
||||
session, openid=openid, openid_url=openid_url,
|
||||
create_defaults=True, detail_values=dict(email=email),
|
||||
)
|
||||
session.add(user)
|
||||
session.commit()
|
||||
_log.info('Broadcasting message to Celery workers to update cache for %s', openid)
|
||||
find_recipients.apply_async(
|
||||
({'topic': REFRESH_CACHE_TOPIC, 'body': openid},),
|
||||
exchange=RELOAD_CACHE_EXCHANGE_NAME,
|
||||
)
|
||||
|
||||
# Do the same dogpile.cache invalidation trick that we did above, but
|
||||
# here do it for fas group membership changes. (This is important
|
||||
# because someone could be in a group like the infra-sig which itself
|
||||
# has package-ownership relations in pkgdb. If membership in that
|
||||
# group changes we need to sync fas relationships to catch up and route
|
||||
# messages to the new group members).
|
||||
if '.fas.group.' in topic:
|
||||
usernames = fedmsg.meta.msg2usernames(msg, **config.app_conf)
|
||||
for username in usernames:
|
||||
log.info("Invalidating fas cache for %r" % username)
|
||||
target = fmn.rules.utils.get_groups_of_user
|
||||
fmn.rules.utils.invalidate_cache_for(config.app_conf, target, username)
|
||||
|
||||
# Finding recipients is computationally quite expensive so it's handled
|
||||
# by Celery worker processes. The results are then dropped into an AMQP
|
||||
# queue and processed by the backends.
|
||||
try:
|
||||
find_recipients.apply_async((raw_msg,))
|
||||
except kombu.exceptions.OperationalError:
|
||||
_log.exception('Dispatching task to find recipients failed')
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Gracefully halt this fedmsg consumer.
|
||||
"""
|
||||
log.info("Cleaning up FMNConsumer.")
|
||||
super(FMNConsumer, self).stop()
|
|
@ -1,64 +0,0 @@
|
|||
import logging
|
||||
import os
|
||||
|
||||
import requests
|
||||
import requests.exceptions
|
||||
from gssapi import Credentials, exceptions
|
||||
from requests.compat import urlencode, urljoin
|
||||
from requests_gssapi import HTTPSPNEGOAuth
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Client(object):
|
||||
"""
|
||||
A fasjson client to make very specific requests to fasjson.
|
||||
Necessary because the official fasjson-client library does not support
|
||||
python2.
|
||||
"""
|
||||
def __init__(self, url, principal=None):
|
||||
self.url = url.rstrip("/") + "/v1/"
|
||||
self.principal = principal
|
||||
os.environ["KRB5_CLIENT_KTNAME"] = "/etc/krb5.keytab"
|
||||
try:
|
||||
creds = Credentials(usage="initiate")
|
||||
except exceptions.GSSError as e:
|
||||
log.error("GSError. Unable to create credentials store.", e)
|
||||
gssapi_auth = HTTPSPNEGOAuth(opportunistic_auth=True, creds=creds)
|
||||
self.session = requests.Session()
|
||||
self.session.auth = gssapi_auth
|
||||
|
||||
def search(self, email):
|
||||
"""
|
||||
A very limited search built to only serve fmn's requirement of
|
||||
finding a user based on an email.
|
||||
"""
|
||||
# email must be an exact match in fasjson, so we will either have
|
||||
# 1 result or empty result
|
||||
search_string = "search/users" + "?" + urlencode({"email": email})
|
||||
endpoint = urljoin(self.url, search_string)
|
||||
|
||||
return self.session.get(endpoint).json()
|
||||
|
||||
def get_user(self, username):
|
||||
"""
|
||||
Get a specific user based on their username
|
||||
"""
|
||||
url_string = "users/" + username + "/"
|
||||
endpoint = urljoin(self.url, url_string)
|
||||
|
||||
return self.session.get(endpoint).json()
|
||||
|
||||
def list_all_entities(self, ent_name):
|
||||
"""
|
||||
Return all entities of a certain type. In fmn's case it is users.
|
||||
"""
|
||||
endpoint = urljoin(self.url, ent_name + "/")
|
||||
|
||||
next_page_url = endpoint + "?" + urlencode({"page_number": 1})
|
||||
while next_page_url:
|
||||
res = self.session.get(next_page_url).json()
|
||||
for item in res["result"]:
|
||||
yield item
|
||||
next_page_url = res.get("page", {}).get("next_page")
|
|
@ -1,198 +0,0 @@
|
|||
from __future__ import print_function
|
||||
|
||||
import logging
|
||||
import socket
|
||||
import string
|
||||
import requests
|
||||
|
||||
import fedmsg
|
||||
import fedmsg.meta
|
||||
import fedora.client
|
||||
import fedora.client.fas2
|
||||
from dogpile.cache import make_region
|
||||
|
||||
from fmn import config
|
||||
from .fasjson_client import Client
|
||||
|
||||
fedmsg.meta.make_processors(**config.app_conf)
|
||||
|
||||
_cache = make_region(
|
||||
key_mangler=lambda key: "fmn.consumer:dogpile:" + key
|
||||
).configure(**config.app_conf['fmn.rules.cache'].copy())
|
||||
|
||||
log = logging.getLogger("moksha.hub")
|
||||
|
||||
default_url = 'https://accounts.fedoraproject.org/'
|
||||
creds = config.app_conf['fas_credentials']
|
||||
|
||||
fasjson = config.app_conf['fasjson']
|
||||
if fasjson.get('active'):
|
||||
client = Client(url=fasjson.get('url', default_url))
|
||||
else:
|
||||
client = fedora.client.fas2.AccountSystem(
|
||||
base_url=creds.get('base_url', default_url),
|
||||
username=creds['username'],
|
||||
password=creds['password'],
|
||||
)
|
||||
|
||||
|
||||
def make_fasjson_cache(**config):
|
||||
log.warning("Building the FASJSON cache into redis.")
|
||||
if _cache.get('fas_cache_built'):
|
||||
log.warning("FASJSON cache already built into redis.")
|
||||
return
|
||||
global client
|
||||
try:
|
||||
_add_to_cache(list(client.list_all_entities("users")))
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error("Something went wrong building cache with error: %s" % e)
|
||||
return
|
||||
|
||||
_cache.set('fas_cache_built', True)
|
||||
|
||||
|
||||
def make_fas_cache(**config):
|
||||
log.warning("Building the FAS cache into redis.")
|
||||
if _cache.get('fas_cache_built'):
|
||||
log.warning("FAS cache already built into redis.")
|
||||
return
|
||||
|
||||
global client
|
||||
timeout = socket.getdefaulttimeout()
|
||||
for key in string.ascii_lowercase:
|
||||
socket.setdefaulttimeout(600)
|
||||
try:
|
||||
log.info("Downloading FAS cache for %s*" % key)
|
||||
request = client.send_request(
|
||||
'/user/list',
|
||||
req_params={
|
||||
'search': '%s*' % key,
|
||||
'status': 'active'
|
||||
},
|
||||
auth=True)
|
||||
except fedora.client.ServerError as e:
|
||||
log.warning("Failed to download fas cache for %s %r" % (key, e))
|
||||
return {}
|
||||
finally:
|
||||
socket.setdefaulttimeout(timeout)
|
||||
|
||||
log.info("Caching necessary user data")
|
||||
for user in request['people']:
|
||||
nick = user['ircnick']
|
||||
if nick:
|
||||
_cache.set(str(nick), user['username'])
|
||||
|
||||
email = user['email']
|
||||
if email:
|
||||
_cache.set(str(email), user['username'])
|
||||
|
||||
del request
|
||||
|
||||
_cache.set('fas_cache_built', True)
|
||||
|
||||
|
||||
def _add_to_cache(users):
|
||||
for user in users:
|
||||
nicks = user.get('ircnicks', [])
|
||||
for nick in nicks:
|
||||
_cache.set(nick, user['username'])
|
||||
|
||||
emails = user.get('emails', [])
|
||||
for email in emails:
|
||||
_cache.set(email, user['username'])
|
||||
|
||||
|
||||
def update_nick(username):
|
||||
global client
|
||||
if config.app_conf.get('fasjson'):
|
||||
try:
|
||||
log.info("Downloading FASJSON cache for %s*" % username)
|
||||
response = client.get_user(username=username)
|
||||
_add_to_cache([response["result"]])
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error("Something went wrong updating the cache with error: %s" % e)
|
||||
else:
|
||||
try:
|
||||
log.info("Downloading FAS cache for %s*" % username)
|
||||
request = client.send_request(
|
||||
'/user/list',
|
||||
req_params={'search': '%s' % username},
|
||||
auth=True)
|
||||
except fedora.client.ServerError as e:
|
||||
log.warning(
|
||||
"Failed to download fas cache for %s: %r" % (username, e))
|
||||
return {}
|
||||
|
||||
log.info("Caching necessary data for %s" % username)
|
||||
for user in request['people']:
|
||||
nick = user['ircnick']
|
||||
if nick:
|
||||
_cache.set(nick, user['username'])
|
||||
|
||||
email = user['email']
|
||||
if email:
|
||||
_cache.set(email, user['username'])
|
||||
else:
|
||||
# If we couldn't find the nick in FAS, save it in the _cache as nick
|
||||
# so that we avoid calling FAS for every single filter we have to
|
||||
# run through
|
||||
_cache.set(username, username)
|
||||
|
||||
|
||||
def update_email(email):
|
||||
global client
|
||||
if config.app_conf.get('fasjson'):
|
||||
try:
|
||||
log.info("Downloading FASJSON cache for %s*" % email)
|
||||
response = client.search(email=email)
|
||||
_add_to_cache(response['result'])
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error("Something went wrong updating the cache with error: %s" % e)
|
||||
else:
|
||||
try:
|
||||
log.info("Downloading FAS cache for %s" % email)
|
||||
request = client.send_request(
|
||||
'/user/list',
|
||||
req_params={
|
||||
'search': '%s' % email,
|
||||
'by_email': 1,
|
||||
},
|
||||
auth=True)
|
||||
except fedora.client.ServerError as e:
|
||||
log.warning(
|
||||
"Failed to download fas cache for %s: %r" % (email, e))
|
||||
return {}
|
||||
|
||||
log.info("Caching necessary data for %s" % email)
|
||||
for user in request['people']:
|
||||
nick = user['ircnick']
|
||||
if nick:
|
||||
_cache.set(nick, user['username'])
|
||||
|
||||
email = user['email']
|
||||
if email:
|
||||
_cache.set(email, user['username'])
|
||||
else:
|
||||
# If we couldn't find the email in FAS, save it in the _cache as
|
||||
# email so that we avoid calling FAS for every single filter we
|
||||
# have to run through
|
||||
_cache.set(email, email)
|
||||
|
||||
|
||||
def nick2fas(nickname, **config):
|
||||
result = _cache.get(nickname)
|
||||
if not result:
|
||||
update_nick(nickname)
|
||||
result = _cache.get(nickname)
|
||||
return result or nickname
|
||||
|
||||
|
||||
def email2fas(email, **config):
|
||||
if email.endswith('@fedoraproject.org'):
|
||||
return email.rsplit('@', 1)[0]
|
||||
|
||||
result = _cache.get(email)
|
||||
if not result:
|
||||
update_email(email)
|
||||
result = _cache.get(email)
|
||||
return result or email
|
|
@ -1,475 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# This file is part of the FMN project.
|
||||
# Copyright (C) 2017 Red Hat, Inc.
|
||||
#
|
||||
# This library is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU Lesser General Public
|
||||
# License as published by the Free Software Foundation; either
|
||||
# version 2.1 of the License, or (at your option) any later version.
|
||||
#
|
||||
# This library is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
# Lesser General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser General Public
|
||||
# License along with this library; if not, write to the Free Software
|
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
||||
|
||||
"""
|
||||
This module contains the `Celery tasks`_ used by FMN.
|
||||
|
||||
.. _Celery tasks: http://docs.celeryproject.org/en/latest/
|
||||
"""
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import datetime
|
||||
|
||||
from celery.utils.log import get_task_logger
|
||||
from fedmsg_meta_fedora_infrastructure import fasshim
|
||||
from kombu import Connection, Queue
|
||||
from kombu.pools import connections
|
||||
from celery import task
|
||||
import fedmsg
|
||||
import fedmsg.meta
|
||||
import fedmsg_meta_fedora_infrastructure
|
||||
import sqlalchemy
|
||||
|
||||
from . import config, lib as fmn_lib, formatters, exceptions
|
||||
from . import fmn_fasshim
|
||||
from .lib import models
|
||||
from .celery import app
|
||||
from .constants import BACKEND_QUEUE_PREFIX
|
||||
|
||||
|
||||
__all__ = ['find_recipients']
|
||||
|
||||
|
||||
_log = get_task_logger(__name__)
|
||||
|
||||
|
||||
REFRESH_CACHE_TOPIC = 'fmn.internal.refresh_cache'
|
||||
|
||||
|
||||
# Monkey patch fedmsg_meta modules
|
||||
fasshim.nick2fas = fmn_fasshim.nick2fas
|
||||
fasshim.email2fas = fmn_fasshim.email2fas
|
||||
fedmsg_meta_fedora_infrastructure.supybot.nick2fas = fmn_fasshim.nick2fas
|
||||
fedmsg_meta_fedora_infrastructure.anitya.email2fas = fmn_fasshim.email2fas
|
||||
fedmsg_meta_fedora_infrastructure.bz.email2fas = fmn_fasshim.email2fas
|
||||
fedmsg_meta_fedora_infrastructure.mailman3.email2fas = fmn_fasshim.email2fas
|
||||
fedmsg_meta_fedora_infrastructure.pagure.email2fas = fmn_fasshim.email2fas
|
||||
|
||||
|
||||
class _FindRecipients(task.Task):
|
||||
"""A Celery task sub-class that loads and caches user preferences."""
|
||||
|
||||
name = 'fmn.tasks.find_recipients'
|
||||
# Retry tasks every hour for 60 days before giving up
|
||||
default_retry_delay = 3600
|
||||
max_retries = 1440
|
||||
autoretry_for = (Exception,)
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize caches and other resources for the tasks that require user preferences.
|
||||
|
||||
This is run once per process, not per task.
|
||||
"""
|
||||
_log.info('Initializing the "%s" task', self.name)
|
||||
fedmsg.meta.make_processors(**config.app_conf)
|
||||
self._valid_paths = None
|
||||
self._user_preferences = None
|
||||
_log.info('Initialization complete for the "%s" task', self.name)
|
||||
|
||||
@property
|
||||
def valid_paths(self):
|
||||
"""
|
||||
A property that lazy-loads the valid paths for FMN rules.
|
||||
|
||||
This is done here rather in ``__init__`` so that users of this task
|
||||
don't load all the valid paths when the task is registered with
|
||||
Celery.
|
||||
"""
|
||||
if self._valid_paths is None:
|
||||
_log.info('Loading valid FMN rule paths')
|
||||
self._valid_paths = fmn_lib.load_rules(root="fmn.rules")
|
||||
_log.info('All FMN rule paths successfully loaded')
|
||||
return self._valid_paths
|
||||
|
||||
@property
|
||||
def user_preferences(self):
|
||||
"""
|
||||
A property that lazy-loads the user preferences.
|
||||
|
||||
This is done here rather in ``__init__`` so that users of this task
|
||||
don't load all the user preferences when the task is registered with
|
||||
Celery.
|
||||
"""
|
||||
if self._user_preferences is None:
|
||||
_log.info('Loading all user preferences from the database')
|
||||
self._user_preferences = fmn_lib.load_preferences(
|
||||
cull_disabled=True, cull_backends=['desktop'])
|
||||
_log.info('All user preferences successfully loaded from the database')
|
||||
return self._user_preferences
|
||||
|
||||
def run(self, message):
|
||||
"""
|
||||
A Celery task that finds a list of recipients for a message.
|
||||
|
||||
When the recipients have been found, it publishes an AMQP message for each
|
||||
context (backend) in the format::
|
||||
|
||||
{
|
||||
'context': <backend>,
|
||||
'recipients': [
|
||||
{
|
||||
"triggered_by_links": true,
|
||||
"markup_messages": false,
|
||||
"user": "jcline.id.fedoraproject.org",
|
||||
"filter_name": "firehose",
|
||||
"filter_oneshot": false,
|
||||
"filter_id": 7,
|
||||
"shorten_links": false,
|
||||
"verbose": true,
|
||||
},
|
||||
]
|
||||
'raw_msg': the message that this task handled,
|
||||
}
|
||||
|
||||
|
||||
Args:
|
||||
self (celery.Task): The instance of the Task object this function is bound to.
|
||||
message (dict): A fedmsg to find recipients for.
|
||||
"""
|
||||
_log.debug('Determining recipients for message "%r"', message)
|
||||
topic, message_body = message['topic'], message['body']
|
||||
|
||||
# We send a fake message with this topic as a broadcast to all workers in order for them
|
||||
# to refresh their caches, so if this message is a cache refresh notification stop early.
|
||||
if topic == REFRESH_CACHE_TOPIC:
|
||||
_log.info('Refreshing the user preferences for %s', message_body)
|
||||
fmn_lib.update_preferences(message_body, self.user_preferences)
|
||||
return
|
||||
|
||||
results = fmn_lib.recipients(
|
||||
self.user_preferences, message_body, self.valid_paths, config.app_conf)
|
||||
_log.info('Found %s recipients for message %s', sum(map(len, results.values())),
|
||||
message_body.get('msg_id', topic))
|
||||
|
||||
self._queue_for_delivery(results, message)
|
||||
|
||||
def _queue_for_delivery(self, results, message):
|
||||
"""
|
||||
Queue a processed message for delivery to its recipients.
|
||||
|
||||
The message is either delivered to the default AMQP exchange with the 'backends'
|
||||
routing key or placed in the database if the user has enabled batch delivery. If
|
||||
it is placed in the database, the :func:`batch_messages` task will handle its
|
||||
delivery.
|
||||
|
||||
Message format::
|
||||
{
|
||||
"context": "email",
|
||||
"recipient": dict,
|
||||
"fedmsg": dict,
|
||||
"formatted_message": <formatted_message>
|
||||
}
|
||||
|
||||
Args:
|
||||
results (dict): A dictionary where the keys are context names and the values are
|
||||
a list of recipients for that context. A recipient entry in the list is a
|
||||
dictionary. See :func:`fmn.lib.recipients` for the dictionary format.
|
||||
message (dict): The raw fedmsg to humanize and deliver to the given recipients.
|
||||
"""
|
||||
broker_url = config.app_conf['celery']['broker']
|
||||
|
||||
with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn:
|
||||
producer = conn.Producer()
|
||||
for context, recipients in results.items():
|
||||
_log.info('Dispatching messages for %d recipients for the %s backend',
|
||||
len(recipients), context)
|
||||
for recipient in recipients:
|
||||
_maybe_mark_filter_fired(recipient)
|
||||
|
||||
user = recipient['user']
|
||||
preference = self.user_preferences['{}_{}'.format(user, context)]
|
||||
if _batch(preference, context, recipient, message):
|
||||
continue
|
||||
|
||||
formatted_message = _format(context, message, recipient)
|
||||
|
||||
_log.info('Queuing message for delivery to %s on the %s backend', user, context)
|
||||
backend_message = {
|
||||
"context": context,
|
||||
"recipient": recipient,
|
||||
"fedmsg": message,
|
||||
"formatted_message": formatted_message,
|
||||
}
|
||||
routing_key = BACKEND_QUEUE_PREFIX + context
|
||||
producer.publish(backend_message, routing_key=routing_key,
|
||||
declare=[Queue(routing_key, durable=True)])
|
||||
|
||||
|
||||
def _maybe_mark_filter_fired(recipient):
|
||||
"""
|
||||
If the filter was a one-shot filter, try to mark it as triggered. If that fails,
|
||||
log the error and continue since there's not much else to be done.
|
||||
|
||||
Args:
|
||||
recipient (dict): The recipient dictionary.
|
||||
"""
|
||||
|
||||
if ('filter_oneshot' in recipient and recipient['filter_oneshot']):
|
||||
_log.info('Marking one-time filter as fired')
|
||||
session = models.Session()
|
||||
idx = recipient['filter_id']
|
||||
try:
|
||||
fltr = models.Filter.query.get(idx)
|
||||
fltr.fired(session)
|
||||
session.commit()
|
||||
except (sqlalchemy.exc.SQLAlchemyError, AttributeError):
|
||||
_log.exception('Unable to mark one-shot filter (id %s) as fired', idx)
|
||||
session.rollback()
|
||||
finally:
|
||||
models.Session.remove()
|
||||
|
||||
|
||||
def _batch(preference, context, recipient, message):
|
||||
"""
|
||||
Batch the message if the user wishes it.
|
||||
|
||||
Args:
|
||||
preference (dict): The user's preferences in dictionary form.
|
||||
context (str): The context to batch it for.
|
||||
recipient (dict): The recipient dictionary.
|
||||
message (dict): The fedmsg to batch.
|
||||
"""
|
||||
if preference.get('batch_delta') or preference.get('batch_count'):
|
||||
_log.info('User "%s" has batch delivery set; placing message in database',
|
||||
recipient['user'])
|
||||
session = models.Session()
|
||||
try:
|
||||
models.QueuedMessage.enqueue(session, recipient['user'], context, message)
|
||||
session.commit()
|
||||
return True
|
||||
except sqlalchemy.exc.SQLAlchemyError:
|
||||
_log.exception('Unable to queue message for batch delivery')
|
||||
session.rollback()
|
||||
finally:
|
||||
models.Session.remove()
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _format(context, message, recipient):
|
||||
"""
|
||||
Format the message(s) using the context and recipient to determine settings.
|
||||
|
||||
Args:
|
||||
context (str): The name of the context; this is used to determine what formatter
|
||||
function to use.
|
||||
message (dict or list): A fedmsg or list of fedmsgs to format.
|
||||
recipient (dict): A recipient dictionary passed on to the formatter function.
|
||||
|
||||
Raises:
|
||||
FmnError: If the message could not be formatted.
|
||||
"""
|
||||
formatted_message = None
|
||||
|
||||
# If it's a dictionary, it's a single message that doesn't need batching
|
||||
if isinstance(message, dict):
|
||||
if context == 'email':
|
||||
formatted_message = formatters.email(message['body'], recipient)
|
||||
elif context == 'irc':
|
||||
formatted_message = formatters.irc(message['body'], recipient)
|
||||
elif context == 'sse':
|
||||
try:
|
||||
formatted_message = formatters.sse(message['body'], recipient)
|
||||
except Exception:
|
||||
_log.exception('An exception occurred formatting the message '
|
||||
'for delivery: falling back to sending the raw fedmsg')
|
||||
formatted_message = message
|
||||
elif isinstance(message, list):
|
||||
if context == 'email':
|
||||
formatted_message = formatters.email_batch(
|
||||
[m['body'] for m in message], recipient)
|
||||
elif context == 'irc':
|
||||
formatted_message = formatters.irc_batch(
|
||||
[m['body'] for m in message], recipient)
|
||||
|
||||
if formatted_message is None:
|
||||
raise exceptions.FmnError(
|
||||
'The message was not formatted in any way, aborting!')
|
||||
|
||||
return formatted_message
|
||||
|
||||
|
||||
@app.task(name='fmn.tasks.batch_messages', ignore_results=True)
|
||||
def batch_messages():
|
||||
"""
|
||||
A task that collects all messages ready for batch delivery and queues them.
|
||||
|
||||
Messages for users of the batch feature are placed in the database by the
|
||||
:func:`find_recipients` task. Those messages are then picked up by this task,
|
||||
turned into a summary using the :mod:`fmn.formatters` module, and placed in
|
||||
the delivery service's AMQP queue.
|
||||
|
||||
This is intended to be run as a periodic task using Celery's beat service.
|
||||
"""
|
||||
session = models.Session()
|
||||
try:
|
||||
broker_url = config.app_conf['celery']['broker']
|
||||
with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn:
|
||||
producer = conn.Producer()
|
||||
for pref in models.Preference.list_batching(session):
|
||||
if not _batch_ready(pref):
|
||||
continue
|
||||
|
||||
queued_messages = models.QueuedMessage.list_for(
|
||||
session, pref.user, pref.context)
|
||||
_log.info('Batching %d queued messages for %s',
|
||||
len(queued_messages), pref.user.openid)
|
||||
|
||||
messages = [m.message for m in queued_messages]
|
||||
recipients = [
|
||||
{
|
||||
pref.context.detail_name: value.value,
|
||||
'user': pref.user.openid,
|
||||
'markup_messages': pref.markup_messages,
|
||||
'triggered_by_links': pref.triggered_by_links,
|
||||
'shorten_links': pref.shorten_links,
|
||||
}
|
||||
for value in pref.detail_values
|
||||
]
|
||||
for recipient in recipients:
|
||||
try:
|
||||
formatted_message = _format(pref.context.name, messages, recipient)
|
||||
except exceptions.FmnError:
|
||||
_log.error('A batch message for %r was not formatted, skipping!',
|
||||
recipient)
|
||||
continue
|
||||
|
||||
backend_message = {
|
||||
"context": pref.context.name,
|
||||
"recipient": recipient,
|
||||
"fedmsg": messages,
|
||||
"formatted_message": formatted_message,
|
||||
}
|
||||
routing_key = BACKEND_QUEUE_PREFIX + pref.context.name
|
||||
producer.publish(backend_message, routing_key=routing_key,
|
||||
declare=[Queue(routing_key, durable=True)])
|
||||
|
||||
for message in queued_messages:
|
||||
message.dequeue(session)
|
||||
session.commit()
|
||||
except sqlalchemy.exc.SQLAlchemyError:
|
||||
_log.exception('Failed to dispatch queued messages for delivery')
|
||||
session.rollback()
|
||||
finally:
|
||||
models.Session.remove()
|
||||
|
||||
|
||||
def _batch_ready(preference):
|
||||
"""
|
||||
Determine if a message batch is ready for a user.
|
||||
|
||||
Args:
|
||||
preference (models.Preference): The user preference entry which
|
||||
contains the user's batch preferences.
|
||||
Returns:
|
||||
bool: True if there's a batch ready.
|
||||
"""
|
||||
session = models.Session()
|
||||
try:
|
||||
count = models.QueuedMessage.count_for(session, preference.user, preference.context)
|
||||
if not count:
|
||||
return False
|
||||
|
||||
# Batch based on count
|
||||
if preference.batch_count is not None and preference.batch_count <= count:
|
||||
_log.info("Sending digest for %r per msg count", preference.user.openid)
|
||||
return True
|
||||
|
||||
# Batch based on time
|
||||
earliest = models.QueuedMessage.earliest_for(
|
||||
session, preference.user, preference.context)
|
||||
now = datetime.datetime.utcnow()
|
||||
delta = datetime.timedelta.total_seconds(now - earliest.created_on)
|
||||
if preference.batch_delta is not None and preference.batch_delta <= delta:
|
||||
_log.info("Sending digest for %r per time delta", preference.user.openid)
|
||||
return True
|
||||
except sqlalchemy.exc.SQLAlchemyError:
|
||||
_log.exception('Failed to determine if the batch is ready for %s', preference.user)
|
||||
session.rollback()
|
||||
|
||||
return False
|
||||
|
||||
|
||||
@app.task(name='fmn.tasks.heat_fas_cache', ignore_results=True)
|
||||
def heat_fas_cache(): # pragma: no cover
|
||||
"""
|
||||
Fetch all users from FAS and populate the local Redis cache.
|
||||
|
||||
This is helpful to do once on startup since we'll need everyone's email or
|
||||
IRC nickname eventually.
|
||||
"""
|
||||
if config.app_conf['fasjson'].get('active'):
|
||||
fmn_fasshim.make_fasjson_cache(**config.app_conf)
|
||||
else:
|
||||
fmn_fasshim.make_fas_cache(**config.app_conf)
|
||||
|
||||
|
||||
@app.task(name='fmn.tasks.confirmations', ignore_results=True)
|
||||
def confirmations():
|
||||
"""
|
||||
Load all pending confirmations, create formatted messages, and dispatch them to the
|
||||
delivery service.
|
||||
|
||||
This is intended to be dispatched regularly via celery beat.
|
||||
"""
|
||||
session = models.Session()
|
||||
try:
|
||||
models.Confirmation.delete_expired(session)
|
||||
pending = models.Confirmation.query.filter_by(status='pending').all()
|
||||
broker_url = config.app_conf['celery']['broker']
|
||||
with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn:
|
||||
producer = conn.Producer()
|
||||
for confirmation in pending:
|
||||
message = None
|
||||
if confirmation.context.name == 'email':
|
||||
message = formatters.email_confirmation(confirmation)
|
||||
else:
|
||||
# The way the irc backend is currently written, it has to format the
|
||||
# confirmation itself. For now, just send an empty message, but in the
|
||||
# future it may be worth refactoring the irc backend to let us format here.
|
||||
message = ''
|
||||
recipient = {
|
||||
confirmation.context.detail_name: confirmation.detail_value,
|
||||
'user': confirmation.user.openid,
|
||||
'triggered_by_links': False,
|
||||
'confirmation': True,
|
||||
}
|
||||
backend_message = {
|
||||
"context": confirmation.context.name,
|
||||
"recipient": recipient,
|
||||
"fedmsg": {},
|
||||
"formatted_message": message,
|
||||
}
|
||||
_log.info('Dispatching confirmation message for %r', confirmation)
|
||||
confirmation.set_status(session, 'valid')
|
||||
routing_key = BACKEND_QUEUE_PREFIX + confirmation.context.name
|
||||
producer.publish(backend_message, routing_key=routing_key,
|
||||
declare=[Queue(routing_key, durable=True)])
|
||||
session.commit()
|
||||
except sqlalchemy.exc.SQLAlchemyError:
|
||||
_log.exception('Unable to handle confirmations')
|
||||
session.rollback()
|
||||
finally:
|
||||
models.Session.remove()
|
||||
|
||||
|
||||
#: A Celery task that accepts a message as input and determines the recipients.
|
||||
find_recipients = app.tasks[_FindRecipients.name]
|
|
@ -1,52 +0,0 @@
|
|||
import fedora.client
|
||||
import fasjson_client
|
||||
|
||||
import logging
|
||||
log = logging.getLogger("fmn")
|
||||
|
||||
|
||||
def new_packager(topic, msg):
|
||||
""" Returns a username if the message is about a new packager in FAS. """
|
||||
if '.fas.group.member.sponsor' in topic:
|
||||
group = msg['msg']['group']
|
||||
if group == 'packager':
|
||||
return msg['msg']['user']
|
||||
return None
|
||||
|
||||
|
||||
def new_badges_user(topic, msg):
|
||||
""" Returns a username if the message is about a new fedbadges user. """
|
||||
if '.fedbadges.person.login.first' in topic:
|
||||
return msg['msg']['user']['username']
|
||||
return None
|
||||
|
||||
|
||||
def get_fas_email(config, username):
|
||||
""" Return FAS email associated with a username.
|
||||
|
||||
We use this to try and get the right email for new autocreated users.
|
||||
We used to just use $USERNAME@fp.o, but when first created most users don't
|
||||
have that alias available yet.
|
||||
"""
|
||||
try:
|
||||
fas = fedora.client.AccountSystem(**config['fas_credentials'])
|
||||
person = fas.person_by_username(username)
|
||||
if person.get('email'):
|
||||
return person['email']
|
||||
raise ValueError("No email found: %r" % username)
|
||||
except Exception:
|
||||
log.exception("Failed to get FAS email for %r" % username)
|
||||
return '%s@fedoraproject.org' % username
|
||||
|
||||
|
||||
def get_fasjson_email(config, username):
|
||||
""" Return FASJSON email associated with a username. """
|
||||
try:
|
||||
fasjson = config["fasjson"]
|
||||
client = fasjson_client.Client(url=fasjson.get('url'))
|
||||
person = client.get_user(username=username).result
|
||||
|
||||
return person.get('emails')[0]
|
||||
except Exception:
|
||||
log.exception("Failed to get FASJSON email for %r" % username)
|
||||
return '%s@fedoraproject.org' % username
|
|
@ -1,95 +0,0 @@
|
|||
# This file is part of Moksha.
|
||||
# Copyright (C) 2014 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
# Authors: Ralph Bean <rbean@redhat.com>
|
||||
|
||||
from moksha.hub.api import PollingProducer
|
||||
import os
|
||||
import string
|
||||
import zmq
|
||||
import json
|
||||
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MonitoringProducer(PollingProducer):
|
||||
frequency = 5
|
||||
|
||||
ctx = None
|
||||
socket = None
|
||||
|
||||
def __init__(self, hub, *args, **kwargs):
|
||||
|
||||
key = 'moksha.monitoring.socket'
|
||||
endpoint = hub.config.get(key)
|
||||
if not endpoint:
|
||||
log.info("No %r defined. Monitoring disabled." % key)
|
||||
return
|
||||
|
||||
log.info("Establishing monitor sock at %r" % endpoint)
|
||||
|
||||
# Set up a special socket for ourselves
|
||||
self.ctx = zmq.Context()
|
||||
self.socket = self.ctx.socket(zmq.PUB)
|
||||
self.socket.bind(endpoint)
|
||||
|
||||
# If this is a unix socket (which is almost always is) then set some
|
||||
# permissions so that whatever monitoring service is deployed can talk
|
||||
# to us.
|
||||
mode = hub.config.get('moksha.monitoring.socket.mode')
|
||||
if endpoint.startswith("ipc://") and mode:
|
||||
mode = int(mode, base=8)
|
||||
path = endpoint.split("ipc://")[-1]
|
||||
os.chmod(path, mode)
|
||||
|
||||
super(MonitoringProducer, self).__init__(hub, *args, **kwargs)
|
||||
|
||||
def serialize(self, obj):
|
||||
if isinstance(obj, list):
|
||||
return [self.serialize(item) for item in obj]
|
||||
elif isinstance(obj, dict):
|
||||
return dict([(k, self.serialize(v)) for k, v in obj.items()])
|
||||
elif hasattr(obj, '__json__'):
|
||||
return obj.__json__()
|
||||
return obj
|
||||
|
||||
def poll(self):
|
||||
data = {
|
||||
"consumers": self.serialize(self.hub.consumers),
|
||||
"producers": self.serialize(self.hub.producers),
|
||||
}
|
||||
# Decode topics if they are byte array
|
||||
# This will prevent the json.dumps() to fail
|
||||
for consumer in data["consumers"]:
|
||||
decoded_topics = []
|
||||
for topic in consumer["topic"]:
|
||||
if isinstance(topic, bytes):
|
||||
decoded_topics.append(topic.decode())
|
||||
if decoded_topics:
|
||||
consumer["topic"] = decoded_topics
|
||||
|
||||
if self.socket:
|
||||
self.socket.send_string(json.dumps(data))
|
||||
|
||||
def stop(self):
|
||||
super(MonitoringProducer, self).stop()
|
||||
if self.socket:
|
||||
self.socket.close()
|
||||
self.socket = None
|
||||
|
||||
if self.ctx:
|
||||
self.ctx.term()
|
||||
self.ctx = None
|
|
@ -1,20 +0,0 @@
|
|||
/var/log/cron
|
||||
/var/log/maillog
|
||||
/var/log/messages
|
||||
/var/log/secure
|
||||
/var/log/spooler
|
||||
{
|
||||
sharedscripts
|
||||
postrotate
|
||||
/bin/kill -HUP `cat /var/run/syslogd.pid 2> /dev/null` 2> /dev/null || true
|
||||
endscript
|
||||
daily
|
||||
rotate 7
|
||||
missingok
|
||||
ifempty
|
||||
compress
|
||||
compresscmd /usr/bin/xz
|
||||
uncompresscmd /usr/bin/xz
|
||||
compressext .xz
|
||||
dateext
|
||||
}
|
|
@ -1,180 +0,0 @@
|
|||
---
|
||||
# Configuration for the notifications consumer
|
||||
|
||||
#- name: install needed packages
|
||||
# package: name={{ item }} state=present
|
||||
# with_items:
|
||||
# - python-fmn
|
||||
# - python-psycopg2
|
||||
# - libsemanage-python
|
||||
# - python-gssapi
|
||||
# - python-requests-gssapi
|
||||
# # Needed to produce nice long emails about koji builds
|
||||
# - koji
|
||||
# when:
|
||||
# inventory_hostname.startswith('notifs-backend01.iad2')
|
||||
# tags:
|
||||
# - notifs
|
||||
# - notifs/backend
|
||||
|
||||
- name: install needed packages
|
||||
package: name={{ item }} state=present
|
||||
with_items:
|
||||
- python3-fmn
|
||||
- python-psycopg2
|
||||
- python3-libsemanage
|
||||
- python3-gssapi
|
||||
- python3-pylibmc
|
||||
- python3-requests-gssapi
|
||||
# Needed to produce nice long emails about koji builds
|
||||
- koji
|
||||
# when:
|
||||
#inventory_hostname.startswith('notifs-backend02') or env == 'staging'
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/backend
|
||||
|
||||
- name: ensure systemd drop-in directory exists
|
||||
file: >
|
||||
dest=/etc/systemd/system/fedmsg-hub-3.service.d
|
||||
state=directory
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/backend
|
||||
# when:
|
||||
#inventory_hostname.startswith('notifs-backend02') or env == 'staging'
|
||||
|
||||
- name: ensure memcached is running
|
||||
systemd:
|
||||
state: started
|
||||
name: memcached
|
||||
|
||||
- name: install fedmsg-hub-3 systemd drop-in
|
||||
template: src=override.conf.j2 dest=/etc/systemd/system/fedmsg-hub-3.service.d/override.conf owner=root group=root mode=644
|
||||
notify:
|
||||
- reload systemd
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/backend
|
||||
# when:
|
||||
#inventory_hostname.startswith('notifs-backend02') or env == 'staging'
|
||||
|
||||
- name: hotfix python3-moksha-hub monitoring
|
||||
copy: >
|
||||
src=monitoring.py dest=/usr/lib/python3.10/site-packages/moksha/hub/monitoring.py
|
||||
owner=root group=root mode=0644
|
||||
notify:
|
||||
- restart fedmsg-hub
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/backend
|
||||
|
||||
- name: hotfix python3-moksha-hub
|
||||
copy: >
|
||||
src=consumer.py dest=/usr/lib/python3.10/site-packages/moksha/hub/api/consumer.py
|
||||
owner=root group=root mode=0644
|
||||
notify:
|
||||
- restart fedmsg-hub
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/backend
|
||||
# when:
|
||||
#inventory_hostname.startswith('notifs-backend02') or env == 'staging'
|
||||
|
||||
- name: copy database configuration
|
||||
template: >
|
||||
src={{ item }} dest=/etc/fedmsg.d/{{ item }}
|
||||
owner=fedmsg group=fedmsg mode=0600
|
||||
with_items:
|
||||
- fmn.consumer.py
|
||||
notify:
|
||||
- restart fedmsg-hub
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/backend
|
||||
|
||||
#- name: Hotfix fmn for fasjson
|
||||
# copy: >
|
||||
# src=fasjson-port/{{ item }} dest=/usr/lib/python2.7/site-packages/fmn/{{ item }}
|
||||
# with_items:
|
||||
# - consumer.py
|
||||
# - fasjson_client.py
|
||||
# - fmn_fasshim.py
|
||||
# - tasks.py
|
||||
# - util.py
|
||||
# when:
|
||||
# inventory_hostname.startswith('notifs-backend01.iad') or env != 'staging'
|
||||
# tags:
|
||||
# - notifs
|
||||
# - notifs/backend
|
||||
|
||||
- name: Install fmn SSE configuration
|
||||
template: >
|
||||
src={{ item }} dest=/etc/fedmsg.d/{{ item }}
|
||||
owner=fedmsg group=fedmsg mode=0600
|
||||
with_items:
|
||||
- fmn.sse.py
|
||||
# when: env == "staging"
|
||||
notify:
|
||||
- restart fedmsg-hub
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/backend
|
||||
|
||||
- name: copy the alembic configuration for DBAs
|
||||
template: >
|
||||
src=alembic.ini dest=/usr/share/fmn/alembic.ini
|
||||
owner=root group=sysadmin-dba mode=0660
|
||||
notify:
|
||||
- restart fedmsg-hub
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/backend
|
||||
|
||||
- name: copy over admin utility scripts
|
||||
copy: src=bin/{{ item }} dest=/usr/local/bin/{{ item }} owner=root group=root mode=0755
|
||||
with_items:
|
||||
- fmn-create-account
|
||||
- fmn-disable-account
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/backend
|
||||
|
||||
- name: override the default syslog logrotate file
|
||||
copy: src=syslog-logrotate dest=/etc/logrotate.d/rsyslog
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/backend
|
||||
|
||||
- name: start the workers and the backend
|
||||
service: name={{item}} enabled=yes state=started
|
||||
with_items:
|
||||
- fmn-backend@1
|
||||
- fmn-worker@1
|
||||
- fmn-worker@2
|
||||
- fmn-worker@3
|
||||
- fmn-worker@4
|
||||
- fmn-worker@5
|
||||
- fmn-worker@6
|
||||
- fmn-worker@7
|
||||
- fmn-worker@8
|
||||
- fmn-worker@9
|
||||
- fmn-worker@10
|
||||
- fmn-worker@11
|
||||
- fmn-worker@12
|
||||
- fmn-worker@13
|
||||
- fmn-worker@14
|
||||
- fmn-worker@15
|
||||
- fmn-worker@16
|
||||
- fmn-worker@17
|
||||
- fmn-worker@18
|
||||
- fmn-worker@19
|
||||
- fmn-worker@20
|
||||
- fmn-worker@21
|
||||
- fmn-worker@22
|
||||
- fmn-worker@23
|
||||
- fmn-worker@24
|
||||
- fmn-celerybeat
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/backend
|
|
@ -1,53 +0,0 @@
|
|||
# A generic, single database configuration.
|
||||
|
||||
[alembic]
|
||||
# path to migration scripts
|
||||
script_location = /usr/share/fmn/alembic/
|
||||
|
||||
# template used to generate migration files
|
||||
# file_template = %%(rev)s_%%(slug)s
|
||||
|
||||
# max length of characters to apply to the
|
||||
# "slug" field
|
||||
#truncate_slug_length = 40
|
||||
|
||||
# set to 'true' to run the environment during
|
||||
# the 'revision' command, regardless of autogenerate
|
||||
# revision_environment = false
|
||||
|
||||
sqlalchemy.url = postgresql://{{notifs_db_user}}:{{notifs_db_password}}@db01.iad2.fedoraproject.org/notifications
|
||||
|
||||
# Logging configuration
|
||||
[loggers]
|
||||
keys = root,sqlalchemy,alembic
|
||||
|
||||
[handlers]
|
||||
keys = console
|
||||
|
||||
[formatters]
|
||||
keys = generic
|
||||
|
||||
[logger_root]
|
||||
level = WARN
|
||||
handlers = console
|
||||
qualname =
|
||||
|
||||
[logger_sqlalchemy]
|
||||
level = WARN
|
||||
handlers =
|
||||
qualname = sqlalchemy.engine
|
||||
|
||||
[logger_alembic]
|
||||
level = INFO
|
||||
handlers =
|
||||
qualname = alembic
|
||||
|
||||
[handler_console]
|
||||
class = StreamHandler
|
||||
args = (sys.stderr,)
|
||||
level = NOTSET
|
||||
formatter = generic
|
||||
|
||||
[formatter_generic]
|
||||
format = %(levelname)-5.5s [%(name)s] %(message)s
|
||||
datefmt = %H:%M:%S
|
|
@ -1,192 +0,0 @@
|
|||
{% if env == 'staging' %}
|
||||
domain = "stg.fedoraproject.org"
|
||||
ircnick = "fedora-notifstg"
|
||||
{% else %}
|
||||
domain = "fedoraproject.org"
|
||||
ircnick = "fedora-notif"
|
||||
{% endif %}
|
||||
|
||||
base = "https://apps.%s/notifications/" % domain
|
||||
|
||||
|
||||
config = {
|
||||
{% if env == 'staging' %}
|
||||
# Pull in messages from production so we can more thoroughly test in stg.
|
||||
"endpoints": {
|
||||
"loopback-from-production": [
|
||||
"tcp://hub.fedoraproject.org:9940",
|
||||
],
|
||||
},
|
||||
{% endif %}
|
||||
|
||||
{% if env == 'staging' %}
|
||||
"fmn.topics": [
|
||||
b'org.fedoraproject.',
|
||||
b'org.centos.',
|
||||
b'org.release-monitoring.',
|
||||
],
|
||||
{% else %}
|
||||
"fmn.topics": [
|
||||
b'org.fedoraproject.prod.',
|
||||
b'org.centos.prod.',
|
||||
b'org.release-monitoring.prod.',
|
||||
],
|
||||
{% endif %}
|
||||
|
||||
# Consumer stuff
|
||||
"fmn.consumer.enabled": True,
|
||||
{% if env == 'staging' %}
|
||||
"fmn.sqlalchemy.uri": "postgresql://{{notifs_db_user}}:{{notifs_db_password}}@db01.stg.iad2.fedoraproject.org/notifications",
|
||||
{% else %}
|
||||
"fmn.sqlalchemy.uri": "postgresql://{{notifs_db_user}}:{{notifs_db_password}}@db01.iad2.fedoraproject.org/notifications",
|
||||
{% endif %}
|
||||
|
||||
{% if env != 'staging' %}
|
||||
# Auto create accounts for new packagers.
|
||||
"fmn.autocreate": True,
|
||||
{% else %}
|
||||
# Don't auto create accounts for new packagers in staging.
|
||||
"fmn.autocreate": False,
|
||||
{% endif %}
|
||||
|
||||
# Ignore rubygems coprs
|
||||
"ignored_copr_owners": ["@rubygems"],
|
||||
|
||||
# Just drop these topics without considering any preferences. They are noise that just clog us up.
|
||||
"fmn.junk_suffixes": [
|
||||
'.buildsys.package.list.change',
|
||||
'.buildsys.tag',
|
||||
'.buildsys.untag',
|
||||
'.buildsys.repo.init',
|
||||
'.buildsys.repo.done',
|
||||
'.buildsys.rpm.sign',
|
||||
'.faf.report.threshold1',
|
||||
'.github.status',
|
||||
'.anitya.project.version.update.v2', # Can't handle the v2, only the v1
|
||||
],
|
||||
|
||||
# This sets up four threads to handle incoming messages. At the time of
|
||||
# this commit, all of our fedmsg daemons are running in single-threaded
|
||||
# mode. If we turn it on globally, we should remove this setting.
|
||||
"moksha.workers_per_consumer": 3,
|
||||
"moksha.threadpool_size": 12,
|
||||
|
||||
# Some configuration for the rule processors
|
||||
{% if env == 'staging' %}
|
||||
"fmn.rules.utils.use_pkgdb2": False,
|
||||
'fmn.rules.utils.use_pagure_for_ownership': True,
|
||||
'fmn.rules.utils.pagure_api_url': 'https://src.stg.fedoraproject.org/api/',
|
||||
"fmn.rules.utils.pkgdb_url": "https://admin.stg.fedoraproject.org/pkgdb/api",
|
||||
{% else %}
|
||||
"fmn.rules.utils.use_pkgdb2": False,
|
||||
'fmn.rules.utils.use_pagure_for_ownership': True,
|
||||
'fmn.rules.utils.pagure_api_url': 'https://src.fedoraproject.org/api/',
|
||||
"fmn.rules.utils.pkgdb_url": "http://pkgdb01.iad2.fedoraproject.org/pkgdb/api",
|
||||
{% endif %}
|
||||
"fmn.rules.cache": {
|
||||
"backend": "dogpile.cache.pylibmc",
|
||||
"expiration_time": 60*60*24, # 1 day
|
||||
"arguments": {
|
||||
"url": ["127.0.0.1"],
|
||||
"binary": True,
|
||||
"behaviors": {"tcp_nodelay": True, "ketama": True},
|
||||
},
|
||||
},
|
||||
|
||||
# The notification backend uses this to build a fas cache of ircnicks
|
||||
# to fas usernames so it can act appropriately on certain message types.
|
||||
{% if env == 'staging' -%}
|
||||
"fasjson": {
|
||||
"active": True,
|
||||
"url": "https://fasjson.stg.fedoraproject.org/"
|
||||
},
|
||||
"fas_credentials": {
|
||||
"username": "{{fedoraStagingDummyUser}}",
|
||||
"password": "{{fedoraStagingDummyUserPassword}}",
|
||||
},
|
||||
{% else -%}
|
||||
"fasjson": {
|
||||
"active": True,
|
||||
"url": "https://fasjson.fedoraproject.org/"
|
||||
},
|
||||
"fas_credentials": {
|
||||
"username": "{{fedoraDummyUser}}",
|
||||
"password": "{{fedoraDummyUserPassword}}",
|
||||
},
|
||||
{% endif %}
|
||||
|
||||
|
||||
## Backend stuff ##
|
||||
"fmn.backends": ["email", "irc"], # android is disabled.
|
||||
|
||||
# Email
|
||||
"fmn.email.mailserver": "localhost:25",
|
||||
"fmn.email.from_address": "notifications@" + domain,
|
||||
|
||||
# IRC
|
||||
"fmn.irc.network": "irc.libera.chat",
|
||||
"fmn.irc.nickname": ircnick,
|
||||
"fmn.irc.timeout": 120,
|
||||
"fmn.irc.port": 6697,
|
||||
"fmn.irc.use_ssl": True,
|
||||
{% if env == 'staging' %}
|
||||
"fmn.irc.nickserv_pass": "{{fedora_notifstg_libera_pass}}",
|
||||
{% else %}
|
||||
"fmn.irc.nickserv_pass": "{{fedora_notif_libera_pass}}",
|
||||
{% endif %}
|
||||
|
||||
# Colors:
|
||||
"irc_color_lookup": {
|
||||
"fas": "light blue",
|
||||
"bodhi": "green",
|
||||
"git": "red",
|
||||
"wiki": "purple",
|
||||
"logger": "orange",
|
||||
"pkgdb": "teal",
|
||||
"buildsys": "yellow",
|
||||
"planet": "light green",
|
||||
"anitya": "light cyan",
|
||||
"fmn": "light blue",
|
||||
"hotness": "light green",
|
||||
},
|
||||
|
||||
# GCM - Android notifs
|
||||
"fmn.gcm.post_url": "{{ notifs_gcm_post_url }}",
|
||||
"fmn.gcm.api_key": "{{ notifs_gcm_api_key }}",
|
||||
|
||||
# Confirmation urls:
|
||||
"fmn.base_url": base,
|
||||
"fmn.acceptance_url": base + "confirm/accept/{secret}",
|
||||
"fmn.rejection_url": base + "confirm/reject/{secret}",
|
||||
"fmn.support_email": "notifications@" + domain,
|
||||
|
||||
# Generic stuff
|
||||
"logging": dict(
|
||||
loggers=dict(
|
||||
fmn={
|
||||
"level": "INFO",
|
||||
"propagate": False,
|
||||
"handlers": ["console"],
|
||||
},
|
||||
moksha={
|
||||
"level": "INFO",
|
||||
"propagate": False,
|
||||
"handlers": ["console"],
|
||||
},
|
||||
celery={
|
||||
"level": "INFO",
|
||||
"propagate": False,
|
||||
"handlers": ["console"],
|
||||
},
|
||||
twisted={
|
||||
"level": "INFO",
|
||||
"propagate": False,
|
||||
"handlers": ["console"],
|
||||
},
|
||||
),
|
||||
root={
|
||||
'level': 'WARNING',
|
||||
'handlers': ['console'],
|
||||
},
|
||||
),
|
||||
}
|
|
@ -1,31 +0,0 @@
|
|||
config = {
|
||||
|
||||
# SSE
|
||||
"fmn.sse.pika.host": "localhost",
|
||||
"fmn.sse.pika.port": 5672,
|
||||
"fmn.sse.pika.msg_expiration": 3600000, # 1 hour in ms
|
||||
|
||||
# SSE Web server configuration
|
||||
"fmn.sse.webserver.tcp_port": 8080,
|
||||
# A list of interfaces to listen to ('127.0.0.1', for example); if none
|
||||
# are specified the server listens on all available interfaces.
|
||||
'fmn.sse.webserver.interfaces': [],
|
||||
|
||||
# A regular expression using the standard Python re syntax that defines a
|
||||
# whitelist of queues exposed by the SSE server.
|
||||
'fmn.sse.webserver.queue_whitelist': '.+\.id\.fedoraproject\.org$',
|
||||
|
||||
# A regular expression using the standard Python re syntax that defines a
|
||||
# blacklist for queues exposed by the SSE server. Any queue name that is
|
||||
# matched by the regular expression will return a HTTP 403 to the client.
|
||||
#
|
||||
# Note: This is applied _after_ the whitelist so if the queue is matched
|
||||
# by both regular expressions, the queue _will not_ be served.
|
||||
'fmn.sse.webserver.queue_blacklist': None,
|
||||
|
||||
# The value to use with the 'Access-Control-Allow-Origin' HTTP header
|
||||
'fmn.sse.webserver.allow_origin': '*',
|
||||
|
||||
# Define how many messages to prefetch from the AMQP server
|
||||
'fmn.sse.pika.prefetch_count': 5,
|
||||
}
|
|
@ -1,2 +0,0 @@
|
|||
[Service]
|
||||
Environment=KRB5_CLIENT_KTNAME=/etc/krb5.fedmsg-hub-3_notifs-backend01.{{ env_prefix }}iad2.fedoraproject.org.keytab
|
|
@ -1,111 +0,0 @@
|
|||
Fedora Notifications
|
||||
====================
|
||||
|
||||
Fedora Notifications is a family of systems built to manage end-user
|
||||
notifications triggered by `fedmsg <http://fedmsg.com>`_, the fedora FEDerated
|
||||
MeSsaGe bus.
|
||||
|
||||
The wins here are:
|
||||
|
||||
- Diverse kinds of notification media: Some users don't want email. At present
|
||||
we can do notifications over email and IRC privmsg. In the future we hope to
|
||||
add Android push notifications, desktop popups, and websocket integration
|
||||
across all of our apps.
|
||||
- A single place for end-users to manage notification preferences: As it stands
|
||||
right now, you have to update notification preferences (if you can do so at
|
||||
all) in many different apps in many different places. (bodhi, koji, pkgdb,
|
||||
etc..). With this app (Fedora Notifications), you can choose what you do and
|
||||
don't want to receive in one place -- right here.
|
||||
|
||||
- A single place for email code to live -- instead of being duplicated in every
|
||||
application that we write and deploy. This will ostensibly reduce the amount
|
||||
of code that the infrastructure team has to maintain.
|
||||
|
||||
----
|
||||
|
||||
In a nutshell, here's the way this application works:
|
||||
|
||||
- You login and set up some preferences here, in this webapp.
|
||||
- Events occur in Fedora Infrastructure and are broadcast over fedmsg.
|
||||
- This application receives those events and compares them against your
|
||||
preferences. If there's a match, then it forwards you a notification.
|
||||
|
||||
We maintain a `lot of applications <https://apps.fedoraproject.org>`_. Over
|
||||
time, there has been an initiative to get them all speaking a similar language
|
||||
on the backend with fedmsg. Take a look at the `list of fedmsg topics
|
||||
<http://fedmsg.com/en/latest/topics/>`_ to see what all is covered.
|
||||
|
||||
Some Terminology
|
||||
================
|
||||
|
||||
Rule
|
||||
----
|
||||
|
||||
This is smallest, most atomic object in the Fedora Notifications system. It is
|
||||
a simple rule that can be applied to a fedmsg message. It can evaluate to
|
||||
``True`` or ``False``.
|
||||
|
||||
It has a name and a description. Some examples of rules are:
|
||||
|
||||
- "is a *bodhi* message"
|
||||
- "is a *wiki edit* message"
|
||||
- "relates to the user *lmacken*"
|
||||
- "relates to the package *nethack*"
|
||||
- "relates to a package *owned by me in pkgdb*"
|
||||
|
||||
We have a long list of rules defined. You'll see them when you go to set up
|
||||
your first filter
|
||||
|
||||
Filter
|
||||
------
|
||||
|
||||
To craft your preferences, you will build filters out of rules. Filters have a
|
||||
name (that you give them). An example could be something like:
|
||||
|
||||
- My bodhi packager filter
|
||||
|
||||
- "is a bodhi message"
|
||||
- "relates to a package that I own"
|
||||
|
||||
You will receive notifications for this filter *if and only if* a given message
|
||||
**both** is a bodhi message and is about a package owned by you.
|
||||
|
||||
----
|
||||
|
||||
Note that, if you wanted to get notifications about bodhi updates created by
|
||||
multiple users, you would need to create distinct filters for each one.
|
||||
|
||||
- My bodhi lmacken filter
|
||||
|
||||
- "is a bodhi message"
|
||||
- "relates to the user **lmacken**"
|
||||
|
||||
- My bodhi toshio filter
|
||||
|
||||
- "is a bodhi message"
|
||||
- "relates to the user **toshio**"
|
||||
|
||||
You could not combine those both into the same filter, because *all rules on a
|
||||
filter* must evalulate to ``True`` for the filter to trigger a notification.
|
||||
|
||||
Messaging Context
|
||||
-----------------
|
||||
|
||||
This is the medium over which we'll send a message. You can have one set of
|
||||
preferences for an email messaging context, and another set of preferences for
|
||||
an irc messaging context.
|
||||
|
||||
When a fedmsg message arrives in the system, if *any one filter* on one of your
|
||||
messaging contexts evaluates to ``True``, then you will receive a notification
|
||||
for that context. If some filters evaluate to ``True`` for multiple contexts,
|
||||
you will receive notifications for all those contexts.
|
||||
|
||||
Dénouement
|
||||
==========
|
||||
|
||||
You can report `issues
|
||||
<https://github.com/fedora-infra/fmn/issues>`_ and find the
|
||||
`source <https://github.com/fedora-infra/fmn/>`_ on github.
|
||||
The development team hangs out in ``#fedora-apps``. Please do stop by and say
|
||||
hello.
|
||||
|
|
@ -1,68 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
"""
|
||||
GDPR SAR script for FMN.
|
||||
|
||||
Extract all preferences from a selected username and prints them in JSON to the
|
||||
standard output.
|
||||
"""
|
||||
|
||||
from __future__ import absolute_import, unicode_literals, print_function
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
import requests
|
||||
from six.moves.urllib.parse import urljoin
|
||||
|
||||
|
||||
ENV_USERNAME = "SAR_USERNAME"
|
||||
FMN_INSTANCE = "http://localhost/notifications-old/"
|
||||
FMN_CONTEXTS = ["email", "irc"]
|
||||
|
||||
log = logging.getLogger()
|
||||
|
||||
|
||||
def get_prefs(username, context):
|
||||
url = urljoin(
|
||||
FMN_INSTANCE,
|
||||
"api/{username}.id.fedoraproject.org/{context}/".format(
|
||||
username=username, context=context
|
||||
)
|
||||
)
|
||||
response = requests.get(url)
|
||||
if response.status_code >= 300:
|
||||
log.error("Could not get URL %s: %d %s",
|
||||
url, response.status_code, response.reason)
|
||||
return {}
|
||||
result = response.json()
|
||||
return result
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--debug", action="store_true")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
try:
|
||||
username = os.environ[ENV_USERNAME]
|
||||
except KeyError as e:
|
||||
print("Missing environment variable. {}".format(e), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.debug else logging.WARNING,
|
||||
stream=sys.stderr,
|
||||
)
|
||||
result = {}
|
||||
for context in FMN_CONTEXTS:
|
||||
result[context] = get_prefs(username, context)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -1,19 +0,0 @@
|
|||
#-*- coding: utf-8 -*-
|
||||
|
||||
# The three lines below are required to run on EL6 as EL6 has
|
||||
# two possible version of python-sqlalchemy and python-jinja2
|
||||
# These lines make sure the application uses the correct version.
|
||||
import __main__
|
||||
__main__.__requires__ = ['SQLAlchemy >= 0.7', 'jinja2 >= 2.4']
|
||||
import pkg_resources
|
||||
|
||||
import os
|
||||
os.environ['FMN_WEB_CONFIG'] = '/etc/fmn.web.cfg'
|
||||
|
||||
import logging
|
||||
logging.basicConfig(level='INFO')
|
||||
|
||||
# The most import line to make the wsgi working
|
||||
from fmn.web.app import app as application
|
||||
# Dangerous.. only use when testing.
|
||||
#application.debug = True
|
|
@ -1,161 +0,0 @@
|
|||
---
|
||||
# Configuration for the Fedora Notifications webapp
|
||||
- name: install needed packages
|
||||
package: name={{ item }} state=present
|
||||
with_items:
|
||||
- python-fmn
|
||||
- python-psycopg2
|
||||
- libsemanage-python
|
||||
- python-memcached
|
||||
- python-flask-openid
|
||||
- python-bunch
|
||||
notify:
|
||||
- restart apache
|
||||
when:
|
||||
inventory_hostname.startswith('notifs-web02.iad2')
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/frontend
|
||||
|
||||
- name: install needed packages
|
||||
package: name={{ item }} state=present
|
||||
with_items:
|
||||
- python3-fmn
|
||||
- python3-psycopg2
|
||||
- python3-libsemanage
|
||||
- python3-memcached
|
||||
- python3-flask-openid
|
||||
notify:
|
||||
- restart apache
|
||||
when:
|
||||
inventory_hostname.startswith('notifs-web01') or env == 'staging'
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/frontend
|
||||
|
||||
#- name: Install epel-testing fmn on stage
|
||||
# package: name={{ item }} state=present enablerepo=epel-testing
|
||||
# with_items:
|
||||
# - python-fmn
|
||||
# when: env == "staging"
|
||||
# notify:
|
||||
# - restart apache
|
||||
# tags:
|
||||
# - notifs
|
||||
# - notifs/frontend
|
||||
|
||||
- name: copy fmn app configuration
|
||||
template: >
|
||||
src={{ item }} dest=/etc/fedmsg.d/{{ item }}
|
||||
owner=apache group=apache mode=0600
|
||||
with_items:
|
||||
- fmn.web.py
|
||||
notify:
|
||||
- restart apache
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/frontend
|
||||
|
||||
- name: destroy a bogus config file brought in by python-datanommer-models
|
||||
file: dest=/etc/fedmsg.d/datanommer.py state=absent
|
||||
notify:
|
||||
- restart apache
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/frontend
|
||||
|
||||
- name: setup symlink to fedora theme
|
||||
file: >
|
||||
src=/usr/share/fmn/static/bootstrap-3.3.4-fedora
|
||||
dest=/usr/share/fmn/static/bootstrap
|
||||
state=link
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/frontend
|
||||
|
||||
- name: copy fmn httpd config
|
||||
template: >
|
||||
src=fmn.web.conf dest=/etc/httpd/conf.d/fmn.web.conf
|
||||
owner=apache group=apache mode=0644
|
||||
notify:
|
||||
- restart apache
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/frontend
|
||||
|
||||
- name: copy custom wsgi file
|
||||
copy: src=fmn.web.wsgi dest=/usr/share/fmn/fmn.web.wsgi mode=0644
|
||||
notify:
|
||||
- restart apache
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/frontend
|
||||
|
||||
- name: copy app configuration
|
||||
template: >
|
||||
src=fmn.web.cfg dest=/etc/fmn.web.cfg
|
||||
owner=root group=apache mode=0640
|
||||
notify:
|
||||
- restart apache
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/frontend
|
||||
|
||||
#- name: create live docs directory
|
||||
# file: >
|
||||
# dest=/usr/lib/python2.6/site-packages/fmn/web/docs/
|
||||
# state=directory
|
||||
# tags:
|
||||
# - notifs
|
||||
# - notifs/frontend
|
||||
|
||||
#- name: copy live docs
|
||||
# copy: src={{ item }} dest=/usr/lib/python2.6/site-packages/fmn/web/docs
|
||||
# with_fileglob: fedora-sitedocs/*.rst
|
||||
# tags:
|
||||
# - notifs
|
||||
# - notifs/frontend
|
||||
|
||||
#- name: apply selinux type to static files
|
||||
# file: >
|
||||
# dest=/usr/share/fmn/static
|
||||
# setype=httpd_sys_content_t
|
||||
# state=directory
|
||||
# recurse=yes
|
||||
# tags:
|
||||
# - notifs
|
||||
# - notifs/frontend
|
||||
# - selinux
|
||||
|
||||
- name: ensure selinux lets httpd talk to postgres, memcached, and mail
|
||||
seboolean: name={{item}} state=yes persistent=yes
|
||||
with_items:
|
||||
- httpd_can_network_connect_db
|
||||
- httpd_can_network_memcache
|
||||
- httpd_can_sendmail
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/frontend
|
||||
- selinux
|
||||
|
||||
- name: ensure selinux let's httpd talk to ipsilon
|
||||
seboolean: name={{item}} state=yes persistent=yes
|
||||
with_items:
|
||||
- httpd_can_network_connect
|
||||
tags:
|
||||
- notifs
|
||||
- notifs/frontend
|
||||
- selinux
|
||||
|
||||
- name: Install SAR script
|
||||
copy:
|
||||
src: fmn-sar.py
|
||||
dest: /usr/local/bin/fmn-sar.py
|
||||
mode: 0755
|
||||
owner: apache
|
||||
group: apache
|
||||
when: inventory_hostname.startswith('notifs-web01')
|
||||
tags:
|
||||
- notifs
|
||||
- SAR
|
||||
- GDPR
|
|
@ -1,23 +0,0 @@
|
|||
# See /etc/fedmsg.d/fmn.web.py for the db url
|
||||
|
||||
SECRET_KEY = '{{ notifs_secret_key }}'
|
||||
|
||||
FMN_ADMINS = [
|
||||
'kevin.id.fedoraproject.org', 'kevin.id.stg.fedoraproject.org',
|
||||
'ralph.id.fedoraproject.org', 'ralph.id.stg.fedoraproject.org',
|
||||
'pingou.id.fedoraproject.org', 'pingou.id.stg.fedoraproject.org',
|
||||
'ryanlerch.id.fedoraproject.org', 'ryanlerch.id.stg.fedoraproject.org',
|
||||
]
|
||||
|
||||
{% if env == 'staging' %}
|
||||
FAS_OPENID_CHECK_CERT = False
|
||||
FMN_FEDORA_OPENID = 'https://id.stg.fedoraproject.org'
|
||||
{% else %}
|
||||
FAS_OPENID_CHECK_CERT = True
|
||||
FMN_FEDORA_OPENID = 'https://id.fedoraproject.org'
|
||||
{% endif %}
|
||||
|
||||
FMN_ALLOW_FAS_OPENID = True
|
||||
FMN_ALLOW_GOOGLE_OPENID = False
|
||||
FMN_ALLOW_YAHOO_OPENID = False
|
||||
FMN_ALLOW_GENERIC_OPENID = False
|
|
@ -1,14 +0,0 @@
|
|||
Alias /notifications-old/static /usr/share/fmn/static
|
||||
|
||||
WSGIDaemonProcess fmn user=apache group=apache maximum-requests=1000 display-name=fmn processes={{ wsgi_procs }} threads={{ wsgi_threads }}
|
||||
WSGISocketPrefix run/wsgi
|
||||
WSGIRestrictStdout On
|
||||
WSGIRestrictSignal Off
|
||||
WSGIPythonOptimize 1
|
||||
|
||||
WSGIScriptAlias /notifications-old /usr/share/fmn/fmn.web.wsgi
|
||||
|
||||
<Location /notifications-old>
|
||||
WSGIProcessGroup fmn
|
||||
Require all granted
|
||||
</Location>
|
|
@ -1,51 +0,0 @@
|
|||
config = {
|
||||
# This is for *our* database
|
||||
{% if env == 'staging' %}
|
||||
"fmn.sqlalchemy.uri": "postgresql://{{notifs_db_user}}:{{notifs_db_password}}@db01.stg.iad2.fedoraproject.org/notifications",
|
||||
{% else %}
|
||||
"fmn.sqlalchemy.uri": "postgresql://{{notifs_db_user}}:{{notifs_db_password}}@db01.iad2.fedoraproject.org/notifications",
|
||||
{% endif %}
|
||||
# And this is for the datanommer database
|
||||
"datanommer.sqlalchemy.url": "postgresql://{{datanommerDBUser}}:{{ (env == 'production')|ternary(datanommerDBPassword, datanommer_stg_db_password) }}@db-datanommer01.iad2.fedoraproject.org/datanommer",
|
||||
|
||||
{% if env == 'staging' %}
|
||||
"fmn.backends": ["email", "irc", "android"],
|
||||
{% else %}
|
||||
"fmn.backends": ["email", "irc"], # android is disabled.
|
||||
{% endif %}
|
||||
|
||||
"fmn.web.default_login": "fedora_login",
|
||||
|
||||
{% if env == 'staging' %}
|
||||
"fas_credentials": {
|
||||
"username": "{{fedoraDummyUser}}",
|
||||
"password": "{{fedoraDummyUserPassword}}",
|
||||
"base_url": "https://accounts.stg.fedoraproject.org",
|
||||
},
|
||||
{% else %}
|
||||
"fas_credentials": {
|
||||
"username": "{{fedoraDummyUser}}",
|
||||
"password": "{{fedoraDummyUserPassword}}",
|
||||
},
|
||||
{% endif %}
|
||||
|
||||
# We need to know this to call VERFY to validate new addresses.
|
||||
"fmn.email.mailserver": "bastion01.iad2.fedoraproject.org:25",
|
||||
|
||||
# Some configuration for the rule processors
|
||||
"fmn.rules.utils.use_pkgdb2": False,
|
||||
"fmn.rules.utils.use_pagure_for_ownership": True,
|
||||
{% if env == 'staging' %}
|
||||
"fmn.rules.utils.pagure_api_url": "https://src.stg.fedoraproject.org/api/",
|
||||
{% else %}
|
||||
'fmn.rules.utils.pagure_api_url': 'https://src.fedoraproject.org/api/',
|
||||
{% endif %}
|
||||
|
||||
"fmn.rules.cache": {
|
||||
"backend": "dogpile.cache.memcached",
|
||||
"expiration_time": 3600, # 3600 is 1 hour
|
||||
"arguments": {
|
||||
"url": "memcached01:11211",
|
||||
},
|
||||
},
|
||||
}
|
|
@ -385,9 +385,6 @@ ALLOWDEVFILE=/dev/.udev/db/*
|
|||
ALLOWDEVFILE=/dev/.udev/rules.d/99-root.rules
|
||||
ALLOWDEVFILE=/dev/.udev/uevent_seqnum
|
||||
ALLOWDEVFILE=/dev/md/autorebuild.pid
|
||||
{% if ansible_hostname == 'notifs-backend01' %}
|
||||
ALLOWDEVFILE=/dev/shm/fmn-cache.dbm
|
||||
{% endif %}
|
||||
{% if inventory_hostname in groups['virtservers'] or inventory_hostname in groups['openqa_workers'] or inventory_hostname in groups['openqa_lab_workers'] %}
|
||||
# libvirt spice device makes a /dev/shm/spice file
|
||||
ALLOWDEVFILE=/dev/shm/spice.*
|
||||
|
|
Loading…
Reference in New Issue