diff --git a/CMakeLists.txt b/CMakeLists.txt index 1f1d7ac71..e0e42d552 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,6 +31,7 @@ prevent_in_source_build() option(TD_ENABLE_JNI "Use \"ON\" to enable JNI-compatible TDLib API.") option(TD_ENABLE_DOTNET "Use \"ON\" to enable generation of C++/CLI or C++/CX TDLib API bindings.") +option(TD_EXPERIMENTAL_WATCH_OS "Use \"ON\" to enable watch os support.") if (TD_ENABLE_DOTNET AND (CMAKE_VERSION VERSION_LESS "3.1.0")) message(FATAL_ERROR "CMake 3.1.0 or higher is required. You are running version ${CMAKE_VERSION}.") diff --git a/td/mtproto/HandshakeActor.cpp b/td/mtproto/HandshakeActor.cpp index 9afca6ce7..a3cf20745 100644 --- a/td/mtproto/HandshakeActor.cpp +++ b/td/mtproto/HandshakeActor.cpp @@ -55,8 +55,8 @@ void HandshakeActor::return_connection(Status status) { CHECK(!raw_connection_promise_); return; } - if (status.is_error() && !raw_connection->debug_str_.empty()) { - status = Status::Error(status.code(), PSLICE() << status.message() << " : " << raw_connection->debug_str_); + if (status.is_error() && !raw_connection->extra().debug_str.empty()) { + status = Status::Error(status.code(), PSLICE() << status.message() << " : " << raw_connection->extra().debug_str); } Scheduler::unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref()); if (raw_connection_promise_) { diff --git a/td/mtproto/Ping.cpp b/td/mtproto/Ping.cpp index e4ff9def3..b53181526 100644 --- a/td/mtproto/Ping.cpp +++ b/td/mtproto/Ping.cpp @@ -85,7 +85,7 @@ ActorOwn<> create_ping_actor(string debug, unique_ptr raw_connect raw_connection->close(); promise_.set_error(std::move(status)); } else { - raw_connection->rtt_ = ping_connection_->rtt(); + raw_connection->extra().rtt = ping_connection_->rtt(); if (raw_connection->stats_callback()) { raw_connection->stats_callback()->on_pong(); } diff --git a/td/mtproto/RawConnection.cpp b/td/mtproto/RawConnection.cpp index 835823498..6cefa1faf 100644 --- a/td/mtproto/RawConnection.cpp +++ b/td/mtproto/RawConnection.cpp @@ -15,148 +15,440 @@ #include "td/utils/Status.h" #include "td/utils/StorerBase.h" +#if TD_EXPERIMENTAL_WATCH_OS +#include "td/net/DarwinHttp.h" +#endif + #include namespace td { namespace mtproto { -void RawConnection::send_crypto(const Storer &storer, int64 session_id, int64 salt, const AuthKey &auth_key, - uint64 quick_ack_token) { - PacketInfo info; - info.version = 2; - info.no_crypto_flag = false; - info.salt = salt; - info.session_id = session_id; - info.use_random_padding = transport_->use_random_padding(); - - auto packet = BufferWriter{Transport::write(storer, auth_key, &info), transport_->max_prepend_size(), - transport_->max_append_size()}; - Transport::write(storer, auth_key, &info, packet.as_slice()); - - bool use_quick_ack = false; - if (quick_ack_token != 0 && transport_->support_quick_ack()) { - auto tmp = quick_ack_to_token_.emplace(info.message_ack, quick_ack_token); - if (tmp.second) { - use_quick_ack = true; - } else { - LOG(ERROR) << "Quick ack " << info.message_ack << " collision"; - } +class RawConnectionDefault : public RawConnection { + public: + RawConnectionDefault(SocketFd socket_fd, TransportType transport_type, unique_ptr stats_callback) + : socket_fd_(std::move(socket_fd)) + , transport_(create_transport(transport_type)) + , stats_callback_(std::move(stats_callback)) { + transport_->init(&socket_fd_.input_buffer(), &socket_fd_.output_buffer()); } - transport_->write(std::move(packet), use_quick_ack); -} - -uint64 RawConnection::send_no_crypto(const Storer &storer) { - PacketInfo info; - - info.no_crypto_flag = true; - auto packet = BufferWriter{Transport::write(storer, AuthKey(), &info), transport_->max_prepend_size(), - transport_->max_append_size()}; - Transport::write(storer, AuthKey(), &info, packet.as_slice()); - LOG(INFO) << "Send handshake packet: " << format::as_hex_dump<4>(packet.as_slice()); - transport_->write(std::move(packet), false); - return info.message_id; -} - -Status RawConnection::flush_read(const AuthKey &auth_key, Callback &callback) { - auto r = socket_fd_.flush_read(); - if (r.is_ok()) { - if (stats_callback_) { - stats_callback_->on_read(r.ok()); - } - callback.on_read(r.ok()); + void set_connection_token(StateManager::ConnectionToken connection_token) override { + connection_token_ = std::move(connection_token); } - while (transport_->can_read()) { - BufferSlice packet; - uint32 quick_ack = 0; - TRY_RESULT(wait_size, transport_->read_next(&packet, &quick_ack)); - if (!is_aligned_pointer<4>(packet.as_slice().ubegin())) { - BufferSlice new_packet(packet.size()); - new_packet.as_slice().copy_from(packet.as_slice()); - packet = std::move(new_packet); - } - LOG_CHECK(is_aligned_pointer<4>(packet.as_slice().ubegin())) - << packet.as_slice().ubegin() << ' ' << packet.size() << ' ' << wait_size; - if (wait_size != 0) { - constexpr size_t MAX_PACKET_SIZE = (1 << 22) + 1024; - if (wait_size > MAX_PACKET_SIZE) { - return Status::Error(PSLICE() << "Expected packet size is too big: " << wait_size); - } - break; - } - - if (quick_ack != 0) { - TRY_STATUS(on_quick_ack(quick_ack, callback)); - continue; - } + bool can_send() const override { + return transport_->can_write(); + } + TransportType get_transport_type() const override { + return transport_->get_type(); + } + void send_crypto(const Storer &storer, int64 session_id, int64 salt, const AuthKey &auth_key, + uint64 quick_ack_token) override { PacketInfo info; info.version = 2; + info.no_crypto_flag = false; + info.salt = salt; + info.session_id = session_id; + info.use_random_padding = transport_->use_random_padding(); - TRY_RESULT(read_result, Transport::read(packet.as_slice(), auth_key, &info)); - switch (read_result.type()) { - case Transport::ReadResult::Quickack: { - TRY_STATUS(on_quick_ack(read_result.quick_ack(), callback)); - break; + auto packet = BufferWriter{Transport::write(storer, auth_key, &info), transport_->max_prepend_size(), + transport_->max_append_size()}; + Transport::write(storer, auth_key, &info, packet.as_slice()); + + bool use_quick_ack = false; + if (quick_ack_token != 0 && transport_->support_quick_ack()) { + auto tmp = quick_ack_to_token_.emplace(info.message_ack, quick_ack_token); + if (tmp.second) { + use_quick_ack = true; + } else { + LOG(ERROR) << "Quick ack " << info.message_ack << " collision"; } - case Transport::ReadResult::Error: { - TRY_STATUS(on_read_mtproto_error(read_result.error())); - break; + } + + transport_->write(std::move(packet), use_quick_ack); + } + + uint64 send_no_crypto(const Storer &storer) override { + PacketInfo info; + + info.no_crypto_flag = true; + auto packet = BufferWriter{Transport::write(storer, AuthKey(), &info), transport_->max_prepend_size(), + transport_->max_append_size()}; + Transport::write(storer, AuthKey(), &info, packet.as_slice()); + LOG(INFO) << "Send handshake packet: " << format::as_hex_dump<4>(packet.as_slice()); + transport_->write(std::move(packet), false); + return info.message_id; + } + + PollableFdInfo &get_poll_info() override { + return socket_fd_.get_poll_info(); + } + + StatsCallback *stats_callback() override { + return stats_callback_.get(); + } + + // NB: After first returned error, all subsequent calls will return error too. + Status flush(const AuthKey &auth_key, Callback &callback) override { + auto status = do_flush(auth_key, callback); + if (status.is_error()) { + if (stats_callback_ && status.code() != 2) { + stats_callback_->on_error(); } - case Transport::ReadResult::Packet: { - // If a packet was successfully decrypted, then it is ok to assume that the connection is alive - if (!auth_key.empty()) { - if (stats_callback_) { - stats_callback_->on_pong(); - } + has_error_ = true; + } + return status; + } + + bool has_error() const override { + return has_error_; + } + + void close() override { + transport_.reset(); + socket_fd_.close(); + } + + PublicFields &extra() override { + return extra_; + } + const PublicFields &extra() const override { + return extra_; + } + + private: + PublicFields extra_; + BufferedFd socket_fd_; + unique_ptr transport_; + std::map quick_ack_to_token_; + bool has_error_{false}; + + unique_ptr stats_callback_; + + StateManager::ConnectionToken connection_token_; + + Status flush_read(const AuthKey &auth_key, Callback &callback) { + auto r = socket_fd_.flush_read(); + if (r.is_ok()) { + if (stats_callback_) { + stats_callback_->on_read(r.ok()); + } + callback.on_read(r.ok()); + } + while (transport_->can_read()) { + BufferSlice packet; + uint32 quick_ack = 0; + TRY_RESULT(wait_size, transport_->read_next(&packet, &quick_ack)); + if (!is_aligned_pointer<4>(packet.as_slice().ubegin())) { + BufferSlice new_packet(packet.size()); + new_packet.as_slice().copy_from(packet.as_slice()); + packet = std::move(new_packet); + } + LOG_CHECK(is_aligned_pointer<4>(packet.as_slice().ubegin())) + << packet.as_slice().ubegin() << ' ' << packet.size() << ' ' << wait_size; + if (wait_size != 0) { + constexpr size_t MAX_PACKET_SIZE = (1 << 22) + 1024; + if (wait_size > MAX_PACKET_SIZE) { + return Status::Error(PSLICE() << "Expected packet size is too big: " << wait_size); } - - TRY_STATUS(callback.on_raw_packet(info, packet.from_slice(read_result.packet()))); break; } - case Transport::ReadResult::Nop: - break; - default: - UNREACHABLE(); + + if (quick_ack != 0) { + TRY_STATUS(on_quick_ack(quick_ack, callback)); + continue; + } + + PacketInfo info; + info.version = 2; + + TRY_RESULT(read_result, Transport::read(packet.as_slice(), auth_key, &info)); + switch (read_result.type()) { + case Transport::ReadResult::Quickack: { + TRY_STATUS(on_quick_ack(read_result.quick_ack(), callback)); + break; + } + case Transport::ReadResult::Error: { + TRY_STATUS(on_read_mtproto_error(read_result.error())); + break; + } + case Transport::ReadResult::Packet: { + // If a packet was successfully decrypted, then it is ok to assume that the connection is alive + if (!auth_key.empty()) { + if (stats_callback_) { + stats_callback_->on_pong(); + } + } + + TRY_STATUS(callback.on_raw_packet(info, packet.from_slice(read_result.packet()))); + break; + } + case Transport::ReadResult::Nop: + break; + default: + UNREACHABLE(); + } } - } - TRY_STATUS(std::move(r)); - return Status::OK(); -} - -Status RawConnection::on_read_mtproto_error(int32 error_code) { - if (error_code == -429) { - if (stats_callback_) { - stats_callback_->on_mtproto_error(); - } - return Status::Error(500, PSLICE() << "MTProto error: " << error_code); - } - if (error_code == -404) { - return Status::Error(-404, PSLICE() << "MTProto error: " << error_code); - } - return Status::Error(PSLICE() << "MTProto error: " << error_code); -} - -Status RawConnection::on_quick_ack(uint32 quick_ack, Callback &callback) { - auto it = quick_ack_to_token_.find(quick_ack); - if (it == quick_ack_to_token_.end()) { - LOG(WARNING) << Status::Error(PSLICE() << "Unknown quick_ack " << quick_ack); + TRY_STATUS(std::move(r)); return Status::OK(); - // TODO: return Status::Error(PSLICE() << "Unknown quick_ack " << quick_ack); } - auto token = it->second; - quick_ack_to_token_.erase(it); - callback.on_quick_ack(token).ignore(); - return Status::OK(); -} -Status RawConnection::flush_write() { - TRY_RESULT(size, socket_fd_.flush_write()); - if (size > 0 && stats_callback_) { - stats_callback_->on_write(size); + Status on_read_mtproto_error(int32 error_code) { + if (error_code == -429) { + if (stats_callback_) { + stats_callback_->on_mtproto_error(); + } + return Status::Error(500, PSLICE() << "MTProto error: " << error_code); + } + if (error_code == -404) { + return Status::Error(-404, PSLICE() << "MTProto error: " << error_code); + } + return Status::Error(PSLICE() << "MTProto error: " << error_code); } - return Status::OK(); + + Status on_quick_ack(uint32 quick_ack, Callback &callback) { + auto it = quick_ack_to_token_.find(quick_ack); + if (it == quick_ack_to_token_.end()) { + LOG(WARNING) << Status::Error(PSLICE() << "Unknown quick_ack " << quick_ack); + return Status::OK(); + // TODO: return Status::Error(PSLICE() << "Unknown quick_ack " << quick_ack); + } + auto token = it->second; + quick_ack_to_token_.erase(it); + callback.on_quick_ack(token).ignore(); + return Status::OK(); + } + + Status flush_write() { + TRY_RESULT(size, socket_fd_.flush_write()); + if (size > 0 && stats_callback_) { + stats_callback_->on_write(size); + } + return Status::OK(); + } + + Status do_flush(const AuthKey &auth_key, Callback &callback) TD_WARN_UNUSED_RESULT { + if (has_error_) { + return Status::Error("Connection has already failed"); + } + sync_with_poll(socket_fd_); + + // read/write + // EINVAL may be returned in linux kernel < 2.6.28. And on some new kernels too. + // just close connection and hope that read or write will not return this error too. + TRY_STATUS(socket_fd_.get_pending_error()); + + TRY_STATUS(flush_read(auth_key, callback)); + TRY_STATUS(callback.before_write()); + TRY_STATUS(flush_write()); + if (can_close_local(socket_fd_)) { + return Status::Error("Connection closed"); + } + return Status::OK(); + } +}; + +#if TD_EXPERIMENTAL_WATCH_OS +class RawConnectionHttp : public RawConnection { + public: + RawConnectionHttp(IPAddress ip_address, unique_ptr stats_callback) + : ip_address_(std::move(ip_address)), stats_callback_(std::move(stats_callback)) { + answers_ = std::make_shared>>(); + answers_->init(); + } + + void set_connection_token(StateManager::ConnectionToken connection_token) override { + connection_token_ = std::move(connection_token); + } + + bool can_send() const override { + return mode_ == Send; + } + TransportType get_transport_type() const override { + return mtproto::TransportType{mtproto::TransportType::Http, 0, mtproto::ProxySecret()}; + } + void send_crypto(const Storer &storer, int64 session_id, int64 salt, const AuthKey &auth_key, + uint64 quick_ack_token) override { + PacketInfo info; + info.version = 2; + info.no_crypto_flag = false; + info.salt = salt; + info.session_id = session_id; + info.use_random_padding = false; + + auto packet = BufferWriter{Transport::write(storer, auth_key, &info), 0, 0}; + Transport::write(storer, auth_key, &info, packet.as_slice()); + + send_packet(packet.as_buffer_slice()); + } + + uint64 send_no_crypto(const Storer &storer) override { + PacketInfo info; + + info.no_crypto_flag = true; + auto packet = BufferWriter{Transport::write(storer, AuthKey(), &info), 0, 0}; + Transport::write(storer, AuthKey(), &info, packet.as_slice()); + LOG(INFO) << "Send handshake packet: " << format::as_hex_dump<4>(packet.as_slice()); + send_packet(packet.as_buffer_slice()); + return info.message_id; + } + + PollableFdInfo &get_poll_info() override { + return answers_->reader_get_event_fd().get_poll_info(); + } + + StatsCallback *stats_callback() override { + return stats_callback_.get(); + } + + // NB: After first returned error, all subsequent calls will return error too. + Status flush(const AuthKey &auth_key, Callback &callback) override { + auto status = do_flush(auth_key, callback); + if (status.is_error()) { + if (stats_callback_ && status.code() != 2) { + stats_callback_->on_error(); + } + has_error_ = true; + } + return status; + } + + bool has_error() const override { + return has_error_; + } + + void close() override { + } + + PublicFields &extra() override { + return extra_; + } + const PublicFields &extra() const override { + return extra_; + } + + private: + PublicFields extra_; + IPAddress ip_address_; + bool has_error_{false}; + EventFd event_fd_; + + enum Mode { Send, Receive } mode_{Send}; + + unique_ptr stats_callback_; + + StateManager::ConnectionToken connection_token_; + std::shared_ptr>> answers_; + std::vector to_send_; + + void send_packet(BufferSlice packet) { + CHECK(mode_ == Send); + mode_ = Receive; + to_send_.push_back(std::move(packet)); + } + + Status flush_read(const AuthKey &auth_key, Callback &callback) { + while (true) { + auto packets_n = answers_->reader_wait_nonblock(); + if (packets_n == 0) { + break; + } + for (int i = 0; i < packets_n; i++) { + TRY_RESULT(packet, answers_->reader_get_unsafe()); + if (stats_callback_) { + stats_callback_->on_read(packet.size()); + } + callback.on_read(packet.size()); + CHECK(mode_ == Receive); + mode_ = Send; + + PacketInfo info; + info.version = 2; + + TRY_RESULT(read_result, Transport::read(packet.as_slice(), auth_key, &info)); + switch (read_result.type()) { + case Transport::ReadResult::Quickack: { + break; + } + case Transport::ReadResult::Error: { + TRY_STATUS(on_read_mtproto_error(read_result.error())); + break; + } + case Transport::ReadResult::Packet: { + // If a packet was successfully decrypted, then it is ok to assume that the connection is alive + if (!auth_key.empty()) { + if (stats_callback_) { + stats_callback_->on_pong(); + } + } + + TRY_STATUS(callback.on_raw_packet(info, packet.from_slice(read_result.packet()))); + break; + } + case Transport::ReadResult::Nop: + break; + default: + UNREACHABLE(); + } + } + } + + return Status::OK(); + } + + Status on_read_mtproto_error(int32 error_code) { + if (error_code == -429) { + if (stats_callback_) { + stats_callback_->on_mtproto_error(); + } + return Status::Error(500, PSLICE() << "MTProto error: " << error_code); + } + if (error_code == -404) { + return Status::Error(-404, PSLICE() << "MTProto error: " << error_code); + } + return Status::Error(PSLICE() << "MTProto error: " << error_code); + } + + Status flush_write() { + for (auto &packet : to_send_) { + TRY_STATUS(do_send(packet.as_slice())); + if (packet.size() > 0 && stats_callback_) { + stats_callback_->on_write(packet.size()); + } + } + to_send_.clear(); + return Status::OK(); + } + + Status do_send(Slice data) { + DarwinHttp::post(PSLICE() << "http://" << ip_address_.get_ip_str() << ":" << ip_address_.get_port() << "/api", data, + [answers = answers_](auto res) { answers->writer_put(std::move(res)); }); + return Status::OK(); + } + + Status do_flush(const AuthKey &auth_key, Callback &callback) TD_WARN_UNUSED_RESULT { + if (has_error_) { + return Status::Error("Connection has already failed"); + } + + TRY_STATUS(flush_read(auth_key, callback)); + TRY_STATUS(callback.before_write()); + TRY_STATUS(flush_write()); + return Status::OK(); + } +}; +#endif + +td::unique_ptr RawConnection::create(IPAddress ip_address, SocketFd socket_fd, + TransportType transport_type, + unique_ptr stats_callback) { +#if TD_EXPERIMENTAL_WATCH_OS + return td::make_unique(ip_address, std::move(stats_callback)); +#else + return td::make_unique(std::move(socket_fd), transport_type, std::move(stats_callback)); +#endif } } // namespace mtproto diff --git a/td/mtproto/RawConnection.h b/td/mtproto/RawConnection.h index 924ebab1c..90a602fd6 100644 --- a/td/mtproto/RawConnection.h +++ b/td/mtproto/RawConnection.h @@ -39,34 +39,20 @@ class RawConnection { virtual void on_error() = 0; // called on RawConnection error. Such error should be very rare on good connections. virtual void on_mtproto_error() = 0; }; - RawConnection() = default; - RawConnection(SocketFd socket_fd, TransportType transport_type, unique_ptr stats_callback) - : socket_fd_(std::move(socket_fd)) - , transport_(create_transport(transport_type)) - , stats_callback_(std::move(stats_callback)) { - transport_->init(&socket_fd_.input_buffer(), &socket_fd_.output_buffer()); - } + virtual ~RawConnection() = default; + static td::unique_ptr create(IPAddress ip_address, SocketFd socket_fd, TransportType transport_type, + unique_ptr stats_callback); - void set_connection_token(StateManager::ConnectionToken connection_token) { - connection_token_ = std::move(connection_token); - } + virtual void set_connection_token(StateManager::ConnectionToken connection_token) = 0; - bool can_send() const { - return transport_->can_write(); - } - TransportType get_transport_type() const { - return transport_->get_type(); - } - void send_crypto(const Storer &storer, int64 session_id, int64 salt, const AuthKey &auth_key, - uint64 quick_ack_token = 0); - uint64 send_no_crypto(const Storer &storer); + virtual bool can_send() const = 0; + virtual TransportType get_transport_type() const = 0; + virtual void send_crypto(const Storer &storer, int64 session_id, int64 salt, const AuthKey &auth_key, + uint64 quick_ack_token = 0) = 0; + virtual uint64 send_no_crypto(const Storer &storer) = 0; - PollableFdInfo &get_poll_info() { - return socket_fd_.get_poll_info(); - } - StatsCallback *stats_callback() { - return stats_callback_.get(); - } + virtual PollableFdInfo &get_poll_info() = 0; + virtual StatsCallback *stats_callback() = 0; class Callback { public: @@ -86,65 +72,19 @@ class RawConnection { }; // NB: After first returned error, all subsequent calls will return error too. - Status flush(const AuthKey &auth_key, Callback &callback) TD_WARN_UNUSED_RESULT { - auto status = do_flush(auth_key, callback); - if (status.is_error()) { - if (stats_callback_ && status.code() != 2) { - stats_callback_->on_error(); - } - has_error_ = true; - } - return status; - } + virtual Status flush(const AuthKey &auth_key, Callback &callback) TD_WARN_UNUSED_RESULT = 0; + virtual bool has_error() const = 0; - bool has_error() const { - return has_error_; - } + virtual void close() = 0; - void close() { - transport_.reset(); - socket_fd_.close(); - } + struct PublicFields { + uint32 extra{0}; + string debug_str; + double rtt{0}; + }; - uint32 extra_{0}; - string debug_str_; - double rtt_{0}; - - private: - BufferedFd socket_fd_; - unique_ptr transport_; - std::map quick_ack_to_token_; - bool has_error_{false}; - - unique_ptr stats_callback_; - - StateManager::ConnectionToken connection_token_; - - Status flush_read(const AuthKey &auth_key, Callback &callback); - Status flush_write(); - - Status on_quick_ack(uint32 quick_ack, Callback &callback); - Status on_read_mtproto_error(int32 error_code); - - Status do_flush(const AuthKey &auth_key, Callback &callback) TD_WARN_UNUSED_RESULT { - if (has_error_) { - return Status::Error("Connection has already failed"); - } - sync_with_poll(socket_fd_); - - // read/write - // EINVAL may be returned in linux kernel < 2.6.28. And on some new kernels too. - // just close connection and hope that read or write will not return this error too. - TRY_STATUS(socket_fd_.get_pending_error()); - - TRY_STATUS(flush_read(auth_key, callback)); - TRY_STATUS(callback.before_write()); - TRY_STATUS(flush_write()); - if (can_close_local(socket_fd_)) { - return Status::Error("Connection closed"); - } - return Status::OK(); - } + virtual PublicFields &extra() = 0; + virtual const PublicFields &extra() const = 0; }; } // namespace mtproto diff --git a/td/mtproto/SessionConnection.h b/td/mtproto/SessionConnection.h index 82275ad01..ca9e1fb16 100644 --- a/td/mtproto/SessionConnection.h +++ b/td/mtproto/SessionConnection.h @@ -132,7 +132,7 @@ class SessionConnection bool is_main_ = false; int rtt() const { - return max(2, static_cast(raw_connection_->rtt_ * 1.5 + 1)); + return max(2, static_cast(raw_connection_->extra().rtt * 1.5 + 1)); } int32 read_disconnect_delay() const { diff --git a/td/telegram/Td.cpp b/td/telegram/Td.cpp index daf709ffa..98bb4c44b 100644 --- a/td/telegram/Td.cpp +++ b/td/telegram/Td.cpp @@ -591,7 +591,7 @@ class TestProxyRequest : public RequestOnceActor { }); child_ = - ConnectionCreator::prepare_connection(r_socket_fd.move_as_ok(), proxy_, mtproto_ip_address, get_transport(), + ConnectionCreator::prepare_connection(ip, r_socket_fd.move_as_ok(), proxy_, mtproto_ip_address, get_transport(), "Test", "TestPingDC2", nullptr, {}, false, std::move(connection_promise)); } @@ -613,7 +613,7 @@ class TestProxyRequest : public RequestOnceActor { }; auto handshake = make_unique(dc_id_, 3600); auto data = r_data.move_as_ok(); - auto raw_connection = make_unique(std::move(data.socket_fd), get_transport(), nullptr); + auto raw_connection = mtproto::RawConnection::create(data.ip_address, std::move(data.socket_fd), get_transport(), nullptr); child_ = create_actor( "HandshakeActor", std::move(handshake), std::move(raw_connection), make_unique(), 10.0, PromiseCreator::lambda([actor_id = actor_id(this)](Result> raw_connection) { diff --git a/td/telegram/net/ConnectionCreator.cpp b/td/telegram/net/ConnectionCreator.cpp index 013d39a3e..63dd6a617 100644 --- a/td/telegram/net/ConnectionCreator.cpp +++ b/td/telegram/net/ConnectionCreator.cpp @@ -323,7 +323,8 @@ void ConnectionCreator::ping_proxy(int32 proxy_id, Promise promise) { continue; } - auto r_socket_fd = SocketFd::open(info.option->get_ip_address()); + auto ip = info.option->get_ip_address(); + auto r_socket_fd = SocketFd::open(ip); if (r_socket_fd.is_error()) { LOG(DEBUG) << "Failed to open socket: " << r_socket_fd.error(); on_ping_main_dc_result(token, r_socket_fd.move_as_error()); @@ -331,7 +332,7 @@ void ConnectionCreator::ping_proxy(int32 proxy_id, Promise promise) { } ping_proxy_socket_fd( - r_socket_fd.move_as_ok(), r_transport_type.move_as_ok(), PSTRING() << info.option->get_ip_address(), + ip, r_socket_fd.move_as_ok(), r_transport_type.move_as_ok(), PSTRING() << info.option->get_ip_address(), PromiseCreator::lambda([actor_id = actor_id(this), token](Result result) { send_closure(actor_id, &ConnectionCreator::on_ping_main_dc_result, token, std::move(result)); })); @@ -371,28 +372,31 @@ void ConnectionCreator::ping_proxy_resolved(int32 proxy_id, IPAddress ip_address auto socket_fd = r_socket_fd.move_as_ok(); auto connection_promise = PromiseCreator::lambda( - [promise = std::move(promise), actor_id = actor_id(this), transport_type = extra.transport_type, + [ip_address, promise = std::move(promise), actor_id = actor_id(this), transport_type = extra.transport_type, debug_str = std::move(extra.debug_str)](Result r_connection_data) mutable { if (r_connection_data.is_error()) { return promise.set_error(Status::Error(400, r_connection_data.error().public_message())); } - send_closure(actor_id, &ConnectionCreator::ping_proxy_socket_fd, r_connection_data.move_as_ok().socket_fd, - std::move(transport_type), std::move(debug_str), std::move(promise)); + send_closure(actor_id, &ConnectionCreator::ping_proxy_socket_fd, ip_address, + r_connection_data.move_as_ok().socket_fd, std::move(transport_type), std::move(debug_str), + std::move(promise)); }); CHECK(proxy.use_proxy()); auto token = next_token(); - auto ref = - prepare_connection(std::move(socket_fd), proxy, extra.mtproto_ip_address, extra.transport_type, "Ping", - extra.debug_str, nullptr, create_reference(token), false, std::move(connection_promise)); + auto ref = prepare_connection(extra.ip_address, std::move(socket_fd), proxy, extra.mtproto_ip_address, + extra.transport_type, "Ping", extra.debug_str, nullptr, create_reference(token), false, + std::move(connection_promise)); if (!ref.empty()) { children_[token] = {false, std::move(ref)}; } } -void ConnectionCreator::ping_proxy_socket_fd(SocketFd socket_fd, mtproto::TransportType transport_type, - string debug_str, Promise promise) { +void ConnectionCreator::ping_proxy_socket_fd(IPAddress ip_address, SocketFd socket_fd, + mtproto::TransportType transport_type, string debug_str, + Promise promise) { auto token = next_token(); - auto raw_connection = make_unique(std::move(socket_fd), std::move(transport_type), nullptr); + auto raw_connection = + mtproto::RawConnection::create(ip_address, std::move(socket_fd), std::move(transport_type), nullptr); children_[token] = { false, create_ping_actor(std::move(debug_str), std::move(raw_connection), nullptr, PromiseCreator::lambda([promise = std::move(promise)]( @@ -400,7 +404,7 @@ void ConnectionCreator::ping_proxy_socket_fd(SocketFd socket_fd, mtproto::Transp if (result.is_error()) { return promise.set_error(Status::Error(400, result.error().public_message())); } - auto ping_time = result.ok()->rtt_; + auto ping_time = result.ok()->extra().rtt; promise.set_value(std::move(ping_time)); }), create_reference(token))}; @@ -643,20 +647,20 @@ void ConnectionCreator::request_raw_connection_by_ip(IPAddress ip_address, mtpro } auto socket_fd = r_socket_fd.move_as_ok(); - auto connection_promise = PromiseCreator::lambda( - [promise = std::move(promise), actor_id = actor_id(this), transport_type, - network_generation = network_generation_](Result r_connection_data) mutable { - if (r_connection_data.is_error()) { - return promise.set_error(Status::Error(400, r_connection_data.error().public_message())); - } - auto raw_connection = - make_unique(r_connection_data.move_as_ok().socket_fd, transport_type, nullptr); - raw_connection->extra_ = network_generation; - promise.set_value(std::move(raw_connection)); - }); + auto connection_promise = PromiseCreator::lambda([promise = std::move(promise), actor_id = actor_id(this), + transport_type, network_generation = network_generation_, + ip_address](Result r_connection_data) mutable { + if (r_connection_data.is_error()) { + return promise.set_error(Status::Error(400, r_connection_data.error().public_message())); + } + auto raw_connection = + mtproto::RawConnection::create(ip_address, r_connection_data.move_as_ok().socket_fd, transport_type, nullptr); + raw_connection->extra().extra = network_generation; + promise.set_value(std::move(raw_connection)); + }); auto token = next_token(); - auto ref = prepare_connection(std::move(socket_fd), Proxy(), IPAddress(), transport_type, "Raw", + auto ref = prepare_connection(ip_address, std::move(socket_fd), Proxy(), IPAddress(), transport_type, "Raw", PSTRING() << "to IP address " << ip_address, nullptr, create_reference(token), false, std::move(connection_promise)); if (!ref.empty()) { @@ -699,6 +703,9 @@ Result ConnectionCreator::find_connection(const Proxy &proxy, const IP bool prefer_ipv6 = G()->shared_config().get_option_boolean("prefer_ipv6") || (proxy.use_proxy() && proxy_ip_address.is_ipv6()); bool only_http = proxy.use_http_caching_proxy(); +#if TD_EXPERIMENTAL_WATCH_OS + only_http = true; +#endif TRY_RESULT(info, dc_options_set_.find_connection( dc_id, allow_media_only, proxy.use_proxy() && proxy.use_socks5_proxy(), prefer_ipv6, only_http)); extra.stat = info.stat; @@ -718,29 +725,33 @@ Result ConnectionCreator::find_connection(const Proxy &proxy, const IP if (proxy.use_proxy()) { extra.mtproto_ip_address = info.option->get_ip_address(); + extra.ip_address = proxy_ip_address; extra.debug_str = PSTRING() << (proxy.use_socks5_proxy() ? "Socks5" : (only_http ? "HTTP_ONLY" : "HTTP_TCP")) << ' ' << proxy_ip_address << " --> " << extra.mtproto_ip_address << extra.debug_str; - VLOG(connections) << "Create: " << extra.debug_str; - return SocketFd::open(proxy_ip_address); } else { + extra.ip_address = info.option->get_ip_address(); extra.debug_str = PSTRING() << info.option->get_ip_address() << extra.debug_str; - VLOG(connections) << "Create: " << extra.debug_str; - return SocketFd::open(info.option->get_ip_address()); } + VLOG(connections) << "Create: " << extra.debug_str; + return SocketFd::open(extra.ip_address); } -ActorOwn<> ConnectionCreator::prepare_connection( - SocketFd socket_fd, const Proxy &proxy, const IPAddress &mtproto_ip_address, mtproto::TransportType transport_type, - Slice actor_name_prefix, Slice debug_str, unique_ptr stats_callback, - ActorShared<> parent, bool use_connection_token, Promise promise) { +ActorOwn<> ConnectionCreator::prepare_connection(IPAddress ip_address, SocketFd socket_fd, const Proxy &proxy, + const IPAddress &mtproto_ip_address, + mtproto::TransportType transport_type, Slice actor_name_prefix, + Slice debug_str, + unique_ptr stats_callback, + ActorShared<> parent, bool use_connection_token, + Promise promise) { if (proxy.use_socks5_proxy() || proxy.use_http_tcp_proxy() || transport_type.secret.emulate_tls()) { VLOG(connections) << "Create new transparent proxy connection " << debug_str; class Callback : public TransparentProxy::Callback { public: - explicit Callback(Promise promise, + explicit Callback(Promise promise, IPAddress ip_address, unique_ptr stats_callback, bool use_connection_token, bool was_connected) : promise_(std::move(promise)) + , ip_address_(std::move(ip_address)) , stats_callback_(std::move(stats_callback)) , use_connection_token_(use_connection_token) , was_connected_(was_connected) { @@ -756,6 +767,7 @@ ActorOwn<> ConnectionCreator::prepare_connection( promise_.set_error(Status::Error(400, result.error().public_message())); } else { ConnectionData data; + data.ip_address = ip_address_; data.socket_fd = result.move_as_ok(); data.connection_token = std::move(connection_token_); data.stats_callback = std::move(stats_callback_); @@ -772,6 +784,7 @@ ActorOwn<> ConnectionCreator::prepare_connection( private: Promise promise_; StateManager::ConnectionToken connection_token_; + IPAddress ip_address_; unique_ptr stats_callback_; bool use_connection_token_; bool was_connected_{false}; @@ -779,8 +792,8 @@ ActorOwn<> ConnectionCreator::prepare_connection( VLOG(connections) << "Start " << (proxy.use_socks5_proxy() ? "Socks5" : (proxy.use_http_tcp_proxy() ? "HTTP" : "TLS")) << ": " << debug_str; - auto callback = make_unique(std::move(promise), std::move(stats_callback), use_connection_token, - !proxy.use_socks5_proxy()); + auto callback = make_unique(std::move(promise), ip_address, std::move(stats_callback), + use_connection_token, !proxy.use_socks5_proxy()); if (proxy.use_socks5_proxy()) { return ActorOwn<>(create_actor(PSLICE() << actor_name_prefix << "Socks5", std::move(socket_fd), mtproto_ip_address, proxy.user().str(), proxy.password().str(), @@ -801,6 +814,7 @@ ActorOwn<> ConnectionCreator::prepare_connection( VLOG(connections) << "Create new direct connection " << debug_str; ConnectionData data; + data.ip_address = ip_address; data.socket_fd = std::move(socket_fd); data.stats_callback = std::move(stats_callback); promise.set_result(std::move(data)); @@ -933,9 +947,9 @@ void ConnectionCreator::client_loop(ClientInfo &client) { td::make_unique(client.is_media ? media_net_stats_callback_ : common_net_stats_callback_, actor_id(this), client.hash, extra.stat); auto token = next_token(); - auto ref = prepare_connection(std::move(socket_fd), proxy, extra.mtproto_ip_address, extra.transport_type, Slice(), - extra.debug_str, std::move(stats_callback), create_reference(token), true, - std::move(promise)); + auto ref = prepare_connection(extra.ip_address, std::move(socket_fd), proxy, extra.mtproto_ip_address, + extra.transport_type, Slice(), extra.debug_str, std::move(stats_callback), + create_reference(token), true, std::move(promise)); if (!ref.empty()) { children_[token] = {true, std::move(ref)}; } @@ -963,7 +977,7 @@ void ConnectionCreator::client_create_raw_connection(Result r_co debug_str](Result> result) mutable { if (result.is_ok()) { VLOG(connections) << "Ready connection (" << (check_mode ? "" : "un") << "checked) " << result.ok().get() << ' ' - << tag("rtt", format::as_time(result.ok()->rtt_)) << ' ' << debug_str; + << tag("rtt", format::as_time(result.ok()->extra().rtt)) << ' ' << debug_str; } else { VLOG(connections) << "Failed connection (" << (check_mode ? "" : "un") << "checked) " << result.error() << ' ' << debug_str; @@ -977,12 +991,13 @@ void ConnectionCreator::client_create_raw_connection(Result r_co } auto connection_data = r_connection_data.move_as_ok(); - auto raw_connection = make_unique( - std::move(connection_data.socket_fd), std::move(transport_type), std::move(connection_data.stats_callback)); + auto raw_connection = + mtproto::RawConnection::create(connection_data.ip_address, std::move(connection_data.socket_fd), + std::move(transport_type), std::move(connection_data.stats_callback)); raw_connection->set_connection_token(std::move(connection_data.connection_token)); - raw_connection->extra_ = network_generation; - raw_connection->debug_str_ = debug_str; + raw_connection->extra().extra = network_generation; + raw_connection->extra().debug_str = debug_str; if (check_mode) { VLOG(connections) << "Start check: " << debug_str << " " << (auth_data ? "with" : "without") << " auth data"; diff --git a/td/telegram/net/ConnectionCreator.h b/td/telegram/net/ConnectionCreator.h index 915d3e8be..8acbfc7d6 100644 --- a/td/telegram/net/ConnectionCreator.h +++ b/td/telegram/net/ConnectionCreator.h @@ -84,6 +84,7 @@ class ConnectionCreator : public NetQueryCallback { void ping_proxy(int32 proxy_id, Promise promise); struct ConnectionData { + IPAddress ip_address; SocketFd socket_fd; StateManager::ConnectionToken connection_token; unique_ptr stats_callback; @@ -91,7 +92,7 @@ class ConnectionCreator : public NetQueryCallback { static DcOptions get_default_dc_options(bool is_test); - static ActorOwn<> prepare_connection(SocketFd socket_fd, const Proxy &proxy, const IPAddress &mtproto_ip_address, + static ActorOwn<> prepare_connection(IPAddress ip_address, SocketFd socket_fd, const Proxy &proxy, const IPAddress &mtproto_ip_address, mtproto::TransportType transport_type, Slice actor_name_prefix, Slice debug_str, unique_ptr stats_callback, ActorShared<> parent, bool use_connection_token, @@ -232,6 +233,7 @@ class ConnectionCreator : public NetQueryCallback { DcOptionsSet::Stat *stat{nullptr}; mtproto::TransportType transport_type; string debug_str; + IPAddress ip_address; IPAddress mtproto_ip_address; bool check_mode{false}; }; @@ -246,7 +248,7 @@ class ConnectionCreator : public NetQueryCallback { void ping_proxy_resolved(int32 proxy_id, IPAddress ip_address, Promise promise); - void ping_proxy_socket_fd(SocketFd socket_fd, mtproto::TransportType transport_type, string debug_str, + void ping_proxy_socket_fd(IPAddress ip_address, SocketFd socket_fd, mtproto::TransportType transport_type, string debug_str, Promise promise); void on_ping_main_dc_result(uint64 token, Result result); diff --git a/td/telegram/net/Session.cpp b/td/telegram/net/Session.cpp index 1c62b7596..d039a5033 100644 --- a/td/telegram/net/Session.cpp +++ b/td/telegram/net/Session.cpp @@ -110,7 +110,7 @@ class GenAuthKeyActor : public Actor { auto raw_connection = r_raw_connection.move_as_ok(); VLOG(dc) << "Receive raw connection " << raw_connection.get(); - network_generation_ = raw_connection->extra_; + network_generation_ = raw_connection->extra().extra; child_ = create_actor_on_scheduler( PSLICE() << name_ + "::HandshakeActor", G()->get_slow_net_scheduler_id(), std::move(handshake_), std::move(raw_connection), std::move(context_), 10, std::move(connection_promise_), @@ -1052,7 +1052,7 @@ void Session::connection_open_finish(ConnectionInfo *info, auto raw_connection = r_raw_connection.move_as_ok(); VLOG(dc) << "Receive raw connection " << raw_connection.get(); - if (raw_connection->extra_ != network_generation_) { + if (raw_connection->extra().extra != network_generation_) { LOG(WARNING) << "Got RawConnection with old network_generation"; info->state = ConnectionInfo::State::Empty; yield(); @@ -1087,7 +1087,7 @@ void Session::connection_open_finish(ConnectionInfo *info, mode_name = Slice("HttpLongPoll"); } } - auto name = PSTRING() << get_name() << "::Connect::" << mode_name << "::" << raw_connection->debug_str_; + auto name = PSTRING() << get_name() << "::Connect::" << mode_name << "::" << raw_connection->extra().debug_str; LOG(INFO) << "Finished to open connection " << name; info->connection = make_unique(mode, std::move(raw_connection), &auth_data_); if (can_destroy_auth_key()) { diff --git a/tdnet/CMakeLists.txt b/tdnet/CMakeLists.txt index fb0319855..bc2b0f6ef 100644 --- a/tdnet/CMakeLists.txt +++ b/tdnet/CMakeLists.txt @@ -44,8 +44,20 @@ set(TDNET_SOURCE td/net/TcpListener.h td/net/TransparentProxy.h td/net/Wget.h + + td/net/DarwinHttp.mm + td/net/DarwinHttp.h ) +if (TD_EXPERIMENTAL_WATCH_OS) +set (TDNET_SOURCE + ${TDNET_SOURCE} + td/net/DarwinHttp.mm + td/net/DarwinHttp.h +) +set_source_files_properties(td/net/DarwinHttp.mm PROPERTIES COMPILE_FLAGS -fobjc-arc) +endif() + #RULES #LIBRARIES @@ -66,6 +78,9 @@ if (WIN32) endif() endif() +find_library(FOUNDATION_LIBRARY Foundation REQUIRED) +target_link_libraries(tdnet PRIVATE ${FOUNDATION_LIBRARY}) + install(TARGETS tdnet EXPORT TdTargets LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}" ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}" diff --git a/tdnet/td/net/DarwinHttp.h b/tdnet/td/net/DarwinHttp.h new file mode 100644 index 000000000..7dae980eb --- /dev/null +++ b/tdnet/td/net/DarwinHttp.h @@ -0,0 +1,13 @@ +#pragma once + +#include "td/actor/PromiseFuture.h" + +#include "td/utils/buffer.h" + +namespace td { +class DarwinHttp { + public: + static void get(CSlice url, Promise promise); + static void post(CSlice url, Slice data, Promise promise); +}; +} // namespace td diff --git a/tdnet/td/net/DarwinHttp.mm b/tdnet/td/net/DarwinHttp.mm new file mode 100644 index 000000000..df545661a --- /dev/null +++ b/tdnet/td/net/DarwinHttp.mm @@ -0,0 +1,57 @@ +#include "td/net/DarwinHttp.h" + +#import + +namespace td { +namespace { +NSString *to_ns_string(CSlice slice) { + return [NSString stringWithUTF8String:slice.c_str()]; +} + +NSData *to_ns_data(Slice data) { + return [NSData dataWithBytes:static_cast(data.data()) length:data.size()]; +} + +auto http_get(CSlice url) { + auto nsurl = [NSURL URLWithString:to_ns_string(url)]; + auto request = [NSURLRequest requestWithURL:nsurl]; + return request; +} + +auto http_post(CSlice url, Slice data) { + auto nsurl = [NSURL URLWithString:to_ns_string(url)]; + auto request = [NSMutableURLRequest requestWithURL:nsurl]; + [request setHTTPMethod:@"POST"]; + [request setHTTPBody:to_ns_data(data)]; + [request setValue:@"keep-alive" forHTTPHeaderField:@"Connection"]; + [request setValue:@"" forHTTPHeaderField:@"Host"]; + [request setValue:to_ns_string(PSLICE() << data.size()) forHTTPHeaderField:@"Content-Length"]; + [request setValue:@"application/x-www-form-urlencoded" forHTTPHeaderField:@"Content-Type"]; + return request; +} + +void http_send(NSURLRequest *request, Promise promise) { + __block auto callback = std::move(promise); + NSURLSessionDataTask* dataTask = + [NSURLSession.sharedSession + dataTaskWithRequest:request + completionHandler: + ^(NSData *data, NSURLResponse *response, NSError *error) { + if(error == nil) { + callback(BufferSlice(Slice((const char *)([data bytes]), [data length]))); + } else { + callback(Status::Error(static_cast([error code]))); + } + }]; + [dataTask resume]; +} +} + +void DarwinHttp::get(CSlice url, Promise promise) { + return http_send(http_get(url), std::move(promise)); +} + +void DarwinHttp::post(CSlice url, Slice data, Promise promise) { + return http_send(http_post(url, data), std::move(promise)); +} +} diff --git a/tdutils/td/utils/config.h.in b/tdutils/td/utils/config.h.in index f8b89aeb5..a9fad9efe 100644 --- a/tdutils/td/utils/config.h.in +++ b/tdutils/td/utils/config.h.in @@ -6,3 +6,4 @@ #cmakedefine01 TD_HAVE_COROUTINES #cmakedefine01 TD_HAVE_ABSL #cmakedefine01 TD_FD_DEBUG +#cmakedefine01 TD_EXPERIMENTAL_WATCH_OS diff --git a/tdutils/td/utils/port/IPAddress.cpp b/tdutils/td/utils/port/IPAddress.cpp index aa7fe330a..7206190d2 100644 --- a/tdutils/td/utils/port/IPAddress.cpp +++ b/tdutils/td/utils/port/IPAddress.cpp @@ -499,6 +499,9 @@ Status IPAddress::init_sockaddr(sockaddr *addr, socklen_t len) { Status IPAddress::init_socket_address(const SocketFd &socket_fd) { is_valid_ = false; + if (socket_fd.empty()) { + return Status::Error("Socket is empty"); + } auto socket = socket_fd.get_native_fd().socket(); socklen_t len = storage_size(); int ret = getsockname(socket, &sockaddr_, &len); @@ -511,6 +514,9 @@ Status IPAddress::init_socket_address(const SocketFd &socket_fd) { Status IPAddress::init_peer_address(const SocketFd &socket_fd) { is_valid_ = false; + if (socket_fd.empty()) { + return Status::Error("Socket is empty"); + } auto socket = socket_fd.get_native_fd().socket(); socklen_t len = storage_size(); int ret = getpeername(socket, &sockaddr_, &len); diff --git a/tdutils/td/utils/port/SocketFd.cpp b/tdutils/td/utils/port/SocketFd.cpp index 248623381..a29070859 100644 --- a/tdutils/td/utils/port/SocketFd.cpp +++ b/tdutils/td/utils/port/SocketFd.cpp @@ -590,6 +590,10 @@ Result SocketFd::from_native_fd(NativeFd fd) { } Result SocketFd::open(const IPAddress &address) { +#if TD_EXPERIMENTAL_WATCH_OS + return SocketFd{}; +#endif + NativeFd native_fd{socket(address.get_address_family(), SOCK_STREAM, IPPROTO_TCP)}; if (!native_fd) { return OS_SOCKET_ERROR("Failed to create a socket"); diff --git a/test/http.cpp b/test/http.cpp index 1cc5de26a..6f20dd6ec 100644 --- a/test/http.cpp +++ b/test/http.cpp @@ -11,6 +11,8 @@ #include "td/net/HttpQuery.h" #include "td/net/HttpReader.h" +#include "td/net/DarwinHttp.h" + #include "td/utils/AesCtrByteFlow.h" #include "td/utils/algorithm.h" #include "td/utils/base64.h" @@ -38,6 +40,9 @@ #include #include +#include +#include + REGISTER_TESTS(http) using namespace td; @@ -462,3 +467,37 @@ TEST(Http, gzip_bomb_with_limit) { } ASSERT_TRUE(ok); } + +struct Baton { + std::mutex mutex; + std::condition_variable cond; + bool is_ready{false}; + + void wait() { + std::unique_lock lock(mutex); + cond.wait(lock, [&] { return is_ready; }); + } + + void post() { + { + std::unique_lock lock(mutex); + is_ready = true; + } + cond.notify_all(); + } + + void reset() { + is_ready = false; + } +}; + +TEST(Http, Darwin) { + Baton baton; + //LOG(ERROR) << "???"; + td::DarwinHttp::get("http://example.com", [&](td::BufferSlice data) { + LOG(ERROR) << data.as_slice(); + baton.post(); + }); + //LOG(ERROR) << "!!!"; + baton.wait(); +} diff --git a/test/mtproto.cpp b/test/mtproto.cpp index 812d31d9e..fa737567c 100644 --- a/test/mtproto.cpp +++ b/test/mtproto.cpp @@ -216,8 +216,8 @@ class TestPingActor : public Actor { } ping_connection_ = mtproto::PingConnection::create_req_pq( - make_unique( - r_socket.move_as_ok(), mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()}, + mtproto::RawConnection::create( + ip_address_, r_socket.move_as_ok(), mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()}, nullptr), 3); @@ -330,13 +330,15 @@ class HandshakeTestActor : public Actor { } void loop() override { if (!wait_for_raw_connection_ && !raw_connection_) { - auto r_socket = SocketFd::open(get_default_ip_address()); + auto ip_address = get_default_ip_address(); + auto r_socket = SocketFd::open(ip_address); if (r_socket.is_error()) { finish(Status::Error(PSTRING() << "Failed to open socket: " << r_socket.error())); return stop(); } - raw_connection_ = make_unique( + raw_connection_ = mtproto::RawConnection::create( + ip_address, r_socket.move_as_ok(), mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()}, nullptr); } @@ -535,13 +537,15 @@ class FastPingTestActor : public Actor { void start_up() override { // Run handshake to create key and salt - auto r_socket = SocketFd::open(get_default_ip_address()); + auto ip_address = get_default_ip_address(); + auto r_socket = SocketFd::open(ip_address); if (r_socket.is_error()) { *result_ = Status::Error(PSTRING() << "Failed to open socket: " << r_socket.error()); return stop(); } - auto raw_connection = make_unique( + auto raw_connection = mtproto::RawConnection::create( + ip_address, r_socket.move_as_ok(), mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()}, nullptr); auto handshake = make_unique(get_default_dc_id(), 60 * 100 /*temp*/); create_actor( @@ -581,8 +585,8 @@ class FastPingTestActor : public Actor { return stop(); } connection_ = r_connection.move_as_ok(); - LOG(INFO) << "RTT: " << connection_->rtt_; - connection_->rtt_ = 0; + LOG(INFO) << "RTT: " << connection_->extra().rtt; + connection_->extra().rtt = 0; loop(); }