bridge: send acks on receiving data

In Cockpit Navigator we want to support uploading large files and sent
data in chunks. As fsreplace1 lacks flow control navigator itself has to
make sure it doesn't flood the channel with data but before this commit
couldn't determine when to back off.

Add a new channel-level option — `send-acks` — for sending "ack" control
messages for each data frame we receive.  At first, this new option
supports a single value, `bytes`, which means to send ack messages with
the number of bytes acknowledged.  With this option enabled, the bridge
will send:

```
  {"command": "ack", "bytes": 16384}
```

for a received data frame containing 16384 bytes.  The bridge is also
free to combine acknowledgements (ie: the "bytes" field can be the sum
from multiple messages).

We can support this for all channels at the outset, even if the support
is somewhat preliminary: if do_data() returns None (as it does for all
existing channels) we send the acknowledgement immediately.  Channels
can decide to handle deferred sending on their own by returning True
(which is a new possibility).  We do that for AsyncChannel.

The implementation for ProtocolChannel is sufficient if the bottleneck
is the network upload speed but will fall over if the cause of
throttling is the command (or socket) not accepting data.  This is not
going to be trivial to fix, and will be handled in a later version.

Co-authored-by: Jelle van der Waa <jvanderwaa@redhat.com>
This commit is contained in:
Allison Karlitskaya 2024-04-08 11:49:34 +02:00
parent 0b3f1ab655
commit a605e9cdb7
3 changed files with 116 additions and 14 deletions

View File

@ -112,6 +112,8 @@ The following fields are defined:
* "capabilities": Optional, array of capability strings required from the bridge
* "session": Optional, set to "private" or "shared". Defaults to "shared"
* "flow-control": Optional boolean whether the channel should throttle itself via flow control.
* "send-acks": Set to "bytes" to send "ack" messages after processing each data frame
If "binary" is set to "raw" then this channel transfers binary messages.
@ -162,7 +164,13 @@ channels in the "fence" group are closed before resuming.
The "flow-control" option controls whether a channel should attempt to throttle
itself via flow control when sending or receiving large amounts of data. The
current default (when this option is not provided) is to not do flow control.
However, this default will likely change in the future.
However, this default will likely change in the future. This only impacts data
sent by the bridge to the browser.
If "send-acks" is set to "bytes" then the bridge will send acknowledgement
messages detailing the number of payload bytes that it has received and
processed. This mechanism is provided for senders (ie: in the browser) who
wish to throttle the data that they're sending to the bridge.
**Host values**

View File

@ -21,7 +21,7 @@ import logging
import traceback
from typing import BinaryIO, ClassVar, Dict, Generator, List, Mapping, Optional, Sequence, Set, Tuple, Type
from .jsonutil import JsonError, JsonObject, JsonValue, create_object, get_bool, get_str
from .jsonutil import JsonError, JsonObject, JsonValue, create_object, get_bool, get_enum, get_str
from .protocol import CockpitProblem
from .router import Endpoint, Router, RoutingRule
@ -92,6 +92,7 @@ class Channel(Endpoint):
_send_pings: bool = False
_out_sequence: int = 0
_out_window: int = SEND_WINDOW
_ack_bytes: bool
# Task management
_tasks: Set[asyncio.Task]
@ -106,15 +107,16 @@ class Channel(Endpoint):
group = ''
# input
def do_control(self, command, message):
def do_control(self, command: str, message: JsonObject) -> None:
# Break the various different kinds of control messages out into the
# things that our subclass may be interested in handling. We drop the
# 'message' field for handlers that don't need it.
if command == 'open':
self._tasks = set()
self.channel = message['channel']
self.channel = get_str(message, 'channel')
if get_bool(message, 'flow-control', default=False):
self._send_pings = True
self._ack_bytes = get_enum(message, 'send-acks', ['bytes'], None) is not None
self.group = get_str(message, 'group', 'default')
self.freeze_endpoint()
self.do_open(message)
@ -177,6 +179,10 @@ class Channel(Endpoint):
def do_ping(self, message: JsonObject) -> None:
self.send_pong(message)
def send_ack(self, data: bytes) -> None:
if self._ack_bytes:
self.send_control('ack', bytes=len(data))
def do_channel_data(self, channel: str, data: bytes) -> None:
# Already closing? Ignore.
if self._close_args is not None:
@ -184,13 +190,21 @@ class Channel(Endpoint):
# Catch errors and turn them into close messages
try:
self.do_data(data)
if not self.do_data(data):
self.send_ack(data)
except ChannelError as exc:
self.close(exc.get_attrs())
def do_data(self, _data: bytes) -> None:
def do_data(self, data: bytes) -> 'bool | None':
"""Handles incoming data to the channel.
Return value is True if the channel takes care of send acks on its own,
in which case it should call self.send_ack() on `data` at some point.
None or False means that the acknowledgement is sent automatically."""
# By default, channels can't receive data.
del data
self.close()
return True
# output
def ready(self, **kwargs: JsonValue) -> None:
@ -459,7 +473,7 @@ class AsyncChannel(Channel):
# Three possibilities for what we'll find:
# - None (EOF) → return None
# - a ping → send a pong
# - bytes → return it (possibly empty)
# - bytes (possibly empty) → ack the receipt, and return it
while True:
item = await self.receive_queue.get()
if item is None:
@ -467,6 +481,7 @@ class AsyncChannel(Channel):
if isinstance(item, Mapping):
self.send_pong(item)
else:
self.send_ack(item)
return item
async def write(self, data: bytes) -> None:
@ -504,13 +519,9 @@ class AsyncChannel(Channel):
def do_ping(self, message: JsonObject) -> None:
self.receive_queue.put_nowait(message)
def do_data(self, data: bytes) -> None:
if not isinstance(data, bytes):
# this will persist past this callback, so make sure we take our
# own copy, in case this was a memoryview into a bytearray.
data = bytes(data)
def do_data(self, data: bytes) -> bool:
self.receive_queue.put_nowait(data)
return True # we will send the 'ack' later (from read())
class GeneratorChannel(Channel):

View File

@ -20,7 +20,7 @@ import pytest
from cockpit._vendor.systemd_ctypes import bus
from cockpit.bridge import Bridge
from cockpit.channel import Channel
from cockpit.channel import AsyncChannel, Channel, ChannelRoutingRule
from cockpit.channels import CHANNEL_TYPES
from cockpit.jsonutil import JsonDict, JsonObject, JsonValue, get_bool, get_dict, get_int, json_merge_patch
from cockpit.packages import BridgeConfig
@ -538,6 +538,16 @@ async def test_fsreplace1(transport: MockTransport, tmp_path: Path) -> None:
await transport.check_close(channel=ch)
assert not myfile.exists()
# acks
ch = await transport.check_open('fsreplace1', path=str(myfile), send_acks='bytes')
transport.send_data(ch, b'some stuff')
await transport.assert_msg('', command='ack', bytes=10, channel=ch)
transport.send_data(ch, b'some more stuff')
await transport.assert_msg('', command='ack', bytes=15, channel=ch)
transport.send_done(ch)
await transport.assert_msg('', command='done', channel=ch)
await transport.check_close(channel=ch)
@pytest.mark.asyncio
async def test_fsreplace1_change_conflict(transport: MockTransport, tmp_path: Path) -> None:
@ -616,6 +626,13 @@ async def test_fsreplace1_error(transport: MockTransport, tmp_path: Path) -> Non
transport.send_done(ch)
await transport.assert_msg('', command='close', channel=ch, problem='not-found')
# invalid send-acks option
await transport.check_open('fsreplace1', path=str(tmp_path), send_acks='not-valid',
problem='protocol-error',
reply_keys={
'message': """attribute 'send-acks': invalid value "not-valid" not in ['bytes']"""
})
@pytest.mark.asyncio
@pytest.mark.parametrize('channeltype', CHANNEL_TYPES)
@ -735,6 +752,72 @@ def test_get_os_release(os_release: str, expected: str) -> None:
assert Bridge.get_os_release() == expected
class AckChannel(AsyncChannel):
payload = 'ack1'
async def run(self, options: JsonObject) -> None:
self.semaphore = asyncio.Semaphore(0)
self.ready()
while await self.read():
await self.semaphore.acquire()
@pytest.mark.asyncio
async def test_async_acks(bridge: Bridge, transport: MockTransport) -> None:
# Inject our mock channel type
for rule in bridge.routing_rules:
if isinstance(rule, ChannelRoutingRule):
rule.table['ack1'] = [AckChannel]
# invalid send-acks values
await transport.check_open('ack1', send_acks=True, problem='protocol-error')
await transport.check_open('ack1', send_acks='x', problem='protocol-error')
# open the channel with acks off
ch = await transport.check_open('ack1')
# send a bunch of data and get no acks
for _ in range(20):
transport.send_data(ch, b'x')
# this will assert that we receive only the close message (and no acks)
await transport.check_close(ch)
# open the channel with acks on
ch = await transport.check_open('ack1', send_acks='bytes')
# send a bunch of data
for _ in range(20):
transport.send_data(ch, b'x')
# we should get exactly one ack (from the first read) before things block
await transport.assert_msg('', channel=ch, command='ack', bytes=1)
# this will assert that we receive only the close message (and no additional acks)
await transport.check_close(ch)
# open the channel with acks on
ch = await transport.check_open('ack1', send_acks='bytes')
# fish the open channel out of the bridge
ack = bridge.open_channels[ch]
assert isinstance(ack, AckChannel)
# let's give ourselves a bit more headroom
for _ in range(5):
ack.semaphore.release()
# send a bunch of data and get some acks
for _ in range(10):
transport.send_data(ch, b'x')
for _ in range(6):
await transport.assert_msg('', channel=ch, command='ack', bytes=1)
# make sure that as we "consume" the data we get more acks:
for _ in range(4):
# no ack in the queue...
await transport.assert_empty()
ack.semaphore.release()
# ... but now there is.
await transport.assert_msg('', channel=ch, command='ack', bytes=1)
# make some more room (for data we didn't send)
for _ in range(5):
ack.semaphore.release()
# but we shouldn't have gotten any acks for those
await transport.check_close(ch)
@pytest.mark.asyncio
async def test_flow_control(transport: MockTransport, tmp_path: Path) -> None:
bigun = tmp_path / 'bigun'