bots: Do AMQP interaction over SSL
Move from py-amqp to pika, as this works better for SNI and keeping the TCP connection alive. Change .tasks to support both AMQP and polling mode. The cloud now has to opt into AMQP by exporting the `$AMQP_SERVER` env var into pods. Closes #11109
This commit is contained in:
parent
da14d2b795
commit
eebed02b40
8
.tasks
8
.tasks
|
@ -13,8 +13,12 @@
|
|||
|
||||
set -ex
|
||||
|
||||
# Scan for all tests
|
||||
bots/tests-scan
|
||||
# Consume item from task queue or scan for all tests
|
||||
if [ -n "$AMQP_SERVER" ]; then
|
||||
echo bots/run-queue --amqp "$AMQP_SERVER"
|
||||
else
|
||||
bots/tests-scan
|
||||
fi
|
||||
|
||||
# When run automated, randomize to minimize stampeding herd
|
||||
if [ -t 0 ]; then
|
||||
|
|
|
@ -21,39 +21,49 @@ MAX_PRIORITY = 9
|
|||
|
||||
import argparse
|
||||
import sys
|
||||
import time
|
||||
import ssl
|
||||
|
||||
import amqp
|
||||
import pika
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Read and print messages from the queue without acknowleding them')
|
||||
parser.add_argument('--amqp', default='localhost',
|
||||
help='The URL of the AMQP server to consume from')
|
||||
parser.add_argument('--queue', default='tasks',
|
||||
help='The name of the queue to consume from')
|
||||
parser.add_argument('-n', '--number', default=0,
|
||||
help='Number of queue items to show, starting from the front (0=all)')
|
||||
parser.add_argument('-t', '--timeout', default=0.5,
|
||||
help='Time to wait for messages')
|
||||
parser.add_argument('--amqp', default='localhost:5671',
|
||||
help='The host:port of the AMQP server to consume from (default: %(default)s)')
|
||||
opts = parser.parse_args()
|
||||
|
||||
connection = amqp.Connection(host=opts.amqp)
|
||||
connection.connect()
|
||||
channel = connection.channel()
|
||||
try:
|
||||
channel.queue_declare(queue=opts.queue, passive=True, auto_delete=False)
|
||||
except amqp.exceptions.NotFound:
|
||||
sys.stdout.write('queue is empty\n')
|
||||
return 0
|
||||
channel.basic_qos(0, opts.number, True)
|
||||
host, port = opts.amqp.split(':')
|
||||
except ValueError:
|
||||
parser.error('Please format --amqp as host:port')
|
||||
|
||||
def callback(msg):
|
||||
sys.stdout.write("{0}\n".format(msg.body))
|
||||
msg.channel.basic_reject(msg.delivery_tag, requeue=True)
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(
|
||||
host=host,
|
||||
port=int(port),
|
||||
ssl=True,
|
||||
ssl_options=pika.SSLOptions(
|
||||
ssl_version=ssl.PROTOCOL_TLSv1_2,
|
||||
cafile='/run/secrets/webhook/ca.pem',
|
||||
keyfile='/run/secrets/webhook/amqp-client.key',
|
||||
certfile='/run/secrets/webhook/amqp-client.pem',
|
||||
server_hostname=host),
|
||||
credentials=pika.credentials.ExternalCredentials()))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.basic_consume(callback=callback, queue=opts.queue)
|
||||
time.sleep(opts.timeout)
|
||||
sys.stdout.flush()
|
||||
def print_queue(queue):
|
||||
try:
|
||||
declare_result = channel.queue_declare(queue=queue, passive=True, auto_delete=False)
|
||||
except pika.exceptions.ChannelClosed:
|
||||
print("queue {0} doesn't exist".format(queue))
|
||||
return
|
||||
for i in range(declare_result.method.message_count):
|
||||
method_frame, header_frame, body = channel.basic_get(queue=queue)
|
||||
if method_frame:
|
||||
print(body)
|
||||
|
||||
print('public queue:')
|
||||
print_queue('public')
|
||||
print('rhel queue:')
|
||||
print_queue('rhel')
|
||||
connection.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -21,40 +21,72 @@ MAX_PRIORITY = 9
|
|||
|
||||
import argparse
|
||||
import json
|
||||
import random
|
||||
import ssl
|
||||
import subprocess
|
||||
import sys
|
||||
import urllib.request, urllib.parse, urllib.error
|
||||
|
||||
import pika
|
||||
from task import REDHAT_PING
|
||||
|
||||
# Check if we have access to Red Hat network
|
||||
try:
|
||||
urllib.request.urlopen(REDHAT_PING).read()
|
||||
run_redhat_tasks = True
|
||||
except IOError:
|
||||
run_redhat_tasks = False
|
||||
|
||||
import amqp
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Bot: read a single test command from the queue and execute it')
|
||||
parser.add_argument('--amqp', default='localhost',
|
||||
help='The URL of the AMQP server to consume from')
|
||||
parser.add_argument('--queue', default='tasks',
|
||||
help='The name of the queue to consume from')
|
||||
parser.add_argument('--amqp', default='localhost:5671',
|
||||
help='The host:port of the AMQP server to consume from (default: %(default)s)')
|
||||
opts = parser.parse_args()
|
||||
|
||||
connection = amqp.Connection(host=opts.amqp)
|
||||
connection.connect()
|
||||
channel = connection.channel()
|
||||
try:
|
||||
channel.queue_declare(queue=opts.queue, passive=True, auto_delete=False)
|
||||
except amqp.exceptions.NotFound:
|
||||
sys.stdout.write('queue is empty\n')
|
||||
host, port = opts.amqp.split(':')
|
||||
except ValueError:
|
||||
parser.error('Please format --amqp as host:port')
|
||||
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(
|
||||
host=host,
|
||||
port=int(port),
|
||||
ssl=True,
|
||||
ssl_options=pika.SSLOptions(
|
||||
ssl_version=ssl.PROTOCOL_TLSv1_2,
|
||||
cafile='/run/secrets/webhook/ca.pem',
|
||||
keyfile='/run/secrets/webhook/amqp-client.key',
|
||||
certfile='/run/secrets/webhook/amqp-client.pem',
|
||||
server_hostname=host),
|
||||
credentials=pika.credentials.ExternalCredentials()))
|
||||
channel = connection.channel()
|
||||
|
||||
try:
|
||||
declare_public_result = channel.queue_declare(queue='public', passive=True, auto_delete=False)
|
||||
declare_rhel_result = channel.queue_declare(queue='rhel', passive=True, auto_delete=False)
|
||||
except pika.exceptions.ChannelClosed:
|
||||
print("One of the queues doesn't exist")
|
||||
return 0
|
||||
|
||||
channel.basic_qos(0, 1, True)
|
||||
queue='public'
|
||||
if run_redhat_tasks:
|
||||
# Try the rhel queue if the public queue is empty
|
||||
if declare_public_result.method.message_count == 0:
|
||||
queue = 'rhel'
|
||||
# If both are non-empty, shuffle
|
||||
elif declare_rhel_result.method.message_count > 0:
|
||||
queue = ['public', 'rhel'][random.randrange(2)]
|
||||
|
||||
# Get one item from the queue if present
|
||||
msg = channel.basic_get(queue=opts.queue)
|
||||
if msg:
|
||||
body = json.loads(msg.body)
|
||||
method_frame, header_frame, body = channel.basic_get(queue=queue)
|
||||
if method_frame:
|
||||
body = json.loads(body)
|
||||
sys.stderr.write("Consuming {0} task:\n{1}\n".format(body['type'],json.dumps(body, indent=2, sort_keys=True)))
|
||||
sys.stderr.flush()
|
||||
if subprocess.call(body['command'], shell=True) in [0, 2]:
|
||||
channel.basic_ack(msg.delivery_tag)
|
||||
channel.basic_ack(method_frame.delivery_tag)
|
||||
|
||||
channel.close()
|
||||
connection.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -90,6 +90,7 @@ import contextlib
|
|||
import os
|
||||
import json
|
||||
import pipes
|
||||
import ssl
|
||||
import sys
|
||||
import time
|
||||
import itertools
|
||||
|
@ -101,7 +102,7 @@ from task import github, label, REDHAT_PING
|
|||
|
||||
no_amqp = False
|
||||
try:
|
||||
import amqp
|
||||
import pika
|
||||
except ImportError:
|
||||
no_amqp = True
|
||||
|
||||
|
@ -128,13 +129,11 @@ def main():
|
|||
parser.add_argument('-s', '--sha', default=None,
|
||||
help='SHA beloging to pull request to scan for tasks')
|
||||
parser.add_argument('--amqp', default=None,
|
||||
help='The URL of the AMQP server to publish to')
|
||||
parser.add_argument('--queue', default='tasks',
|
||||
help='The name of the queue to publish to')
|
||||
help='The host:port of the AMQP server to publish to (format host:port)')
|
||||
|
||||
opts = parser.parse_args()
|
||||
if opts.amqp and no_amqp:
|
||||
sys.stderr.write("AMQP URL specified but python-amqp not available\n")
|
||||
sys.stderr.write("AMQP host:port specified but python-amqp not available\n")
|
||||
return 1
|
||||
api = github.GitHub(repo=opts.repo)
|
||||
|
||||
|
@ -172,14 +171,29 @@ def default_policy():
|
|||
return policy
|
||||
|
||||
@contextlib.contextmanager
|
||||
def distributed_queue(host, queue):
|
||||
connection = amqp.Connection(host=host)
|
||||
connection.connect()
|
||||
def distributed_queue(amqp_server):
|
||||
try:
|
||||
host, port = amqp_server.split(':')
|
||||
except ValueError:
|
||||
sys.stderr.write('Please format --amqp as host:port\n')
|
||||
sys.exit(1)
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(
|
||||
host=host,
|
||||
port=int(port),
|
||||
ssl=True,
|
||||
ssl_options=pika.SSLOptions(
|
||||
ssl_version=ssl.PROTOCOL_TLSv1_2,
|
||||
cafile='/run/secrets/webhook/ca.pem',
|
||||
keyfile='/run/secrets/webhook/amqp-client.key',
|
||||
certfile='/run/secrets/webhook/amqp-client.pem',
|
||||
server_hostname=host),
|
||||
credentials=pika.credentials.ExternalCredentials()))
|
||||
channel = connection.channel()
|
||||
arguments = {
|
||||
"x-max-priority": MAX_PRIORITY
|
||||
}
|
||||
channel.queue_declare(queue=queue, arguments=arguments, auto_delete=False)
|
||||
channel.queue_declare(queue='public', arguments=arguments, auto_delete=False)
|
||||
channel.queue_declare(queue='rhel', arguments=arguments, auto_delete=False)
|
||||
yield channel
|
||||
connection.close()
|
||||
|
||||
|
@ -202,8 +216,8 @@ def tests_human(priority, name, number, revision, ref, context, base, repo, bots
|
|||
|
||||
# Prepare a test invocation command
|
||||
def tests_invoke(priority, name, number, revision, ref, context, base, repo, bots_ref, options):
|
||||
if not run_redhat_tasks and (context in REDHAT_VERIFY or
|
||||
context in REDHAT_EXTERNAL_PROJECTS.get(repo, [])):
|
||||
if not options.amqp and not run_redhat_tasks and (context in REDHAT_VERIFY or
|
||||
context in REDHAT_EXTERNAL_PROJECTS.get(repo, [])):
|
||||
return ''
|
||||
|
||||
try:
|
||||
|
@ -270,8 +284,10 @@ def queue_test(priority, name, number, revision, ref, context, base, repo, bots_
|
|||
"ref": ref,
|
||||
"name": name,
|
||||
}
|
||||
msg = amqp.Message(body=json.dumps(body), priority=priority)
|
||||
queue.basic_publish(msg, routing_key=options.queue)
|
||||
if (context in REDHAT_VERIFY or context in REDHAT_EXTERNAL_PROJECTS.get(repo, [])):
|
||||
queue.basic_publish('', 'rhel', json.dumps(body), properties=pika.BasicProperties(priority=priority))
|
||||
else:
|
||||
queue.basic_publish('', 'public', json.dumps(body), properties=pika.BasicProperties(priority=priority))
|
||||
|
||||
def prioritize(status, title, labels, priority, context):
|
||||
state = status.get("state", None)
|
||||
|
@ -461,7 +477,7 @@ def scan_for_pull_tasks(api, policy, opts, repo):
|
|||
if not opts.amqp:
|
||||
func = lambda x: tests_invoke(*x, options=opts)
|
||||
return list(map(func, results))
|
||||
with distributed_queue(opts.amqp, opts.queue) as queue:
|
||||
with distributed_queue(opts.amqp) as queue:
|
||||
func = lambda x: queue_test(*x, queue=queue, options=opts)
|
||||
return list(map(func, results))
|
||||
|
||||
|
|
Loading…
Reference in New Issue