本篇文章,主要讲解Edege模式推拉流的调试和源码分析。
1.Edege推拉流相关类介绍
(1) 从这里的源码可以看出,Edege模式的拉流和推流的管理,都是由SrsSource这个类来管理,后面的源码和函数调用,也会体现出来。
SrsSource::SrsSource(){ req = NULL; jitter_algorithm = SrsRtmpJitterAlgorithmOFF; mix_correct = false; mix_queue = new SrsMixQueue(); _can_publish = true; _pre_source_id = _source_id = -1; die_at = 0; //控制边缘节点的拉流 ,源站origin到边缘节点Edge play_edge = new SrsPlayEdge(); //控制边缘节点的推流,边缘节点Edge到源站origin publish_edge = new SrsPublishEdge(); //控制gop的cache gop_cache = new SrsGopCache(); //控制源站的路由 hub = new SrsOriginHub(); //控制Meta的cache meta = new SrsMetaCache(); is_monotonically_increase = false; last_packet_time = 0; _srs_config->subscribe(this); atc = false;}
(2) SrsEdgeUpstream就是从源站里面去拉流。
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost){ SrsConfDirective* conf = get_vhost(vhost); if (!conf) { return NULL; } conf = conf->get("cluster"); if (!conf) { return NULL; } return conf->get("origin");}
(3)Rtmp源站拉流类SrsEdgeRtmpUpstream,基础上面的类。源码如下:
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost){ SrsConfDirective* conf = get_vhost(vhost); if (!conf) { return NULL; } conf = conf->get("cluster"); if (!conf) { return NULL; } return conf->get("origin");}
(4)拉流,播放相关:
class SrsPublishEdge{private: SrsEdgeState state; SrsEdgeForwarder* forwarder;public: SrsPublishEdge(); virtual ~SrsPublishEdge();public: virtual void set_queue_size(srs_utime_t queue_size);public: virtual srs_error_t initialize(SrsSource* source, SrsRequest* req); virtual bool can_publish(); // When client publish stream on edge. virtual srs_error_t on_client_publish(); // Proxy publish stream to edge virtual srs_error_t on_proxy_publish(SrsCommonMessage* msg); // Proxy unpublish stream to edge. virtual void on_proxy_unpublish();};
// The edge used to ingest stream from origin.class SrsEdgeIngester : public ISrsCoroutineHandler{private: SrsSource* source; SrsPlayEdge* edge; SrsRequest* req; SrsCoroutine* trd; SrsLbRoundRobin* lb; SrsEdgeUpstream* upstream;public: SrsEdgeIngester(); virtual ~SrsEdgeIngester();public: virtual srs_error_t initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r); virtual srs_error_t start(); virtual void stop(); virtual std::string get_curr_origin();// Interface ISrsReusableThread2Handlerpublic: virtual srs_error_t cycle();private: virtual srs_error_t do_cycle();private: virtual srs_error_t ingest(std::string& redirect); virtual srs_error_t process_publish_message(SrsCommonMessage* msg, std::string& redirect);};
(5)源站推流类SrsEdgeForwarder,源码如下:
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost){ SrsConfDirective* conf = get_vhost(vhost); if (!conf) { return NULL; } conf = conf->get("cluster"); if (!conf) { return NULL; } return conf->get("origin");}
(6)推流相关类:
class SrsPublishEdge{private: SrsEdgeState state; SrsEdgeForwarder* forwarder;public: SrsPublishEdge(); virtual ~SrsPublishEdge();public: virtual void set_queue_size(srs_utime_t queue_size);public: virtual srs_error_t initialize(SrsSource* source, SrsRequest* req); virtual bool can_publish(); // When client publish stream on edge. virtual srs_error_t on_client_publish(); // Proxy publish stream to edge virtual srs_error_t on_proxy_publish(SrsCommonMessage* msg); // Proxy unpublish stream to edge. virtual void on_proxy_unpublish();};
2.Edge模式推流源码分析
(1)推流源码分析,从配置文件,开始读取。
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost){ SrsConfDirective* conf = get_vhost(vhost); if (!conf) { return NULL; } conf = conf->get("cluster"); if (!conf) { return NULL; } return conf->get("origin");}
(2)前面的文章已经讲过,在Edge模式下,当推流端推流时,首先是推到源站。拉流时,如果边缘节点有缓存,就直接从边缘节点拉取,否则还是要需要从源站去拉取。当有多个源站origin时,
推流,首先推到源站。
srs_error_t SrsEdgeForwarder::start(){ srs_error_t err = srs_success; // reset the error code. send_error_code = ERROR_SUCCESS; std::string url; if (true) { SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost); srs_assert(conf); // select the origin. std::string server = lb->select(conf->args); int port = SRS_CONSTS_RTMP_DEFAULT_PORT; srs_parse_hostport(server, server, port); // support vhost tranform for edge, // @see https://github.com/ossrs/srs/issues/372 std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost); vhost = srs_string_replace(vhost, "[vhost]", req->vhost); url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param); } // open socket. srs_freep(sdk); srs_utime_t cto = SRS_EDGE_FORWARDER_TIMEOUT; srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT; sdk = new SrsSimpleRtmpClient(url, cto, sto); if ((err = sdk->connect()) != srs_success) { return srs_error_wrap(err, "sdk connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto)); } if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) { return srs_error_wrap(err, "sdk publish"); } srs_freep(trd); trd = new SrsSTCoroutine("edge-fwr", this, _srs_context->get_id()); if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "coroutine"); } srs_trace("edge-fwr publish url %s", url.c_str()); return err;}
(3)如果这个时候有一个源站断开,当再开启推流时,就会推送到另外一个源站。如果有多个源站,就是按照配置文件的配置,从左到右,这样一个顺序,去一个接一个去推。
(4)开启调试SRS,输入命令:
gdb ./objs/srs
界面如下:
(5)调试配置文件,输入命令:
set args -c conf/edge1.conf
b main
c
r
界面如下:
打印断点,输入命令:
b SrsConfig::get_vhost_edge_origin
界面如下:
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost){ SrsConfDirective* conf = get_vhost(vhost); if (!conf) { return NULL; } conf = conf->get("cluster"); if (!conf) { return NULL; } return conf->get("origin");}
判断当前节点,输入命令:
b SrsConfig::get_vhost_is_edge
界面如下:
(6)判断当前节点是否为边缘节点的源码。
bool SrsConfig::get_vhost_is_edge(SrsConfDirective* vhost){ static bool DEFAULT = false; SrsConfDirective* conf = vhost; if (!conf) { return DEFAULT; } conf = conf->get("cluster"); if (!conf) { return DEFAULT; } conf = conf->get("mode"); if (!conf || conf->arg0().empty()) { return DEFAULT; } return "remote" == conf->arg0();}
运行起来,执行命令:
c
跑起来,如下界面:
到这里,应该要开启推流,注意这里是推流到边缘节点(这里举例以19350),如果不知道怎么推流,可以参考前面一篇文章。
(7)这个时候,就会运行到断点这里,然后输入命令:
bt
查看调用栈,如下界面:
0 SrsConfig::get_vhost_is_edge(SrsConfDirective* vhost) at src/app/srs_app_config.cpp:50631 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:4722 SrsRtmpConn::service_cycle() at src/app/srs_app_rtmp_conn.cpp:3883 SrsRtmpConn::do_cycle() at src/app/srs_app_rtmp_conn.cpp:2094 SrsConnection::cycle() at src/app/srs_app_conn.cpp:1715 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:1986 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:2137 _st_thread_main at sched.c:3378 st_thread_create at sched.c:616
继续输入调试命令:
c
最主要是看get_vhost_edge_origin,如下界面:
(8)查看调用栈,输入命令:
bt
如下界面:
0 SrsConfig::get_vhost_edge_origin(string vhost) at src/app/srs_app_config.cpp:50911 SrsEdgeForwarder::start() at src/app/srs_app_edge.cpp:4822 SrsPublishEdge::on_client_publish() at src/app/srs_app_edge.cpp:7773 SrsSource::on_edge_start_publish() at src/app/srs_app_source.cpp:25924 SrsRtmpConn::acquire_publish(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:9365 SrsRtmpConn::publishing(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:8226 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:5347 SrsRtmpConn::service_cycle() at src/app/srs_app_rtmp_conn.cpp:3888 SrsRtmpConn::do_cycle() at src/app/srs_app_rtmp_conn.cpp:2099 SrsConnection::cycle() at src/app/srs_app_conn.cpp:17110 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:19811 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:21312 _st_thread_main at sched.c:33713 st_thread_create at sched.c:616
(9)从上面调用栈来看,最大的区别就是该函数下判断是否是边缘节点。源码如下:
srs_error_t SrsRtmpConn::acquire_publish(SrsSource* source){ srs_error_t err = srs_success; SrsRequest* req = info->req; if (!source->can_publish(info->edge)) { return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str()); } // when edge, ignore the publish event, directly proxy it. if (info->edge) { //是边缘节点 if ((err = source->on_edge_start_publish()) != srs_success) { return srs_error_wrap(err, "rtmp: edge start publish"); } } else { //不是边缘节点 if ((err = source->on_publish()) != srs_success) { return srs_error_wrap(err, "rtmp: source publish"); } } return err;}
(10)看看边缘节点的分支,源码如下:
srs_error_t SrsSource::on_edge_start_publish(){ //推流到源站 return publish_edge->on_client_publish();}
调用SrsPublishEdge::on_client_publish()。该函数的功能是从边缘节点推送到源站。从源码可以看出是调用forwarder去推送。
srs_error_t SrsPublishEdge::on_client_publish(){ srs_error_t err = srs_success; // error when not init state. if (state != SrsEdgeStateInit) { return srs_error_new(ERROR_RTMP_EDGE_PUBLISH_STATE, "invalid state"); } // @see https://github.com/ossrs/srs/issues/180 // to avoid multiple publish the same stream on the same edge, // directly enter the publish stage. if (true) { SrsEdgeState pstate = state; state = SrsEdgeStatePublish; srs_trace("edge change from %d to state %d (push).", pstate, state); } // start to forward stream to origin. err = forwarder->start(); // @see https://github.com/ossrs/srs/issues/180 // when failed, revert to init if (err != srs_success) { SrsEdgeState pstate = state; state = SrsEdgeStateInit; srs_trace("edge revert from %d to state %d (push), error %s", pstate, state, srs_error_desc(err).c_str()); } return err;}
(11)每一路边缘节点推流到源站,都是用forwarder,开启forwarder去推送数据。这时候会开启一个协程。源码如下:
srs_error_t SrsEdgeForwarder::start(){ srs_error_t err = srs_success; // reset the error code. send_error_code = ERROR_SUCCESS; std::string url; if (true) { SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost); srs_assert(conf); // select the origin. std::string server = lb->select(conf->args); int port = SRS_CONSTS_RTMP_DEFAULT_PORT; srs_parse_hostport(server, server, port); // support vhost tranform for edge, // @see https://github.com/ossrs/srs/issues/372 std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost); vhost = srs_string_replace(vhost, "[vhost]", req->vhost); url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param); } // open socket. srs_freep(sdk); srs_utime_t cto = SRS_EDGE_FORWARDER_TIMEOUT; srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT; sdk = new SrsSimpleRtmpClient(url, cto, sto); if ((err = sdk->connect()) != srs_success) { return srs_error_wrap(err, "sdk connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto)); } if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) { return srs_error_wrap(err, "sdk publish"); } srs_freep(trd); //开启协程 trd = new SrsSTCoroutine("edge-fwr", this, _srs_context->get_id()); if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "coroutine"); } srs_trace("edge-fwr publish url %s", url.c_str()); return err;}
(12)每一个协程,都必定有一个do_cycle(),这里的sdk代表的是客户端(指的是sdk edge到origin的rtmp客户端),源码如下:
srs_error_t SrsEdgeForwarder::do_cycle(){ srs_error_t err = srs_success; sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE); SrsPithyPrint* pprint = SrsPithyPrint::create_edge(); SrsAutoFree(SrsPithyPrint, pprint); SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS); while (true) { if ((err = trd->pull()) != srs_success) { return srs_error_wrap(err, "edge forward pull"); } if (send_error_code != ERROR_SUCCESS) { srs_usleep(SRS_EDGE_FORWARDER_TIMEOUT); continue; } // read from client. if (true) { SrsCommonMessage* msg = NULL; //sdk代表的是客户端 err = sdk->recv_message(&msg); srs_verbose("edge loop recv message. ret=%d", ret); if (err != srs_success && srs_error_code(err) != ERROR_SOCKET_TIMEOUT) { srs_error("edge push get server control message failed. err=%s", srs_error_desc(err).c_str()); send_error_code = srs_error_code(err); srs_error_reset(err); continue; } srs_error_reset(err); srs_freep(msg); } // forward all messages. // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. int count = 0; //从队列里读取数据 if ((err = queue->dump_packets(msgs.max, msgs.msgs, count)) != srs_success) { return srs_error_wrap(err, "queue dumps packets"); } pprint->elapse(); // pithy print if (pprint->can_print()) { sdk->kbps_sample(SRS_CONSTS_LOG_EDGE_PUBLISH, pprint->age(), count); } // ignore when no messages. if (count send_and_free_messages(msgs.msgs, count)) != srs_success) { return srs_error_wrap(err, "send messages"); } } return err;}
(13)这个函数是接收来自推流客户端到edge边缘节点的数据。也就是把数据放到队列里面去。然后把这些数据通过上面的do_cycle去读取和发送到origin。源码如下:
srs_error_t SrsEdgeForwarder::proxy(SrsCommonMessage* msg){ srs_error_t err = srs_success; if (send_error_code != ERROR_SUCCESS) { return srs_error_new(send_error_code, "edge forwarder"); } // the msg is auto free by source, // so we just ignore, or copy then send it. if (msg->size header.is_set_chunk_size() || msg->header.is_window_ackledgement_size() || msg->header.is_ackledgement()) { return err; } SrsSharedPtrMessage copy; if ((err = copy.create(msg)) != srs_success) { return srs_error_wrap(err, "create message"); } copy.stream_id = sdk->sid(); //接收数据,放入队列 if ((err = queue->enqueue(copy.copy())) != srs_success) { return srs_error_wrap(err, "enqueue message"); } return err;}
(14)在入队列,这里打印断点,输入命令:
b SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
c
如下界面:
(15)查看调用栈,输入命令:
bt
如下界面:
这个调用栈的流程,就是从从推流队列里去读取数据,给SrsEdgeForwarder,最后再给对应的协程执行do_cycle去执行。
0 SrsConfig::get_vhost_edge_origin(string vhost) at src/app/srs_app_config.cpp:50911 SrsEdgeForwarder::start() at src/app/srs_app_edge.cpp:4822 SrsPublishEdge::on_client_publish() at src/app/srs_app_edge.cpp:7773 SrsSource::on_edge_start_publish() at src/app/srs_app_source.cpp:25924 SrsRtmpConn::acquire_publish(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:9365 SrsRtmpConn::publishing(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:8226 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:5347 SrsRtmpConn::service_cycle() at src/app/srs_app_rtmp_conn.cpp:3888 SrsRtmpConn::do_cycle() at src/app/srs_app_rtmp_conn.cpp:2099 SrsConnection::cycle() at src/app/srs_app_recv_thread.cpp:19810SrsRecvThread::cycle() at src/app/srs_app_st.cpp:19811 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:21312 _st_thread_main at sched.c:33713 st_thread_create at sched.c:616
如果看过前面文章的朋友,应该知道,在RTMP推流时,如果是边缘节点和非边缘节点,走的流程是不一样。源码如下:
srs_error_t SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg){ srs_error_t err = srs_success; // for edge, directly proxy message to origin. //边缘节点 if (info->edge) { if ((err = source->on_edge_proxy_publish(msg)) != srs_success) { return srs_error_wrap(err, "rtmp: proxy publish"); } return err; } // process audio packet if (msg->header.is_audio()) { if ((err = source->on_audio(msg)) != srs_success) { return srs_error_wrap(err, "rtmp: consume audio"); } return err; } // process video packet if (msg->header.is_video()) { if ((err = source->on_video(msg)) != srs_success) { return srs_error_wrap(err, "rtmp: consume video"); } return err; } // process aggregate packet if (msg->header.is_aggregate()) { if ((err = source->on_aggregate(msg)) != srs_success) { return srs_error_wrap(err, "rtmp: consume aggregate"); } return err; } // process onMetaData if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { SrsPacket* pkt = NULL; if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) { return srs_error_wrap(err, "rtmp: decode message"); } SrsAutoFree(SrsPacket, pkt); if (dynamic_cast(pkt)) { SrsOnMetaDataPacket* metadata = dynamic_cast(pkt); if ((err = source->on_meta_data(msg, metadata)) != srs_success) { return srs_error_wrap(err, "rtmp: consume metadata"); } return err; } return err; } return err;}
3.接下来分析下Edge模式url实现单点登录,拉流的源码。
(1)在该函数下,打印断点,输入命令:
b SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
b SrsEdgeIngester::SrsEdgeIngester()
b srs_error_t SrsEdgeIngester::do_cycle()
b srs_error_t SrsPlayEdge::on_client_play()
c
界面如下:
(2)启动拉流协程,源码如下:
srs_error_t SrsPlayEdge::on_client_play(){ srs_error_t err = srs_success; // start ingest when init state. if (state == SrsEdgeStateInit) { state = SrsEdgeStatePlay; //启动拉流协程 err = ingester->start(); } return err;}
(3)运行到这里,要保证推流依然运行正常。然后再去拉流。
查看on_client_play的调用栈,输入命令:
bt
界面如下:
0 SrsPlayEdge::on_client_play() at src/app/srs_app_edge.cpp:6771SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg) at src/app/srs_app_source.cpp:25582 SrsRtmpConn::playing(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:6493 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:5344 SrsRtmpConn::service_cycle() at src/app/srs_app_rtmp_conn.cpp:3885 SrsRtmpConn::do_cycle() at src/app/srs_app_rtmp_conn.cpp:2096 SrsConnection::cycle() at src/app/srs_app_conn.cpp:1717 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:1988 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:2139_st_thread_main at sched.c:33710 st_thread_create at sched.c:616
从调用栈的关系可以看出,这里可以看出,SrsSource::create_consumer这里需要判断是否是源站,如果有源站,那么就从源站这里去拉流,并启动on_client_play,源码如下图:
srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg){ srs_error_t err = srs_success; consumer = new SrsConsumer(this, conn); consumers.push_back(consumer); srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); consumer->set_queue_size(queue_size); // if atc, update the sequence header to gop cache time. if (atc && !gop_cache->empty()) { if (meta->data()) { meta->data()->timestamp = srsu2ms(gop_cache->start_time()); } if (meta->vsh()) { meta->vsh()->timestamp = srsu2ms(gop_cache->start_time()); } if (meta->ash()) { meta->ash()->timestamp = srsu2ms(gop_cache->start_time()); } } // If stream is publishing, dumps the sequence header and gop cache. if (hub->active()) { // Copy metadata and sequence header to consumer. if ((err = meta->dumps(consumer, atc, jitter_algorithm, dm, ds)) != srs_success) { return srs_error_wrap(err, "meta dumps"); } // copy gop cache to client. if (dg && (err = gop_cache->dump(consumer, atc, jitter_algorithm)) != srs_success) { return srs_error_wrap(err, "gop cache dumps"); } } // print status. if (dg) { srs_trace("create consumer, active=%d, queue_size=%.2f, jitter=%d", hub->active(), queue_size, jitter_algorithm); } else { srs_trace("create consumer, active=%d, ignore gop cache, jitter=%d", hub->active(), jitter_algorithm); } // for edge, when play edge stream, check the state //如果是源站,就从源站去拉流 if (_srs_config->get_vhost_is_edge(req->vhost)) { // notice edge to start for the first client. if ((err = play_edge->on_client_play()) != srs_success) { return srs_error_wrap(err, "play edge"); } } return err;}
(4)启动on_client_play(),从这里可以看出,这里只可以启动一次,源码如下:
srs_error_t SrsPlayEdge::on_client_play(){ srs_error_t err = srs_success; // start ingest when init state. if (state == SrsEdgeStateInit) { state = SrsEdgeStatePlay; //要启动拉流origin-edge协程,一个source只会start一次 err = ingester->start(); } return err;}
(5)如果所有的拉流端都断开,那么需要有一个状态变更。源码如下:
void SrsPlayEdge::on_all_client_stop(){ // when all client disconnected, // and edge is ingesting origin stream, abort it. if (state == SrsEdgeStatePlay || state == SrsEdgeStateIngestConnected) { SrsEdgeState pstate = state; state = SrsEdgeStateIngestStopping; ingester->stop(); state = SrsEdgeStateInit; srs_trace("edge change from %d to %d then %d (init).", pstate, SrsEdgeStateIngestStopping, state); return; }}
(6)查看SrsEdgeRtmpUpstream的调用栈
0 SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb) at src/app/srs_app_edge.cpp:761 SrsEdgeIngester::do_cycle() at src/app/srs_app_edge.cpp:2712 SrsEdgeIngester::cycle() at src/app/srs_app_edge.cpp:2433 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:1984 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:2135_st_thread_main at sched.c:3376 st_thread_create at sched.c:616
这里会创建一个upstream。源码如下:
srs_error_t SrsEdgeIngester::do_cycle(){ srs_error_t err = srs_success; std::string redirect; while (true) { if ((err = trd->pull()) != srs_success) { return srs_error_wrap(err, "do cycle pull"); } srs_freep(upstream); upstream = new SrsEdgeRtmpUpstream(redirect); if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) { return srs_error_wrap(err, "on source id changed"); } if ((err = upstream->connect(req, lb)) != srs_success) { return srs_error_wrap(err, "connect upstream"); } if ((err = edge->on_ingest_play()) != srs_success) { return srs_error_wrap(err, "notify edge play"); } // set to larger timeout to read av data from origin. upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT); err = ingest(redirect); // retry for rtmp 302 immediately. if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) { int port; string server; upstream->selected(server, port); string url = req->get_stream_url(); srs_warn("RTMP redirect %s from %s:%d to %s", url.c_str(), server.c_str(), port, redirect.c_str()); srs_error_reset(err); continue; } if (srs_is_client_gracefully_close(err)) { srs_warn("origin disconnected, retry, error %s", srs_error_desc(err).c_str()); srs_error_reset(err); } break; } return err;}
(7) 在前面的源码分析中,upstream对应的就是一个拉流客户端。如下界面:
上面的函数源码,重点关注SrsEdgeIngester::ingest,打印断点,调试,输入命令:
b SrsEdgeIngester::ingest(string& redirect)
bt
界面如下:
查看调用栈,如下界面:
(8) SysEdgeIngester::do_cycle是一个推流主循环,主要是从边缘节点推流到源站的推流主循环工作。
srs_error_t SrsEdgeIngester::ingest(string& redirect){ srs_error_t err = srs_success; SrsPithyPrint* pprint = SrsPithyPrint::create_edge(); SrsAutoFree(SrsPithyPrint, pprint); // we only use the redict once. // reset the redirect to empty, for maybe the origin changed. redirect = ""; while (true) { if ((err = trd->pull()) != srs_success) { return srs_error_wrap(err, "thread quit"); } pprint->elapse(); // pithy print if (pprint->can_print()) { upstream->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age()); } // read from client. SrsCommonMessage* msg = NULL; //接收数据 if ((err = upstream->recv_message(&msg)) != srs_success) { return srs_error_wrap(err, "recv message"); } srs_assert(msg); SrsAutoFree(SrsCommonMessage, msg); //重要的函数,推流 if ((err = process_publish_message(msg, redirect)) != srs_success) { return srs_error_wrap(err, "process message"); } } return err;}
(9)视频、音频、Metadata都是在这里处理,最终都是使用SrsSource里面对应的数据,所以这是一个重点分析的函数。
srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect){ srs_error_t err = srs_success; // process audio packet if (msg->header.is_audio()) { if ((err = source->on_audio(msg)) != srs_success) { return srs_error_wrap(err, "source consume audio"); } } // process video packet if (msg->header.is_video()) { if ((err = source->on_video(msg)) != srs_success) { return srs_error_wrap(err, "source consume video"); } } // process aggregate packet if (msg->header.is_aggregate()) { if ((err = source->on_aggregate(msg)) != srs_success) { return srs_error_wrap(err, "source consume aggregate"); } return err; } // process onMetaData if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { SrsPacket* pkt = NULL; if ((err = upstream->decode_message(msg, &pkt)) != srs_success) { return srs_error_wrap(err, "decode message"); } SrsAutoFree(SrsPacket, pkt); if (dynamic_cast(pkt)) { SrsOnMetaDataPacket* metadata = dynamic_cast(pkt); if ((err = source->on_meta_data(msg, metadata)) != srs_success) { return srs_error_wrap(err, "source consume metadata"); } return err; } return err; } // call messages, for example, reject, redirect. if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) { SrsPacket* pkt = NULL; if ((err = upstream->decode_message(msg, &pkt)) != srs_success) { return srs_error_wrap(err, "decode message"); } SrsAutoFree(SrsPacket, pkt); // RTMP 302 redirect if (dynamic_cast(pkt)) { SrsCallPacket* call = dynamic_cast(pkt); if (!call->arguments->is_object()) { return err; } SrsAmf0Any* prop = NULL; SrsAmf0Object* evt = call->arguments->to_object(); if ((prop = evt->ensure_property_string("level")) == NULL) { return err; } else if (prop->to_str() != StatusLevelError) { return err; } if ((prop = evt->get_property("ex")) == NULL || !prop->is_object()) { return err; } SrsAmf0Object* ex = prop->to_object(); // The redirect is tcUrl while redirect2 is RTMP URL. // https://github.com/ossrs/srs/issues/1575#issuecomment-574999798 if ((prop = ex->ensure_property_string("redirect2")) == NULL) { // TODO: FIXME: Remove it when SRS3 released, it's temporarily support for SRS3 alpha versions(a0 to a8). if ((prop = ex->ensure_property_string("redirect")) == NULL) { return err; } } redirect = prop->to_str(); return srs_error_new(ERROR_CONTROL_REDIRECT, "RTMP 302 redirect to %s", redirect.c_str()); } } return err;}
(10) 这里以SrsSource::on_video举例,打印断点,并查看调用栈,输入命令:
b SrsSource::on_video(SrsCommonMessage* shared_video)
bt
调用栈界面如下:
srs_error_t SrsEdgeIngester::ingest(string& redirect) at src/app/srs_app_edge.cpp:3391 SrsEdgeIngester::do_cycle() at src/app/srs_app_edge.cpp:2822 SrsEdgeIngester::cycle() at src/app/srs_app_edge.cpp:2433 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:1984 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:2135_st_thread_main at sched.c:3376 st_thread_create at sched.c:616
通过这里的函数调用关系,可以知道,从源站拉回来后,最终还是通过SrsSource去分发。
srs_error_t SrsSource::on_video(SrsCommonMessage* shared_video){ srs_error_t err = srs_success; // monotically increase detect. if (!mix_correct && is_monotonically_increase) { if (last_packet_time > 0 && shared_video->header.timestamp header.timestamp; // drop any unknown header video. // @see https://github.com/ossrs/srs/issues/421 if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) { char b0 = 0x00; if (shared_video->size > 0) { b0 = shared_video->payload[0]; } srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0); return err; } // convert shared_video to msg, user should not use shared_video again. // the payload is transfer to msg, and set to NULL in shared_video. SrsSharedPtrMessage msg; if ((err = msg.create(shared_video)) != srs_success) { return srs_error_wrap(err, "create message"); } // directly process the audio message. if (!mix_correct) { return on_video_imp(&msg); } // insert msg to the queue. mix_queue->push(msg.copy()); // fetch someone from mix queue. SrsSharedPtrMessage* m = mix_queue->pop(); if (!m) { return err; } // consume the monotonically increase message. if (m->is_audio()) { err = on_audio_imp(m); } else { err = on_video_imp(m); } srs_freep(m); return err;}
(11)前面讲过url实现单点登录,当有多个源站时,会选择一个正在运行的源站,通过函数调用lb->select(conf->args),体现如下:
srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb){ srs_error_t err = srs_success; SrsRequest* req = r; std::string url; if (true) { //读取配置文件源站 SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost); // @see https://github.com/ossrs/srs/issues/79 // when origin is error, for instance, server is shutdown, // then user remove the vhost then reload, the conf is empty. if (!conf) { return srs_error_new(ERROR_EDGE_VHOST_REMOVED, "vhost %s removed", req->vhost.c_str()); } // select the origin. //选择源站 std::string server = lb->select(conf->args); int port = SRS_CONSTS_RTMP_DEFAULT_PORT; srs_parse_hostport(server, server, port); // override the origin info by redirect. if (!redirect.empty()) { int _port; string _schema, _vhost, _app, _stream, _param, _host; srs_discovery_tc_url(redirect, _schema, _host, _vhost, _app, _stream, _port, _param); server = _host; port = _port; } // Remember the current selected server. selected_ip = server; selected_port = port; // support vhost tranform for edge, // @see https://github.com/ossrs/srs/issues/372 std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost); vhost = srs_string_replace(vhost, "[vhost]", req->vhost); url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param); } srs_freep(sdk); srs_utime_t cto = SRS_EDGE_INGESTER_TIMEOUT; srs_utime_t sto = SRS_CONSTS_RTMP_PULSE; sdk = new SrsSimpleRtmpClient(url, cto, sto); if ((err = sdk->connect()) != srs_success) { return srs_error_wrap(err, "edge pull %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto)); } if ((err = sdk->play(_srs_config->get_chunk_size(req->vhost))) != srs_success) { return srs_error_wrap(err, "edge pull %s stream failed", url.c_str()); } return err;}
4.总结
本篇文章重点分析了Edege模式的推拉流源码及调试过程,可以更加清楚认识Edege模式。希望能够帮助到大家。欢迎关注,转发,点赞,收藏,分享,评论区讨论。
本文到此结束,希望对大家有所帮助!