From: hyc Date: Wed, 10 Mar 2010 19:23:10 +0000 (+0000) Subject: Cache initial packets, don't write header till we know the dataType X-Git-Tag: v2.4~207 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=b94823a1bb9f07db8a48e5697793ca1f9b1b98e2;p=rtmpdump Cache initial packets, don't write header till we know the dataType git-svn-id: svn://svn.mplayerhq.hu/rtmpdump/trunk@328 400ebc74-4327-4243-bc38-086b20814532 --- diff --git a/rtmpgw.c b/rtmpgw.c index a4a8efa..b41dff7 100644 --- a/rtmpgw.c +++ b/rtmpgw.c @@ -105,7 +105,6 @@ typedef struct uint32_t dStartOffset; uint32_t dStopOffset; - uint32_t nTimeStamp; unsigned char hash[HASHLEN]; } RTMP_REQUEST; @@ -292,21 +291,30 @@ WriteHeader(char **buf, // target pointer, maybe preallocated return size; } +typedef struct WSargs { + RTMP *rtmp; + char *buf; // target pointer, maybe preallocated + unsigned int buflen; // length of buffer if preallocated + uint32_t nTimeStamp; // timestamp of last packet returned + uint8_t dataType; // type of stream for FLV header +} WSargs; + int -WriteStream(RTMP * rtmp, char **buf, // target pointer, maybe preallocated - unsigned int len, // length of buffer if preallocated - uint32_t * nTimeStamp) +WriteStream(WSargs *ws) { uint32_t prevTagSize = 0; int rtnGetNextMediaPacket = 0, ret = -1; RTMPPacket packet = { 0 }; - rtnGetNextMediaPacket = RTMP_GetNextMediaPacket(rtmp, &packet); + rtnGetNextMediaPacket = RTMP_GetNextMediaPacket(ws->rtmp, &packet); while (rtnGetNextMediaPacket) { char *packetBody = packet.m_body; unsigned int nPacketLen = packet.m_nBodySize; + // set data type + ws->dataType |= (((packet.m_packetType == 0x08)<<2)|(packet.m_packetType == 0x09)); + // skip video info/command packets if (packet.m_packetType == 0x09 && nPacketLen == 2 && ((*packetBody & 0xf0) == 0x50)) @@ -343,33 +351,31 @@ WriteStream(RTMP * rtmp, char **buf, // target pointer, maybe preallocated || packet.m_packetType == 0x12) ? 11 : 0) + (packet.m_packetType != 0x16 ? 4 : 0); - if (size + 4 > len) + if (size + 4 > ws->buflen) { // the extra 4 is for the case of an FLV stream without a last prevTagSize (we need extra 4 bytes to append it) - *buf = (char *) realloc(*buf, size + 4); - if (*buf == 0) + ws->buf = realloc(ws->buf, size + 4); + if (ws->buf == 0) { Log(LOGERROR, "Couldn't reallocate memory!"); ret = -1; // fatal error break; } + ws->buflen = size + 4; } - char *ptr = *buf, *pend = ptr + size+4; + char *ptr = ws->buf, *pend = ptr + size+4; // audio (0x08), video (0x09) or metadata (0x12) packets : // construct 11 byte header then add rtmp packet's data if (packet.m_packetType == 0x08 || packet.m_packetType == 0x09 || packet.m_packetType == 0x12) { - // set data type - //*dataType |= (((packet.m_packetType == 0x08)<<2)|(packet.m_packetType == 0x09)); - - (*nTimeStamp) = packet.m_nTimeStamp; + ws->nTimeStamp = packet.m_nTimeStamp; prevTagSize = 11 + nPacketLen; *ptr++ = packet.m_packetType; ptr = AMF_EncodeInt24(ptr, pend, nPacketLen); - ptr = AMF_EncodeInt24(ptr, pend, *nTimeStamp); - *ptr = (char) (((*nTimeStamp) & 0xFF000000) >> 24); + ptr = AMF_EncodeInt24(ptr, pend, ws->nTimeStamp); + *ptr = (char) (((ws->nTimeStamp) & 0xFF000000) >> 24); ptr++; // stream id @@ -387,11 +393,11 @@ WriteStream(RTMP * rtmp, char **buf, // target pointer, maybe preallocated while (pos + 11 < nPacketLen) { uint32_t dataSize = AMF_DecodeInt24(packetBody + pos + 1); // size without header (11) and without prevTagSize (4) - *nTimeStamp = AMF_DecodeInt24(packetBody + pos + 4); - *nTimeStamp |= (packetBody[pos + 7] << 24); + ws->nTimeStamp = AMF_DecodeInt24(packetBody + pos + 4); + ws->nTimeStamp |= (packetBody[pos + 7] << 24); // set data type - //*dataType |= (((*(packetBody+pos) == 0x08)<<2)|(*(packetBody+pos) == 0x09)); + ws->dataType |= (((*(packetBody+pos) == 0x08)<<2)|(*(packetBody+pos) == 0x09)); if (pos + 11 + dataSize + 4 > nPacketLen) { @@ -420,7 +426,7 @@ WriteStream(RTMP * rtmp, char **buf, // target pointer, maybe preallocated Log(LOGDEBUG, "FLV Packet: type %02X, dataSize: %lu, tagSize: %lu, timeStamp: %lu ms", (unsigned char) packetBody[pos], dataSize, prevTagSize, - *nTimeStamp); + ws->nTimeStamp); #endif if (prevTagSize != (dataSize + 11)) @@ -532,6 +538,7 @@ void processTCPrequest(STREAMING_SERVER * server, // server socket and state (ou char *ptr = NULL; // header pointer size_t nRead = 0; + int doHeader = 1; char srvhead[] = "\r\nServer:HTTP-RTMP Stream Server \r\nContent-Type: Video/MPEG \r\n\r\n"; @@ -540,6 +547,7 @@ void processTCPrequest(STREAMING_SERVER * server, // server socket and state (ou RTMP rtmp = { 0 }; uint32_t dSeek = 0; // can be used to start from a later point in the stream + WSargs ws; // reset RTMP options to defaults specified upon invokation of streams RTMP_REQUEST req; @@ -720,6 +728,11 @@ void processTCPrequest(STREAMING_SERVER * server, // server socket and state (ou // send the packets buffer = (char *) calloc(PACKET_SIZE, 1); + ws.rtmp = &rtmp; + ws.buf = buffer; + ws.buflen = PACKET_SIZE; + ws.nTimeStamp = dSeek; + ws.dataType = 0; // User defined seek offset if (req.dStartOffset > 0) @@ -733,7 +746,7 @@ void processTCPrequest(STREAMING_SERVER * server, // server socket and state (ou if (dSeek != 0) { - LogPrintf("Starting at TS: %d ms\n", req.nTimeStamp); + LogPrintf("Starting at TS: %d ms\n", ws.nTimeStamp); } Log(LOGDEBUG, "Setting buffer time to: %dms", req.bufferTime); @@ -767,14 +780,8 @@ void processTCPrequest(STREAMING_SERVER * server, // server socket and state (ou nRead = WriteHeader(&buffer, PACKET_SIZE); if (nRead > 0) { - nWritten = send(sockfd, buffer, nRead, 0); - if (nWritten < 0) - { - Log(LOGERROR, "%s, sending failed, error: %d", __FUNCTION__, - GetSockError()); - goto cleanup; // we are in STREAMING_IN_PROGRESS, so we'll go to STREAMING_ACCEPTING - } - + ws.buf += nRead; + ws.buflen -= nRead; size += nRead; } else @@ -787,13 +794,28 @@ void processTCPrequest(STREAMING_SERVER * server, // server socket and state (ou // get the rest of the stream do { - nRead = WriteStream(&rtmp, &buffer, PACKET_SIZE, &req.nTimeStamp); + nRead = WriteStream(&ws); if (nRead > 0) { - nWritten = send(sockfd, buffer, nRead, 0); - //Log(LOGDEBUG, "written: %d", nWritten); - if (nWritten < 0) + if (doHeader) + { + if (ws.nTimeStamp > dSeek) + { + doHeader = 0; + nRead += size; + size = 0; + ws.buf = buffer; + ws.buflen = PACKET_SIZE; + buffer[4] = ws.dataType; + } + else + { + ws.buf += nRead; + ws.buflen -= nRead; + } + } + if (!doHeader && (nWritten = send(sockfd, buffer, nRead, 0)) < 0) { Log(LOGERROR, "%s, sending failed, error: %d", __FUNCTION__, GetSockError()); @@ -809,17 +831,17 @@ void processTCPrequest(STREAMING_SERVER * server, // server socket and state (ou if (duration > 0) { percent = - ((double) (dSeek + req.nTimeStamp)) / (duration * + ((double) (dSeek + ws.nTimeStamp)) / (duration * 1000.0) * 100.0; percent = ((double) (int) (percent * 10.0)) / 10.0; LogStatus("\r%.3f KB / %.2f sec (%.1f%%)", (double) size / 1024.0, - (double) (req.nTimeStamp) / 1000.0, percent); + (double) (ws.nTimeStamp) / 1000.0, percent); } else { LogStatus("\r%.3f KB / %.2f sec", (double) size / 1024.0, - (double) (req.nTimeStamp) / 1000.0); + (double) (ws.nTimeStamp) / 1000.0); } } #ifdef _DEBUG @@ -830,7 +852,7 @@ void processTCPrequest(STREAMING_SERVER * server, // server socket and state (ou #endif // Force clean close if a specified stop offset is reached - if (req.dStopOffset && req.nTimeStamp >= req.dStopOffset) + if (req.dStopOffset && ws.nTimeStamp >= req.dStopOffset) { LogPrintf("\nStop offset has been reached at %.2f seconds\n", (double) req.dStopOffset / 1000.0);