// edsUpdate triggers an EDS update for the given instances func(s *ServiceEntryStore)edsUpdate(instances []*model.ServiceInstance) { allInstances := []*model.ServiceInstance{}
// Find all keys we need to lookup keys := map[instancesKey]struct{}{} for _, i := range instances { keys[makeInstanceKey(i)] = struct{}{} }
s.maybeRefreshIndexes()
s.storeMutex.RLock() for key := range keys { for _, i := range s.instances[key] { allInstances = append(allInstances, i...) } } s.storeMutex.RUnlock()
// This was a delete iflen(allInstances) == 0 { for k := range keys { _ = s.XdsUpdater.EDSUpdate(s.Cluster(), string(k.hostname), k.namespace, nil) } return } ... }
如果实例有更新则直接发送更新 EDS 的请求:
// edsUpdate triggers an EDS update for the given instances func(s *ServiceEntryStore)edsUpdate(instances []*model.ServiceInstance) { ... endpoints := make(map[instancesKey][]*model.IstioEndpoint) for _, instance := range allInstances { port := instance.ServicePort key := makeInstanceKey(instance) endpoints[key] = append(endpoints[key], &model.IstioEndpoint{ Address: instance.Endpoint.Address, EndpointPort: instance.Endpoint.EndpointPort, ServicePortName: port.Name, Labels: instance.Endpoint.Labels, UID: instance.Endpoint.UID, ServiceAccount: instance.Endpoint.ServiceAccount, Network: instance.Endpoint.Network, Locality: instance.Endpoint.Locality, LbWeight: instance.Endpoint.LbWeight, TLSMode: instance.Endpoint.TLSMode, }) }
for k, eps := range endpoints { _ = s.XdsUpdater.EDSUpdate(s.Cluster(), string(k.hostname), k.namespace, eps) } }
s.storeMutex.RLock() // We will only select entries in the same namespace entries := s.seWithSelectorByNamespace[curr.Namespace] s.storeMutex.RUnlock()
// if there are no service entries, return now to avoid taking unnecessary locks iflen(entries) == 0 { return }
log.Debugf("Handle event %s for workload entry %s in namespace %s", event, curr.Name, curr.Namespace) instances := []*model.ServiceInstance{} for _, se := range entries { workloadLabels := labels.Collection{wle.Labels} if !workloadLabels.IsSupersetOf(se.entry.WorkloadSelector.Labels) { // Not a match, skip this one continue } instance := convertWorkloadEntryToServiceInstances(wle, se.services, se.entry) instances = append(instances, instance...) }
switch event { case model.EventUpdate: os := convertServices(old) if selectorChanged(old, curr) { // Consider all services are updated. mark := make(map[host.Name]*model.Service, len(cs)) for _, svc := range cs { mark[svc.Hostname] = svc updatedSvcs = append(updatedSvcs, svc) } for _, svc := range os { if _, f := mark[svc.Hostname]; !f { updatedSvcs = append(updatedSvcs, svc) } } } else { addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs = servicesDiff(os, cs) } case model.EventDelete: deletedSvcs = cs case model.EventAdd: addedSvcs = cs default: // this should not happen unchangedSvcs = cs }
iflen(unchangedSvcs) > 0 { // If this service entry had endpoints with IPs (i.e. resolution STATIC), then we do EDS update. // If the service entry had endpoints with FQDNs (i.e. resolution DNS), then we need to do // full push (as fqdn endpoints go via strict_dns clusters in cds). currentServiceEntry := curr.Spec.(*networking.ServiceEntry) oldServiceEntry := old.Spec.(*networking.ServiceEntry) if currentServiceEntry.Resolution == networking.ServiceEntry_DNS { if !reflect.DeepEqual(currentServiceEntry.Endpoints, oldServiceEntry.Endpoints) { // fqdn endpoints have changed. Need full push for _, svc := range unchangedSvcs { configsUpdated[model.ConfigKey{ Kind: gvk.ServiceEntry, Name: string(svc.Hostname), Namespace: svc.Attributes.Namespace}] = struct{}{} } } } }
iflen(unchangedSvcs) > 0 && !fullPush { // IP endpoints in a STATIC service entry has changed. We need EDS update // If will do full-push, leave the edsUpdate to that. // XXX We should do edsUpdate for all unchangedSvcs since we begin to calculate service // data according to this "configsUpdated" and thus remove the "!willFullPush" condition. instances := convertInstances(curr, unchangedSvcs) key := configKey{ kind: serviceEntryConfigType, name: curr.Name, namespace: curr.Namespace, } // If only instances have changed, just update the indexes for the changed instances. s.updateExistingInstances(key, instances) s.edsUpdate(instances) return }
if fullPush { // When doing a full push, for added and updated services trigger an eds update // so that endpoint shards are updated. var instances []*model.ServiceInstance iflen(addedSvcs) > 0 { instances = append(instances, convertInstances(curr, addedSvcs)...) } iflen(updatedSvcs) > 0 { instances = append(instances, convertInstances(curr, updatedSvcs)...) } iflen(unchangedSvcs) > 0 { currentServiceEntry := curr.Spec.(*networking.ServiceEntry) oldServiceEntry := old.Spec.(*networking.ServiceEntry) // Non DNS service entries are sent via EDS. So we should compare and update if such endpoints change. if currentServiceEntry.Resolution != networking.ServiceEntry_DNS { if !reflect.DeepEqual(currentServiceEntry.Endpoints, oldServiceEntry.Endpoints) { instances = append(instances, convertInstances(curr, unchangedSvcs)...) } } } s.edsUpdate(instances)
// If service entry is deleted, cleanup endpoint shards for services. for _, svc := range deletedSvcs { s.XdsUpdater.SvcUpdate(s.Cluster(), string(svc.Hostname), svc.Attributes.Namespace, model.EventDelete) }