D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
td-agent
/
embedded
/
lib
/
ruby
/
gems
/
2.1.0
/
gems
/
msgpack-1.1.0
/
ext
/
msgpack
/
Filename :
buffer.c
back
Copy
/* * MessagePack for Ruby * * Copyright (C) 2008-2013 Sadayuki Furuhashi * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "buffer.h" #include "rmem.h" #ifndef HAVE_RB_STR_REPLACE static ID s_replace; #endif #ifdef COMPAT_HAVE_ENCODING /* see compat.h*/ int msgpack_rb_encindex_utf8; int msgpack_rb_encindex_usascii; int msgpack_rb_encindex_ascii8bit; #endif #ifndef DISABLE_RMEM static msgpack_rmem_t s_rmem; #endif void msgpack_buffer_static_init() { #ifdef COMPAT_HAVE_ENCODING msgpack_rb_encindex_utf8 = rb_utf8_encindex(); msgpack_rb_encindex_usascii = rb_usascii_encindex(); msgpack_rb_encindex_ascii8bit = rb_ascii8bit_encindex(); #endif #ifndef DISABLE_RMEM msgpack_rmem_init(&s_rmem); #endif #ifndef HAVE_RB_STR_REPLACE s_replace = rb_intern("replace"); #endif } void msgpack_buffer_static_destroy() { #ifndef DISABLE_RMEM msgpack_rmem_destroy(&s_rmem); #endif } void msgpack_buffer_init(msgpack_buffer_t* b) { memset(b, 0, sizeof(msgpack_buffer_t)); b->head = &b->tail; b->write_reference_threshold = MSGPACK_BUFFER_STRING_WRITE_REFERENCE_DEFAULT; b->read_reference_threshold = MSGPACK_BUFFER_STRING_READ_REFERENCE_DEFAULT; b->io_buffer_size = MSGPACK_BUFFER_IO_BUFFER_SIZE_DEFAULT; b->io = Qnil; b->io_buffer = Qnil; } static void _msgpack_buffer_chunk_destroy(msgpack_buffer_chunk_t* c) { if(c->mem != NULL) { #ifndef DISABLE_RMEM if(!msgpack_rmem_free(&s_rmem, c->mem)) { xfree(c->mem); } /* no needs to update rmem_owner because chunks will not be * free()ed (left in free_list) and thus *rmem_owner is * always valid. */ #else xfree(c->mem); #endif } c->first = NULL; c->last = NULL; c->mem = NULL; } void msgpack_buffer_destroy(msgpack_buffer_t* b) { /* head is always available */ msgpack_buffer_chunk_t* c = b->head; while(c != &b->tail) { msgpack_buffer_chunk_t* n = c->next; _msgpack_buffer_chunk_destroy(c); xfree(c); c = n; } _msgpack_buffer_chunk_destroy(c); c = b->free_list; while(c != NULL) { msgpack_buffer_chunk_t* n = c->next; xfree(c); c = n; } } void msgpack_buffer_mark(msgpack_buffer_t* b) { /* head is always available */ msgpack_buffer_chunk_t* c = b->head; while(c != &b->tail) { rb_gc_mark(c->mapped_string); c = c->next; } rb_gc_mark(c->mapped_string); rb_gc_mark(b->io); rb_gc_mark(b->io_buffer); rb_gc_mark(b->owner); } bool _msgpack_buffer_shift_chunk(msgpack_buffer_t* b) { _msgpack_buffer_chunk_destroy(b->head); if(b->head == &b->tail) { /* list becomes empty. don't add head to free_list * because head should be always available */ b->tail_buffer_end = NULL; b->read_buffer = NULL; return false; } /* add head to free_list */ msgpack_buffer_chunk_t* next_head = b->head->next; b->head->next = b->free_list; b->free_list = b->head; b->head = next_head; b->read_buffer = next_head->first; return true; } void msgpack_buffer_clear(msgpack_buffer_t* b) { while(_msgpack_buffer_shift_chunk(b)) { ; } } size_t msgpack_buffer_read_to_string_nonblock(msgpack_buffer_t* b, VALUE string, size_t length) { size_t avail = msgpack_buffer_top_readable_size(b); #ifndef DISABLE_BUFFER_READ_REFERENCE_OPTIMIZE /* optimize */ if(length <= avail && RSTRING_LEN(string) == 0 && b->head->mapped_string != NO_MAPPED_STRING && length >= b->read_reference_threshold) { VALUE s = _msgpack_buffer_refer_head_mapped_string(b, length); #ifndef HAVE_RB_STR_REPLACE /* TODO MRI 1.8 */ rb_funcall(string, s_replace, 1, s); #else rb_str_replace(string, s); #endif /* here doesn't have to call ENCODING_SET because * encoding of s is always ASCII-8BIT */ _msgpack_buffer_consumed(b, length); return length; } #endif size_t const length_orig = length; while(true) { if(length <= avail) { rb_str_buf_cat(string, b->read_buffer, length); _msgpack_buffer_consumed(b, length); return length_orig; } rb_str_buf_cat(string, b->read_buffer, avail); length -= avail; if(!_msgpack_buffer_shift_chunk(b)) { return length_orig - length; } avail = msgpack_buffer_top_readable_size(b); } } size_t msgpack_buffer_read_nonblock(msgpack_buffer_t* b, char* buffer, size_t length) { /* buffer == NULL means skip */ size_t const length_orig = length; while(true) { size_t avail = msgpack_buffer_top_readable_size(b); if(length <= avail) { if(buffer != NULL) { memcpy(buffer, b->read_buffer, length); } _msgpack_buffer_consumed(b, length); return length_orig; } if(buffer != NULL) { memcpy(buffer, b->read_buffer, avail); buffer += avail; } length -= avail; if(!_msgpack_buffer_shift_chunk(b)) { return length_orig - length; } } } size_t msgpack_buffer_all_readable_size(const msgpack_buffer_t* b) { size_t sz = msgpack_buffer_top_readable_size(b); if(b->head == &b->tail) { return sz; } msgpack_buffer_chunk_t* c = b->head->next; while(true) { sz += c->last - c->first; if(c == &b->tail) { return sz; } c = c->next; } } bool _msgpack_buffer_read_all2(msgpack_buffer_t* b, char* buffer, size_t length) { if(!msgpack_buffer_ensure_readable(b, length)) { return false; } msgpack_buffer_read_nonblock(b, buffer, length); return true; } static inline msgpack_buffer_chunk_t* _msgpack_buffer_alloc_new_chunk(msgpack_buffer_t* b) { msgpack_buffer_chunk_t* reuse = b->free_list; if(reuse == NULL) { return xmalloc(sizeof(msgpack_buffer_chunk_t)); } b->free_list = b->free_list->next; return reuse; } static inline void _msgpack_buffer_add_new_chunk(msgpack_buffer_t* b) { if(b->head == &b->tail) { if(b->tail.first == NULL) { /* empty buffer */ return; } msgpack_buffer_chunk_t* nc = _msgpack_buffer_alloc_new_chunk(b); *nc = b->tail; b->head = nc; nc->next = &b->tail; } else { /* search node before tail */ msgpack_buffer_chunk_t* before_tail = b->head; while(before_tail->next != &b->tail) { before_tail = before_tail->next; } msgpack_buffer_chunk_t* nc = _msgpack_buffer_alloc_new_chunk(b); #ifndef DISABLE_RMEM #ifndef DISABLE_RMEM_REUSE_INTERNAL_FRAGMENT if(b->rmem_last == b->tail_buffer_end) { /* reuse unused rmem space */ size_t unused = b->tail_buffer_end - b->tail.last; b->rmem_last -= unused; } #endif #endif /* rebuild tail */ *nc = b->tail; before_tail->next = nc; nc->next = &b->tail; } } static inline void _msgpack_buffer_append_reference(msgpack_buffer_t* b, VALUE string) { VALUE mapped_string = rb_str_dup(string); #ifdef COMPAT_HAVE_ENCODING ENCODING_SET(mapped_string, msgpack_rb_encindex_ascii8bit); #endif _msgpack_buffer_add_new_chunk(b); char* data = RSTRING_PTR(mapped_string); size_t length = RSTRING_LEN(mapped_string); b->tail.first = (char*) data; b->tail.last = (char*) data + length; b->tail.mapped_string = mapped_string; b->tail.mem = NULL; /* msgpack_buffer_writable_size should return 0 for mapped chunk */ b->tail_buffer_end = b->tail.last; /* consider read_buffer */ if(b->head == &b->tail) { b->read_buffer = b->tail.first; } } void _msgpack_buffer_append_long_string(msgpack_buffer_t* b, VALUE string) { size_t length = RSTRING_LEN(string); if(b->io != Qnil) { msgpack_buffer_flush(b); #ifdef COMPAT_HAVE_ENCODING if (ENCODING_GET(string) == msgpack_rb_encindex_ascii8bit) { rb_funcall(b->io, b->io_write_all_method, 1, string); } else if(!STR_DUP_LIKELY_DOES_COPY(string)) { VALUE s = rb_str_dup(string); ENCODING_SET(s, msgpack_rb_encindex_ascii8bit); rb_funcall(b->io, b->io_write_all_method, 1, s); } else { msgpack_buffer_append(b, RSTRING_PTR(string), length); } #else rb_funcall(b->io, b->io_write_all_method, 1, string); #endif } else if(!STR_DUP_LIKELY_DOES_COPY(string)) { _msgpack_buffer_append_reference(b, string); } else { msgpack_buffer_append(b, RSTRING_PTR(string), length); } } static inline void* _msgpack_buffer_chunk_malloc( msgpack_buffer_t* b, msgpack_buffer_chunk_t* c, size_t required_size, size_t* allocated_size) { #ifndef DISABLE_RMEM if(required_size <= MSGPACK_RMEM_PAGE_SIZE) { #ifndef DISABLE_RMEM_REUSE_INTERNAL_FRAGMENT if((size_t)(b->rmem_end - b->rmem_last) < required_size) { #endif /* alloc new rmem page */ *allocated_size = MSGPACK_RMEM_PAGE_SIZE; char* buffer = msgpack_rmem_alloc(&s_rmem); c->mem = buffer; /* update rmem owner */ b->rmem_owner = &c->mem; b->rmem_last = b->rmem_end = buffer + MSGPACK_RMEM_PAGE_SIZE; return buffer; #ifndef DISABLE_RMEM_REUSE_INTERNAL_FRAGMENT } else { /* reuse unused rmem */ *allocated_size = (size_t)(b->rmem_end - b->rmem_last); char* buffer = b->rmem_last; b->rmem_last = b->rmem_end; /* update rmem owner */ c->mem = *b->rmem_owner; *b->rmem_owner = NULL; b->rmem_owner = &c->mem; return buffer; } #endif } #else if(required_size < 72) { required_size = 72; } #endif // TODO alignment? *allocated_size = required_size; void* mem = xmalloc(required_size); c->mem = mem; return mem; } static inline void* _msgpack_buffer_chunk_realloc( msgpack_buffer_t* b, msgpack_buffer_chunk_t* c, void* mem, size_t required_size, size_t* current_size) { if(mem == NULL) { return _msgpack_buffer_chunk_malloc(b, c, required_size, current_size); } size_t next_size = *current_size * 2; while(next_size < required_size) { next_size *= 2; } *current_size = next_size; mem = xrealloc(mem, next_size); c->mem = mem; return mem; } void _msgpack_buffer_expand(msgpack_buffer_t* b, const char* data, size_t length, bool flush_to_io) { if(flush_to_io && b->io != Qnil) { msgpack_buffer_flush(b); if(msgpack_buffer_writable_size(b) >= length) { /* data == NULL means ensure_writable */ if(data != NULL) { size_t tail_avail = msgpack_buffer_writable_size(b); memcpy(b->tail.last, data, length); b->tail.last += tail_avail; } return; } } /* data == NULL means ensure_writable */ if(data != NULL) { size_t tail_avail = msgpack_buffer_writable_size(b); memcpy(b->tail.last, data, tail_avail); b->tail.last += tail_avail; data += tail_avail; length -= tail_avail; } size_t capacity = b->tail.last - b->tail.first; /* can't realloc mapped chunk or rmem page */ if(b->tail.mapped_string != NO_MAPPED_STRING #ifndef DISABLE_RMEM || capacity <= MSGPACK_RMEM_PAGE_SIZE #endif ) { /* allocate new chunk */ _msgpack_buffer_add_new_chunk(b); char* mem = _msgpack_buffer_chunk_malloc(b, &b->tail, length, &capacity); char* last = mem; if(data != NULL) { memcpy(mem, data, length); last += length; } /* rebuild tail chunk */ b->tail.first = mem; b->tail.last = last; b->tail.mapped_string = NO_MAPPED_STRING; b->tail_buffer_end = mem + capacity; /* consider read_buffer */ if(b->head == &b->tail) { b->read_buffer = b->tail.first; } } else { /* realloc malloc()ed chunk or NULL */ size_t tail_filled = b->tail.last - b->tail.first; char* mem = _msgpack_buffer_chunk_realloc(b, &b->tail, b->tail.first, tail_filled+length, &capacity); char* last = mem + tail_filled; if(data != NULL) { memcpy(last, data, length); last += length; } /* consider read_buffer */ if(b->head == &b->tail) { size_t read_offset = b->read_buffer - b->head->first; b->read_buffer = mem + read_offset; } /* rebuild tail chunk */ b->tail.first = mem; b->tail.last = last; b->tail_buffer_end = mem + capacity; } } static inline VALUE _msgpack_buffer_head_chunk_as_string(msgpack_buffer_t* b) { size_t length = b->head->last - b->read_buffer; if(length == 0) { return rb_str_buf_new(0); } if(b->head->mapped_string != NO_MAPPED_STRING) { return _msgpack_buffer_refer_head_mapped_string(b, length); } return rb_str_new(b->read_buffer, length); } static inline VALUE _msgpack_buffer_chunk_as_string(msgpack_buffer_chunk_t* c) { size_t chunk_size = c->last - c->first; if(chunk_size == 0) { return rb_str_buf_new(0); } if(c->mapped_string != NO_MAPPED_STRING) { return rb_str_dup(c->mapped_string); } return rb_str_new(c->first, chunk_size); } VALUE msgpack_buffer_all_as_string(msgpack_buffer_t* b) { if(b->head == &b->tail) { return _msgpack_buffer_head_chunk_as_string(b); } size_t length = msgpack_buffer_all_readable_size(b); VALUE string = rb_str_new(NULL, length); char* buffer = RSTRING_PTR(string); size_t avail = msgpack_buffer_top_readable_size(b); memcpy(buffer, b->read_buffer, avail); buffer += avail; length -= avail; msgpack_buffer_chunk_t* c = b->head->next; while(true) { avail = c->last - c->first; memcpy(buffer, c->first, avail); if(length <= avail) { return string; } buffer += avail; length -= avail; c = c->next; } } VALUE msgpack_buffer_all_as_string_array(msgpack_buffer_t* b) { if(b->head == &b->tail) { VALUE s = msgpack_buffer_all_as_string(b); VALUE ary = rb_ary_new3(1, s); return ary; } /* TODO optimize ary construction */ VALUE ary = rb_ary_new(); VALUE s = _msgpack_buffer_head_chunk_as_string(b); rb_ary_push(ary, s); msgpack_buffer_chunk_t* c = b->head->next; while(true) { s = _msgpack_buffer_chunk_as_string(c); rb_ary_push(ary, s); if(c == &b->tail) { return ary; } c = c->next; } return ary; } size_t msgpack_buffer_flush_to_io(msgpack_buffer_t* b, VALUE io, ID write_method, bool consume) { if(msgpack_buffer_top_readable_size(b) == 0) { return 0; } VALUE s = _msgpack_buffer_head_chunk_as_string(b); rb_funcall(io, write_method, 1, s); size_t sz = RSTRING_LEN(s); if(consume) { while(_msgpack_buffer_shift_chunk(b)) { s = _msgpack_buffer_chunk_as_string(b->head); rb_funcall(io, write_method, 1, s); sz += RSTRING_LEN(s); } return sz; } else { if(b->head == &b->tail) { return sz; } msgpack_buffer_chunk_t* c = b->head->next; while(true) { s = _msgpack_buffer_chunk_as_string(c); rb_funcall(io, write_method, 1, s); sz += RSTRING_LEN(s); if(c == &b->tail) { return sz; } c = c->next; } } } size_t _msgpack_buffer_feed_from_io(msgpack_buffer_t* b) { if(b->io_buffer == Qnil) { b->io_buffer = rb_funcall(b->io, b->io_partial_read_method, 1, LONG2NUM(b->io_buffer_size)); if(b->io_buffer == Qnil) { rb_raise(rb_eEOFError, "IO reached end of file"); } StringValue(b->io_buffer); } else { VALUE ret = rb_funcall(b->io, b->io_partial_read_method, 2, LONG2NUM(b->io_buffer_size), b->io_buffer); if(ret == Qnil) { rb_raise(rb_eEOFError, "IO reached end of file"); } } size_t len = RSTRING_LEN(b->io_buffer); if(len == 0) { rb_raise(rb_eEOFError, "IO reached end of file"); } /* TODO zero-copy optimize? */ msgpack_buffer_append_nonblock(b, RSTRING_PTR(b->io_buffer), len); return len; } size_t _msgpack_buffer_read_from_io_to_string(msgpack_buffer_t* b, VALUE string, size_t length) { if(RSTRING_LEN(string) == 0) { /* direct read */ VALUE ret = rb_funcall(b->io, b->io_partial_read_method, 2, LONG2NUM(length), string); if(ret == Qnil) { return 0; } return RSTRING_LEN(string); } /* copy via io_buffer */ if(b->io_buffer == Qnil) { b->io_buffer = rb_str_buf_new(0); } VALUE ret = rb_funcall(b->io, b->io_partial_read_method, 2, LONG2NUM(length), b->io_buffer); if(ret == Qnil) { return 0; } size_t rl = RSTRING_LEN(b->io_buffer); rb_str_buf_cat(string, (const void*)RSTRING_PTR(b->io_buffer), rl); return rl; } size_t _msgpack_buffer_skip_from_io(msgpack_buffer_t* b, size_t length) { if(b->io_buffer == Qnil) { b->io_buffer = rb_str_buf_new(0); } VALUE ret = rb_funcall(b->io, b->io_partial_read_method, 2, LONG2NUM(length), b->io_buffer); if(ret == Qnil) { return 0; } return RSTRING_LEN(b->io_buffer); }