From: hyc Date: Tue, 16 Mar 2010 04:06:22 +0000 (+0000) Subject: Partial RTMP_Write implementation X-Git-Tag: v2.4~182 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=a6a41ce55363f77bc99d559f47e860def5503c1a;p=rtmpdump Partial RTMP_Write implementation git-svn-id: svn://svn.mplayerhq.hu/rtmpdump/trunk@356 400ebc74-4327-4243-bc38-086b20814532 --- diff --git a/librtmp/rtmp.c b/librtmp/rtmp.c index 4a8cfa7..7dc17a8 100644 --- a/librtmp/rtmp.c +++ b/librtmp/rtmp.c @@ -275,9 +275,7 @@ RTMP_SetupStream(RTMP *r, double dTime, uint32_t dLength, bool bLiveStream, long int timeout) { - assert(protocol < 9); - - Log(LOGDEBUG, "Protocol : %s", RTMPProtocolStrings[protocol]); + Log(LOGDEBUG, "Protocol : %s", RTMPProtocolStrings[protocol&7]); Log(LOGDEBUG, "Hostname : %s", hostname); Log(LOGDEBUG, "Port : %d", port); Log(LOGDEBUG, "Playpath : %s", playpath->av_val); @@ -598,7 +596,7 @@ RTMP_ReconnectStream(RTMP *r, int bufferTime, double seekTime, { RTMP_DeleteStream(r); - RTMP_SendCreateStream(r, 2.0); + RTMP_SendCreateStream(r); RTMP_SetBufferMS(r, bufferTime); @@ -1009,6 +1007,8 @@ SAVC(videoFunction); SAVC(objectEncoding); SAVC(secureToken); SAVC(secureTokenResponse); +SAVC(type); +SAVC(nonprivate); static bool SendConnectPacket(RTMP *r, RTMPPacket *cp) @@ -1029,12 +1029,15 @@ SendConnectPacket(RTMP *r, RTMPPacket *cp) char *enc = packet.m_body; enc = AMF_EncodeString(enc, pend, &av_connect); - enc = AMF_EncodeNumber(enc, pend, 1.0); + enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); *enc++ = AMF_OBJECT; - if (r->Link.app.av_len) + enc = AMF_EncodeNamedString(enc, pend, &av_app, &r->Link.app); + if (!enc) + return false; + if (r->Link.protocol & RTMP_FEATURE_WRITE) { - enc = AMF_EncodeNamedString(enc, pend, &av_app, &r->Link.app); + enc = AMF_EncodeNamedString(enc, pend, &av_type, &av_nonprivate); if (!enc) return false; } @@ -1056,30 +1059,33 @@ SendConnectPacket(RTMP *r, RTMPPacket *cp) if (!enc) return false; } - enc = AMF_EncodeNamedBoolean(enc, pend, &av_fpad, false); - if (!enc) - return false; - enc = AMF_EncodeNamedNumber(enc, pend, &av_capabilities, 15.0); - if (!enc) - return false; - enc = AMF_EncodeNamedNumber(enc, pend, &av_audioCodecs, r->m_fAudioCodecs); - if (!enc) - return false; - enc = AMF_EncodeNamedNumber(enc, pend, &av_videoCodecs, r->m_fVideoCodecs); - if (!enc) - return false; - enc = AMF_EncodeNamedNumber(enc, pend, &av_videoFunction, 1.0); - if (!enc) - return false; - if (r->Link.pageUrl.av_len) + if (!(r->Link.protocol & RTMP_FEATURE_WRITE)) { - enc = AMF_EncodeNamedString(enc, pend, &av_pageUrl, &r->Link.pageUrl); + enc = AMF_EncodeNamedBoolean(enc, pend, &av_fpad, false); if (!enc) return false; + enc = AMF_EncodeNamedNumber(enc, pend, &av_capabilities, 15.0); + if (!enc) + return false; + enc = AMF_EncodeNamedNumber(enc, pend, &av_audioCodecs, r->m_fAudioCodecs); + if (!enc) + return false; + enc = AMF_EncodeNamedNumber(enc, pend, &av_videoCodecs, r->m_fVideoCodecs); + if (!enc) + return false; + enc = AMF_EncodeNamedNumber(enc, pend, &av_videoFunction, 1.0); + if (!enc) + return false; + if (r->Link.pageUrl.av_len) + { + enc = AMF_EncodeNamedString(enc, pend, &av_pageUrl, &r->Link.pageUrl); + if (!enc) + return false; + } } if (r->m_fEncoding != 0.0 || r->m_bSendEncoding) - { - enc = AMF_EncodeNamedNumber(enc, pend, &av_objectEncoding, r->m_fEncoding); // AMF0, AMF3 not supported yet + { // AMF0, AMF3 not fully supported yet + enc = AMF_EncodeNamedNumber(enc, pend, &av_objectEncoding, r->m_fEncoding); if (!enc) return false; } @@ -1149,7 +1155,7 @@ SendBGHasStream(RTMP *r, double dId, AVal *playpath) SAVC(createStream); bool -RTMP_SendCreateStream(RTMP *r, double dCmdID) +RTMP_SendCreateStream(RTMP *r) { RTMPPacket packet; char pbuf[256], *pend = pbuf + sizeof(pbuf); @@ -1164,7 +1170,7 @@ RTMP_SendCreateStream(RTMP *r, double dCmdID) char *enc = packet.m_body; enc = AMF_EncodeString(enc, pend, &av_createStream); - enc = AMF_EncodeNumber(enc, pend, dCmdID); + enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); *enc++ = AMF_NULL; // NULL packet.m_nBodySize = enc - packet.m_body; @@ -1190,7 +1196,7 @@ SendFCSubscribe(RTMP *r, AVal *subscribepath) Log(LOGDEBUG, "FCSubscribe: %s", subscribepath->av_val); char *enc = packet.m_body; enc = AMF_EncodeString(enc, pend, &av_FCSubscribe); - enc = AMF_EncodeNumber(enc, pend, 4.0); + enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); *enc++ = AMF_NULL; enc = AMF_EncodeString(enc, pend, subscribepath); @@ -1202,6 +1208,98 @@ SendFCSubscribe(RTMP *r, AVal *subscribepath) return RTMP_SendPacket(r, &packet, true); } +SAVC(releaseStream); + +static bool +SendReleaseStream(RTMP *r) +{ + RTMPPacket packet; + char pbuf[1024], *pend = pbuf + sizeof(pbuf); + + packet.m_nChannel = 0x03; // control channel (invoke) + packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM; + packet.m_packetType = 0x14; // INVOKE + packet.m_nInfoField1 = 0; + packet.m_nInfoField2 = 0; + packet.m_hasAbsTimestamp = 0; + packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE; + + char *enc = packet.m_body; + enc = AMF_EncodeString(enc, pend, &av_releaseStream); + enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); + *enc++ = AMF_NULL; + enc = AMF_EncodeString(enc, pend, &r->Link.playpath); + if (!enc) + return false; + + packet.m_nBodySize = enc - packet.m_body; + + return RTMP_SendPacket(r, &packet, true); +} + +SAVC(FCPublish); + +static bool +SendFCPublish(RTMP *r) +{ + RTMPPacket packet; + char pbuf[1024], *pend = pbuf + sizeof(pbuf); + + packet.m_nChannel = 0x03; // control channel (invoke) + packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM; + packet.m_packetType = 0x14; // INVOKE + packet.m_nInfoField1 = 0; + packet.m_nInfoField2 = 0; + packet.m_hasAbsTimestamp = 0; + packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE; + + char *enc = packet.m_body; + enc = AMF_EncodeString(enc, pend, &av_FCPublish); + enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); + *enc++ = AMF_NULL; + enc = AMF_EncodeString(enc, pend, &r->Link.playpath); + if (!enc) + return false; + + packet.m_nBodySize = enc - packet.m_body; + + return RTMP_SendPacket(r, &packet, true); +} + +SAVC(publish); +SAVC(live); + +static bool +SendPublish(RTMP *r) +{ + RTMPPacket packet; + char pbuf[1024], *pend = pbuf + sizeof(pbuf); + + packet.m_nChannel = 0x04; // source channel (invoke) + packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM; + packet.m_packetType = 0x14; // INVOKE + packet.m_nInfoField1 = 0; + packet.m_nInfoField2 = 0; + packet.m_hasAbsTimestamp = 0; + packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE; + + char *enc = packet.m_body; + enc = AMF_EncodeString(enc, pend, &av_publish); + enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); + *enc++ = AMF_NULL; + enc = AMF_EncodeString(enc, pend, &r->Link.playpath); + if (!enc) + return false; + + enc = AMF_EncodeString(enc, pend, &av_live); + if (!enc) + return false; + + packet.m_nBodySize = enc - packet.m_body; + + return RTMP_SendPacket(r, &packet, true); +} + SAVC(deleteStream); static bool @@ -1220,7 +1318,7 @@ SendDeleteStream(RTMP *r, double dStreamId) char *enc = packet.m_body; enc = AMF_EncodeString(enc, pend, &av_deleteStream); - enc = AMF_EncodeNumber(enc, pend, 0.0); + enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); *enc++ = AMF_NULL; enc = AMF_EncodeNumber(enc, pend, dStreamId); @@ -1248,7 +1346,7 @@ RTMP_SendPause(RTMP *r, bool DoPause, double dTime) char *enc = packet.m_body; enc = AMF_EncodeString(enc, pend, &av_pause); - enc = AMF_EncodeNumber(enc, pend, 0); + enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); *enc++ = AMF_NULL; enc = AMF_EncodeBoolean(enc, pend, DoPause); enc = AMF_EncodeNumber(enc, pend, (double)dTime); @@ -1277,7 +1375,7 @@ RTMP_SendSeek(RTMP *r, double dTime) char *enc = packet.m_body; enc = AMF_EncodeString(enc, pend, &av_seek); - enc = AMF_EncodeNumber(enc, pend, 0); + enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); *enc++ = AMF_NULL; enc = AMF_EncodeNumber(enc, pend, dTime); @@ -1350,7 +1448,7 @@ SendCheckBW(RTMP *r) char *enc = packet.m_body; enc = AMF_EncodeString(enc, pend, &av__checkbw); - enc = AMF_EncodeNumber(enc, pend, 0); + enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); *enc++ = AMF_NULL; packet.m_nBodySize = enc - packet.m_body; @@ -1404,7 +1502,7 @@ SendPlay(RTMP *r) char *enc = packet.m_body; enc = AMF_EncodeString(enc, pend, &av_play); - enc = AMF_EncodeNumber(enc, pend, 0.0); // stream id?? + enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); *enc++ = AMF_NULL; Log(LOGDEBUG, "%s, seekTime=%.2f, dLength=%d, sending play: %s", @@ -1648,23 +1746,40 @@ HandleInvoke(RTMP *r, const char *body, unsigned int nBodySize) SendSecureTokenResponse(r, &p.p_vu.p_aval); } } - RTMP_SendServerBW(r); - RTMP_SendCtrl(r, 3, 0, 300); - - RTMP_SendCreateStream(r, 2.0); + if (r->Link.protocol & RTMP_FEATURE_WRITE) + { + SendReleaseStream(r); + SendFCPublish(r); + } + else + { + RTMP_SendServerBW(r); + RTMP_SendCtrl(r, 3, 0, 300); + } + RTMP_SendCreateStream(r); - /* Send the FCSubscribe if live stream or if subscribepath is set */ - if (r->Link.subscribepath.av_len) - SendFCSubscribe(r, &r->Link.subscribepath); - else if (r->Link.bLiveStream) - SendFCSubscribe(r, &r->Link.playpath); + if (!(r->Link.protocol & RTMP_FEATURE_WRITE)) + { + /* Send the FCSubscribe if live stream or if subscribepath is set */ + if (r->Link.subscribepath.av_len) + SendFCSubscribe(r, &r->Link.subscribepath); + else if (r->Link.bLiveStream) + SendFCSubscribe(r, &r->Link.playpath); + } } else if (AVMATCH(&methodInvoked, &av_createStream)) { r->m_stream_id = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3)); - SendPlay(r); - RTMP_SendCtrl(r, 3, r->m_stream_id, r->m_nBufferMS); + if (r->Link.protocol & RTMP_FEATURE_WRITE) + { + SendPublish(r); + } + else + { + SendPlay(r); + RTMP_SendCtrl(r, 3, r->m_stream_id, r->m_nBufferMS); + } } else if (AVMATCH(&methodInvoked, &av_play)) { @@ -2614,6 +2729,7 @@ RTMP_Close(RTMP *r) AV_clear(r->m_methodCalls, r->m_numCalls); r->m_methodCalls = NULL; r->m_numCalls = 0; + r->m_numInvokes = 0; r->m_bPlaying = false; r->m_sb.sb_size = 0; @@ -3408,3 +3524,66 @@ RTMP_Read(RTMP *r, char *buf, int size) total += size; return total; } + +static const AVal av_setDataFrame = AVC("@setDataFrame"); + +int +RTMP_Write(RTMP *r, char *buf, int size) +{ + RTMPPacket packet; + char *pend, *enc; + int s2 = size, ret; + + if (size < 11) { + /* FLV pkt too small */ + return 0; + } + + packet.m_nChannel = 0x04; // source channel + packet.m_nInfoField2 = 0; + + if (buf[0] == 'F' && buf[1] == 'L' && buf[2] == 'V') + { + buf += 13; + s2 -= 13; + } + + packet.m_packetType = *buf++; + packet.m_nBodySize = AMF_DecodeInt24(buf); + buf += 3; + packet.m_nInfoField1 = AMF_DecodeInt24(buf); + buf += 3; + packet.m_nInfoField1 |= *buf++ << 24; + buf += 3; + s2 -= 11; + + if (((packet.m_packetType == 0x08 || packet.m_packetType == 0x09) && + !packet.m_nInfoField1) || packet.m_packetType == 0x12) + { + packet.m_headerType = RTMP_PACKET_SIZE_LARGE; + packet.m_hasAbsTimestamp = 1; + if (packet.m_packetType == 0x12) + packet.m_nBodySize += 16; + } + else + { + packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM; + packet.m_hasAbsTimestamp = 0; + } + + if (!RTMPPacket_Alloc(&packet, packet.m_nBodySize)) + { + Log(LOGDEBUG, "%s, failed to allocate packet", __FUNCTION__); + return false; + } + enc = packet.m_body; + pend = enc + packet.m_nBodySize; + if (packet.m_packetType == 0x12) + enc = AMF_EncodeString(enc, pend, &av_setDataFrame); + memcpy(enc, buf, packet.m_nBodySize); + ret = RTMP_SendPacket(r, &packet, false); + RTMPPacket_Free(&packet); + if (!ret) + return -1; + return size; +} diff --git a/librtmp/rtmp.h b/librtmp/rtmp.h index 29e974d..62520a6 100644 --- a/librtmp/rtmp.h +++ b/librtmp/rtmp.h @@ -39,7 +39,9 @@ extern "C" #define RTMP_FEATURE_HTTP 0x01 #define RTMP_FEATURE_ENC 0x02 #define RTMP_FEATURE_SSL 0x04 -#define RTMP_FEATURE_MFP 0x08 // not yet supported +#define RTMP_FEATURE_MFP 0x08 /* not yet supported */ +#define RTMP_FEATURE_WRITE 0x10 /* publish, not play */ +#define RTMP_FEATURE_HTTP2 0x20 /* server-side rtmpt */ #define RTMP_PROTOCOL_UNDEFINED -1 #define RTMP_PROTOCOL_RTMP 0 @@ -205,8 +207,9 @@ extern "C" uint8_t m_bSendEncoding; uint8_t m_bSendCounter; - AVal *m_methodCalls; /* remote method calls queue */ + int m_numInvokes; int m_numCalls; + AVal *m_methodCalls; /* remote method calls queue */ RTMP_LNK Link; RTMPPacket *m_vecChannelsIn[RTMP_CHANNELS]; @@ -286,7 +289,7 @@ extern "C" int RTMPSockBuf_Send(RTMPSockBuf *sb, const char *buf, int len); int RTMPSockBuf_Close(RTMPSockBuf *sb); - bool RTMP_SendCreateStream(RTMP *r, double dCmdID); + bool RTMP_SendCreateStream(RTMP *r); bool RTMP_SendSeek(RTMP *r, double dTime); bool RTMP_SendServerBW(RTMP *r); void RTMP_DropRequest(RTMP *r, int i, bool freeit);