Skip to content

Commit

Permalink
fix #248, improve about 15% performance for fast buffer. 2.0.49
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Dec 4, 2014
1 parent 8423974 commit 29324fa
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 110 deletions.
2 changes: 1 addition & 1 deletion trunk/src/core/srs_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 2
#define VERSION_MINOR 0
#define VERSION_REVISION 48
#define VERSION_REVISION 49
// server info.
#define RTMP_SIG_SRS_KEY "SRS"
#define RTMP_SIG_SRS_ROLE "origin/edge server"
Expand Down
1 change: 1 addition & 0 deletions trunk/src/kernel/srs_kernel_error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_OpenSslSha256DigestSize 2037
#define ERROR_OpenSslGetPeerPublicKey 2038
#define ERROR_OpenSslComputeSharedKey 2039
#define ERROR_RTMP_BUFFER_OVERFLOW 2040
//
// system control message,
// not an error, but special control logic.
Expand Down
93 changes: 63 additions & 30 deletions trunk/src/rtmp/srs_protocol_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_kernel_log.hpp>
#include <srs_kernel_utility.hpp>

// the max header size,
// @see SrsProtocol::read_message_header().
#define SRS_RTMP_MAX_MESSAGE_HEADER 11

SrsSimpleBuffer::SrsSimpleBuffer()
{
}
Expand Down Expand Up @@ -81,46 +85,45 @@ SrsFastBuffer::SrsFastBuffer()
merged_read = false;
_handler = NULL;

nb_buffer = SOCKET_READ_SIZE;
buffer = new char[nb_buffer];
p = end = buffer = NULL;
nb_buffer = 0;

reset_buffer(SOCKET_READ_SIZE);
}

SrsFastBuffer::~SrsFastBuffer()
{
srs_freep(buffer);
}

int SrsFastBuffer::length()
char SrsFastBuffer::read_1byte()
{
int len = (int)data.size();
srs_assert(len >= 0);
return len;
}

char* SrsFastBuffer::bytes()
{
return (length() == 0)? NULL : &data.at(0);
srs_assert(end - p >= 1);
return *p++;
}

void SrsFastBuffer::erase(int size)
char* SrsFastBuffer::read_slice(int size)
{
if (size <= 0) {
return;
}
srs_assert(end - p >= size);
srs_assert(p + size > buffer);

if (size >= length()) {
data.clear();
return;
}
char* ptr = p;
p += size;

data.erase(data.begin(), data.begin() + size);
// reset when consumed all.
if (p == end) {
p = end = buffer;
srs_verbose("all consumed, reset fast buffer");
}

return ptr;
}

void SrsFastBuffer::append(const char* bytes, int size)
void SrsFastBuffer::skip(int size)
{
srs_assert(size > 0);

data.insert(data.end(), bytes, bytes + size);
srs_assert(end - p >= size);
srs_assert(p + size > buffer);
p += size;
}

int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size)
Expand All @@ -133,9 +136,27 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size)
return ret;
}

while (length() < required_size) {
// when read payload and need to grow, reset buffer.
if (end - p < required_size && required_size > SRS_RTMP_MAX_MESSAGE_HEADER) {
int nb_cap = end - p;
srs_verbose("move fast buffer %d bytes", nb_cap);
buffer = (char*)memmove(buffer, p, nb_cap);
p = buffer;
end = p + nb_cap;
}

while (end - p < required_size) {
// the max to read is the left bytes.
size_t max_to_read = buffer + nb_buffer - end;

if (max_to_read <= 0) {
ret = ERROR_RTMP_BUFFER_OVERFLOW;
srs_error("buffer overflow, required=%d, max=%d, ret=%d", required_size, nb_buffer, ret);
return ret;
}

ssize_t nread;
if ((ret = reader->read(buffer, nb_buffer, &nread)) != ERROR_SUCCESS) {
if ((ret = reader->read(end, max_to_read, &nread)) != ERROR_SUCCESS) {
return ret;
}

Expand All @@ -149,8 +170,9 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size)
_handler->on_read(nread);
}

// we just move the ptr to next.
srs_assert((int)nread > 0);
append(buffer, (int)nread);
end += nread;
}

return ret;
Expand Down Expand Up @@ -198,8 +220,19 @@ int SrsFastBuffer::buffer_size()

void SrsFastBuffer::reset_buffer(int size)
{
// remember the cap.
int nb_cap = end - p;

// atleast to put the old data.
nb_buffer = srs_max(nb_cap, size);

// copy old data to buf.
char* buf = new char[nb_buffer];
if (nb_cap > 0) {
memcpy(buf, p, nb_cap);
}

srs_freep(buffer);

nb_buffer = size;
buffer = new char[nb_buffer];
p = buffer = buf;
end = p + nb_cap;
}
43 changes: 22 additions & 21 deletions trunk/src/rtmp/srs_protocol_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,39 +116,40 @@ class SrsFastBuffer
// the merged handler
bool merged_read;
IMergeReadHandler* _handler;
// data and socket buffer
std::vector<char> data;
// the user-space buffer to fill by reader,
// which use fast index and reset when chunk body read ok.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/248
// ptr to the current read position.
char* p;
// ptr to the content end.
char* end;
// ptr to the buffer.
// buffer <= p <= end <= buffer+nb_buffer
char* buffer;
// the max size of buffer.
int nb_buffer;
public:
SrsFastBuffer();
virtual ~SrsFastBuffer();
public:
/**
* get the length of buffer. empty if zero.
* @remark assert length() is not negative.
*/
virtual int length();
/**
* get the buffer bytes.
* @return the bytes, NULL if empty.
* read 1byte from buffer, move to next bytes.
* @remark assert buffer already grow(1).
*/
virtual char* bytes();
public:
virtual char read_1byte();
/**
* erase size of bytes from begin.
* @param size to erase size of bytes.
* clear if size greater than or equals to length()
* @remark ignore size is not positive.
* read a slice in size bytes, move to next bytes.
* user can use this char* ptr directly, and should never free it.
* @remark assert buffer already grow(size).
* @remark the ptr returned maybe invalid after grow(x).
*/
virtual void erase(int size);
private:
virtual char* read_slice(int size);
/**
* append specified bytes to buffer.
* @param size the size of bytes
* @remark assert size is positive.
* skip some bytes in buffer.
* @param size the bytes to skip. positive to next; negative to previous.
* @remark assert buffer already grow(size).
*/
virtual void append(const char* bytes, int size);
virtual void skip(int size);
public:
/**
* grow buffer to the required size, loop to read from skt to fill.
Expand Down
Loading

0 comments on commit 29324fa

Please sign in to comment.