This source file includes following definitions.
- weak_ptr_factory_
- Start
- UpdateCredentials
- UpdateRegisteredIds
- Ready
- Invalidate
- InvalidateUnknownVersion
- InvalidateAll
- DispatchInvalidations
- SaveInvalidations
- EmitSavedInvalidations
- InformRegistrationStatus
- InformRegistrationFailure
- ReissueRegistrations
- InformError
- Acknowledge
- Drop
- WriteState
- DoRegistrationUpdate
- RequestDetailedStatus
- CollectDebugData
- StopForTest
- Stop
- GetState
- EmitStateChange
- GetThisAsAckHandler
- OnNetworkChannelStateChanged
#include "sync/notifier/sync_invalidation_listener.h"
#include <vector>
#include "base/bind.h"
#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
#include "base/tracked_objects.h"
#include "google/cacheinvalidation/include/invalidation-client.h"
#include "google/cacheinvalidation/include/types.h"
#include "google/cacheinvalidation/types.pb.h"
#include "jingle/notifier/listener/push_client.h"
#include "sync/notifier/invalidation_util.h"
#include "sync/notifier/object_id_invalidation_map.h"
#include "sync/notifier/registration_manager.h"
namespace {
const char kApplicationName[] = "chrome-sync";
}
namespace syncer {
SyncInvalidationListener::Delegate::~Delegate() {}
SyncInvalidationListener::SyncInvalidationListener(
scoped_ptr<SyncNetworkChannel> network_channel)
: sync_network_channel_(network_channel.Pass()),
sync_system_resources_(sync_network_channel_.get(), this),
delegate_(NULL),
ticl_state_(DEFAULT_INVALIDATION_ERROR),
push_client_state_(DEFAULT_INVALIDATION_ERROR),
weak_ptr_factory_(this) {
DCHECK(CalledOnValidThread());
sync_network_channel_->AddObserver(this);
}
SyncInvalidationListener::~SyncInvalidationListener() {
DCHECK(CalledOnValidThread());
sync_network_channel_->RemoveObserver(this);
Stop();
DCHECK(!delegate_);
}
void SyncInvalidationListener::Start(
const CreateInvalidationClientCallback&
create_invalidation_client_callback,
const std::string& client_id, const std::string& client_info,
const std::string& invalidation_bootstrap_data,
const UnackedInvalidationsMap& initial_unacked_invalidations,
const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker,
Delegate* delegate) {
DCHECK(CalledOnValidThread());
Stop();
sync_system_resources_.set_platform(client_info);
sync_system_resources_.Start();
sync_system_resources_.storage()->SetInitialState(
invalidation_bootstrap_data);
unacked_invalidations_map_ = initial_unacked_invalidations;
invalidation_state_tracker_ = invalidation_state_tracker;
DCHECK(invalidation_state_tracker_.IsInitialized());
DCHECK(!delegate_);
DCHECK(delegate);
delegate_ = delegate;
#if defined(OS_IOS)
int client_type = ipc::invalidation::ClientType::CHROME_SYNC_IOS;
#else
int client_type = ipc::invalidation::ClientType::CHROME_SYNC;
#endif
invalidation_client_.reset(
create_invalidation_client_callback.Run(
&sync_system_resources_, client_type, client_id,
kApplicationName, this));
invalidation_client_->Start();
registration_manager_.reset(
new RegistrationManager(invalidation_client_.get()));
}
void SyncInvalidationListener::UpdateCredentials(
const std::string& email, const std::string& token) {
DCHECK(CalledOnValidThread());
sync_network_channel_->UpdateCredentials(email, token);
}
void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
DCHECK(CalledOnValidThread());
registered_ids_ = ids;
if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) {
DoRegistrationUpdate();
}
}
void SyncInvalidationListener::Ready(
invalidation::InvalidationClient* client) {
DCHECK(CalledOnValidThread());
DCHECK_EQ(client, invalidation_client_.get());
ticl_state_ = INVALIDATIONS_ENABLED;
EmitStateChange();
DoRegistrationUpdate();
}
void SyncInvalidationListener::Invalidate(
invalidation::InvalidationClient* client,
const invalidation::Invalidation& invalidation,
const invalidation::AckHandle& ack_handle) {
DCHECK(CalledOnValidThread());
DCHECK_EQ(client, invalidation_client_.get());
client->Acknowledge(ack_handle);
const invalidation::ObjectId& id = invalidation.object_id();
std::string payload;
if (invalidation.has_payload())
payload = invalidation.payload();
DVLOG(2) << "Received invalidation with version " << invalidation.version()
<< " for " << ObjectIdToString(id);
ObjectIdInvalidationMap invalidations;
Invalidation inv = Invalidation::Init(id, invalidation.version(), payload);
inv.set_ack_handler(GetThisAsAckHandler());
invalidations.Insert(inv);
DispatchInvalidations(invalidations);
}
void SyncInvalidationListener::InvalidateUnknownVersion(
invalidation::InvalidationClient* client,
const invalidation::ObjectId& object_id,
const invalidation::AckHandle& ack_handle) {
DCHECK(CalledOnValidThread());
DCHECK_EQ(client, invalidation_client_.get());
DVLOG(1) << "InvalidateUnknownVersion";
client->Acknowledge(ack_handle);
ObjectIdInvalidationMap invalidations;
Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id);
unknown_version.set_ack_handler(GetThisAsAckHandler());
invalidations.Insert(unknown_version);
DispatchInvalidations(invalidations);
}
void SyncInvalidationListener::InvalidateAll(
invalidation::InvalidationClient* client,
const invalidation::AckHandle& ack_handle) {
DCHECK(CalledOnValidThread());
DCHECK_EQ(client, invalidation_client_.get());
DVLOG(1) << "InvalidateAll";
client->Acknowledge(ack_handle);
ObjectIdInvalidationMap invalidations;
for (ObjectIdSet::iterator it = registered_ids_.begin();
it != registered_ids_.end(); ++it) {
Invalidation unknown_version = Invalidation::InitUnknownVersion(*it);
unknown_version.set_ack_handler(GetThisAsAckHandler());
invalidations.Insert(unknown_version);
}
DispatchInvalidations(invalidations);
}
void SyncInvalidationListener::DispatchInvalidations(
const ObjectIdInvalidationMap& invalidations) {
DCHECK(CalledOnValidThread());
ObjectIdInvalidationMap to_save = invalidations;
ObjectIdInvalidationMap to_emit =
invalidations.GetSubsetWithObjectIds(registered_ids_);
SaveInvalidations(to_save);
EmitSavedInvalidations(to_emit);
}
void SyncInvalidationListener::SaveInvalidations(
const ObjectIdInvalidationMap& to_save) {
ObjectIdSet objects_to_save = to_save.GetObjectIds();
for (ObjectIdSet::const_iterator it = objects_to_save.begin();
it != objects_to_save.end(); ++it) {
UnackedInvalidationsMap::iterator lookup =
unacked_invalidations_map_.find(*it);
if (lookup == unacked_invalidations_map_.end()) {
lookup = unacked_invalidations_map_.insert(
std::make_pair(*it, UnackedInvalidationSet(*it))).first;
}
lookup->second.AddSet(to_save.ForObject(*it));
}
invalidation_state_tracker_.Call(
FROM_HERE,
&InvalidationStateTracker::SetSavedInvalidations,
unacked_invalidations_map_);
}
void SyncInvalidationListener::EmitSavedInvalidations(
const ObjectIdInvalidationMap& to_emit) {
DVLOG(2) << "Emitting invalidations: " << to_emit.ToString();
delegate_->OnInvalidate(to_emit);
}
void SyncInvalidationListener::InformRegistrationStatus(
invalidation::InvalidationClient* client,
const invalidation::ObjectId& object_id,
InvalidationListener::RegistrationState new_state) {
DCHECK(CalledOnValidThread());
DCHECK_EQ(client, invalidation_client_.get());
DVLOG(1) << "InformRegistrationStatus: "
<< ObjectIdToString(object_id) << " " << new_state;
if (new_state != InvalidationListener::REGISTERED) {
registration_manager_->MarkRegistrationLost(object_id);
}
}
void SyncInvalidationListener::InformRegistrationFailure(
invalidation::InvalidationClient* client,
const invalidation::ObjectId& object_id,
bool is_transient,
const std::string& error_message) {
DCHECK(CalledOnValidThread());
DCHECK_EQ(client, invalidation_client_.get());
DVLOG(1) << "InformRegistrationFailure: "
<< ObjectIdToString(object_id)
<< "is_transient=" << is_transient
<< ", message=" << error_message;
if (is_transient) {
registration_manager_->MarkRegistrationLost(object_id);
} else {
registration_manager_->DisableId(object_id);
}
}
void SyncInvalidationListener::ReissueRegistrations(
invalidation::InvalidationClient* client,
const std::string& prefix,
int prefix_length) {
DCHECK(CalledOnValidThread());
DCHECK_EQ(client, invalidation_client_.get());
DVLOG(1) << "AllRegistrationsLost";
registration_manager_->MarkAllRegistrationsLost();
}
void SyncInvalidationListener::InformError(
invalidation::InvalidationClient* client,
const invalidation::ErrorInfo& error_info) {
DCHECK(CalledOnValidThread());
DCHECK_EQ(client, invalidation_client_.get());
LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": "
<< error_info.error_message()
<< " (transient = " << error_info.is_transient() << ")";
if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) {
ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED;
} else {
ticl_state_ = TRANSIENT_INVALIDATION_ERROR;
}
EmitStateChange();
}
void SyncInvalidationListener::Acknowledge(
const invalidation::ObjectId& id,
const syncer::AckHandle& handle) {
UnackedInvalidationsMap::iterator lookup =
unacked_invalidations_map_.find(id);
if (lookup == unacked_invalidations_map_.end()) {
DLOG(WARNING) << "Received acknowledgement for untracked object ID";
return;
}
lookup->second.Acknowledge(handle);
invalidation_state_tracker_.Call(
FROM_HERE,
&InvalidationStateTracker::SetSavedInvalidations,
unacked_invalidations_map_);
}
void SyncInvalidationListener::Drop(
const invalidation::ObjectId& id,
const syncer::AckHandle& handle) {
UnackedInvalidationsMap::iterator lookup =
unacked_invalidations_map_.find(id);
if (lookup == unacked_invalidations_map_.end()) {
DLOG(WARNING) << "Received drop for untracked object ID";
return;
}
lookup->second.Drop(handle);
invalidation_state_tracker_.Call(
FROM_HERE,
&InvalidationStateTracker::SetSavedInvalidations,
unacked_invalidations_map_);
}
void SyncInvalidationListener::WriteState(const std::string& state) {
DCHECK(CalledOnValidThread());
DVLOG(1) << "WriteState";
invalidation_state_tracker_.Call(
FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state);
}
void SyncInvalidationListener::DoRegistrationUpdate() {
DCHECK(CalledOnValidThread());
const ObjectIdSet& unregistered_ids =
registration_manager_->UpdateRegisteredIds(registered_ids_);
for (ObjectIdSet::iterator it = unregistered_ids.begin();
it != unregistered_ids.end(); ++it) {
unacked_invalidations_map_.erase(*it);
}
invalidation_state_tracker_.Call(
FROM_HERE,
&InvalidationStateTracker::SetSavedInvalidations,
unacked_invalidations_map_);
ObjectIdInvalidationMap object_id_invalidation_map;
for (UnackedInvalidationsMap::iterator map_it =
unacked_invalidations_map_.begin();
map_it != unacked_invalidations_map_.end(); ++map_it) {
if (registered_ids_.find(map_it->first) == registered_ids_.end()) {
continue;
}
map_it->second.ExportInvalidations(
GetThisAsAckHandler(),
&object_id_invalidation_map);
}
EmitSavedInvalidations(object_id_invalidation_map);
}
void SyncInvalidationListener::RequestDetailedStatus(
base::Callback<void(const base::DictionaryValue&)> callback) const {
DCHECK(CalledOnValidThread());
sync_network_channel_->RequestDetailedStatus(callback);
callback.Run(*CollectDebugData());
}
scoped_ptr<base::DictionaryValue>
SyncInvalidationListener::CollectDebugData() const {
scoped_ptr<base::DictionaryValue> return_value(new base::DictionaryValue());
return_value->SetString(
"SyncInvalidationListener.PushClientState",
std::string(InvalidatorStateToString(push_client_state_)));
return_value->SetString("SyncInvalidationListener.TiclState",
std::string(InvalidatorStateToString(ticl_state_)));
scoped_ptr<base::DictionaryValue> unacked_map(new base::DictionaryValue());
for (UnackedInvalidationsMap::const_iterator it =
unacked_invalidations_map_.begin();
it != unacked_invalidations_map_.end();
++it) {
unacked_map->Set((it->first).name(), (it->second).ToValue().release());
}
return_value->Set("SyncInvalidationListener.UnackedInvalidationsMap",
unacked_map.release());
return return_value.Pass();
}
void SyncInvalidationListener::StopForTest() {
DCHECK(CalledOnValidThread());
Stop();
}
void SyncInvalidationListener::Stop() {
DCHECK(CalledOnValidThread());
if (!invalidation_client_) {
return;
}
registration_manager_.reset();
sync_system_resources_.Stop();
invalidation_client_->Stop();
invalidation_client_.reset();
delegate_ = NULL;
ticl_state_ = DEFAULT_INVALIDATION_ERROR;
push_client_state_ = DEFAULT_INVALIDATION_ERROR;
}
InvalidatorState SyncInvalidationListener::GetState() const {
DCHECK(CalledOnValidThread());
if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED ||
push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) {
return INVALIDATION_CREDENTIALS_REJECTED;
}
if (ticl_state_ == INVALIDATIONS_ENABLED &&
push_client_state_ == INVALIDATIONS_ENABLED) {
return INVALIDATIONS_ENABLED;
}
return TRANSIENT_INVALIDATION_ERROR;
}
void SyncInvalidationListener::EmitStateChange() {
DCHECK(CalledOnValidThread());
delegate_->OnInvalidatorStateChange(GetState());
}
WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() {
DCHECK(CalledOnValidThread());
return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr());
}
void SyncInvalidationListener::OnNetworkChannelStateChanged(
InvalidatorState invalidator_state) {
DCHECK(CalledOnValidThread());
push_client_state_ = invalidator_state;
EmitStateChange();
}
}