From: hyc Date: Wed, 6 Jan 2010 10:29:47 +0000 (+0000) Subject: Relay data in chunks instead of full packets, to reduce latency. Support X-Git-Tag: v2.4~311 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=e7faab2112146bb100b377923f37fc6e988e9024;p=rtmpdump Relay data in chunks instead of full packets, to reduce latency. Support multiple outstanding Play requests. git-svn-id: svn://svn.mplayerhq.hu/rtmpdump/trunk@200 400ebc74-4327-4243-bc38-086b20814532 --- diff --git a/rtmpsuck.c b/rtmpsuck.c index 523e760..abb92c5 100644 --- a/rtmpsuck.c +++ b/rtmpsuck.c @@ -70,6 +70,13 @@ enum STREAMING_STOPPED }; +typedef struct Flist +{ + struct Flist *f_next; + FILE *f_file; + AVal f_path; +} Flist; + typedef struct Plist { struct Plist *p_next; @@ -85,7 +92,8 @@ typedef struct RTMP rc; Plist *rs_pkt[2]; /* head, tail */ Plist *rc_pkt[2]; /* head, tail */ - FILE *out; + Flist *f_head, *f_tail; + Flist *f_cur; } STREAMING_SERVER; @@ -129,9 +137,9 @@ SAVC(fmsVer); SAVC(mode); SAVC(level); SAVC(code); -SAVC(description); SAVC(secureToken); SAVC(onStatus); +SAVC(details); SAVC(close); static const AVal av_NetStream_Failed = AVC("NetStream.Failed"); static const AVal av_NetStream_Play_Failed = AVC("NetStream.Play.Failed"); @@ -190,13 +198,7 @@ ServeInvoke(STREAMING_SERVER *server, int which, RTMPPacket *pack, const char *b if (cobj.o_props[i].p_type == AMF_STRING) { pval = cobj.o_props[i].p_vu.p_aval; - if (pval.av_val) - { - pval.av_val = malloc(pval.av_len+1); - memcpy(pval.av_val, cobj.o_props[i].p_vu.p_aval.av_val, pval.av_len); - pval.av_val[pval.av_len] = '\0'; - } - LogPrintf("%.*s: %s\n", pname.av_len, pname.av_val, pval.av_val); + LogPrintf("%.*s: %.*s\n", pname.av_len, pname.av_val, pval.av_len, pval.av_val); } if (AVMATCH(&pname, &av_app)) { @@ -289,12 +291,7 @@ ServeInvoke(STREAMING_SERVER *server, int which, RTMPPacket *pack, const char *b server->rc.Link.authflag = AMFProp_GetBoolean(&obj.o_props[3]); if (obj.o_num > 4) { - AVal tmp; - AMFProp_GetString(&obj.o_props[4], &tmp); - server->rc.Link.auth.av_len = tmp.av_len; - server->rc.Link.auth.av_val = malloc(tmp.av_len+1); - memcpy(server->rc.Link.auth.av_val, tmp.av_val, tmp.av_len); - server->rc.Link.auth.av_val[tmp.av_len] = '\0'; + AMFProp_GetString(&obj.o_props[4], &server->rc.Link.auth); } } @@ -307,6 +304,7 @@ ServeInvoke(STREAMING_SERVER *server, int which, RTMPPacket *pack, const char *b else if (AVMATCH(&method, &av_play)) { AVal av; + FILE *out; char *file, *p, *q; char flvHeader[] = { 'F', 'L', 'V', 0x01, 0x05, // video + audio, we finalize later if the value is different @@ -317,7 +315,8 @@ ServeInvoke(STREAMING_SERVER *server, int which, RTMPPacket *pack, const char *b server->rc.m_stream_id = pack->m_nInfoField2; AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &av); server->rc.Link.playpath = av; - q = strchr(av.av_val, '?'); + + q = memchr(av.av_val, '?', av.av_len); if (q) av.av_len = q - av.av_val; for (p=av.av_val+av.av_len-1; p>=av.av_val; p--) @@ -337,20 +336,38 @@ ServeInvoke(STREAMING_SERVER *server, int which, RTMPPacket *pack, const char *b LogPrintf("Playpath: %.*s\nSaving as: %s\n", server->rc.Link.playpath.av_len, server->rc.Link.playpath.av_val, file); - server->out = fopen(file, "wb"); - if (!server->out) + out = fopen(file, "wb"); + free(file); + if (!out) ret = 1; else - fwrite(flvHeader, 1, sizeof(flvHeader), server->out); - free(file); + { + Flist *fl; + fwrite(flvHeader, 1, sizeof(flvHeader), out); + av = server->rc.Link.playpath; + fl = malloc(sizeof(Flist)+av.av_len+1); + fl->f_file = out; + fl->f_path.av_len = av.av_len; + fl->f_path.av_val = (char *)(fl+1); + memcpy(fl->f_path.av_val, av.av_val, av.av_len); + fl->f_path.av_val[av.av_len] = '\0'; + fl->f_next = NULL; + if (server->f_tail) + server->f_tail->f_next = fl; + else + server->f_head = fl; + server->f_tail = fl; + server->f_cur = fl; + } } else if (AVMATCH(&method, &av_onStatus)) { AMFObject obj2; - AVal code, level; + AVal code, level, details; AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2); AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code); AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level); + AMFProp_GetString(AMF_GetProp(&obj2, &av_details, -1), &details); Log(LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val); if (AVMATCH(&code, &av_NetStream_Failed) @@ -363,6 +380,15 @@ ServeInvoke(STREAMING_SERVER *server, int which, RTMPPacket *pack, const char *b if (AVMATCH(&code, &av_NetStream_Play_Start)) { + Flist *fl; + /* If multiple streams were queued up, find the one + to make current. */ + for (fl = server->f_head; fl; fl=fl->f_next) + if (AVMATCH(&fl->f_path, &details) && fl->f_file) + { + server->f_cur = fl; + break; + } server->rc.m_bPlaying = true; } @@ -370,6 +396,18 @@ ServeInvoke(STREAMING_SERVER *server, int which, RTMPPacket *pack, const char *b if (AVMATCH(&code, &av_NetStream_Play_Complete) || AVMATCH(&code, &av_NetStream_Play_Stop)) { + Flist **fl; + /* Remove this file from the play queue */ + for (fl = &server->f_head; *fl; fl = &(*fl)->f_next) + if (*fl == server->f_cur) + { + Flist *f = *fl; + *fl = f->f_next; + f->f_file = NULL; + free(f); + server->f_cur = NULL; + break; + } ret = 1; } } @@ -643,6 +681,7 @@ controlServerThread(void *unused) case 'q': LogPrintf("Exiting\n"); stopStreaming(rtmpServer); + free(rtmpServer); exit(0); break; default: @@ -657,7 +696,7 @@ void doServe(STREAMING_SERVER * server, // server socket and state (our listenin ) { RTMPPacket pc = { 0 }, ps = { 0 }; - char *buf; + char *buf, hbuf[RTMP_MAX_HEADER_SIZE]; unsigned int buflen = 131072; bool paused = false; @@ -703,6 +742,8 @@ void doServe(STREAMING_SERVER * server, // server socket and state (our listenin break; } + pc.m_header = hbuf; + /* We have our own timeout in select() */ server->rc.Link.timeout = 10; server->rs.Link.timeout = 10; @@ -729,12 +770,12 @@ void doServe(STREAMING_SERVER * server, // server socket and state (our listenin FD_SET(server->rc.m_socket, &rfds); /* give more time to start up if we're not playing yet */ - tv.tv_sec = server->out ? 30 : 60; + tv.tv_sec = server->f_cur ? 30 : 60; tv.tv_usec = 0; if (select(n + 1, &rfds, NULL, NULL, &tv) <= 0) { - if (server->out && server->rc.m_mediaChannel && !paused) + if (server->f_cur && server->rc.m_mediaChannel && !paused) { server->rc.m_pauseStamp = server->rc.m_channelTimestamp[server->rc.m_mediaChannel]; if (RTMP_ToggleStream(&server->rc)) @@ -795,10 +836,11 @@ void doServe(STREAMING_SERVER * server, // server socket and state (our listenin } } else if (ps.m_packetType == 0x11 || ps.m_packetType == 0x14) - if (ServePacket(server, 0, &ps) && server->out) + if (ServePacket(server, 0, &ps) && server->f_cur) { - fclose(server->out); - server->out = NULL; + fclose(server->f_cur->f_file); + server->f_cur->f_file = NULL; + server->f_cur = NULL; } RTMP_SendPacket(&server->rc, &ps, false); RTMPPacket_Free(&ps); @@ -807,98 +849,107 @@ void doServe(STREAMING_SERVER * server, // server socket and state (our listenin } if (cr) { + int n = pc.m_nBytesRead; while (RTMP_ReadPacket(&server->rc, &pc)) - if (RTMPPacket_IsReady(&pc)) - { - int sendit = 1; - if (paused) - { - if (pc.m_nTimeStamp <= server->rc.m_mediaStamp) - continue; - paused = 0; - server->rc.m_pausing = 0; - } - /* change chunk size */ - if (pc.m_packetType == 0x01) - { - if (pc.m_nBodySize >= 4) - { - server->rc.m_inChunkSize = AMF_DecodeInt32(pc.m_body); - Log(LOGDEBUG, "%s, server: chunk size change to %d", __FUNCTION__, - server->rc.m_inChunkSize); - server->rs.m_outChunkSize = server->rc.m_inChunkSize; - } - } - else if (pc.m_packetType == 0x04) - { - short nType = AMF_DecodeInt16(pc.m_body); - /* SWFverification */ - if (nType == 0x1a) -#ifdef CRYPTO - if (server->rc.Link.SWFHash.av_len) - { - RTMP_SendCtrl(&server->rc, 0x1b, 0, 0); - sendit = 0; - } -#else - /* The session will certainly fail right after this */ - Log(LOGERROR, "%s, server requested SWF verification, need CRYPTO support! ", __FUNCTION__); -#endif - } - else if (server->out && ( - pc.m_packetType == 0x08 || - pc.m_packetType == 0x09 || - pc.m_packetType == 0x12 || - pc.m_packetType == 0x16) && - RTMP_ClientPacket(&server->rc, &pc)) - { - int len = WriteStream(&buf, &buflen, &server->stamp, &pc); - if (len > 0 && fwrite(buf, 1, len, server->out) != len) - goto cleanup; - } - else if ( pc.m_packetType == 0x11 || pc.m_packetType == 0x14) - { - if (ServePacket(server, 1, &pc) && server->out) - { - fclose(server->out); - server->out = NULL; - } - } - - if (sendit && RTMP_IsConnected(&server->rs)) - RTMP_SendPacket(&server->rs, &pc, false); - RTMPPacket_Free(&pc); - break; - } + { + int sendit = 1; + if (RTMPPacket_IsReady(&pc)) + { + if (paused) + { + if (pc.m_nTimeStamp <= server->rc.m_mediaStamp) + continue; + paused = 0; + server->rc.m_pausing = 0; + } + /* change chunk size */ + if (pc.m_packetType == 0x01) + { + if (pc.m_nBodySize >= 4) + { + server->rc.m_inChunkSize = AMF_DecodeInt32(pc.m_body); + Log(LOGDEBUG, "%s, server: chunk size change to %d", __FUNCTION__, + server->rc.m_inChunkSize); + server->rs.m_outChunkSize = server->rc.m_inChunkSize; + } + } + else if (pc.m_packetType == 0x04) + { + short nType = AMF_DecodeInt16(pc.m_body); + /* SWFverification */ + if (nType == 0x1a) + #ifdef CRYPTO + if (server->rc.Link.SWFHash.av_len) + { + RTMP_SendCtrl(&server->rc, 0x1b, 0, 0); + sendit = 0; + } + #else + /* The session will certainly fail right after this */ + Log(LOGERROR, "%s, server requested SWF verification, need CRYPTO support! ", __FUNCTION__); + #endif + } + else if (server->f_cur && ( + pc.m_packetType == 0x08 || + pc.m_packetType == 0x09 || + pc.m_packetType == 0x12 || + pc.m_packetType == 0x16) && + RTMP_ClientPacket(&server->rc, &pc)) + { + int len = WriteStream(&buf, &buflen, &server->stamp, &pc); + if (len > 0 && fwrite(buf, 1, len, server->f_cur->f_file) != len) + goto cleanup; + } + else if ( pc.m_packetType == 0x11 || pc.m_packetType == 0x14) + { + if (ServePacket(server, 1, &pc) && server->f_cur) + { + fclose(server->f_cur->f_file); + server->f_cur->f_file = NULL; + server->f_cur = NULL; + } + } + } + if (sendit && RTMP_IsConnected(&server->rs)) + RTMP_SendChunk(&server->rs, &pc, n); + if (RTMPPacket_IsReady(&pc)) + { + RTMPPacket_Free(&pc); + pc.m_nBytesRead = 0; + } + break; + } } if (!RTMP_IsConnected(&server->rs) && RTMP_IsConnected(&server->rc) - && !server->out) + && !server->f_cur) RTMP_Close(&server->rc); } cleanup: LogPrintf("Closing connection... "); RTMP_Close(&server->rs); - if (server->out) + while (server->f_head) { - fclose(server->out); - server->out = NULL; + Flist *fl = server->f_head; + server->f_head = fl->f_next; + if (fl->f_file) + fclose(fl->f_file); + free(fl); } + free(buf); /* Should probably be done by RTMP_Close() ... */ free((void *)server->rc.Link.hostname); server->rc.Link.hostname = NULL; - free(server->rc.Link.tcUrl.av_val); server->rc.Link.tcUrl.av_val = NULL; - free(server->rc.Link.swfUrl.av_val); server->rc.Link.swfUrl.av_val = NULL; - free(server->rc.Link.pageUrl.av_val); server->rc.Link.pageUrl.av_val = NULL; - free(server->rc.Link.app.av_val); server->rc.Link.app.av_val = NULL; - free(server->rc.Link.auth.av_val); server->rc.Link.auth.av_val = NULL; - free(server->rc.Link.flashVer.av_val); server->rc.Link.flashVer.av_val = NULL; +#ifdef CRYPTO + free(server->rc.Link.SWFHash.av_val); + server->rc.Link.SWFHash.av_val = NULL; +#endif LogPrintf("done!\n\n"); quit: @@ -1044,7 +1095,7 @@ main(int argc, char **argv) LogPrintf("RTMP Proxy Server %s\n", RTMPDUMP_VERSION); LogPrintf("(c) 2010 Andrej Stepanchuk, Howard Chu; license: GPL\n\n"); - debuglevel = LOGINFO; + debuglevel = LOGDEBUG; if (argc > 1 && !strcmp(argv[1], "-z")) debuglevel = LOGALL; @@ -1080,6 +1131,8 @@ main(int argc, char **argv) } Log(LOGDEBUG, "Done, exiting..."); + free(rtmpServer); + CleanupSockets(); #ifdef _DEBUG