func main() {
memoryStorage, err := NewMemoryStorage()
sysFs, err := sysfs.NewRealSysFs()
collectorHttpClient := createCollectorHttpClient(*collectorCert, *collectorKey)
containerManager, err := manager.New(memoryStorage, sysFs, *maxHousekeepingInterval, *allowDynamicHousekeeping, ignoreMetrics.MetricSet, &collectorHttpClient)
mux := http.NewServeMux()
if *enableProfiling {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
// Register all HTTP handlers.
err = cadvisorhttp.RegisterHandlers(mux, containerManager, *httpAuthFile, *httpAuthRealm, *httpDigestFile, *httpDigestRealm)
cadvisorhttp.RegisterPrometheusHandler(mux, containerManager, *prometheusEndpoint, nil)
// Start the manager.
if err := containerManager.Start(); err != nil {
glog.Fatalf("Failed to start container manager: %v", err)
//启动一些一直要运行的go routine来用于实现各种监控操作.
// Install signal handler.
glog.Infof("Starting cAdvisor version: %s-%s on port %d", version.Info["version"], version.Info["revision"], *argPort)
addr := fmt.Sprintf("%s:%d", *argIp, *argPort)
glog.Fatal(http.ListenAndServe(addr, mux))
type manager struct {
containers map[namespacedContainerName]*containerData
containersLock sync.RWMutex
memoryCache *memory.InMemoryCache
fsInfo fs.FsInfo
machineInfo info.MachineInfo
quitChannels []chan error
cadvisorContainer string
inHostNamespace bool
eventHandler events.EventManager
startupTime time.Time
maxHousekeepingInterval time.Duration
allowDynamicHousekeeping bool
ignoreMetrics container.MetricSet
containerWatchers []watcher.ContainerWatcher
//Registers a channel to listen for events affecting subcontainers (recursively).
eventsChannel chan watcher.ContainerEvent
collectorHttpClient *http.Client
// New takes a memory storage and returns a new manager.
func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingInterval time.Duration, allowDynamicHousekeeping bool, ignoreMetricsSet container.MetricSet, collectorHttpClient *http.Client) (Manager, error) {
// Detect the container we are running on.
selfContainer, err := cgroups.GetThisCgroupDir("cpu")
dockerStatus, err := docker.Status()
//调用docker api获取docker信息.
rktPath, err := rkt.RktPath()
context := fs.Context{
Docker: fs.DockerContext{
Root: docker.RootDir(),
Driver: dockerStatus.Driver,
DriverStatus: dockerStatus.DriverStatus,
RktPath: rktPath,
fsInfo, err := fs.NewFsInfo(context)
// If cAdvisor was started with host's rootfs mounted, assume that its running
// in its own namespaces.
inHostNamespace := false
if _, err := os.Stat("/rootfs/proc"); os.IsNotExist(err) {
inHostNamespace = true
// Register for new subcontainers.
eventsChannel := make(chan watcher.ContainerEvent, 16)
newManager := &manager{
containers: make(map[namespacedContainerName]*containerData),
quitChannels: make([]chan error, 0, 2),
memoryCache: memoryCache,
fsInfo: fsInfo,
cadvisorContainer: selfContainer,
inHostNamespace: inHostNamespace,
startupTime: time.Now(),
maxHousekeepingInterval: maxHousekeepingInterval,
allowDynamicHousekeeping: allowDynamicHousekeeping,
ignoreMetrics: ignoreMetricsSet,
containerWatchers: []watcher.ContainerWatcher{},
eventsChannel: eventsChannel,
collectorHttpClient: collectorHttpClient,
machineInfo, err := machine.Info(sysfs, fsInfo, inHostNamespace)
newManager.machineInfo = *machineInfo
versionInfo, err := getVersionInfo()
glog.Infof("Version: %+v", *versionInfo)
newManager.eventHandler = events.NewEventManager(parseEventsStoragePolicy())
//注册eventHandler.封装了一些对event进行处理的操作,相当于是一个event manager.
return newManager, nil
manager的start: cadvisor/manager/manager.go
// Start the container manager.
func (self *manager) Start() error {
err := docker.Register(self, self.fsInfo, self.ignoreMetrics)
err = rkt.Register(self, self.fsInfo, self.ignoreMetrics)
if err != nil {
glog.Warningf("Registration of the rkt container factory failed: %v", err)
} else {
watcher, err := rktwatcher.NewRktContainerWatcher()
if err != nil {
return err
self.containerWatchers = append(self.containerWatchers, watcher)
err = systemd.Register(self, self.fsInfo, self.ignoreMetrics)
err = raw.Register(self, self.fsInfo, self.ignoreMetrics)
rawWatcher, err := rawwatcher.NewRawContainerWatcher()
self.containerWatchers = append(self.containerWatchers, rawWatcher)
// Watch for OOMs.
err = self.watchForNewOoms()
// If there are no factories, don't start any housekeeping and serve the information we do have.
if !container.HasFactories() {
return nil
// Create root and then recover all containers.
err = self.createContainer("/", watcher.Raw)
glog.Infof("Starting recovery of all containers")
err = self.detectSubcontainers("/")
glog.Infof("Recovery completed")
// Watch for new container.
quitWatcher := make(chan error)
err = self.watchForNewContainers(quitWatcher)
//watch cgroup的文件系统
self.quitChannels = append(self.quitChannels, quitWatcher)
// Look for new containers in the main housekeeping thread.
quitGlobalHousekeeping := make(chan error)
self.quitChannels = append(self.quitChannels, quitGlobalHousekeeping)
go self.globalHousekeeping(quitGlobalHousekeeping)
return nil
// Create a container.
func (m *manager) createContainer(containerName string, watchSource watcher.ContainerWatchSource) error {
defer m.containersLock.Unlock()
return m.createContainerLocked(containerName, watchSource)
func (m *manager) createContainerLocked(containerName string, watchSource watcher.ContainerWatchSource) error {
namespacedName := namespacedContainerName{
Name: containerName,
handler, accept, err := container.NewContainerHandler(containerName, watchSource, m.inHostNamespace)
collectorManager, err := collector.NewCollectorManager()
logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer
cont, err := newContainerData(containerName, m.memoryCache, handler, logUsage, collectorManager, m.maxHousekeepingInterval, m.allowDynamicHousekeeping)
// Add collectors
labels := handler.GetContainerLabels()
collectorConfigs := collector.GetCollectorConfigs(labels)
err = m.registerCollectors(collectorConfigs, cont)
// Add the container name and all its aliases. The aliases must be within the namespace of the factory.
m.containers[namespacedName] = cont
for _, alias := range cont.info.Aliases {
Namespace: cont.info.Namespace,
Name: alias,
}] = cont
contSpec, err := cont.handler.GetSpec()
contRef, err := cont.handler.ContainerReference()
newEvent := &info.Event{
ContainerName: contRef.Name,
Timestamp: contSpec.CreationTime,
EventType: info.EventContainerCreation,
err = m.eventHandler.AddEvent(newEvent)
// Start the container's housekeeping.
return cont.Start()
//执行go c.housekeeping()
container.NewContainerHandler:创建containerHandler,主要是通过遍历factories(一个全局的factories map),根据containerName看是否能该factories处理,如果可以处理,就调用对应factory的NewContainerHandler方法。rawFactory的CanHandleAndAccept逻辑: 很直接了,如果dockeronly参数被设置为false,或者容器的name为”/“,则CanHandleAndAccept都会返回true。factory注册的顺序先是dockerfactory其次是rawfactory,在检测的时候是遍历factory,执行它们的CanHandleAndAccept方法,哪个先返回true,就先把相应的factory注册进去,所以以”/“命名的容器应该被rawfactory处理,后面的应该被dockerfactory处理。
type containerData struct {
handler container.ContainerHandler
info containerInfo
memoryCache *memory.InMemoryCache
lock sync.Mutex
loadReader cpuload.CpuLoadReader
summaryReader *summary.StatsSummary
loadAvg float64 // smoothed load average seen so far.
housekeepingInterval time.Duration
maxHousekeepingInterval time.Duration
allowDynamicHousekeeping bool
lastUpdatedTime time.Time
lastErrorTime time.Time
// Decay value used for load average smoothing. Interval length of 10 seconds is used.
loadDecay float64
// Whether to log the usage of this container when it is updated.
logUsage bool
// Tells the container to stop.
stop chan bool
// Runs custom metric collectors.
collectorManager collector.CollectorManager
在createContainer的最后一步是执行containerData的Start方法,实际上是用一个goroutine来执行go c.housekeeping()
func (c *containerData) Start() error {
go c.housekeeping()
return nil
func (c *containerData) housekeeping() {
// Housekeep every second.
glog.V(3).Infof("Start housekeeping for container %q\n", c.info.Name)
lastHousekeeping := time.Now()
for {
select {
case <-c.stop:
// Stop housekeeping when signaled.
// Perform housekeeping.
start := time.Now()
next := c.nextHousekeeping(lastHousekeeping)
// Schedule the next housekeeping. Sleep until that time.
if time.Now().Before(next) {
} else {
next = time.Now()
lastHousekeeping = next
func (c *containerData) housekeepingTick() {
err := c.updateStats()
func (c *containerData) updateStats() error {
stats, statsErr := c.handler.GetStats()
if c.loadReader != nil {
// TODO(vmarmol): Cache this path.
path, err := c.handler.GetCgroupPath("cpu")
if err == nil {
loadStats, err := c.loadReader.GetCpuLoad(c.info.Name, path)
stats.TaskStats = loadStats
// convert to 'milliLoad' to avoid floats and preserve precision.
stats.Cpu.LoadAverage = int32(c.loadAvg * 1000)
if c.summaryReader != nil {
err := c.summaryReader.AddSample(*stats)
var customStatsErr error
cm := c.collectorManager.(*collector.GenericCollectorManager)
if len(cm.Collectors) > 0 {
if cm.NextCollectionTime.Before(time.Now()) {
customStats, err := c.updateCustomStats()
if customStats != nil {
stats.CustomMetrics = customStats
ref, err := c.handler.ContainerReference()
err = c.memoryCache.AddStats(ref, stats)
return customStatsErr