[tcp] add support for otTcpForwardProgress callback (#7583)

This commit adds support for the remaining TCP callback, which was not
yet supported.

Originally, the unimplemented callback was otTcpSendReady, which would
indicate to the application when new data added to the TCP send buffer
would be sent out immediately. There was also discussion of adding an
otTcpBytesAcked callback, which, together with the
otTcpSendByExtension() call, would allow a circular buffer to be
implemented efficiently on top of the otLinkedBuffer API.

Ultimately, it seemed best to generalize the otTcpSendReady callback
to include both cases where bytes are acked by the connection peer,
and where the send buffer drains, allowing new data to be sent
immediately. The reasoning behind this decision is that both the
otTcpSendReady and otTcpBytesAcked callbacks are triggered by the same
event --- an ACK received from the connection peer --- and that it may
require the application to have to "coordinate state" across
callbacks. Having a single callback function to indicate both
conditions seemed like it could simplify applications significantly.

The new, combined, callback is otTcpForwardProgress. This commit
implements support and documentation for this callback function.
This commit is contained in:
Sam Kumar 2022-04-22 09:12:12 -07:00 committed by GitHub
parent 4bd7ab4b79
commit d79468bb18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 262 additions and 101 deletions

View File

@ -53,7 +53,7 @@ extern "C" {
* @note This number versions both OpenThread platform and user APIs.
*
*/
#define OPENTHREAD_API_VERSION (203)
#define OPENTHREAD_API_VERSION (204)
/**
* @addtogroup api-instance

View File

@ -29,7 +29,7 @@
/**
* @file
* @brief
* This file defines the OpenThread TCP API.
* This file defines the OpenThread TCP API.
*
*/
@ -64,7 +64,7 @@ typedef struct otLinkedBuffer
{
struct otLinkedBuffer *mNext; ///< Pointer to the next linked buffer in the chain, or NULL if it is the end.
const uint8_t * mData; ///< Pointer to data referenced by this linked buffer.
uint16_t mLength; ///< Length of this linked buffer (number of bytes).
size_t mLength; ///< Length of this linked buffer (number of bytes).
} otLinkedBuffer;
struct otTcpEndpoint;
@ -95,47 +95,74 @@ typedef void (*otTcpEstablished)(otTcpEndpoint *aEndpoint);
typedef void (*otTcpSendDone)(otTcpEndpoint *aEndpoint, otLinkedBuffer *aData);
/**
* This callback informs the application that the first @p aNumBytes in the
* send buffer have been acknowledged by the connection peer and that their
* underlying memory can be reclaimed by the application.
* This callback informs the application if forward progress has been made in
* transferring data from the send buffer to the recipient. This callback is
* not necessary for correct TCP operation. Most applications can just rely on
* the otTcpSendDone() callback to reclaim linked buffers once the TCP stack is
* done using them. The purpose of this callback is to support advanced
* applications that benefit from finer-grained information about how the
* the connection is making forward progress in transferring data to the
* connection peer.
*
* This callback is not necessary for correct TCP operation. Most applications
* can just rely on the otTcpSendDone() callback. If an application wants
* fine-grained feedback as memory in the send buffer becomes available again
* (instead of waiting for an entire linked buffer's worth of data becomes
* available) or some indication as to whether the connection is making forward
* progress, it can register this callback.
* This callback's operation is closely tied to TCP's send buffer. The send
* buffer can be understood as having two regions. First, there is the
* "in-flight" region at the head (front) of the send buffer. It corresponds
* to data which has been sent to the recipient, but is not yet acknowledged.
* Second, there is the "backlog" region, which consists of all data in the
* send buffer that is not in the "in-flight" region. The "backlog" region
* corresponds to data that is queued for sending, but has not yet been sent.
*
* @param[in] aEndpoint The TCP endpoint for the connection.
* @param[in] aNumBytes The number of bytes newly acknowledged by the connection peer.
* The callback is invoked in response to two types of events. First, the
* "in-flight" region of the send buffer may shrink (e.g., when the recipient
* acknowledges data that we sent earlier). Second, the "backlog" region of the
* send buffer may shrink (e.g., new data was sent out). These two conditions
* often occur at the same time, in response to an ACK segment from the
* connection peer, which is why they are combined in a single callback.
*
* The TCP stack only uses the @p aInSendBuffer bytes at the tail of the send
* buffer; when @p aInSendBuffer decreases by an amount x, it means that x
* additional bytes that were formerly at the head of the send buffer are no
* longer part of the send buffer and can now be reclaimed (i.e., overwritten)
* by the application. Note that the otLinkedBuffer structure itself can only
* be reclaimed once all bytes that it references are no longer part of the
* send buffer.
*
* This callback subsumes otTcpSendDone(), in the following sense: applications
* can determine when linked buffers can be reclaimed by comparing
* @p aInSendBuffer with how many bytes are in each linked buffer. However, we
* expect otTcpSendDone(), which directly conveys which otLinkedBuffers can be
* reclaimed, to be much simpler to use. If both callbacks are registered and
* are triggered by the same event (e.g., the same ACK segment received), then
* the otTcpSendDone() callback will be triggered first, followed by this
* callback.
*
* Additionally, this callback provides @p aBacklog, which indicates how many
* bytes of data in the send buffer are not yet in flight. For applications
* that only want to add data to the send buffer when there is an assurance
* that it will be sent out soon, it may be desirable to only send out data
* when @p aBacklog is suitably small (0 or close to 0). For example, an
* application may use @p aBacklog so that it can react to queue buildup by
* dropping or aggregating data to avoid creating a backlog of data.
*
* After a call to otTcpSendByReference() or otTcpSendByExtension() with a
* positive number of bytes, the otTcpForwardProgress() callback is guaranteed
* to be called, to indicate when the bytes that were added to the send buffer
* are sent out. The call to otTcpForwardProgress() may be made immediately
* after the bytes are added to the send buffer (if some of those bytes are
* immediately sent out, reducing the backlog), or sometime in the future (once
* the connection sends out some or all of the data, reducing the backlog). By
* "immediately," we mean that the callback is immediately scheduled for
* execution in a tasklet; to avoid reentrancy-related complexity, the
* otTcpForwardProgress() callback is never directly called from the
* otTcpSendByReference() or otTcpSendByExtension() functions.
*
* @param[in] aEndpoint The TCP endpoint for the connection.
* @param[in] aInSendBuffer The number of bytes in the send buffer (sum of "in-flight" and "backlog" regions).
* @param[in] aBacklog The number of bytes that are queued for sending but have not yet been sent (the "backlog"
* region).
*
*/
typedef void (*otTcpBytesAcked)(otTcpEndpoint *aEndpoint, size_t aNumBytes);
/**
* This callback informs the application that if data is added to the send
* buffer, some of it will be transmitted immediately without delay, as opposed
* to being queued for transmission once the peer ACKs some data.
*
* After a call to otTcpSendByReference() or otTcpSendByExtension(), the
* otTcpSendReady() callback is guaranteed to be called, either immediately (if
* the connection is already ready) or sometime in the future (once the
* connection becomes ready for more data).
*
* This callback is not necessary for correct TCP operation. If more data is
* added to the send buffer than can be transmitted without delay, it will
* simply be queued for transmission at a later time. This callback should be
* used only in cases where some assurance is desired that data added to the
* send buffer will be sent soon (e.g., TCP won't wait for the recipient to
* ACK some other data first before sending this data out). For example, you
* may use this callback if you'd rather have your data be dropped than develop
* a backlog of data in your send buffer. But for most applications, where this
* isn't a concern, it's expected that one would not use this callback at all.
*
* @param[in] aEndpoint The TCP endpoint for the connection.
*
*/
typedef void (*otTcpSendReady)(otTcpEndpoint *aEndpoint);
typedef void (*otTcpForwardProgress)(otTcpEndpoint *aEndpoint, size_t aInSendBuffer, size_t aBacklog);
/**
* This callback indicates the number of bytes available for consumption from
@ -226,7 +253,7 @@ struct otTcpEndpoint
otTcpEstablished mEstablishedCallback; ///< "Established" callback function
otTcpSendDone mSendDoneCallback; ///< "Send done" callback function
otTcpSendReady mSendReadyCallback; ///< "Send ready" callback function
otTcpForwardProgress mForwardProgressCallback; ///< "Forward progress" callback function
otTcpReceiveAvailable mReceiveAvailableCallback; ///< "Receive available" callback function
otTcpDisconnected mDisconnectedCallback; ///< "Disconnected" callback function
@ -234,6 +261,8 @@ struct otTcpEndpoint
otLinkedBuffer mReceiveLinks[2];
otSockAddr mSockAddr;
uint8_t mPendingCallbacks;
};
/**
@ -246,8 +275,7 @@ typedef struct otTcpEndpointInitializeArgs
otTcpEstablished mEstablishedCallback; ///< "Established" callback function
otTcpSendDone mSendDoneCallback; ///< "Send done" callback function
otTcpBytesAcked mBytesAckedCallback; ///< "Bytes acked" callback
otTcpSendReady mSendReadyCallback; ///< "Send ready" callback function
otTcpForwardProgress mForwardProgressCallback; ///< "Forward progress" callback function
otTcpReceiveAvailable mReceiveAvailableCallback; ///< "Receive available" callback function
otTcpDisconnected mDisconnectedCallback; ///< "Disconnected" callback function
@ -297,7 +325,9 @@ typedef struct otTcpEndpointInitializeArgs
* @retval OT_ERROR_FAILED Failed to open the TCP endpoint.
*
*/
otError otTcpEndpointInitialize(otInstance *aInstance, otTcpEndpoint *aEndpoint, otTcpEndpointInitializeArgs *aArgs);
otError otTcpEndpointInitialize(otInstance * aInstance,
otTcpEndpoint * aEndpoint,
const otTcpEndpointInitializeArgs *aArgs);
/**
* Obtains the otInstance that was associated with @p aEndpoint upon
@ -514,9 +544,10 @@ otError otTcpSendEndOfStream(otTcpEndpoint *aEndpoint);
*
* This immediately makes the TCP endpoint free for use for another connection
* and empties the send and receive buffers, transferring ownership of any data
* provided by the application in otTcpSendByReference() calls back to
* the application. The TCP endpoint's callbacks and memory for the receive
* buffer remain associated with the TCP endpoint.
* provided by the application in otTcpSendByReference() and
* otTcpSendByExtension() calls back to the application. The TCP endpoint's
* callbacks and memory for the receive buffer remain associated with the
* TCP endpoint.
*
* @param[in] aEndpoint A pointer to the TCP endpoint structure representing the TCP endpoint to abort.
*
@ -678,7 +709,9 @@ typedef struct otTcpListenerInitializeArgs
* @retval OT_ERROR_FAILED Failed to open the TCP listener.
*
*/
otError otTcpListenerInitialize(otInstance *aInstance, otTcpListener *aListener, otTcpListenerInitializeArgs *aArgs);
otError otTcpListenerInitialize(otInstance * aInstance,
otTcpListener * aListener,
const otTcpListenerInitializeArgs *aArgs);
/**
* Obtains the otInstance that was associated with @p aListener upon

View File

@ -42,7 +42,9 @@
using namespace ot;
otError otTcpEndpointInitialize(otInstance *aInstance, otTcpEndpoint *aEndpoint, otTcpEndpointInitializeArgs *aArgs)
otError otTcpEndpointInitialize(otInstance * aInstance,
otTcpEndpoint * aEndpoint,
const otTcpEndpointInitializeArgs *aArgs)
{
return AsCoreType(aEndpoint).Initialize(AsCoreType(aInstance), *aArgs);
}
@ -117,7 +119,9 @@ otError otTcpEndpointDeinitialize(otTcpEndpoint *aEndpoint)
return AsCoreType(aEndpoint).Deinitialize();
}
otError otTcpListenerInitialize(otInstance *aInstance, otTcpListener *aListener, otTcpListenerInitializeArgs *aArgs)
otError otTcpListenerInitialize(otInstance * aInstance,
otTcpListener * aListener,
const otTcpListenerInitializeArgs *aArgs)
{
return AsCoreType(aListener).Initialize(AsCoreType(aInstance), *aArgs);
}

View File

@ -41,6 +41,7 @@
#include "common/code_utils.hpp"
#include "common/error.hpp"
#include "common/instance.hpp"
#include "common/locator_getters.hpp"
#include "common/log.hpp"
#include "common/random.hpp"
#include "net/checksum.hpp"
@ -71,28 +72,31 @@ static_assert(offsetof(Tcp::Listener, mTcbListen) == 0, "mTcbListen field in otT
Tcp::Tcp(Instance &aInstance)
: InstanceLocator(aInstance)
, mTimer(aInstance, Tcp::HandleTimer)
, mTasklet(aInstance, Tcp::HandleTasklet)
, mEphemeralPort(kDynamicPortMin)
{
OT_UNUSED_VARIABLE(mEphemeralPort);
}
Error Tcp::Endpoint::Initialize(Instance &aInstance, otTcpEndpointInitializeArgs &aArgs)
Error Tcp::Endpoint::Initialize(Instance &aInstance, const otTcpEndpointInitializeArgs &aArgs)
{
Error error;
struct tcpcb &tp = GetTcb();
memset(&tp, 0x00, sizeof(tp));
SuccessOrExit(error = aInstance.Get<Tcp>().mEndpoints.Add(*this));
mContext = aArgs.mContext;
mEstablishedCallback = aArgs.mEstablishedCallback;
mSendDoneCallback = aArgs.mSendDoneCallback;
mSendReadyCallback = aArgs.mSendReadyCallback;
mForwardProgressCallback = aArgs.mForwardProgressCallback;
mReceiveAvailableCallback = aArgs.mReceiveAvailableCallback;
mDisconnectedCallback = aArgs.mDisconnectedCallback;
memset(mTimers, 0x00, sizeof(mTimers));
memset(&mSockAddr, 0x00, sizeof(mSockAddr));
memset(&tp, 0x00, sizeof(tp));
mPendingCallbacks = 0;
/*
* Initialize buffers --- formerly in initialize_tcb.
@ -186,16 +190,26 @@ exit:
Error Tcp::Endpoint::SendByReference(otLinkedBuffer &aBuffer, uint32_t aFlags)
{
Error error;
struct tcpcb &tp = GetTcb();
return BsdErrorToOtError(tcp_usr_send(&tp, (aFlags & OT_TCP_SEND_MORE_TO_COME) != 0, &aBuffer, 0));
size_t backlogBefore = GetBacklogBytes();
size_t sent = aBuffer.mLength;
SuccessOrExit(error = BsdErrorToOtError(tcp_usr_send(&tp, (aFlags & OT_TCP_SEND_MORE_TO_COME) != 0, &aBuffer, 0)));
PostCallbacksAfterSend(sent, backlogBefore);
exit:
return error;
}
Error Tcp::Endpoint::SendByExtension(size_t aNumBytes, uint32_t aFlags)
{
Error error;
bool moreToCome = (aFlags & OT_TCP_SEND_MORE_TO_COME) != 0;
struct tcpcb &tp = GetTcb();
bool moreToCome = (aFlags & OT_TCP_SEND_MORE_TO_COME) != 0;
struct tcpcb &tp = GetTcb();
size_t backlogBefore = GetBacklogBytes();
int bsdError;
VerifyOrExit(lbuf_head(&tp.sendbuf) != nullptr, error = kErrorInvalidState);
@ -203,6 +217,8 @@ Error Tcp::Endpoint::SendByExtension(size_t aNumBytes, uint32_t aFlags)
bsdError = tcp_usr_send(&tp, moreToCome ? 1 : 0, nullptr, aNumBytes);
SuccessOrExit(error = BsdErrorToOtError(bsdError));
PostCallbacksAfterSend(aNumBytes, backlogBefore);
exit:
return error;
}
@ -428,6 +444,49 @@ exit:
return calledUserCallback;
}
void Tcp::Endpoint::PostCallbacksAfterSend(size_t aSent, size_t aBacklogBefore)
{
size_t backlogAfter = GetBacklogBytes();
if (backlogAfter < aBacklogBefore + aSent && mForwardProgressCallback != nullptr)
{
mPendingCallbacks |= kForwardProgressCallbackFlag;
GetInstance().Get<Tcp>().mTasklet.Post();
}
}
bool Tcp::Endpoint::FirePendingCallbacks(void)
{
bool calledUserCallback = false;
if ((mPendingCallbacks & kForwardProgressCallbackFlag) != 0 && mForwardProgressCallback != nullptr)
{
mForwardProgressCallback(this, GetSendBufferBytes(), GetBacklogBytes());
calledUserCallback = true;
}
mPendingCallbacks = 0;
return calledUserCallback;
}
size_t Tcp::Endpoint::GetSendBufferBytes(void) const
{
const struct tcpcb &tp = GetTcb();
return lbuf_used_space(&tp.sendbuf);
}
size_t Tcp::Endpoint::GetInFlightBytes(void) const
{
const struct tcpcb &tp = GetTcb();
return tp.snd_max - tp.snd_una;
}
size_t Tcp::Endpoint::GetBacklogBytes(void) const
{
return GetSendBufferBytes() - GetInFlightBytes();
}
Address &Tcp::Endpoint::GetLocalIp6Address(void)
{
return *reinterpret_cast<Address *>(&GetTcb().laddr);
@ -465,7 +524,7 @@ exit:
return matches;
}
Error Tcp::Listener::Initialize(Instance &aInstance, otTcpListenerInitializeArgs &aArgs)
Error Tcp::Listener::Initialize(Instance &aInstance, const otTcpListenerInitializeArgs &aArgs)
{
Error error;
struct tcpcb_listen *tpl = &GetTcbListen();
@ -606,13 +665,14 @@ Error Tcp::HandleMessage(ot::Ip6::Header &aIp6Header, Message &aMessage, Message
int nextAction;
struct tcpcb * tp = &endpoint->GetTcb();
otLinkedBuffer *priorHead = lbuf_head(&tp->sendbuf);
otLinkedBuffer *priorHead = lbuf_head(&tp->sendbuf);
size_t priorBacklog = endpoint->GetSendBufferBytes() - endpoint->GetInFlightBytes();
memset(&sig, 0x00, sizeof(sig));
nextAction = tcp_input(ip6Header, tcpHeader, &aMessage, tp, nullptr, &sig);
if (nextAction != RELOOKUP_REQUIRED)
{
ProcessSignals(*endpoint, priorHead, sig);
ProcessSignals(*endpoint, priorHead, priorBacklog, sig);
ExitNow();
}
/* If the matching socket was in the TIME-WAIT state, then we try passive sockets. */
@ -634,14 +694,23 @@ exit:
return error;
}
void Tcp::ProcessSignals(Endpoint &aEndpoint, otLinkedBuffer *aPriorHead, struct tcplp_signals &aSignals)
void Tcp::ProcessSignals(Endpoint & aEndpoint,
otLinkedBuffer * aPriorHead,
size_t aPriorBacklog,
struct tcplp_signals &aSignals)
{
VerifyOrExit(IsInitialized(aEndpoint) && !aEndpoint.IsClosed());
if (aSignals.conn_established && aEndpoint.mEstablishedCallback != nullptr)
{
aEndpoint.mEstablishedCallback(&aEndpoint);
}
VerifyOrExit(IsInitialized(aEndpoint) && !aEndpoint.IsClosed());
if (aEndpoint.mSendDoneCallback != nullptr)
{
otLinkedBuffer *curr = aPriorHead;
for (int i = 0; i != aSignals.links_popped; i++)
for (uint32_t i = 0; i != aSignals.links_popped; i++)
{
otLinkedBuffer *next = curr->mNext;
@ -654,13 +723,19 @@ void Tcp::ProcessSignals(Endpoint &aEndpoint, otLinkedBuffer *aPriorHead, struct
}
VerifyOrExit(IsInitialized(aEndpoint) && !aEndpoint.IsClosed());
if (aSignals.conn_established && aEndpoint.mEstablishedCallback != nullptr)
if (aEndpoint.mForwardProgressCallback != nullptr)
{
aEndpoint.mEstablishedCallback(&aEndpoint);
size_t backlogBytes = aEndpoint.GetBacklogBytes();
if (aSignals.bytes_acked > 0 || backlogBytes < aPriorBacklog)
{
aEndpoint.mForwardProgressCallback(&aEndpoint, aEndpoint.GetSendBufferBytes(), backlogBytes);
aEndpoint.mPendingCallbacks &= ~kForwardProgressCallbackFlag;
}
}
VerifyOrExit(IsInitialized(aEndpoint) && !aEndpoint.IsClosed());
if ((aSignals.recvbuf_notempty || aSignals.rcvd_fin) && aEndpoint.mReceiveAvailableCallback != nullptr)
if ((aSignals.recvbuf_added || aSignals.rcvd_fin) && aEndpoint.mReceiveAvailableCallback != nullptr)
{
aEndpoint.mReceiveAvailableCallback(&aEndpoint, cbuf_used_space(&aEndpoint.GetTcb().recvbuf),
aEndpoint.GetTcb().reass_fin_index != -1,
@ -846,6 +921,25 @@ restart:
}
}
void Tcp::HandleTasklet(Tasklet &aTasklet)
{
OT_ASSERT(&aTasklet == &aTasklet.Get<Tcp>().mTasklet);
LogDebg("TCP tasklet invoked");
aTasklet.Get<Tcp>().ProcessCallbacks();
}
void Tcp::ProcessCallbacks(void)
{
for (Endpoint &endpoint : mEndpoints)
{
if (endpoint.FirePendingCallbacks())
{
mTasklet.Post();
break;
}
}
}
} // namespace Ip6
} // namespace ot

View File

@ -107,7 +107,7 @@ public:
* @retval kErrorFailed Failed to open the TCP endpoint.
*
*/
Error Initialize(Instance &aInstance, otTcpEndpointInitializeArgs &aArgs);
Error Initialize(Instance &aInstance, const otTcpEndpointInitializeArgs &aArgs);
/**
* Obtains the Instance that was associated with this Endpoint upon
@ -384,6 +384,13 @@ public:
void CancelTimer(uint8_t aTimerFlag);
bool FirePendingTimers(TimeMilli aNow, bool &aHasFutureTimer, TimeMilli &aEarliestFutureExpiry);
void PostCallbacksAfterSend(size_t aSent, size_t aBacklogBefore);
bool FirePendingCallbacks(void);
size_t GetSendBufferBytes(void) const;
size_t GetInFlightBytes(void) const;
size_t GetBacklogBytes(void) const;
Address & GetLocalIp6Address(void);
const Address &GetLocalIp6Address(void) const;
Address & GetForeignIp6Address(void);
@ -418,7 +425,7 @@ public:
* @retval kErrorFailed Failed to open the TCP listener.
*
*/
Error Initialize(Instance &aInstance, otTcpListenerInitializeArgs &aArgs);
Error Initialize(Instance &aInstance, const otTcpListenerInitializeArgs &aArgs);
/**
* Obtains the otInstance that was associated with this Listener upon
@ -656,7 +663,16 @@ private:
kDynamicPortMax = 65535, ///< Service Name and Transport Protocol Port Number Registry
};
void ProcessSignals(Endpoint &aEndpoint, otLinkedBuffer *aPriorHead, struct tcplp_signals &aSignals);
static constexpr uint8_t kEstablishedCallbackFlag = (1 << 0);
static constexpr uint8_t kSendDoneCallbackFlag = (1 << 1);
static constexpr uint8_t kForwardProgressCallbackFlag = (1 << 2);
static constexpr uint8_t kReceiveAvailableCallbackFlag = (1 << 3);
static constexpr uint8_t kDisconnectedCallbackFlag = (1 << 4);
void ProcessSignals(Endpoint & aEndpoint,
otLinkedBuffer * aPriorHead,
size_t aPriorBacklog,
struct tcplp_signals &aSignals);
static Error BsdErrorToOtError(int aBsdError);
bool CanBind(const SockAddr &aSockName);
@ -664,7 +680,11 @@ private:
static void HandleTimer(Timer &aTimer);
void ProcessTimers(void);
static void HandleTasklet(Tasklet &aTasklet);
void ProcessCallbacks(void);
TimerMilli mTimer;
Tasklet mTasklet;
LinkedList<Endpoint> mEndpoints;
LinkedList<Listener> mListeners;

View File

@ -1186,6 +1186,7 @@ tcp_do_segment(struct ip6_hdr* ip6, struct tcphdr *th, otMessage* msg,
{
uint32_t poppedbytes = lbuf_pop(&tp->sendbuf, acked, &sig->links_popped);
KASSERT(poppedbytes == acked, ("More bytes were acked than are in the send buffer"));
sig->bytes_acked += poppedbytes;
}
if (SEQ_GT(tp->snd_una, tp->snd_recover) &&
SEQ_LEQ(th->th_ack, tp->snd_recover))
@ -1329,10 +1330,9 @@ tcp_do_segment(struct ip6_hdr* ip6, struct tcphdr *th, otMessage* msg,
*/
if (!tpiscantrcv(tp)) {
size_t usedbefore = cbuf_used_space(&tp->recvbuf);
cbuf_write(&tp->recvbuf, msg, otMessageGetOffset(msg) + drop_hdrlen, tlen, cbuf_copy_from_message);
if (usedbefore == 0 && tlen > 0) {
sig->recvbuf_notempty = true;
if (tlen > 0) {
sig->recvbuf_added = true;
}
} else {
/*
@ -2213,10 +2213,12 @@ process_ACK:
tp->snd_wnd -= usedspace;
poppedbytes = lbuf_pop(&tp->sendbuf, usedspace, &sig->links_popped);
KASSERT(poppedbytes == usedspace, ("Could not fully empty send buffer"));
sig->bytes_acked += poppedbytes;
ourfinisacked = 1;
} else {
uint32_t poppedbytes = lbuf_pop(&tp->sendbuf, acked, &sig->links_popped);
KASSERT(poppedbytes == acked, ("Could not remove acked bytes from send buffer"));
sig->bytes_acked += poppedbytes;
tp->snd_wnd -= acked;
ourfinisacked = 0;
}
@ -2414,10 +2416,9 @@ step6:
* sbappendstream_locked(&so->so_rcv, m, 0);).
*/
if (!tpiscantrcv(tp)) {
size_t usedbefore = cbuf_used_space(&tp->recvbuf);
cbuf_write(&tp->recvbuf, msg, otMessageGetOffset(msg) + drop_hdrlen, tlen, cbuf_copy_from_message);
if (usedbefore == 0 && tlen > 0) {
sig->recvbuf_notempty = true;
if (tlen > 0) {
sig->recvbuf_added = true;
}
} else if (tlen > 0) {
/*

View File

@ -103,8 +103,8 @@ present:
KASSERT(merged == mergeable, ("Reassembly merge out of bounds: tried to merge %d, but merged %d", (int) mergeable, (int) merged));
if (tpiscantrcv(tp)) {
cbuf_pop(&tp->recvbuf, merged); // So no data really enters the buffer
} else if (usedbefore == 0 && merged > 0) {
sig->recvbuf_notempty = true;
} else if (merged > 0) {
sig->recvbuf_added = true;
}
} else {
/* If there is data in the buffer AND we can't receive more, then that must be because we received a FIN,

View File

@ -57,7 +57,7 @@ void lbuf_extend(struct lbufhead* buffer, size_t numbytes) {
buffer->tail->mLength += numbytes;
}
size_t lbuf_pop(struct lbufhead* buffer, size_t numbytes, int* ntraversed) {
size_t lbuf_pop(struct lbufhead* buffer, size_t numbytes, uint32_t* ntraversed) {
otLinkedBuffer* curr = buffer->head;
size_t bytesleft = numbytes;
size_t curroffset = buffer->offset;
@ -109,6 +109,6 @@ int lbuf_getrange(struct lbufhead* buffer, size_t offset, size_t numbytes,
return 0;
}
size_t lbuf_used_space(struct lbufhead* buffer) {
size_t lbuf_used_space(const struct lbufhead* buffer) {
return buffer->length;
}

View File

@ -65,7 +65,7 @@ void lbuf_extend(struct lbufhead* buffer, size_t numbytes);
NUMBYTES bytes in the buffer to begin with). *NTRAVERSED is incremented once
for each entry in the buffer that is no longer referenced and can be
reclaimed. */
size_t lbuf_pop(struct lbufhead* buffer, size_t numbytes, int* ntraversed);
size_t lbuf_pop(struct lbufhead* buffer, size_t numbytes, uint32_t* ntraversed);
/* Given a range of indices, specified by an OFFSET from the start and a
length NUMBYTES, this function locates the chain of linked buffer entries
@ -83,6 +83,6 @@ int lbuf_getrange(struct lbufhead* buffer, size_t offset, size_t numbytes,
struct otLinkedBuffer** last, size_t* lastextra);
/* Returns the total number of bytes stored in the buffer. */
size_t lbuf_used_space(struct lbufhead* buffer);
size_t lbuf_used_space(const struct lbufhead* buffer);
#endif

View File

@ -42,43 +42,52 @@ extern "C" {
#endif
#include <errno.h>
#include <openthread/ip6.h>
#include <openthread/message.h>
#include "bsdtcp/ip6.h"
#include "bsdtcp/tcp.h"
#include "bsdtcp/tcp_fsm.h"
#include "bsdtcp/tcp_timer.h"
#include "bsdtcp/tcp_var.h"
#include <openthread/ip6.h>
#include <openthread/message.h>
#define RELOOKUP_REQUIRED -1
#define CONN_LOST_NORMAL 0
struct tcplp_signals {
int links_popped;
bool conn_established;
bool recvbuf_notempty;
bool rcvd_fin;
struct tcplp_signals
{
uint32_t links_popped;
uint32_t bytes_acked;
bool conn_established;
bool recvbuf_added;
bool rcvd_fin;
};
/*
* Functions that the TCP protocol logic can call to interact with the rest of
* the system.
*/
otMessage* tcplp_sys_new_message(otInstance* instance);
void tcplp_sys_free_message(otInstance* instance, otMessage* pkt);
void tcplp_sys_send_message(otInstance* instance, otMessage* pkt, otMessageInfo* info);
uint32_t tcplp_sys_get_ticks();
uint32_t tcplp_sys_get_millis();
void tcplp_sys_set_timer(struct tcpcb* tcb, uint8_t timer_flag, uint32_t delay);
void tcplp_sys_stop_timer(struct tcpcb* tcb, uint8_t timer_flag);
struct tcpcb* tcplp_sys_accept_ready(struct tcpcb_listen* tpl, struct in6_addr* addr, uint16_t port);
bool tcplp_sys_accepted_connection(struct tcpcb_listen* tpl, struct tcpcb* accepted, struct in6_addr* addr, uint16_t port);
void tcplp_sys_connection_lost(struct tcpcb* tcb, uint8_t errnum);
void tcplp_sys_on_state_change(struct tcpcb* tcb, int newstate);
void tcplp_sys_log(const char* format, ...);
void tcplp_sys_panic(const char* format, ...);
bool tcplp_sys_autobind(otInstance *aInstance, const otSockAddr *aPeer, otSockAddr *aToBind, bool aBindAddress, bool aBindPort);
uint32_t tcplp_sys_generate_isn();
otMessage * tcplp_sys_new_message(otInstance *instance);
void tcplp_sys_free_message(otInstance *instance, otMessage *pkt);
void tcplp_sys_send_message(otInstance *instance, otMessage *pkt, otMessageInfo *info);
uint32_t tcplp_sys_get_ticks();
uint32_t tcplp_sys_get_millis();
void tcplp_sys_set_timer(struct tcpcb *tcb, uint8_t timer_flag, uint32_t delay);
void tcplp_sys_stop_timer(struct tcpcb *tcb, uint8_t timer_flag);
struct tcpcb *tcplp_sys_accept_ready(struct tcpcb_listen *tpl, struct in6_addr *addr, uint16_t port);
bool tcplp_sys_accepted_connection(struct tcpcb_listen *tpl,
struct tcpcb * accepted,
struct in6_addr * addr,
uint16_t port);
void tcplp_sys_connection_lost(struct tcpcb *tcb, uint8_t errnum);
void tcplp_sys_on_state_change(struct tcpcb *tcb, int newstate);
void tcplp_sys_log(const char *format, ...);
void tcplp_sys_panic(const char *format, ...);
bool tcplp_sys_autobind(otInstance * aInstance,
const otSockAddr *aPeer,
otSockAddr * aToBind,
bool aBindAddress,
bool aBindPort);
uint32_t tcplp_sys_generate_isn();
#ifdef __cplusplus
} // extern "C"