Add source to Binlog::sync.

This commit is contained in:
levlam 2024-02-02 15:09:17 +03:00
parent d79bd4b694
commit 8377726001
13 changed files with 65 additions and 56 deletions

View File

@ -1172,7 +1172,7 @@ void AuthManager::destroy_auth_keys() {
}
});
G()->td_db()->get_binlog_pmc()->set("auth", "destroy");
G()->td_db()->get_binlog_pmc()->force_sync(std::move(promise));
G()->td_db()->get_binlog_pmc()->force_sync(std::move(promise), "destroy_auth_keys");
}
void AuthManager::on_delete_account_result(NetQueryPtr &&net_query) {

View File

@ -5398,7 +5398,7 @@ void ContactsManager::set_my_id(UserId my_id) {
my_id_ = my_id;
G()->td_db()->get_binlog_pmc()->set("my_id", to_string(my_id.get()));
td_->option_manager_->set_option_integer("my_id", my_id_.get());
G()->td_db()->get_binlog_pmc()->force_sync(Promise<Unit>());
G()->td_db()->get_binlog_pmc()->force_sync(Promise<Unit>(), "set_my_id");
}
}
@ -8835,13 +8835,15 @@ void ContactsManager::on_import_contacts_finished(int64 random_id, vector<UserId
}
if (G()->use_chat_info_database()) {
G()->td_db()->get_binlog()->force_sync(PromiseCreator::lambda(
[log_event = log_event_store(all_imported_contacts_).as_slice().str()](Result<> result) mutable {
if (result.is_ok()) {
LOG(INFO) << "Save imported contacts to database";
G()->td_db()->get_sqlite_pmc()->set("user_imported_contacts", std::move(log_event), Auto());
}
}));
G()->td_db()->get_binlog()->force_sync(
PromiseCreator::lambda(
[log_event = log_event_store(all_imported_contacts_).as_slice().str()](Result<> result) mutable {
if (result.is_ok()) {
LOG(INFO) << "Save imported contacts to database";
G()->td_db()->get_sqlite_pmc()->set("user_imported_contacts", std::move(log_event), Auto());
}
}),
"on_import_contacts_finished");
}
for (size_t i = 0; i < result_size; i++) {
@ -8961,17 +8963,19 @@ void ContactsManager::save_contacts_to_database() {
transform(contacts_hints_.search_empty(100000).second, [](int64 key) { return UserId(key); });
G()->td_db()->get_binlog_pmc()->set("saved_contact_count", to_string(saved_contact_count_));
G()->td_db()->get_binlog()->force_sync(PromiseCreator::lambda([user_ids = std::move(user_ids)](Result<> result) {
if (result.is_ok()) {
LOG(INFO) << "Saved contacts to database";
G()->td_db()->get_sqlite_pmc()->set(
"user_contacts", log_event_store(user_ids).as_slice().str(), PromiseCreator::lambda([](Result<> result) {
if (result.is_ok()) {
send_closure(G()->contacts_manager(), &ContactsManager::save_next_contacts_sync_date);
}
}));
}
}));
G()->td_db()->get_binlog()->force_sync(
PromiseCreator::lambda([user_ids = std::move(user_ids)](Result<> result) {
if (result.is_ok()) {
LOG(INFO) << "Saved contacts to database";
G()->td_db()->get_sqlite_pmc()->set(
"user_contacts", log_event_store(user_ids).as_slice().str(), PromiseCreator::lambda([](Result<> result) {
if (result.is_ok()) {
send_closure(G()->contacts_manager(), &ContactsManager::save_next_contacts_sync_date);
}
}));
}
}),
"save_contacts_to_database");
}
void ContactsManager::on_get_contacts_failed(Status error) {

View File

@ -372,7 +372,7 @@ void DeviceTokenManager::save_info(int32 token_type) {
}
sync_cnt_++;
G()->td_db()->get_binlog_pmc()->force_sync(
create_event_promise(self_closure(this, &DeviceTokenManager::dec_sync_cnt)));
create_event_promise(self_closure(this, &DeviceTokenManager::dec_sync_cnt)), "DeviceTokenManager::save_info");
}
void DeviceTokenManager::dec_sync_cnt() {

View File

@ -677,7 +677,7 @@ void SecretChatActor::cancel_chat(bool delete_history, bool is_already_discarded
}
});
context_->binlog()->force_sync(std::move(on_sync));
context_->binlog()->force_sync(std::move(on_sync), "cancel_chat");
yield();
}
@ -1072,7 +1072,7 @@ void SecretChatActor::do_outbound_message_impl(unique_ptr<log_event::OutboundSec
if (log_event_id == 0) {
log_event_id = binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*state->message));
LOG(INFO) << "Outbound secret message [save_log_event] start " << tag("log_event_id", log_event_id);
context_->binlog()->force_sync(std::move(save_log_event_finish));
context_->binlog()->force_sync(std::move(save_log_event_finish), "do_outbound_message_impl");
state->message->set_log_event_id(log_event_id);
} else {
LOG(INFO) << "Outbound secret message [save_log_event] skip " << tag("log_event_id", log_event_id);
@ -1329,7 +1329,7 @@ Status SecretChatActor::do_inbound_message_decrypted(unique_ptr<log_event::Inbou
auto save_log_event_finish = PromiseCreator::join(std::move(save_changes_start), std::move(qts_promise));
if (need_sync) {
// TODO: lazy sync is enough
context_->binlog()->force_sync(std::move(save_log_event_finish));
context_->binlog()->force_sync(std::move(save_log_event_finish), "do_inbound_message_decrypted");
} else {
save_log_event_finish.set_value(Unit());
}
@ -1487,7 +1487,7 @@ void SecretChatActor::outbound_resend(uint64 state_id) {
"on_outbound_send_message_start");
}
});
context_->binlog()->force_sync(std::move(send_message_start));
context_->binlog()->force_sync(std::move(send_message_start), "outbound_resend");
}
Status SecretChatActor::outbound_rewrite_with_empty(uint64 state_id) {
@ -1668,7 +1668,7 @@ void SecretChatActor::on_outbound_send_message_error(uint64 state_id, Status err
}
});
if (need_sync) {
context_->binlog()->force_sync(std::move(send_message_start));
context_->binlog()->force_sync(std::move(send_message_start), "on_outbound_send_message_error");
} else {
send_message_start.set_value(Unit());
}
@ -1833,7 +1833,7 @@ Status SecretChatActor::on_update_chat(NetQueryPtr query) {
TRY_STATUS(on_update_chat(std::move(config)));
if (auth_state_.state == State::WaitRequestResponse) {
context_->secret_chat_db()->set_value(auth_state_);
context_->binlog()->force_sync(Promise<>());
context_->binlog()->force_sync(Promise<>(), "on_update_chat");
}
return Status::OK();
}

View File

@ -449,7 +449,7 @@ Status TdDb::init_sqlite(const Parameters &parameters, const DbKey &key, const D
binlog_pmc.erase("invalidate_old_featured_sticker_sets");
binlog_pmc.erase(AttachMenuManager::get_attach_menu_bots_database_key());
}
binlog_pmc.force_sync({});
binlog_pmc.force_sync(Auto(), "init_sqlite");
TRY_STATUS(db.exec("COMMIT TRANSACTION"));
@ -535,7 +535,7 @@ void TdDb::open_impl(Parameters parameters, Promise<OpenedDatabase> &&promise) {
sqlite_key = string(32, ' ');
Random::secure_bytes(sqlite_key);
binlog_pmc->set("sqlite_key", sqlite_key);
binlog_pmc->force_sync(Auto());
binlog_pmc->force_sync(Auto(), "TdDb::open_impl 1");
}
new_sqlite_key = DbKey::raw_key(std::move(sqlite_key));
} else {
@ -561,7 +561,7 @@ void TdDb::open_impl(Parameters parameters, Promise<OpenedDatabase> &&promise) {
}
if (drop_sqlite_key) {
binlog_pmc->erase("sqlite_key");
binlog_pmc->force_sync(Auto());
binlog_pmc->force_sync(Auto(), "TdDb::open_impl 2");
}
VLOG(td_init) << "Create concurrent_binlog_pmc";

View File

@ -213,8 +213,8 @@ class BinlogKeyValue final : public KeyValueSyncInterface {
return it->second.first;
}
void force_sync(Promise<> &&promise) final {
binlog_->force_sync(std::move(promise));
void force_sync(Promise<> &&promise, const char *source) final {
binlog_->force_sync(std::move(promise), source);
}
void lazy_sync(Promise<> &&promise) {
@ -288,14 +288,14 @@ inline void BinlogKeyValue<Binlog>::add_event(uint64 seq_no, BufferSlice &&event
}
template <>
inline void BinlogKeyValue<Binlog>::force_sync(Promise<> &&promise) {
binlog_->sync();
inline void BinlogKeyValue<Binlog>::force_sync(Promise<> &&promise, const char *source) {
binlog_->sync(source);
promise.set_value(Unit());
}
template <>
inline void BinlogKeyValue<Binlog>::lazy_sync(Promise<> &&promise) {
force_sync(std::move(promise));
force_sync(std::move(promise), "lazy_sync");
}
} // namespace td

View File

@ -48,7 +48,7 @@ class KeyValueSyncInterface {
virtual void erase_by_prefix(Slice prefix) = 0;
virtual void force_sync(Promise<> &&promise) = 0;
virtual void force_sync(Promise<> &&promise, const char *source) = 0;
virtual void close(Promise<> promise) = 0;
};

View File

@ -286,9 +286,9 @@ Status Binlog::close(bool need_sync) {
return Status::OK();
}
if (need_sync) {
sync();
sync("close");
} else {
flush();
flush("close");
}
fd_.lock(FileFd::LockFlags::Unlock, path_, 1).ensure();
@ -373,7 +373,7 @@ void Binlog::do_event(BinlogEvent &&event) {
LOG(INFO) << "Load: init encryption";
} else {
CHECK(state_ == State::Reindex);
flush();
flush("do_event");
update_write_encryption();
//LOG(INFO) << format::cond(state_ == State::Run, "Run", "Reindex") << ": init encryption";
}
@ -404,19 +404,21 @@ void Binlog::do_event(BinlogEvent &&event) {
fd_size_ += event_size;
}
void Binlog::sync() {
flush();
void Binlog::sync(const char *source) {
flush(source);
if (need_sync_) {
LOG(INFO) << "Sync binlog from " << source;
auto status = fd_.sync();
LOG_IF(FATAL, status.is_error()) << "Failed to sync binlog: " << status;
need_sync_ = false;
}
}
void Binlog::flush() {
void Binlog::flush(const char *source) {
if (state_ == State::Load) {
return;
}
LOG(DEBUG) << "Flush binlog from " << source;
flush_events_buffer(true);
// NB: encryption happens during flush
if (byte_flow_flag_) {
@ -448,7 +450,7 @@ void Binlog::lazy_flush() {
buffer_reader_.sync_with_writer();
auto size = buffer_reader_.size() + events_buffer_size;
if (size > (1 << 14)) {
flush();
flush("lazy_flush");
} else if (size > 0 && need_flush_since_ == 0) {
need_flush_since_ = Time::now_cached();
}
@ -660,7 +662,7 @@ void Binlog::do_reindex() {
do_event(std::move(event)); // NB: no move is actually happens
});
{
flush();
flush("do_reindex");
if (start_size != 0) { // must sync creation of the file if it is non-empty
auto status = fd_.sync_barrier();
LOG_IF(FATAL, status.is_error()) << "Failed to sync binlog: " << status;

View File

@ -109,8 +109,8 @@ class Binlog {
}
void add_event(BinlogEvent &&event);
void sync();
void flush();
void sync(const char *source);
void flush(const char *source);
void lazy_flush();
double need_flush_since() const {
return need_flush_since_;

View File

@ -74,7 +74,7 @@ class BinlogInterface {
return seq_no;
}
virtual void force_sync(Promise<> promise) = 0;
virtual void force_sync(Promise<> promise, const char *source) = 0;
virtual void force_flush() = 0;
virtual void change_key(DbKey db_key, Promise<> promise) = 0;

View File

@ -61,7 +61,8 @@ class BinlogActor final : public Actor {
try_flush();
}
void force_sync(Promise<> &&promise) {
void force_sync(Promise<> &&promise, const char *source) {
LOG(INFO) << "Force binlog sync from " << source;
auto seq_no = processor_.max_unfinished_seq_no();
if (processor_.max_finished_seq_no() == seq_no) {
do_immediate_sync(std::move(promise));
@ -72,7 +73,7 @@ class BinlogActor final : public Actor {
void force_flush() {
// TODO: use same logic as in force_sync
binlog_->flush();
binlog_->flush("force_flush");
flush_flag_ = false;
}
@ -115,7 +116,7 @@ class BinlogActor final : public Actor {
auto need_flush_since = binlog_->need_flush_since();
auto now = Time::now_cached();
if (now > need_flush_since + FLUSH_TIMEOUT - 1e-9) {
binlog_->flush();
binlog_->flush("try_flush");
} else {
if (!force_sync_flag_) {
flush_flag_ = true;
@ -161,7 +162,7 @@ class BinlogActor final : public Actor {
flush_flag_ = false;
wakeup_at_ = 0;
if (need_sync) {
binlog_->sync();
binlog_->sync("timeout_expired");
// LOG(ERROR) << "BINLOG SYNC";
set_promises(sync_promises_);
} else if (need_flush) {
@ -205,12 +206,14 @@ void ConcurrentBinlog::add_raw_event_impl(uint64 event_id, BufferSlice &&raw_eve
send_closure(binlog_actor_, &detail::BinlogActor::add_raw_event, event_id, std::move(raw_event), std::move(promise),
info);
}
void ConcurrentBinlog::force_sync(Promise<> promise) {
send_closure(binlog_actor_, &detail::BinlogActor::force_sync, std::move(promise));
void ConcurrentBinlog::force_sync(Promise<> promise, const char *source) {
send_closure(binlog_actor_, &detail::BinlogActor::force_sync, std::move(promise), source);
}
void ConcurrentBinlog::force_flush() {
send_closure(binlog_actor_, &detail::BinlogActor::force_flush);
}
void ConcurrentBinlog::change_key(DbKey db_key, Promise<> promise) {
send_closure(binlog_actor_, &detail::BinlogActor::change_key, std::move(db_key), std::move(promise));
}

View File

@ -41,7 +41,7 @@ class ConcurrentBinlog final : public BinlogInterface {
ConcurrentBinlog &operator=(ConcurrentBinlog &&) = delete;
~ConcurrentBinlog() final;
void force_sync(Promise<> promise) final;
void force_sync(Promise<> promise, const char *source) final;
void force_flush() final;
void change_key(DbKey db_key, Promise<> promise) final;

View File

@ -364,7 +364,7 @@ class FakeBinlog final
FakeBinlog() {
register_actor("FakeBinlog", this).release();
}
void force_sync(Promise<> promise) final {
void force_sync(Promise<> promise, const char *source) final {
if (pending_events_.empty()) {
pending_events_.emplace_back();
}
@ -639,7 +639,7 @@ class Master final : public Actor {
if (binlog_generation != binlog_generation_) {
return promise.set_error(Status::Error("Binlog generation mismatch"));
}
binlog_->force_sync(std::move(promise));
binlog_->force_sync(std::move(promise), "sync_binlog");
}
void on_closed() {
LOG(INFO) << "CLOSED";