Mesos 1.0.0 源码解析杂记

这里记录下分析mesos源码的所得,会比较杂,权当博主自己的备忘。

check disk usage

在本地搭建了一个mesos测试环境,发现log level是INFO,也就是说默认情况下日志信息是最丰富的。 发现agent每隔1分钟会统计一下磁盘的用量,发现agent的flag “disk watch interval”确实默认是1min。 在agent启动的时候,会spawn一个Slave的Process(这里的Process是libprocess里的概念,你可以暂时理解它为轻量级的进程), 这个Process在初始化的时候干了一件事:

delay(flags.disk_watch_interval, self(), &Slave::checkDiskUsage);

这个逻辑也就是,disk_watch_interval时间之后,调用Slave::checkDiskUsage函数:

void Slave::checkDiskUsage()
{
  // TODO(vinod): We are making usage a Future, so that we can plug in
  // fs::usage() into async.
  // NOTE: We calculate disk usage of the file system on which the
  // slave work directory is mounted.
  Future<double>(::fs::usage(flags.work_dir))
    .onAny(defer(self(), &Slave::_checkDiskUsage, lambda::_1));
}


void Slave::_checkDiskUsage(const Future<double>& usage)
{
  if (!usage.isReady()) {
    LOG(ERROR) << "Failed to get disk usage: "
               << (usage.isFailed() ? usage.failure() : "future discarded");
  } else {
    executorDirectoryMaxAllowedAge = age(usage.get());
    LOG(INFO) << "Current disk usage " << std::setiosflags(std::ios::fixed)
              << std::setprecision(2) << 100 * usage.get() << "%."
              << " Max allowed age: " << executorDirectoryMaxAllowedAge;

    // We prune all directories whose deletion time is within
    // the next 'gc_delay - age'. Since a directory is always
    // scheduled for deletion 'gc_delay' into the future, only directories
    // that are at least 'age' old are deleted.
    gc->prune(flags.gc_delay - executorDirectoryMaxAllowedAge);
  }
  delay(flags.disk_watch_interval, self(), &Slave::checkDiskUsage);
}

虽然Future和Promise博主还没有从本质上掌握,但从这里的代码,基本可以看出逻辑。Process初始化的时候, delay一个函数调用,函数调用里面最后再发起一个delay,这样就可以定时的检查磁盘用量。

agent向master注册自己

通过阅读agent的代码发现:

Try<MasterDetector*> detector_ = MasterDetector::create(
      master, flags.master_detector);

通过创建一个MasterDetector来做Master的检测工作,而MasterDetector定义在Master文件夹中。 继续深入MasterDetector也没有什么好看的,如果没有指定zk的话,使用的是StandaloneMasterDetector, 否则就是使用zk来做Master检测。我们还得回头看看,这个MasterDetector创建了之后,是怎么用的。 我们继续看Agent Main的代码:

Slave* slave = new Slave(
      id,
      flags,
      detector,
      containerizer.get(),
      &files,
      &gc,
      &statusUpdateManager,
      resourceEstimator.get(),
      qosController.get(),
      authorizer_);

ok, detector作为一个引用,传递进了Slave的构造函数。 所以,我们得看看,在Slave Process中是如何使用detector的。 在Slave Process里有两个函数使用了detector:

void Slave::detected(const Future<Option<MasterInfo>>& _master)

void Slave::__recover(const Future<Nothing>& future)

最后发现,在recover的时候才会detect。 貌似Agent起来的时候就一定是RECOVERING的状态。

VersionProcess

在master和agent启动的时候,都会spawn一个VersionProcess, 这个VersionProcess到底是个什么鬼?


void VersionProcess::initialize()
{
  route("/", VERSION_HELP(), &VersionProcess::version);
}


Future<http::Response> VersionProcess::version(const http::Request& request)
{
  return OK(internal::version(), request.url.query.get("jsonp"));
}

原来是mesos设置了一个单独的Process来处理version的HTTP REST请求,参考:

vanJobs:~/my/mesos-api-try/agent$ ./version.py http://172.16.27.47:5051
{"build_date":"2016-08-01 15:11:35","build_time":1470035495.0,"build_user":"demo","git_sha":"c9b70582e9fccab8f6863b0bd3a812b5969a8c24","git_tag":"1.0.0","version":"1.0.0"}

Agent Recovery

在Agent启动的最后,有一行代码:

  // Do recovery.
  async(&state::recover, metaDir, flags.strict)
    .then(defer(self(), &Slave::recover, lambda::_1))
    .then(defer(self(), &Slave::_recover))
    .onAny(defer(self(), &Slave::__recover, lambda::_1));

可以看出,每次Agent启动都会尝试去Recover。那么Recover到底是个什么鬼? 这个Recovery看起来有些复杂,TBA。

Master->Framework Heartbeat

在Framework注册成功之后,Master会启动对Framework的heartbeat, 那么这个heartbeat是怎么实现的呢? 代码在master.cpp中:

// This process periodically sends heartbeats to a scheduler on the
// given HTTP connection.
class Heartbeater : public process::Process<Heartbeater>
{
public:
  Heartbeater(const FrameworkID& _frameworkId,
              const HttpConnection& _http,
              const Duration& _interval)
    : process::ProcessBase(process::ID::generate("heartbeater")),
      frameworkId(_frameworkId),
      http(_http),
      interval(_interval) {}

protected:
  virtual void initialize() override
  {
    heartbeat();
  }

private:
  void heartbeat()
  {
    // Only send a heartbeat if the connection is not closed.
    if (http.closed().isPending()) {
      VLOG(1) << "Sending heartbeat to " << frameworkId;

      scheduler::Event event;
      event.set_type(scheduler::Event::HEARTBEAT);

      http.send(event);
    }

    process::delay(interval, self(), &Self::heartbeat);
  }

  const FrameworkID frameworkId;
  HttpConnection http;
  const Duration interval;
};

可以从上面的代码中看出基本逻辑,发送heartbeat的地方是http.send(event), 那么我们需要研究一下 http和event, TBA。

Framework启动过程

参考examples里的test_framework.cpp:

driver = new MesosSchedulerDriver(
        &scheduler,
        framework,
        master.get(),
        implicitAcknowledgements,
        credential);
  } else {
    framework.set_principal("test-framework-cpp");

    driver = new MesosSchedulerDriver(
        &scheduler,
        framework,
        master.get(),
        implicitAcknowledgements);
  }

  int status = driver->run() == DRIVER_STOPPED ? 0 : 1;

  // Ensure that the driver process terminates.
  driver->stop();

可以看出,启动一个Framework其实就是构造一个自己的Scheduler,然后传入到MesosSchedulerDriver的构造函数中, driver->run()一下。下面我们看一下driver->run()到底干了些什么事情:

多Framework的Offer竞态如何处理?

我们知道,一个ResourceOffer有可能提供给多个Framework, 某个Framework接受并在此Offer上运行Task, 另外的Framework就会收到Rescind消息表示该Offer已经撤销不能用了。那么这个过程到底是怎么样的? 我说的是精确的模型是什么? 在什么地方做的互斥和同步?

Offer的Filters是什么

在阅读SchedulerDriver接口时发现,lauchTasks的时候会可选传入一个Filters,那么这个Filters在mesos的 模型里到底是什么语义?

HTTP Framework和非HTTP Framework有什么不同?

在阅读mesos源码的过程中,在至少两处注释的地方看到HTTP Framework这个概念,那么究竟什么是HTTP Framework呢? 非HTTP Framework使用的是ShedulerDriver和master通信,而HTTP Framework使用的是Mesos这个类和Master通信。

对HTTP Framework有了进一步的认识,所谓HTTP Framework,意思就是实现Framework与Master交互的时候,完全通过Master暴露的HTTP API; 而非HTTP Framework则使用的是libmesos.so这个库,内部使用的是libprocess的跨进程通信,底层应该是socket。

Agent是怎样产生Executor的?

今天同事问了一个问题, 如何更新mesos的Executor? 因为暂时没有看到这一块的东西,所以临时补了一下。 Executor是我们自己写的一个可执行程序,最终表现成一个守护进程。而这个进程是有Agent产生的,那么怎么产生的呢? 由于Agent本身也是一个守护进程,可以猜测到底层是用了fork+exec,下面逐步从代码中论证。

首先Framework向Master发送lauchTask消息,Master会接着发送RunTaskMessage给Agent,Agent会调用 自身的Containerizer来launch Executor, 下面展示一段逻辑:

// Launch the container.
  Future<bool> launch;
  if (!executor->isCommandExecutor()) {
    // If the executor is _not_ a command executor, this means that
    // the task will include the executor to run. The actual task to
    // run will be enqueued and subsequently handled by the executor
    // when it has registered to the slave.
    launch = slave->containerizer->launch(
        containerId,
        executorInfo_, // Modified to include the task's resources, see above.
        executor->directory,
        user,
        slave->info.id(),
        slave->self(),
        info.checkpoint());
  } else {
    // An executor has _not_ been provided by the task and will
    // instead define a command and/or container to run. Right now,
    // these tasks will require an executor anyway and the slave
    // creates a command executor. However, it is up to the
    // containerizer how to execute those tasks and the generated
    // executor info works as a placeholder.
    // TODO(nnielsen): Obsolete the requirement for executors to run
    // one-off tasks.
    launch = slave->containerizer->launch(
        containerId,
        taskInfo,
        executorInfo_, // Modified to include the task's resources, see above.
        executor->directory,
        user,
        slave->info.id(),
        slave->self(),
        info.checkpoint());
  }

注释里有一段逻辑,如果当前的executor是command类型,那么直接使用bash或者一个容器来运行任务; 如果不是,那么就需要一个独立的executor来运行task,task会先排队,直到executor注册到slave之后, 才会处理task。底层是调用了containerizer里面的launcher, 最后会追踪到PosixLauncher:

Try<pid_t> PosixLauncher::fork(
    const ContainerID& containerId,
    const string& path,
    const vector<string>& argv,
    const Subprocess::IO& in,
    const Subprocess::IO& out,
    const Subprocess::IO& err,
    const Option<flags::FlagsBase>& flags,
    const Option<map<string, string>>& environment,
    const Option<int>& namespaces,
    vector<process::Subprocess::Hook> parentHooks)
{
  if (namespaces.isSome() && namespaces.get() != 0) {
    return Error("Posix launcher does not support namespaces");
  }

  if (pids.contains(containerId)) {
    return Error("Process has already been forked for container " +
                 stringify(containerId));
  }

  // If we are on systemd, then extend the life of the child. Any
  // grandchildren's lives will also be extended.
#ifdef __linux__
  if (systemd::enabled()) {
    parentHooks.emplace_back(Subprocess::Hook(&systemd::mesos::extendLifetime));
  }
#endif // __linux__

  Try<Subprocess> child = subprocess(
      path,
      argv,
      in,
      out,
      err,
      SETSID,
      flags,
      environment,
      None(),
      parentHooks);

大体的逻辑是传入可执行程序路径、标准输入输出错误、环境变量、flags等等新建一个子进程, 我们继续追踪一下subprocess这个接口,看它底层究竟是怎么做的。

#ifndef __WINDOWS__
    Try<pid_t> pid = internal::cloneChild(
        path,
        argv,
        set_sid,
        environment,
        _clone,
        parent_hooks,
        working_directory,
        watchdog,
        stdinfds,
        stdoutfds,
        stderrfds);

    if (pid.isError()) {
      return Error(pid.error());
    }

    process.data->pid = pid.get();
#else
    Try<PROCESS_INFORMATION> processInformation = internal::createChildProcess(
        path, argv, environment, stdinfds, stdoutfds, stderrfds);

    if (processInformation.isError()) {
      process::internal::close(stdinfds, stdoutfds, stderrfds);
      return Error(
          "Could not launch child process" + processInformation.error());
    }

    if (processInformation.get().dwProcessId == -1) {
      // Save the errno as 'close' below might overwrite it.
      ErrnoError error("Failed to clone");
      process::internal::close(stdinfds, stdoutfds, stderrfds);
      return error;
    }

    process.data->processInformation = processInformation.get();
    process.data->pid = processInformation.get().dwProcessId;
#endif // __WINDOWS__

可以看出libprocess在底层,对windows和unix体系分开处理。我们继续跟踪cloneChild: 这里就不贴代码了,最底层使用了linux系统调用clone,这种和fork类似,但是附带一些共享内存和文件描述符的功能, 在linux下常常用来实现线程。

Executor可执行文件是怎么被下载的?

我们知道Executor一般是一个可执行文件,可供Agent下载到本地执行,那么这段逻辑在哪里?另外,如果 我的Executor更新了,如果实时通知Agent更新?有了上面一个dive做铺垫,我们继续挖掘clone一个Executor传入的可执行文件路径, 看这个路径是怎么产生的。

Composing Containerizers

如果我们启动Agent的时候,设置–containerizers这个flag为多个值得时候,就会使用Composing Containerizer。 那么这个Composing Containerizer是什么鬼? 这里的Composing Containerizer实现的是compose模式,也没啥大不了的,也就是在多个Containerizer上面封装了一层, 不同的Task来,使用不同的Containerizer。

如何使用mesos自带的executor?

其实一直没有搞清楚如何使用mesos自己的executor。运行example里的Framework,其实都是自带的executor。今天终于搞清楚如何使用mesos-docker-executor了。 首先在Agent启动的时候,需要支持docker。其次在Framework实现任务分发的时候,对于TaskInfo我们把Executor留空,并且设置ContainerInfo和Command的值,这样 就可以定义一个在特定image里执行某条命令的docker container化得任务了。

Table of Contents