SRS流媒体服务器集群之Edge模式(2)

本篇文章,主要讲解Edege模式推拉流的调试和源码分析。

1.Edege推拉流相关类介绍

(1) 从这里的源码可以看出,Edege模式的拉流和推流的管理,都是由SrsSource这个类来管理,后面的源码和函数调用,也会体现出来。

SrsSource::SrsSource(){    req = NULL;    jitter_algorithm = SrsRtmpJitterAlgorithmOFF;    mix_correct = false;    mix_queue = new SrsMixQueue();        _can_publish = true;    _pre_source_id = _source_id = -1;    die_at = 0;    //控制边缘节点的拉流 ,源站origin到边缘节点Edge    play_edge = new SrsPlayEdge();  //控制边缘节点的推流,边缘节点Edge到源站origin    publish_edge = new SrsPublishEdge();  //控制gop的cache    gop_cache = new SrsGopCache();  //控制源站的路由    hub = new SrsOriginHub();  //控制Meta的cache    meta = new SrsMetaCache();        is_monotonically_increase = false;    last_packet_time = 0;        _srs_config->subscribe(this);    atc = false;}

(2) SrsEdgeUpstream就是从源站里面去拉流。

SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost){    SrsConfDirective* conf = get_vhost(vhost);    if (!conf) {        return NULL;    }        conf = conf->get("cluster");    if (!conf) {        return NULL;    }        return conf->get("origin");}

(3)Rtmp源站拉流类SrsEdgeRtmpUpstream,基础上面的类。源码如下:

SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost){    SrsConfDirective* conf = get_vhost(vhost);    if (!conf) {        return NULL;    }        conf = conf->get("cluster");    if (!conf) {        return NULL;    }        return conf->get("origin");}

(4)拉流,播放相关:

class SrsPublishEdge{private:    SrsEdgeState state;    SrsEdgeForwarder* forwarder;public:    SrsPublishEdge();    virtual ~SrsPublishEdge();public:    virtual void set_queue_size(srs_utime_t queue_size);public:    virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);    virtual bool can_publish();    // When client publish stream on edge.    virtual srs_error_t on_client_publish();    // Proxy publish stream to edge    virtual srs_error_t on_proxy_publish(SrsCommonMessage* msg);    // Proxy unpublish stream to edge.    virtual void on_proxy_unpublish();};

// The edge used to ingest stream from origin.class SrsEdgeIngester : public ISrsCoroutineHandler{private:    SrsSource* source;    SrsPlayEdge* edge;    SrsRequest* req;    SrsCoroutine* trd;    SrsLbRoundRobin* lb;    SrsEdgeUpstream* upstream;public:    SrsEdgeIngester();    virtual ~SrsEdgeIngester();public:    virtual srs_error_t initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r);    virtual srs_error_t start();    virtual void stop();    virtual std::string get_curr_origin();// Interface ISrsReusableThread2Handlerpublic:    virtual srs_error_t cycle();private:    virtual srs_error_t do_cycle();private:    virtual srs_error_t ingest(std::string& redirect);    virtual srs_error_t process_publish_message(SrsCommonMessage* msg, std::string& redirect);};

(5)源站推流类SrsEdgeForwarder,源码如下:

SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost){    SrsConfDirective* conf = get_vhost(vhost);    if (!conf) {        return NULL;    }        conf = conf->get("cluster");    if (!conf) {        return NULL;    }        return conf->get("origin");}

(6)推流相关类:

class SrsPublishEdge{private:    SrsEdgeState state;    SrsEdgeForwarder* forwarder;public:    SrsPublishEdge();    virtual ~SrsPublishEdge();public:    virtual void set_queue_size(srs_utime_t queue_size);public:    virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);    virtual bool can_publish();    // When client publish stream on edge.    virtual srs_error_t on_client_publish();    // Proxy publish stream to edge    virtual srs_error_t on_proxy_publish(SrsCommonMessage* msg);    // Proxy unpublish stream to edge.    virtual void on_proxy_unpublish();};

2.Edge模式推流源码分析

(1)推流源码分析,从配置文件,开始读取。

SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost){    SrsConfDirective* conf = get_vhost(vhost);    if (!conf) {        return NULL;    }        conf = conf->get("cluster");    if (!conf) {        return NULL;    }        return conf->get("origin");}

(2)前面的文章已经讲过,在Edge模式下,当推流端推流时,首先是推到源站。拉流时,如果边缘节点有缓存,就直接从边缘节点拉取,否则还是要需要从源站去拉取。当有多个源站origin时,

推流,首先推到源站。

srs_error_t SrsEdgeForwarder::start(){    srs_error_t err = srs_success;        // reset the error code.    send_error_code = ERROR_SUCCESS;        std::string url;    if (true) {        SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);        srs_assert(conf);                // select the origin.        std::string server = lb->select(conf->args);        int port = SRS_CONSTS_RTMP_DEFAULT_PORT;        srs_parse_hostport(server, server, port);                // support vhost tranform for edge,        // @see https://github.com/ossrs/srs/issues/372        std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);        vhost = srs_string_replace(vhost, "[vhost]", req->vhost);                url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);    }        // open socket.    srs_freep(sdk);    srs_utime_t cto = SRS_EDGE_FORWARDER_TIMEOUT;    srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT;    sdk = new SrsSimpleRtmpClient(url, cto, sto);        if ((err = sdk->connect()) != srs_success) {        return srs_error_wrap(err, "sdk connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));    }        if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) {        return srs_error_wrap(err, "sdk publish");    }        srs_freep(trd);    trd = new SrsSTCoroutine("edge-fwr", this, _srs_context->get_id());        if ((err = trd->start()) != srs_success) {        return srs_error_wrap(err, "coroutine");    }    srs_trace("edge-fwr publish url %s", url.c_str());        return err;}

(3)如果这个时候有一个源站断开,当再开启推流时,就会推送到另外一个源站。如果有多个源站,就是按照配置文件的配置,从左到右,这样一个顺序,去一个接一个去推。

(4)开启调试SRS,输入命令:

gdb ./objs/srs

界面如下:

(5)调试配置文件,输入命令:

set args -c conf/edge1.conf

b main

c

r

界面如下:

打印断点,输入命令:

b SrsConfig::get_vhost_edge_origin

界面如下:

SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost){    SrsConfDirective* conf = get_vhost(vhost);    if (!conf) {        return NULL;    }        conf = conf->get("cluster");    if (!conf) {        return NULL;    }        return conf->get("origin");}

判断当前节点,输入命令:

b SrsConfig::get_vhost_is_edge

界面如下:

(6)判断当前节点是否为边缘节点的源码。

bool SrsConfig::get_vhost_is_edge(SrsConfDirective* vhost){    static bool DEFAULT = false;        SrsConfDirective* conf = vhost;    if (!conf) {        return DEFAULT;    }        conf = conf->get("cluster");    if (!conf) {        return DEFAULT;    }        conf = conf->get("mode");    if (!conf || conf->arg0().empty()) {        return DEFAULT;    }        return "remote" == conf->arg0();}

运行起来,执行命令:

c

跑起来,如下界面:

到这里,应该要开启推流,注意这里是推流到边缘节点(这里举例以19350),如果不知道怎么推流,可以参考前面一篇文章。

(7)这个时候,就会运行到断点这里,然后输入命令:

bt

查看调用栈,如下界面:

0 SrsConfig::get_vhost_is_edge(SrsConfDirective* vhost)  at src/app/srs_app_config.cpp:50631 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:4722  SrsRtmpConn::service_cycle()  at src/app/srs_app_rtmp_conn.cpp:3883 SrsRtmpConn::do_cycle()  at src/app/srs_app_rtmp_conn.cpp:2094 SrsConnection::cycle()  at src/app/srs_app_conn.cpp:1715 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:1986 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:2137 _st_thread_main at sched.c:3378 st_thread_create at sched.c:616

继续输入调试命令:

c

最主要是看get_vhost_edge_origin,如下界面:

(8)查看调用栈,输入命令:

bt

如下界面:

0  SrsConfig::get_vhost_edge_origin(string vhost) at src/app/srs_app_config.cpp:50911 SrsEdgeForwarder::start() at src/app/srs_app_edge.cpp:4822 SrsPublishEdge::on_client_publish() at src/app/srs_app_edge.cpp:7773 SrsSource::on_edge_start_publish() at src/app/srs_app_source.cpp:25924 SrsRtmpConn::acquire_publish(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:9365 SrsRtmpConn::publishing(SrsSource* source)  at src/app/srs_app_rtmp_conn.cpp:8226 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:5347  SrsRtmpConn::service_cycle()  at src/app/srs_app_rtmp_conn.cpp:3888 SrsRtmpConn::do_cycle()  at src/app/srs_app_rtmp_conn.cpp:2099 SrsConnection::cycle()  at src/app/srs_app_conn.cpp:17110 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:19811 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:21312 _st_thread_main at sched.c:33713  st_thread_create at sched.c:616

(9)从上面调用栈来看,最大的区别就是该函数下判断是否是边缘节点。源码如下:

srs_error_t SrsRtmpConn::acquire_publish(SrsSource* source){    srs_error_t err = srs_success;        SrsRequest* req = info->req;        if (!source->can_publish(info->edge)) {        return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str());    }        // when edge, ignore the publish event, directly proxy it.    if (info->edge) {      //是边缘节点        if ((err = source->on_edge_start_publish()) != srs_success) {            return srs_error_wrap(err, "rtmp: edge start publish");        }    } else {      //不是边缘节点        if ((err = source->on_publish()) != srs_success) {            return srs_error_wrap(err, "rtmp: source publish");        }    }        return err;}

(10)看看边缘节点的分支,源码如下:

srs_error_t SrsSource::on_edge_start_publish(){  //推流到源站    return publish_edge->on_client_publish();}

调用SrsPublishEdge::on_client_publish()。该函数的功能是从边缘节点推送到源站。从源码可以看出是调用forwarder去推送。

srs_error_t SrsPublishEdge::on_client_publish(){    srs_error_t err = srs_success;        // error when not init state.    if (state != SrsEdgeStateInit) {        return srs_error_new(ERROR_RTMP_EDGE_PUBLISH_STATE, "invalid state");    }        // @see https://github.com/ossrs/srs/issues/180    // to avoid multiple publish the same stream on the same edge,    // directly enter the publish stage.    if (true) {        SrsEdgeState pstate = state;        state = SrsEdgeStatePublish;        srs_trace("edge change from %d to state %d (push).", pstate, state);    }        // start to forward stream to origin.    err = forwarder->start();        // @see https://github.com/ossrs/srs/issues/180    // when failed, revert to init    if (err != srs_success) {        SrsEdgeState pstate = state;        state = SrsEdgeStateInit;        srs_trace("edge revert from %d to state %d (push), error %s", pstate, state, srs_error_desc(err).c_str());    }        return err;}

(11)每一路边缘节点推流到源站,都是用forwarder,开启forwarder去推送数据。这时候会开启一个协程。源码如下:

srs_error_t SrsEdgeForwarder::start(){    srs_error_t err = srs_success;        // reset the error code.    send_error_code = ERROR_SUCCESS;        std::string url;    if (true) {        SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);        srs_assert(conf);                // select the origin.        std::string server = lb->select(conf->args);        int port = SRS_CONSTS_RTMP_DEFAULT_PORT;        srs_parse_hostport(server, server, port);                // support vhost tranform for edge,        // @see https://github.com/ossrs/srs/issues/372        std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);        vhost = srs_string_replace(vhost, "[vhost]", req->vhost);                url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);    }        // open socket.    srs_freep(sdk);    srs_utime_t cto = SRS_EDGE_FORWARDER_TIMEOUT;    srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT;    sdk = new SrsSimpleRtmpClient(url, cto, sto);        if ((err = sdk->connect()) != srs_success) {        return srs_error_wrap(err, "sdk connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));    }        if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) {        return srs_error_wrap(err, "sdk publish");    }        srs_freep(trd);  //开启协程    trd = new SrsSTCoroutine("edge-fwr", this, _srs_context->get_id());        if ((err = trd->start()) != srs_success) {        return srs_error_wrap(err, "coroutine");    }    srs_trace("edge-fwr publish url %s", url.c_str());        return err;}

(12)每一个协程,都必定有一个do_cycle(),这里的sdk代表的是客户端(指的是sdk edge到origin的rtmp客户端),源码如下:

srs_error_t SrsEdgeForwarder::do_cycle(){    srs_error_t err = srs_success;        sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE);        SrsPithyPrint* pprint = SrsPithyPrint::create_edge();    SrsAutoFree(SrsPithyPrint, pprint);        SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);        while (true) {        if ((err = trd->pull()) != srs_success) {            return srs_error_wrap(err, "edge forward pull");        }                if (send_error_code != ERROR_SUCCESS) {            srs_usleep(SRS_EDGE_FORWARDER_TIMEOUT);            continue;        }                // read from client.        if (true) {            SrsCommonMessage* msg = NULL;          //sdk代表的是客户端            err = sdk->recv_message(&msg);                        srs_verbose("edge loop recv message. ret=%d", ret);            if (err != srs_success && srs_error_code(err) != ERROR_SOCKET_TIMEOUT) {                srs_error("edge push get server control message failed. err=%s", srs_error_desc(err).c_str());                send_error_code = srs_error_code(err);                srs_error_reset(err);                continue;            }                        srs_error_reset(err);            srs_freep(msg);        }                // forward all messages.        // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.        int count = 0;      //从队列里读取数据        if ((err = queue->dump_packets(msgs.max, msgs.msgs, count)) != srs_success) {            return srs_error_wrap(err, "queue dumps packets");        }                pprint->elapse();                // pithy print        if (pprint->can_print()) {            sdk->kbps_sample(SRS_CONSTS_LOG_EDGE_PUBLISH, pprint->age(), count);        }                // ignore when no messages.        if (count send_and_free_messages(msgs.msgs, count)) != srs_success) {            return srs_error_wrap(err, "send messages");        }    }        return err;}

(13)这个函数是接收来自推流客户端到edge边缘节点的数据。也就是把数据放到队列里面去。然后把这些数据通过上面的do_cycle去读取和发送到origin。源码如下:

srs_error_t SrsEdgeForwarder::proxy(SrsCommonMessage* msg){    srs_error_t err = srs_success;        if (send_error_code != ERROR_SUCCESS) {        return srs_error_new(send_error_code, "edge forwarder");    }        // the msg is auto free by source,    // so we just ignore, or copy then send it.    if (msg->size header.is_set_chunk_size()        || msg->header.is_window_ackledgement_size()        || msg->header.is_ackledgement()) {        return err;    }        SrsSharedPtrMessage copy;    if ((err = copy.create(msg)) != srs_success) {        return srs_error_wrap(err, "create message");    }        copy.stream_id = sdk->sid();  //接收数据,放入队列    if ((err = queue->enqueue(copy.copy())) != srs_success) {        return srs_error_wrap(err, "enqueue message");    }        return err;}

(14)在入队列,这里打印断点,输入命令:

b SrsEdgeForwarder::proxy(SrsCommonMessage* msg)

c

如下界面:

(15)查看调用栈,输入命令:

bt

如下界面:

这个调用栈的流程,就是从从推流队列里去读取数据,给SrsEdgeForwarder,最后再给对应的协程执行do_cycle去执行。

0  SrsConfig::get_vhost_edge_origin(string vhost) at src/app/srs_app_config.cpp:50911 SrsEdgeForwarder::start() at src/app/srs_app_edge.cpp:4822 SrsPublishEdge::on_client_publish() at src/app/srs_app_edge.cpp:7773 SrsSource::on_edge_start_publish() at src/app/srs_app_source.cpp:25924 SrsRtmpConn::acquire_publish(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:9365 SrsRtmpConn::publishing(SrsSource* source)  at src/app/srs_app_rtmp_conn.cpp:8226 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:5347  SrsRtmpConn::service_cycle()  at src/app/srs_app_rtmp_conn.cpp:3888 SrsRtmpConn::do_cycle()  at src/app/srs_app_rtmp_conn.cpp:2099 SrsConnection::cycle()  at src/app/srs_app_recv_thread.cpp:19810SrsRecvThread::cycle()  at src/app/srs_app_st.cpp:19811 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:21312 _st_thread_main at sched.c:33713  st_thread_create at sched.c:616

如果看过前面文章的朋友,应该知道,在RTMP推流时,如果是边缘节点和非边缘节点,走的流程是不一样。源码如下:

srs_error_t SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg){    srs_error_t err = srs_success;        // for edge, directly proxy message to origin.  //边缘节点    if (info->edge) {        if ((err = source->on_edge_proxy_publish(msg)) != srs_success) {            return srs_error_wrap(err, "rtmp: proxy publish");        }        return err;    }        // process audio packet    if (msg->header.is_audio()) {        if ((err = source->on_audio(msg)) != srs_success) {            return srs_error_wrap(err, "rtmp: consume audio");        }        return err;    }    // process video packet    if (msg->header.is_video()) {        if ((err = source->on_video(msg)) != srs_success) {            return srs_error_wrap(err, "rtmp: consume video");        }        return err;    }        // process aggregate packet    if (msg->header.is_aggregate()) {        if ((err = source->on_aggregate(msg)) != srs_success) {            return srs_error_wrap(err, "rtmp: consume aggregate");        }        return err;    }        // process onMetaData    if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {        SrsPacket* pkt = NULL;        if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {            return srs_error_wrap(err, "rtmp: decode message");        }        SrsAutoFree(SrsPacket, pkt);                if (dynamic_cast(pkt)) {            SrsOnMetaDataPacket* metadata = dynamic_cast(pkt);            if ((err = source->on_meta_data(msg, metadata)) != srs_success) {                return srs_error_wrap(err, "rtmp: consume metadata");            }            return err;        }        return err;    }        return err;}

3.接下来分析下Edge模式url实现单点登录,拉流的源码。

(1)在该函数下,打印断点,输入命令:

b SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)

b SrsEdgeIngester::SrsEdgeIngester()

b srs_error_t SrsEdgeIngester::do_cycle()

b srs_error_t SrsPlayEdge::on_client_play()

c

界面如下:

(2)启动拉流协程,源码如下:

srs_error_t SrsPlayEdge::on_client_play(){    srs_error_t err = srs_success;        // start ingest when init state.    if (state == SrsEdgeStateInit) {        state = SrsEdgeStatePlay;      //启动拉流协程        err = ingester->start();    }        return err;}

(3)运行到这里,要保证推流依然运行正常。然后再去拉流。

查看on_client_play的调用栈,输入命令:

bt

界面如下:

0 SrsPlayEdge::on_client_play() at src/app/srs_app_edge.cpp:6771SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg) at src/app/srs_app_source.cpp:25582 SrsRtmpConn::playing(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:6493 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:5344 SrsRtmpConn::service_cycle()  at src/app/srs_app_rtmp_conn.cpp:3885 SrsRtmpConn::do_cycle()  at src/app/srs_app_rtmp_conn.cpp:2096 SrsConnection::cycle()  at src/app/srs_app_conn.cpp:1717 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:1988 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:2139_st_thread_main at sched.c:33710  st_thread_create at sched.c:616

从调用栈的关系可以看出,这里可以看出,SrsSource::create_consumer这里需要判断是否是源站,如果有源站,那么就从源站这里去拉流,并启动on_client_play,源码如下图:

srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg){    srs_error_t err = srs_success;        consumer = new SrsConsumer(this, conn);    consumers.push_back(consumer);    srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);    consumer->set_queue_size(queue_size);    // if atc, update the sequence header to gop cache time.    if (atc && !gop_cache->empty()) {        if (meta->data()) {            meta->data()->timestamp = srsu2ms(gop_cache->start_time());        }        if (meta->vsh()) {            meta->vsh()->timestamp = srsu2ms(gop_cache->start_time());        }        if (meta->ash()) {            meta->ash()->timestamp = srsu2ms(gop_cache->start_time());        }    }    // If stream is publishing, dumps the sequence header and gop cache.    if (hub->active()) {        // Copy metadata and sequence header to consumer.        if ((err = meta->dumps(consumer, atc, jitter_algorithm, dm, ds)) != srs_success) {            return srs_error_wrap(err, "meta dumps");        }        // copy gop cache to client.        if (dg && (err = gop_cache->dump(consumer, atc, jitter_algorithm)) != srs_success) {            return srs_error_wrap(err, "gop cache dumps");        }    }    // print status.    if (dg) {        srs_trace("create consumer, active=%d, queue_size=%.2f, jitter=%d", hub->active(), queue_size, jitter_algorithm);    } else {        srs_trace("create consumer, active=%d, ignore gop cache, jitter=%d", hub->active(), jitter_algorithm);    }        // for edge, when play edge stream, check the state  //如果是源站,就从源站去拉流    if (_srs_config->get_vhost_is_edge(req->vhost)) {        // notice edge to start for the first client.        if ((err = play_edge->on_client_play()) != srs_success) {            return srs_error_wrap(err, "play edge");        }    }        return err;}

(4)启动on_client_play(),从这里可以看出,这里只可以启动一次,源码如下:

srs_error_t SrsPlayEdge::on_client_play(){    srs_error_t err = srs_success;        // start ingest when init state.    if (state == SrsEdgeStateInit) {        state = SrsEdgeStatePlay;      //要启动拉流origin-edge协程,一个source只会start一次        err = ingester->start();    }        return err;}

(5)如果所有的拉流端都断开,那么需要有一个状态变更。源码如下:

void SrsPlayEdge::on_all_client_stop(){    // when all client disconnected,    // and edge is ingesting origin stream, abort it.    if (state == SrsEdgeStatePlay || state == SrsEdgeStateIngestConnected) {        SrsEdgeState pstate = state;        state = SrsEdgeStateIngestStopping;        ingester->stop();        state = SrsEdgeStateInit;        srs_trace("edge change from %d to %d then %d (init).", pstate, SrsEdgeStateIngestStopping, state);                return;    }}

(6)查看SrsEdgeRtmpUpstream的调用栈

0 SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb) at src/app/srs_app_edge.cpp:761 SrsEdgeIngester::do_cycle()   at src/app/srs_app_edge.cpp:2712 SrsEdgeIngester::cycle() at src/app/srs_app_edge.cpp:2433 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:1984 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:2135_st_thread_main at sched.c:3376  st_thread_create at sched.c:616

这里会创建一个upstream。源码如下:

srs_error_t SrsEdgeIngester::do_cycle(){    srs_error_t err = srs_success;    std::string redirect;    while (true) {        if ((err = trd->pull()) != srs_success) {            return srs_error_wrap(err, "do cycle pull");        }                srs_freep(upstream);        upstream = new SrsEdgeRtmpUpstream(redirect);                if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) {            return srs_error_wrap(err, "on source id changed");        }                if ((err = upstream->connect(req, lb)) != srs_success) {            return srs_error_wrap(err, "connect upstream");        }                if ((err = edge->on_ingest_play()) != srs_success) {            return srs_error_wrap(err, "notify edge play");        }        // set to larger timeout to read av data from origin.        upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);                err = ingest(redirect);                // retry for rtmp 302 immediately.        if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {            int port;            string server;            upstream->selected(server, port);            string url = req->get_stream_url();            srs_warn("RTMP redirect %s from %s:%d to %s", url.c_str(), server.c_str(), port, redirect.c_str());            srs_error_reset(err);            continue;        }                if (srs_is_client_gracefully_close(err)) {            srs_warn("origin disconnected, retry, error %s", srs_error_desc(err).c_str());            srs_error_reset(err);        }        break;    }        return err;}

(7) 在前面的源码分析中,upstream对应的就是一个拉流客户端。如下界面:

上面的函数源码,重点关注SrsEdgeIngester::ingest,打印断点,调试,输入命令:

b SrsEdgeIngester::ingest(string& redirect)

bt

界面如下:

查看调用栈,如下界面:

(8) SysEdgeIngester::do_cycle是一个推流主循环,主要是从边缘节点推流到源站的推流主循环工作。

srs_error_t SrsEdgeIngester::ingest(string& redirect){    srs_error_t err = srs_success;        SrsPithyPrint* pprint = SrsPithyPrint::create_edge();    SrsAutoFree(SrsPithyPrint, pprint);    // we only use the redict once.    // reset the redirect to empty, for maybe the origin changed.    redirect = "";        while (true) {        if ((err = trd->pull()) != srs_success) {            return srs_error_wrap(err, "thread quit");        }                pprint->elapse();                // pithy print        if (pprint->can_print()) {            upstream->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age());        }                // read from client.        SrsCommonMessage* msg = NULL;      //接收数据        if ((err = upstream->recv_message(&msg)) != srs_success) {            return srs_error_wrap(err, "recv message");        }                srs_assert(msg);        SrsAutoFree(SrsCommonMessage, msg);        //重要的函数,推流        if ((err = process_publish_message(msg, redirect)) != srs_success) {            return srs_error_wrap(err, "process message");        }    }        return err;}

(9)视频、音频、Metadata都是在这里处理,最终都是使用SrsSource里面对应的数据,所以这是一个重点分析的函数。

srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect){    srs_error_t err = srs_success;        // process audio packet    if (msg->header.is_audio()) {        if ((err = source->on_audio(msg)) != srs_success) {            return srs_error_wrap(err, "source consume audio");        }    }        // process video packet    if (msg->header.is_video()) {        if ((err = source->on_video(msg)) != srs_success) {            return srs_error_wrap(err, "source consume video");        }    }        // process aggregate packet    if (msg->header.is_aggregate()) {        if ((err = source->on_aggregate(msg)) != srs_success) {            return srs_error_wrap(err, "source consume aggregate");        }        return err;    }        // process onMetaData    if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {        SrsPacket* pkt = NULL;        if ((err = upstream->decode_message(msg, &pkt)) != srs_success) {            return srs_error_wrap(err, "decode message");        }        SrsAutoFree(SrsPacket, pkt);                if (dynamic_cast(pkt)) {            SrsOnMetaDataPacket* metadata = dynamic_cast(pkt);            if ((err = source->on_meta_data(msg, metadata)) != srs_success) {                return srs_error_wrap(err, "source consume metadata");            }            return err;        }                return err;    }        // call messages, for example, reject, redirect.    if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {        SrsPacket* pkt = NULL;        if ((err = upstream->decode_message(msg, &pkt)) != srs_success) {            return srs_error_wrap(err, "decode message");        }        SrsAutoFree(SrsPacket, pkt);                // RTMP 302 redirect        if (dynamic_cast(pkt)) {            SrsCallPacket* call = dynamic_cast(pkt);            if (!call->arguments->is_object()) {                return err;            }                        SrsAmf0Any* prop = NULL;            SrsAmf0Object* evt = call->arguments->to_object();                        if ((prop = evt->ensure_property_string("level")) == NULL) {                return err;            } else if (prop->to_str() != StatusLevelError) {                return err;            }                        if ((prop = evt->get_property("ex")) == NULL || !prop->is_object()) {                return err;            }            SrsAmf0Object* ex = prop->to_object();            // The redirect is tcUrl while redirect2 is RTMP URL.            // https://github.com/ossrs/srs/issues/1575#issuecomment-574999798            if ((prop = ex->ensure_property_string("redirect2")) == NULL) {                // TODO: FIXME: Remove it when SRS3 released, it's temporarily support for SRS3 alpha versions(a0 to a8).                if ((prop = ex->ensure_property_string("redirect")) == NULL) {                    return err;                }            }            redirect = prop->to_str();                        return srs_error_new(ERROR_CONTROL_REDIRECT, "RTMP 302 redirect to %s", redirect.c_str());        }    }        return err;}

(10) 这里以SrsSource::on_video举例,打印断点,并查看调用栈,输入命令:

b SrsSource::on_video(SrsCommonMessage* shared_video)

bt

调用栈界面如下:

srs_error_t SrsEdgeIngester::ingest(string& redirect) at src/app/srs_app_edge.cpp:3391 SrsEdgeIngester::do_cycle() at src/app/srs_app_edge.cpp:2822 SrsEdgeIngester::cycle() at src/app/srs_app_edge.cpp:2433 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:1984 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:2135_st_thread_main at sched.c:3376  st_thread_create at sched.c:616

通过这里的函数调用关系,可以知道,从源站拉回来后,最终还是通过SrsSource去分发。

srs_error_t SrsSource::on_video(SrsCommonMessage* shared_video){    srs_error_t err = srs_success;        // monotically increase detect.    if (!mix_correct && is_monotonically_increase) {        if (last_packet_time > 0 && shared_video->header.timestamp header.timestamp;        // drop any unknown header video.    // @see https://github.com/ossrs/srs/issues/421    if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) {        char b0 = 0x00;        if (shared_video->size > 0) {            b0 = shared_video->payload[0];        }                srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0);        return err;    }        // convert shared_video to msg, user should not use shared_video again.    // the payload is transfer to msg, and set to NULL in shared_video.    SrsSharedPtrMessage msg;    if ((err = msg.create(shared_video)) != srs_success) {        return srs_error_wrap(err, "create message");    }        // directly process the audio message.    if (!mix_correct) {        return on_video_imp(&msg);    }        // insert msg to the queue.    mix_queue->push(msg.copy());        // fetch someone from mix queue.    SrsSharedPtrMessage* m = mix_queue->pop();    if (!m) {        return err;    }        // consume the monotonically increase message.    if (m->is_audio()) {        err = on_audio_imp(m);    } else {        err = on_video_imp(m);    }    srs_freep(m);        return err;}

(11)前面讲过url实现单点登录,当有多个源站时,会选择一个正在运行的源站,通过函数调用lb->select(conf->args),体现如下:

srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb){    srs_error_t err = srs_success;        SrsRequest* req = r;        std::string url;    if (true) {      //读取配置文件源站        SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);                // @see https://github.com/ossrs/srs/issues/79        // when origin is error, for instance, server is shutdown,        // then user remove the vhost then reload, the conf is empty.        if (!conf) {            return srs_error_new(ERROR_EDGE_VHOST_REMOVED, "vhost %s removed", req->vhost.c_str());        }                // select the origin.      //选择源站        std::string server = lb->select(conf->args);        int port = SRS_CONSTS_RTMP_DEFAULT_PORT;        srs_parse_hostport(server, server, port);                // override the origin info by redirect.        if (!redirect.empty()) {            int _port;            string _schema, _vhost, _app, _stream, _param, _host;            srs_discovery_tc_url(redirect, _schema, _host, _vhost, _app, _stream, _port, _param);                        server = _host;            port = _port;        }        // Remember the current selected server.        selected_ip = server;        selected_port = port;                // support vhost tranform for edge,        // @see https://github.com/ossrs/srs/issues/372        std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);        vhost = srs_string_replace(vhost, "[vhost]", req->vhost);                url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);    }        srs_freep(sdk);    srs_utime_t cto = SRS_EDGE_INGESTER_TIMEOUT;    srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;    sdk = new SrsSimpleRtmpClient(url, cto, sto);        if ((err = sdk->connect()) != srs_success) {        return srs_error_wrap(err, "edge pull %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));    }        if ((err = sdk->play(_srs_config->get_chunk_size(req->vhost))) != srs_success) {        return srs_error_wrap(err, "edge pull %s stream failed", url.c_str());    }        return err;}

4.总结

本篇文章重点分析了Edege模式的推拉流源码及调试过程,可以更加清楚认识Edege模式。希望能够帮助到大家。欢迎关注,转发,点赞,收藏,分享,评论区讨论。

本文到此结束,希望对大家有所帮助!

关于作者:

生活百科常识网