D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
alt
/
php71
/
usr
/
include
/
php
/
ext
/
swoole
/
include
/
Filename :
swoole.h
back
Copy
/* +----------------------------------------------------------------------+ | Swoole | +----------------------------------------------------------------------+ | This source file is subject to version 2.0 of the Apache license, | | that is bundled with this package in the file LICENSE, and is | | available through the world-wide-web at the following url: | | http://www.apache.org/licenses/LICENSE-2.0.html | | If you did not receive a copy of the Apache2.0 license and are unable| | to obtain it through the world-wide-web, please send a note to | | license@swoole.com so we can mail you a copy immediately. | +----------------------------------------------------------------------+ | Author: Tianfeng Han <mikan.tenny@gmail.com> | +----------------------------------------------------------------------+ */ #ifndef SWOOLE_H_ #define SWOOLE_H_ #if defined(HAVE_CONFIG_H) && !defined(COMPILE_DL_SWOOLE) #include "config.h" #elif defined(PHP_ATOM_INC) || defined(ZEND_SIGNALS) #include "php_config.h" #endif #ifdef __cplusplus #define SW_EXTERN_C_BEGIN extern "C" { #define SW_EXTERN_C_END } #else #define SW_EXTERN_C_BEGIN #define SW_EXTERN_C_END #endif SW_EXTERN_C_BEGIN #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif /*--- C standard library ---*/ #include <assert.h> #include <ctype.h> #include <errno.h> #include <inttypes.h> #include <limits.h> #include <math.h> #include <stddef.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <stdarg.h> #include <signal.h> #include <time.h> #include <fcntl.h> #include <unistd.h> #include <pthread.h> #include <poll.h> #include <arpa/inet.h> #include <netinet/in.h> #include <netinet/tcp.h> #include <netdb.h> #include <sys/socket.h> #include <sys/time.h> #include <sys/select.h> #include <sys/mman.h> #include <sys/ipc.h> #include <sys/wait.h> #include <sys/un.h> #include <sys/types.h> #include <sys/utsname.h> #include <sys/stat.h> #if defined(HAVE_CPU_AFFINITY) #ifdef __FreeBSD__ #include <sys/types.h> #include <sys/cpuset.h> #include <pthread_np.h> typedef cpuset_t cpu_set_t; #else #include <sched.h> #endif #endif #ifdef __MACH__ #include <mach/clock.h> #include <mach/mach_time.h> #include <sys/sysctl.h> #define ORWL_NANO (+1.0E-9) #define ORWL_GIGA UINT64_C(1000000000) static double orwl_timebase = 0.0; static uint64_t orwl_timestart = 0; #ifndef HAVE_CLOCK_GETTIME int clock_gettime(clock_id_t which_clock, struct timespec *t); #endif #endif #if __APPLE__ #define daemon(nochdir, noclose) swoole_daemon(nochdir, noclose) #endif /*----------------------------------------------------------------------------*/ #define SWOOLE_MAJOR_VERSION 4 #define SWOOLE_MINOR_VERSION 4 #define SWOOLE_RELEASE_VERSION 12 #define SWOOLE_EXTRA_VERSION "" #define SWOOLE_VERSION "4.4.12" #define SWOOLE_VERSION_ID 40412 #define SWOOLE_BUG_REPORT \ "A bug occurred in Swoole-v" SWOOLE_VERSION ", please report it.\n"\ "The Swoole developers probably don't know about it,\n"\ "and unless you report it, chances are it won't be fixed.\n"\ "You can read How to report a bug doc before submitting any bug reports:\n"\ ">> https://github.com/swoole/swoole-src/blob/master/.github/ISSUE.md \n"\ "Please do not send bug reports in the mailing list or personal letters.\n"\ "The issue page is also suitable to submit feature requests.\n" /*----------------------------------------------------------------------------*/ #ifndef ulong #define ulong unsigned long #endif typedef unsigned long ulong_t; #ifndef PRIu64 #define PRIu64 "llu" #endif #ifndef PRIx64 #define PRIx64 "llx" #endif #if defined(__GNUC__) #if __GNUC__ >= 3 #define sw_inline inline __attribute__((always_inline)) #else #define sw_inline inline #endif #elif defined(_MSC_VER) #define sw_inline __forceinline #else #define sw_inline inline #endif #if defined(__GNUC__) && __GNUC__ >= 4 #define SW_API __attribute__ ((visibility("default"))) #else #define SW_API #endif #if defined(MAP_ANON) && !defined(MAP_ANONYMOUS) #define MAP_ANONYMOUS MAP_ANON #endif #if defined(MAP_HUGETLB) || defined(MAP_ALIGNED_SUPER) #define MAP_HUGE_PAGE 1 #endif #ifndef SOCK_NONBLOCK #define SOCK_NONBLOCK O_NONBLOCK #endif #ifndef CLOCK_REALTIME #define CLOCK_REALTIME 0 #endif #if !defined(__GNUC__) || __GNUC__ < 3 #define __builtin_expect(x, expected_value) (x) #endif #define sw_likely(x) __builtin_expect(!!(x), 1) #define sw_unlikely(x) __builtin_expect(!!(x), 0) #define SW_START_LINE "-------------------------START----------------------------" #define SW_END_LINE "--------------------------END-----------------------------" #define SW_ECHO_RED "\e[31m%s\e[0m" #define SW_ECHO_GREEN "\e[32m%s\e[0m" #define SW_ECHO_YELLOW "\e[33m%s\e[0m" #define SW_ECHO_BLUE "\e[34m%s\e[0m" #define SW_ECHO_MAGENTA "\e[35m%s\e[0m" #define SW_ECHO_CYAN "\e[36m%s\e[0m" #define SW_ECHO_WHITE "\e[37m%s\e[0m" #define SW_COLOR_RED 1 #define SW_COLOR_GREEN 2 #define SW_COLOR_YELLOW 3 #define SW_COLOR_BLUE 4 #define SW_COLOR_MAGENTA 5 #define SW_COLOR_CYAN 6 #define SW_COLOR_WHITE 7 #define SW_SPACE ' ' #define SW_CRLF "\r\n" #define SW_CRLF_LEN 2 #define SW_ASCII_CODE_0 64 #define SW_ASCII_CODE_Z 106 /*----------------------------------------------------------------------------*/ #include "swoole_config.h" #include "atomic.h" #include "hashmap.h" #include "list.h" #include "heap.h" #include "ring_queue.h" #include "array.h" #include "error.h" #define SW_MAX(A, B) ((A) > (B) ? (A) : (B)) #define SW_MIN(A, B) ((A) < (B) ? (A) : (B)) #ifndef MAX #define MAX(A, B) SW_MAX(A, B) #endif #ifndef MIN #define MIN(A, B) SW_MIN(A, B) #endif #ifdef SW_DEBUG #define SW_ASSERT(e) assert(e) #define SW_ASSERT_1BYTE(v) do { \ size_t i = 0, n = 0; \ for (; i < sizeof(v); i++) { \ n += ((v >> i) & 1) ? 1 : 0; \ } \ assert(n == 1); \ } while (0) #else #define SW_ASSERT(e) #define SW_ASSERT_1BYTE(v) #endif #define SW_START_SLEEP usleep(100000) //sleep 1s,wait fork and pthread_create /*-----------------------------------Memory------------------------------------*/ // Evaluates to the number of elements in 'array' #define SW_ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0])) #define SW_DEFAULT_ALIGNMENT sizeof(unsigned long) #define SW_MEM_ALIGNED_SIZE(size) \ SW_MEM_ALIGNED_SIZE_EX(size, SW_DEFAULT_ALIGNMENT) #define SW_MEM_ALIGNED_SIZE_EX(size, alignment) \ (((size) + ((alignment) - 1LL)) & ~((alignment) - 1LL)) #ifdef SW_USE_EMALLOC #define sw_malloc emalloc #define sw_free efree #define sw_calloc ecalloc #define sw_realloc erealloc #else #ifdef SW_USE_JEMALLOC #include <jemalloc/jemalloc.h> #define sw_malloc je_malloc #define sw_free je_free #define sw_calloc je_calloc #define sw_realloc je_realloc #else #define sw_malloc malloc #define sw_free free #define sw_calloc calloc #define sw_realloc realloc #endif #endif static sw_inline int sw_mem_equal(const void *v1, size_t s1, const void *v2, size_t s2) { return s1 == s2 && memcmp(v1, v2, s2) == 0; } /*----------------------------------String-------------------------------------*/ #define SW_STRS(s) s, sizeof(s) #define SW_STRL(s) s, sizeof(s)-1 #if defined(SW_USE_JEMALLOC) || defined(SW_USE_TCMALLOC) #define sw_strdup swoole_strdup #define sw_strndup swoole_strndup #else #define sw_strdup strdup #define sw_strndup strndup #endif #define SW_Z_BEST_SPEED 1 /** always return less than size, zero termination */ size_t sw_snprintf(char *buf, size_t size, const char *format, ...); size_t sw_vsnprintf(char *buf, size_t size, const char *format, va_list args); static sw_inline char* swoole_strdup(const char *s) { size_t l = strlen(s) + 1; char *p = (char *) sw_malloc(l); if (sw_likely(p)) { memcpy(p, s, l); } return p; } static sw_inline char* swoole_strndup(const char *s, size_t n) { char *p = (char *) sw_malloc(n + 1); if (sw_likely(p)) { strncpy(p, s, n)[n] = '\0'; } return p; } /*--------------------------------Constants------------------------------------*/ enum swResult_code { SW_OK = 0, SW_ERR = -1, }; enum swReturn_code { SW_CONTINUE = 1, SW_WAIT = 2, SW_CLOSE = 3, SW_ERROR = 4, SW_READY = 5, }; enum swFd_type { SW_FD_SESSION, //server stream session SW_FD_STREAM_SERVER, //server stream port SW_FD_DGRAM_SERVER, //server dgram port SW_FD_PIPE, SW_FD_STREAM, SW_FD_AIO, /** * Coroutine Socket */ SW_FD_CORO_SOCKET, /** * socket poll fd [coroutine::socket_poll] */ SW_FD_CORO_POLL, SW_FD_SIGNAL, //signalfd SW_FD_DNS_RESOLVER,//dns resolver /** * c-ares */ SW_FD_ARES, /** * SW_FD_USER or SW_FD_USER+n: for custom event */ SW_FD_USER = 16, SW_FD_STREAM_CLIENT, SW_FD_DGRAM_CLIENT, }; enum swBool_type { SW_TRUE = 1, SW_FALSE = 0, }; enum swEvent_type { SW_EVENT_NULL = 0, SW_EVENT_DEAULT = 1u << 8, SW_EVENT_READ = 1u << 9, SW_EVENT_WRITE = 1u << 10, SW_EVENT_RDWR = SW_EVENT_READ | SW_EVENT_WRITE, SW_EVENT_ERROR = 1u << 11, SW_EVENT_ONCE = 1u << 12, }; enum swPipe_type { SW_PIPE_READ = 0, SW_PIPE_WRITE = 1, }; enum swGlobal_hook_type { SW_GLOBAL_HOOK_BEFORE_SERVER_START, SW_GLOBAL_HOOK_BEFORE_CLIENT_START, SW_GLOBAL_HOOK_BEFORE_WORKER_START, SW_GLOBAL_HOOK_ON_CORO_START, SW_GLOBAL_HOOK_ON_CORO_STOP, SW_GLOBAL_HOOK_ON_REACTOR_CREATE, }; enum swFork_type { SW_FORK_SPAWN = 0, SW_FORK_EXEC = 1u << 2, }; //------------------------------------------------------------------------------- enum swServer_mode { SW_MODE_BASE = 1, SW_MODE_PROCESS = 2, }; //------------------------------------------------------------------------------- enum swSocket_type { SW_SOCK_TCP = 1, SW_SOCK_UDP = 2, SW_SOCK_TCP6 = 3, SW_SOCK_UDP6 = 4, SW_SOCK_UNIX_STREAM = 5, //unix sock stream SW_SOCK_UNIX_DGRAM = 6, //unix sock dgram }; #define SW_SOCK_SSL (1u << 9) //------------------------------------------------------------------------------- enum swLog_level { SW_LOG_DEBUG = 0, SW_LOG_TRACE, SW_LOG_INFO, SW_LOG_NOTICE, SW_LOG_WARNING, SW_LOG_ERROR, SW_LOG_NONE, }; //------------------------------------------------------------------------------- enum swWorker_status { SW_WORKER_BUSY = 1, SW_WORKER_IDLE = 2, }; //------------------------------------------------------------------------------- #define swInfo(str,...) \ if (SW_LOG_INFO >= SwooleG.log_level) {\ size_t _sw_error_len = sw_snprintf(sw_error,SW_ERROR_MSG_SIZE,str,##__VA_ARGS__);\ SwooleG.write_log(SW_LOG_INFO, sw_error, _sw_error_len);\ } #define swNotice(str,...) \ if (SW_LOG_NOTICE >= SwooleG.log_level) {\ size_t _sw_error_len = sw_snprintf(sw_error,SW_ERROR_MSG_SIZE,str,##__VA_ARGS__);\ SwooleG.write_log(SW_LOG_NOTICE, sw_error, _sw_error_len);\ } #define swSysNotice(str,...) \ do{\ SwooleG.error = errno;\ if (SW_LOG_ERROR >= SwooleG.log_level) {\ size_t _sw_error_len = sw_snprintf(sw_error,SW_ERROR_MSG_SIZE,"%s(:%d): " str ", Error: %s[%d]",__func__,__LINE__,##__VA_ARGS__,strerror(errno),errno);\ SwooleG.write_log(SW_LOG_NOTICE, sw_error, _sw_error_len);\ }\ } while(0) #define swWarn(str,...) \ do{\ if (SW_LOG_WARNING >= SwooleG.log_level) {\ size_t _sw_error_len = sw_snprintf(sw_error,SW_ERROR_MSG_SIZE,"%s: " str,__func__,##__VA_ARGS__);\ SwooleG.write_log(SW_LOG_WARNING, sw_error, _sw_error_len);\ }\ } while(0) #define swSysWarn(str,...) \ do{\ SwooleG.error = errno;\ if (SW_LOG_ERROR >= SwooleG.log_level) {\ size_t _sw_error_len = sw_snprintf(sw_error,SW_ERROR_MSG_SIZE,"%s(:%d): " str ", Error: %s[%d]",__func__,__LINE__,##__VA_ARGS__,strerror(errno),errno);\ SwooleG.write_log(SW_LOG_WARNING, sw_error, _sw_error_len);\ }\ } while(0) #define swError(str,...) \ do{\ size_t _sw_error_len = sw_snprintf(sw_error, SW_ERROR_MSG_SIZE, str, ##__VA_ARGS__);\ SwooleG.write_log(SW_LOG_ERROR, sw_error, _sw_error_len);\ exit(1);\ } while(0) #define swSysError(str,...) \ do{\ size_t _sw_error_len = sw_snprintf(sw_error,SW_ERROR_MSG_SIZE,"%s(:%d): " str ", Error: %s[%d]",__func__,__LINE__,##__VA_ARGS__,strerror(errno),errno);\ SwooleG.write_log(SW_LOG_ERROR, sw_error, _sw_error_len);\ exit(1);\ } while(0) #define swFatalError(code, str,...) \ SwooleG.fatal_error(code, str, ##__VA_ARGS__) #define swoole_error_log(level, __errno, str, ...) \ do{\ SwooleG.error = __errno;\ if (level >= SwooleG.log_level){\ size_t _sw_error_len = sw_snprintf(sw_error, SW_ERROR_MSG_SIZE, "%s (ERRNO %d): " str,__func__,__errno,##__VA_ARGS__);\ SwooleG.write_log(level, sw_error, _sw_error_len);\ }\ } while(0) #ifdef SW_DEBUG #define swDebug(str,...) \ if (SW_LOG_DEBUG >= SwooleG.log_level) {\ size_t _sw_error_len = sw_snprintf(sw_error, SW_ERROR_MSG_SIZE, "%s(:%d): " str, __func__, __LINE__, ##__VA_ARGS__);\ SwooleG.write_log(SW_LOG_DEBUG, sw_error, _sw_error_len);\ } #define swHexDump(data, length) \ do { \ const char *__data = (data); \ size_t __length = (length); \ swDebug("+----------+------------+-----------+-----------+------------+------------------+"); \ for (size_t of = 0; of < __length; of += 16) \ { \ char hex[16 * 3 + 1]; \ char str[16 + 1]; \ size_t i, hof = 0, sof = 0; \ for (i = of; i < of + 16 && i < __length; i++) \ { \ hof += sprintf(hex + hof, "%02x ", (__data)[i] & 0xff); \ sof += sprintf(str + sof, "%c", isprint((int) (__data)[i]) ? (__data)[i] : '.'); \ } \ swDebug("| %08x | %-48s| %-16s |", of, hex, str); \ } \ swDebug("+----------+------------+-----------+-----------+------------+------------------+"); \ } while (0) #else #define swDebug(str,...) #define swHexDump(data, length) #endif enum swTrace_type { /** * Server */ SW_TRACE_SERVER = 1u << 1, SW_TRACE_CLIENT = 1u << 2, SW_TRACE_BUFFER = 1u << 3, SW_TRACE_CONN = 1u << 4, SW_TRACE_EVENT = 1u << 5, SW_TRACE_WORKER = 1u << 6, SW_TRACE_MEMORY = 1u << 7, SW_TRACE_REACTOR = 1u << 8, SW_TRACE_PHP = 1u << 9, SW_TRACE_HTTP = 1u << 10, SW_TRACE_HTTP2 = 1u << 11, SW_TRACE_EOF_PROTOCOL = 1u << 12, SW_TRACE_LENGTH_PROTOCOL = 1u << 13, SW_TRACE_CLOSE = 1u << 14, SW_TRACE_WEBSOCEKT = 1u << 15, /** * Client */ SW_TRACE_REDIS_CLIENT = 1u << 16, SW_TRACE_MYSQL_CLIENT = 1u << 17, SW_TRACE_HTTP_CLIENT = 1u << 18, SW_TRACE_AIO = 1u << 19, SW_TRACE_SSL = 1u << 20, SW_TRACE_NORMAL = 1u << 21, /** * Coroutine */ SW_TRACE_CHANNEL = 1u << 22, SW_TRACE_TIMER = 1u << 23, SW_TRACE_SOCKET = 1u << 24, SW_TRACE_COROUTINE = 1u << 25, SW_TRACE_CONTEXT = 1u << 26, SW_TRACE_CO_HTTP_SERVER = 1u << 27, SW_TRACE_ALL = 0xffffffff }; #ifdef SW_LOG_TRACE_OPEN #define swTraceLog(what,str,...) \ if (SW_LOG_TRACE >= SwooleG.log_level && (what & SwooleG.trace_flags)) {\ size_t _sw_error_len = sw_snprintf(sw_error,SW_ERROR_MSG_SIZE,"%s(:%d): " str, __func__, __LINE__, ##__VA_ARGS__);\ SwooleG.write_log(SW_LOG_TRACE, sw_error, _sw_error_len);\ } #else #define swTraceLog(what,str,...) #endif #define swTrace(str,...) swTraceLog(SW_TRACE_NORMAL, str, ##__VA_ARGS__) #define swYield() sched_yield() //or usleep(1) #define SW_MAX_FDTYPE 32 //32 kinds of event //------------------------------Base-------------------------------- #ifndef uchar typedef unsigned char uchar; #endif #ifdef SW_USE_OPENSSL #include <openssl/ssl.h> #endif typedef void (*swDestructor)(void *data); typedef void (*swCallback)(void *data); typedef struct { uint32_t id; uint32_t fd :24; uint32_t reactor_id :8; } swSession; typedef struct _swString { size_t length; size_t size; off_t offset; char *str; } swString; typedef struct _swLinkedList_node { struct _swLinkedList_node *prev; struct _swLinkedList_node *next; ulong_t priority; void *data; } swLinkedList_node; typedef struct { uint32_t num; uint8_t type; swLinkedList_node *head; swLinkedList_node *tail; swDestructor dtor; } swLinkedList; typedef struct { union { struct sockaddr_in inet_v4; struct sockaddr_in6 inet_v6; struct sockaddr_un un; } addr; socklen_t len; } swSocketAddress; typedef struct _swSocket { int fd; enum swFd_type fdtype; enum swSocket_type socket_type; int events; uint8_t removed :1; uint8_t nonblock :1; uint8_t direct_send :1; #ifdef SW_USE_OPENSSL uint8_t ssl_send :1; uint8_t ssl_want_read :1; uint8_t ssl_want_write :1; uint8_t ssl_renegotiation :1; uint8_t ssl_handshake_buffer_set :1; #endif uint8_t dontwait :1; uint8_t close_wait :1; uint8_t send_wait :1; uint8_t listen_wait :1; uint8_t tcp_nopush :1; uint8_t tcp_nodelay :1; uint8_t skip_recv :1; uint8_t recv_wait :1; /** * memory buffer size; */ uint32_t buffer_size; void *object; #ifdef SW_USE_OPENSSL SSL *ssl; uint32_t ssl_state; #endif swSocketAddress info; struct _swBuffer *out_buffer; struct _swBuffer *in_buffer; swString *recv_buffer; #ifdef SW_DEBUG size_t total_recv_bytes; size_t total_send_bytes; #endif } swSocket; typedef struct _swConnection { /** * file descript */ int fd; /** * session id */ uint32_t session_id; /** * socket type, SW_SOCK_TCP or SW_SOCK_UDP */ enum swSocket_type socket_type; //-------------------------------------------------------------- /** * is active * system fd must be 0. en: signalfd, listen socket */ uint8_t active; #ifdef SW_USE_OPENSSL uint8_t ssl; uint8_t ssl_ready; #endif //-------------------------------------------------------------- uint8_t overflow; uint8_t high_watermark; //-------------------------------------------------------------- uint8_t http_upgrade; #ifdef SW_USE_HTTP2 uint8_t http2_stream; #endif #ifdef SW_HAVE_ZLIB uint8_t websocket_compression; #endif //-------------------------------------------------------------- /** * server is actively close the connection */ uint8_t close_actively; uint8_t closed; uint8_t close_queued; uint8_t closing; uint8_t close_reset; uint8_t peer_closed; /** * protected connection, cannot be closed by heartbeat thread. */ uint8_t protect; //-------------------------------------------------------------- uint8_t close_notify; uint8_t close_force; //-------------------------------------------------------------- /** * ReactorThread id */ uint16_t reactor_id; /** * close error code */ uint16_t close_errno; /** * from which socket fd */ sw_atomic_t server_fd; /** * socket address */ swSocketAddress info; /** * link any thing, for kernel, do not use with application. */ void *object; /** * socket info */ swSocket *socket; /** * connect time(seconds) */ time_t connect_time; /** * received time with last data */ time_t last_time; #ifdef SW_BUFFER_RECV_TIME /** * received time(microseconds) with last data */ double last_time_usec; #endif /** * bind uid */ uint32_t uid; /** * upgarde websocket */ uint8_t websocket_status; /** * unfinished data frame */ swString *websocket_buffer; #ifdef SW_USE_OPENSSL swString *ssl_client_cert; uint16_t ssl_client_cert_pid; #endif sw_atomic_t lock; } swConnection; typedef struct _swProtocol { /* one package: eof check */ uint8_t split_by_eof; uint8_t package_eof_len; char package_eof[SW_DATA_EOF_MAXLEN + 1]; char package_length_type; uint8_t package_length_size; uint16_t package_length_offset; uint16_t package_body_offset; uint32_t package_max_length; void *private_data; void *private_data_2; uint16_t real_header_length; int (*onPackage)(struct _swProtocol *, swSocket *, char *, uint32_t); ssize_t (*get_package_length)(struct _swProtocol *, swSocket *, char *, uint32_t); uint8_t (*get_package_length_size)(swSocket *); } swProtocol; typedef ssize_t (*swProtocol_length_function)(struct _swProtocol *, swSocket *, char *, uint32_t); //------------------------------String-------------------------------- #define swoole_tolower(c) (uchar) ((c >= 'A' && c <= 'Z') ? (c | 0x20) : c) #define swoole_toupper(c) (uchar) ((c >= 'a' && c <= 'z') ? (c & ~0x20) : c) uint32_t swoole_utf8_decode(uchar **p, size_t n); size_t swoole_utf8_length(uchar *p, size_t n); void swoole_random_string(char *buf, size_t size); static sw_inline char *swoole_strlchr(char *p, char *last, char c) { while (p < last) { if (*p == c) { return p; } p++; } return NULL; } static sw_inline size_t swoole_size_align(size_t size, int pagesize) { return size + (pagesize - (size % pagesize)); } #define SW_STRINGL(s) s->str, s->length #define SW_STRINGS(s) s->str, s->size #define SW_STRINGCVL(s) s->str + s->offset, s->length - s->offset swString *swString_new(size_t size); swString *swString_dup(const char *src_str, size_t length); swString *swString_dup2(swString *src); void swString_print(swString *str); int swString_append(swString *str, swString *append_str); int swString_append_ptr(swString *str, const char *append_str, size_t length); int swString_write(swString *str, off_t offset, swString *write_str); int swString_write_ptr(swString *str, off_t offset, char *write_str, size_t length); int swString_extend(swString *str, size_t new_size); char* swString_alloc(swString *str, size_t __size); static sw_inline void swString_clear(swString *str) { str->length = 0; str->offset = 0; } static sw_inline void swString_free(swString *str) { sw_free(str->str); sw_free(str); } static sw_inline int swString_extend_align(swString *str, size_t _new_size) { size_t align_size = SW_MEM_ALIGNED_SIZE(str->size * 2); while (align_size < _new_size) { align_size *= 2; } return swString_extend(str, align_size); } static sw_inline int swString_grow(swString *str, size_t incr_value) { str->length += incr_value; if (str->length == str->size && swString_extend(str, str->size * 2) < 0) { return SW_ERR; } else { return SW_OK; } } /** * migrate data to head, [offset, length - offset] -> [0, length - offset] */ static sw_inline void swString_pop_front(swString *str, off_t offset) { assert(offset >= 0 && (size_t ) offset <= str->length); if (sw_unlikely(offset == 0)) return; str->length = str->length - offset; str->offset = 0; if (str->length == 0) return; memmove(str->str, str->str + offset, str->length); } static sw_inline void swString_sub(swString *str, off_t start, size_t length) { char *from = str->str + start + (start >= 0 ? 0 : str->length); str->length = length != 0 ? length : str->length - start; str->offset = 0; if (sw_likely(str->length > 0)) { memmove(str->str, from, str->length); } } //------------------------------Base-------------------------------- enum _swEventData_flag { SW_EVENT_DATA_NORMAL, SW_EVENT_DATA_PTR = 1u << 1, SW_EVENT_DATA_CHUNK = 1u << 2, SW_EVENT_DATA_END = 1u << 3, }; typedef struct _swDataHead { int fd; uint32_t len; int16_t reactor_id; uint8_t type; uint8_t flags; uint16_t server_fd; #ifdef SW_BUFFER_RECV_TIME double time; #endif } swDataHead; void swDataHead_dump(const swDataHead *data); typedef struct _swEvent { int fd; int16_t reactor_id; enum swFd_type type; swSocket *socket; } swEvent; typedef struct { swDataHead info; char data[SW_IPC_BUFFER_SIZE]; } swEventData; typedef struct { swDataHead info; char data[0]; } swPipeBuffer; typedef struct _swDgramPacket { int socket_type; swSocketAddress socket_addr; uint32_t length; char data[0]; } swDgramPacket; typedef struct _swSendData { swDataHead info; char *data; } swSendData; typedef struct { off_t offset; size_t length; char filename[0]; } swSendFile_request; typedef void (*swSignalHandler)(int); typedef struct _swReactor swReactor; typedef int (*swReactor_handler)(swReactor *reactor, swEvent *event); //------------------Pipe-------------------- typedef struct _swPipe { void *object; int blocking; double timeout; int (*read)(struct _swPipe *, void *recv, int length); int (*write)(struct _swPipe *, void *send, int length); int (*getFd)(struct _swPipe *, int master); int (*close)(struct _swPipe *); } swPipe; enum swPipe_close_which { SW_PIPE_CLOSE_MASTER = 1, SW_PIPE_CLOSE_WORKER = 2, SW_PIPE_CLOSE_READ = 3, SW_PIPE_CLOSE_WRITE = 4, SW_PIPE_CLOSE_BOTH = 0, }; int swPipeBase_create(swPipe *p, int blocking); int swPipeEventfd_create(swPipe *p, int blocking, int semaphore, int timeout); int swPipeUnsock_create(swPipe *p, int blocking, int protocol); int swPipeUnsock_close_ext(swPipe *p, int which); static inline int swPipeNotify_auto(swPipe *p, int blocking, int semaphore) { #ifdef HAVE_EVENTFD return swPipeEventfd_create(p, blocking, semaphore, 0); #else return swPipeBase_create(p, blocking); #endif } //------------------Queue-------------------- typedef struct _swQueue_Data { long mtype; /* type of received/sent message */ char mdata[sizeof(swEventData)]; /* text of the message */ } swQueue_data; typedef struct _swMsgQueue { int blocking; int msg_id; int flags; int perms; } swMsgQueue; int swMsgQueue_create(swMsgQueue *q, int blocking, key_t msg_key, int perms); void swMsgQueue_set_blocking(swMsgQueue *q, uint8_t blocking); int swMsgQueue_set_capacity(swMsgQueue *q, int queue_bytes); int swMsgQueue_push(swMsgQueue *q, swQueue_data *in, int data_length); int swMsgQueue_pop(swMsgQueue *q, swQueue_data *out, int buffer_length); int swMsgQueue_stat(swMsgQueue *q, int *queue_num, int *queue_bytes); int swMsgQueue_free(swMsgQueue *q); //------------------Lock-------------------------------------- enum SW_LOCKS { SW_RWLOCK = 1, SW_FILELOCK = 2, SW_MUTEX = 3, SW_SEM = 4, SW_SPINLOCK = 5, SW_ATOMLOCK = 6, }; enum swDNSLookup_cache_type { SW_DNS_LOOKUP_RANDOM = (1u << 11), }; typedef struct { const char *hostname; const char *service; int family; int socktype; int protocol; int error; void *result; int count; } swRequest_getaddrinfo; typedef struct _swMutex { pthread_mutex_t _lock; pthread_mutexattr_t attr; } swMutex; typedef struct _swFileLock { struct flock lock_t; int fd; } swFileLock; #ifdef HAVE_RWLOCK typedef struct _swRWLock { pthread_rwlock_t _lock; pthread_rwlockattr_t attr; } swRWLock; #ifdef HAVE_SPINLOCK typedef struct _swSpinLock { pthread_spinlock_t lock_t; } swSpinLock; #endif typedef struct _swAtomicLock { sw_atomic_t lock_t; uint32_t spin; } swAtomicLock; typedef struct _swSem { key_t key; int semid; } swSem; #endif typedef struct _swLock { int type; union { swMutex mutex; #ifdef HAVE_RWLOCK swRWLock rwlock; #endif #ifdef HAVE_SPINLOCK swSpinLock spinlock; #endif swFileLock filelock; swSem sem; swAtomicLock atomlock; } object; int (*lock_rd)(struct _swLock *); int (*lock)(struct _swLock *); int (*unlock)(struct _swLock *); int (*trylock_rd)(struct _swLock *); int (*trylock)(struct _swLock *); int (*free)(struct _swLock *); } swLock; //Thread Condition typedef struct _swCond { swLock _lock; pthread_cond_t _cond; int (*wait)(struct _swCond *object); int (*timewait)(struct _swCond *object, long, long); int (*notify)(struct _swCond *object); int (*broadcast)(struct _swCond *object); void (*free)(struct _swCond *object); int (*lock)(struct _swCond *object); int (*unlock)(struct _swCond *object); } swCond; #define SW_SHM_MMAP_FILE_LEN 64 typedef struct _swShareMemory_mmap { size_t size; char mapfile[SW_SHM_MMAP_FILE_LEN]; int tmpfd; int key; int shmid; void *mem; } swShareMemory; void *swShareMemory_mmap_create(swShareMemory *object, size_t size, char *mapfile); void *swShareMemory_sysv_create(swShareMemory *object, size_t size, int key); int swShareMemory_sysv_free(swShareMemory *object, int rm); int swShareMemory_mmap_free(swShareMemory *object); //-------------------memory manager------------------------- typedef struct _swMemoryPool { void *object; void* (*alloc)(struct _swMemoryPool *pool, uint32_t size); void (*free)(struct _swMemoryPool *pool, void *ptr); void (*destroy)(struct _swMemoryPool *pool); } swMemoryPool; typedef struct _swFixedPool_slice { uint8_t lock; struct _swFixedPool_slice *next; struct _swFixedPool_slice *pre; char data[0]; } swFixedPool_slice; typedef struct _swFixedPool { void *memory; size_t size; swFixedPool_slice *head; swFixedPool_slice *tail; /** * total memory size */ uint32_t slice_num; /** * memory usage */ uint32_t slice_use; /** * Fixed slice size, not include the memory used by swFixedPool_slice */ uint32_t slice_size; /** * use shared memory */ uint8_t shared; } swFixedPool; /** * FixedPool, random alloc/free fixed size memory */ swMemoryPool* swFixedPool_new(uint32_t slice_num, uint32_t slice_size, uint8_t shared); swMemoryPool* swFixedPool_new2(uint32_t slice_size, void *memory, size_t size); swMemoryPool* swMalloc_new(); /** * RingBuffer, In order for malloc / free */ swMemoryPool *swRingBuffer_new(uint32_t size, uint8_t shared); /** * Global memory, the program life cycle only malloc / free one time */ swMemoryPool* swMemoryGlobal_new(uint32_t pagesize, uint8_t shared); void swFixedPool_debug(swMemoryPool *pool); /** * alloc shared memory */ void* sw_shm_malloc(size_t size); void sw_shm_free(void *ptr); void* sw_shm_calloc(size_t num, size_t _size); int sw_shm_protect(void *addr, int flags); void* sw_shm_realloc(void *ptr, size_t new_size); #ifdef HAVE_RWLOCK int swRWLock_create(swLock *lock, int use_in_process); #endif #ifdef SEM_UNDO int swSem_create(swLock *lock, key_t key); #endif int swFileLock_create(swLock *lock, int fd); #ifdef HAVE_SPINLOCK int swSpinLock_create(swLock *object, int spin); #endif int swAtomicLock_create(swLock *object, int spin); int swMutex_create(swLock *lock, int use_in_process); int swMutex_lockwait(swLock *lock, int timeout_msec); int swCond_create(swCond *cond); typedef struct _swThreadParam { void *object; int pti; } swThreadParam; #ifdef __MACH__ char* sw_error_(); #define sw_error sw_error_() #else extern __thread char sw_error[SW_ERROR_MSG_SIZE]; #endif enum swProcess_type { SW_PROCESS_MASTER = 1, SW_PROCESS_WORKER = 2, SW_PROCESS_MANAGER = 3, SW_PROCESS_TASKWORKER = 4, SW_PROCESS_USERWORKER = 5, }; #define swIsMaster() (SwooleG.process_type==SW_PROCESS_MASTER) #define swIsWorker() (SwooleG.process_type==SW_PROCESS_WORKER) #define swIsTaskWorker() (SwooleG.process_type==SW_PROCESS_TASKWORKER) #define swIsManager() (SwooleG.process_type==SW_PROCESS_MANAGER) #define swIsUserWorker() (SwooleG.process_type==SW_PROCESS_USERWORKER) //----------------------Logger--------------------- int swLog_init(char *logfile); void swLog_put(int level, char *content, size_t length); void swLog_reopen(enum swBool_type redirect); void swLog_free(void); //----------------------Tool Function--------------------- uint64_t swoole_hash_key(char *str, int str_len); uint32_t swoole_common_multiple(uint32_t u, uint32_t v); uint32_t swoole_common_divisor(uint32_t u, uint32_t v); extern void swoole_sha1(const char *str, int _len, unsigned char *digest); extern void swoole_sha256(const char *str, int _len, unsigned char *digest); static sw_inline uint16_t swoole_swap_endian16(uint16_t x) { return (((x & 0xff) << 8) | ((x & 0xff00) >> 8)); } static sw_inline uint32_t swoole_swap_endian32(uint32_t x) { return (((x & 0xff) << 24) | ((x & 0xff00) << 8) | ((x & 0xff0000) >> 8) | ((x & 0xff000000) >> 24)); } static sw_inline int32_t swoole_unpack(char type, const void *data) { switch(type) { /*-------------------------16bit-----------------------------*/ case 'c': return *((int8_t *) data); case 'C': return *((uint8_t *) data); /*-------------------------16bit-----------------------------*/ /** * signed short (always 16 bit, machine byte order) */ case 's': return *((int16_t *) data); /** * unsigned short (always 16 bit, machine byte order) */ case 'S': return *((uint16_t *) data); /** * unsigned short (always 16 bit, big endian byte order) */ case 'n': return ntohs(*((uint16_t *) data)); /** * unsigned short (always 32 bit, little endian byte order) */ case 'v': return swoole_swap_endian16(ntohs(*((uint16_t *) data))); /*-------------------------32bit-----------------------------*/ /** * unsigned long (always 32 bit, machine byte order) */ case 'L': return *((uint32_t *) data); /** * signed long (always 32 bit, machine byte order) */ case 'l': return *((int *) data); /** * unsigned long (always 32 bit, big endian byte order) */ case 'N': return ntohl(*((uint32_t *) data)); /** * unsigned short (always 32 bit, little endian byte order) */ case 'V': return swoole_swap_endian32(ntohl(*((uint32_t *) data))); default: return *((uint32_t *) data); } } static inline char* swoole_strnstr(const char *haystack, const char *needle, uint32_t length) { int i; uint32_t needle_length = strlen(needle); assert(needle_length > 0); for (i = 0; i < (int) (length - needle_length + 1); i++) { if ((haystack[0] == needle[0]) && (0 == memcmp(haystack, needle, needle_length))) { return (char *) haystack; } haystack++; } return NULL; } static inline int swoole_strnpos(const char *haystack, uint32_t haystack_length, const char *needle, uint32_t needle_length) { assert(needle_length > 0); uint32_t i; if (sw_likely(needle_length <= haystack_length)) { for (i = 0; i < haystack_length - needle_length + 1; i++) { if ((haystack[0] == needle[0]) && (0 == memcmp(haystack, needle, needle_length))) { return i; } haystack++; } } return -1; } static inline int swoole_strrnpos(const char *haystack, const char *needle, uint32_t length) { uint32_t needle_length = strlen(needle); assert(needle_length > 0); uint32_t i; haystack += (length - needle_length); for (i = length - needle_length; i > 0; i--) { if ((haystack[0] == needle[0]) && (0 == memcmp(haystack, needle, needle_length))) { return i; } haystack--; } return -1; } static inline void swoole_strtolower(char *str, int length) { char *c, *e; c = str; e = c + length; while (c < e) { *c = tolower(*c); c++; } } int swoole_itoa(char *buf, long value); void swoole_dump_ascii(const char *data, size_t size); void swoole_dump_bin(const char *data, char type, size_t size); void swoole_dump_hex(const char *data, size_t outlen); int swoole_type_size(char type); int swoole_mkdir_recursive(const char *dir); char* swoole_dirname(char *file); size_t swoole_sync_writefile(int fd, const void *data, size_t len); size_t swoole_sync_readfile(int fd, void *buf, size_t len); swString* swoole_sync_readfile_eof(int fd); int swoole_rand(int min, int max); int swoole_system_random(int min, int max); long swoole_file_get_size(FILE *fp); int swoole_tmpfile(char *filename); swString* swoole_file_get_contents(const char *filename); int swoole_file_put_contents(const char *filename, const char *content, size_t length); long swoole_file_size(const char *filename); char *swoole_dec2hex(int value, int base); int swoole_version_compare(const char *version1, const char *version2); #ifdef HAVE_EXECINFO void swoole_print_trace(void); #endif int swoole_ioctl_set_block(int sock, int nonblock); int swoole_fcntl_set_option(int sock, int nonblock, int cloexec); int swoole_gethostbyname(int type, const char *name, char *addr); int swoole_getaddrinfo(swRequest_getaddrinfo *req); char* swoole_string_format(size_t n, const char *format, ...); //----------------------core function--------------------- int swSocket_set_timeout(int sock, double timeout); int swSocket_create_server(int type, const char *address, int port, int backlog); //----------------------------------------Socket--------------------------------------- static sw_inline int swSocket_is_dgram(uint8_t type) { return (type == SW_SOCK_UDP || type == SW_SOCK_UDP6 || type == SW_SOCK_UNIX_DGRAM); } static sw_inline int swSocket_is_stream(uint8_t type) { return (type == SW_SOCK_TCP || type == SW_SOCK_TCP6 || type == SW_SOCK_UNIX_STREAM); } void swoole_init(void); void swoole_clean(void); pid_t swoole_fork(int flags); double swoole_microtime(void); void swoole_rtrim(char *str, int len); void swoole_redirect_stdout(int new_fd); int swoole_shell_exec(const char *command, pid_t *pid, uint8_t get_error_stream); int swoole_daemon(int nochdir, int noclose); SW_API int swoole_add_function(const char *name, void* func); SW_API void* swoole_get_function(const char *name, uint32_t length); SW_API int swoole_add_hook(enum swGlobal_hook_type type, swCallback func, int push_back); SW_API void swoole_call_hook(enum swGlobal_hook_type type, void *arg); static sw_inline uint64_t swoole_hton64(uint64_t host) { uint64_t ret = 0; uint32_t high, low; low = host & 0xFFFFFFFF; high = (host >> 32) & 0xFFFFFFFF; low = htonl(low); high = htonl(high); ret = low; ret <<= 32; ret |= high; return ret; } static sw_inline uint64_t swoole_ntoh64(uint64_t net) { uint64_t ret = 0; uint32_t high, low; low = net & 0xFFFFFFFF; high = net >> 32; low = ntohl(low); high = ntohl(high); ret = low; ret <<= 32; ret |= high; return ret; } int swSocket_create(int type); int swSocket_bind(int sock, int type, const char *host, int *port); int swSocket_accept(int fd, swSocketAddress *sa); int swSocket_wait(int fd, int timeout_ms, int events); int swSocket_wait_multi(int *list_of_fd, int n_fd, int timeout_ms, int events); void swSocket_clean(int fd); ssize_t swSocket_sendto_blocking(int fd, const void *buf, size_t n, int flag, struct sockaddr *addr, socklen_t addr_len); int swSocket_set_buffer_size(int fd, uint32_t buffer_size); ssize_t swSocket_udp_sendto(int server_sock, const char *dst_ip, int dst_port, const char *data, uint32_t len); ssize_t swSocket_udp_sendto6(int server_sock, const char *dst_ip, int dst_port, const char *data, uint32_t len); ssize_t swSocket_unix_sendto(int server_sock, const char *dst_path, const char *data, uint32_t len); int swSocket_sendfile_sync(int sock, const char *filename, off_t offset, size_t length, double timeout); int swSocket_write_blocking(int __fd, const void *__data, int __len); int swSocket_recv_blocking(int fd, void *__data, size_t __len, int flags); static sw_inline int swSocket_set_nonblock(int sock) { return swoole_fcntl_set_option(sock, 1, -1); } static sw_inline int swSocket_set_blocking(int sock) { return swoole_fcntl_set_option(sock, 0, -1); } static sw_inline int swoole_waitpid(pid_t __pid, int *__stat_loc, int __options) { int ret; do { ret = waitpid(__pid, __stat_loc, __options); } while (ret < 0 && errno == EINTR); return ret; } static sw_inline int swoole_kill(pid_t __pid, int __sig) { if (__pid <= 0) { return -1; } return kill(__pid, __sig); } #ifdef TCP_CORK #define HAVE_TCP_NOPUSH static sw_inline int swSocket_tcp_nopush(int sock, int nopush) { return setsockopt(sock, IPPROTO_TCP, TCP_CORK, (const void *) &nopush, sizeof(int)); } #else #define swSocket_tcp_nopush(sock, nopush) #endif swSignalHandler swSignal_set(int sig, swSignalHandler func, int restart, int mask); void swSignal_add(int signo, swSignalHandler func); void swSignal_callback(int signo); swSignalHandler swSignal_get_handler(int signo); void swSignal_clear(void); void swSignal_none(void); char* swSignal_str(int sig); #ifdef HAVE_SIGNALFD void swSignalfd_init(); int swSignalfd_setup(swReactor *reactor); #endif typedef struct _swDefer_callback { struct _swDefer_callback *next, *prev; swCallback callback; void *data; } swDefer_callback; struct _swReactor { void *object; void *ptr; //reserve /** * last signal number */ int singal_no; uint32_t event_num; uint32_t max_event_num; uint32_t signal_listener_num; uint32_t check_timer :1; uint32_t running :1; uint32_t start :1; uint32_t once :1; uint32_t wait_exit :1; /** * disable accept new connection */ uint32_t disable_accept :1; /** * callback signal */ uint32_t check_signalfd :1; /** * reactor->wait timeout (millisecond) or -1 */ int32_t timeout_msec; uint16_t id; //Reactor ID uint16_t flag; //flag uint32_t max_socket; swArray *socket_array; #ifdef SW_USE_MALLOC_TRIM time_t last_malloc_trim_time; #endif swReactor_handler read_handler[SW_MAX_FDTYPE]; swReactor_handler write_handler[SW_MAX_FDTYPE]; swReactor_handler error_handler[SW_MAX_FDTYPE]; swReactor_handler default_write_handler; swReactor_handler default_error_handler; struct _swTimer *timer; int (*add)(swReactor *, int fd, int fdtype); int (*set)(swReactor *, int fd, int fdtype); int (*del)(swReactor *, int fd); int (*wait)(swReactor *, struct timeval *); void (*free)(swReactor *); void *defer_tasks; void *destroy_callbacks; swDefer_callback idle_task; swDefer_callback future_task; void (*onTimeout)(swReactor *); void (*onFinish)(swReactor *); void (*onBegin)(swReactor *); void (*enable_accept)(swReactor *); int (*is_empty)(swReactor *); int (*write)(swReactor *, int, const void *, int); int (*close)(swReactor *, int); void (*defer)(swReactor *, swCallback, void *); }; typedef struct _swWorker swWorker; typedef struct _swThread swThread; typedef struct _swProcessPool swProcessPool; struct _swWorker { /** * worker process */ pid_t pid; /** * worker thread */ pthread_t tid; swProcessPool *pool; swMemoryPool *pool_output; swMsgQueue *queue; /** * redirect stdout to pipe_master */ uint8_t redirect_stdout :1; /** * redirect stdin to pipe_worker */ uint8_t redirect_stdin :1; /** * redirect stderr to pipe_worker */ uint8_t redirect_stderr :1; /** * worker status, IDLE or BUSY */ uint8_t status; uint8_t type; uint8_t ipc_mode; uint8_t child_process; /** * tasking num */ sw_atomic_t tasking_num; time_t start_time; long dispatch_count; long request_count; /** * worker id */ uint32_t id; swLock lock; swPipe *pipe_object; int pipe_master; int pipe_worker; int pipe; void *ptr; void *ptr2; }; typedef struct { int socket; int last_connection; char *socket_file; swString *response_buffer; } swStreamInfo; struct _swProcessPool { /** * reloading */ uint8_t reloading; uint8_t reload_init; uint8_t dispatch_mode; uint8_t ipc_mode; uint8_t started; uint32_t reload_worker_i; uint32_t max_wait_time; swWorker *reload_workers; /** * process type */ uint8_t type; /** * worker->id = start_id + i */ uint16_t start_id; /** * use message queue IPC */ uint8_t use_msgqueue; /** * use stream socket IPC */ uint8_t use_socket; char *packet_buffer; uint32_t max_packet_size; /** * message queue key */ key_t msgqueue_key; uint32_t worker_num; uint32_t max_request; uint32_t max_request_grace; int (*onTask)(struct _swProcessPool *pool, swEventData *task); void (*onWorkerStart)(struct _swProcessPool *pool, int worker_id); void (*onMessage)(struct _swProcessPool *pool, char *data, uint32_t length); void (*onWorkerStop)(struct _swProcessPool *pool, int worker_id); int (*main_loop)(struct _swProcessPool *pool, swWorker *worker); int (*onWorkerNotFound)(struct _swProcessPool *pool, pid_t pid, int status); sw_atomic_t round_id; swWorker *workers; swPipe *pipes; swHashMap *map; swReactor *reactor; swMsgQueue *queue; swStreamInfo *stream; void *ptr; void *ptr2; }; //----------------------------------------Reactor--------------------------------------- static sw_inline int swReactor_error(swReactor *reactor) { switch (errno) { case EINTR: return SW_OK; } return SW_ERR; } static sw_inline int swReactor_event_read(int fdtype) { return (fdtype < SW_EVENT_DEAULT) || (fdtype & SW_EVENT_READ); } static sw_inline int swReactor_event_write(int fdtype) { return fdtype & SW_EVENT_WRITE; } static sw_inline int swReactor_event_error(int fdtype) { return fdtype & SW_EVENT_ERROR; } static sw_inline enum swFd_type swReactor_fdtype(int flags) { return (enum swFd_type) (flags & (~SW_EVENT_READ) & (~SW_EVENT_WRITE) & (~SW_EVENT_ERROR) & (~SW_EVENT_ONCE)); } static sw_inline int swReactor_events(int flags) { int events = 0; if (swReactor_event_read(flags)) { events |= SW_EVENT_READ; } if (swReactor_event_write(flags)) { events |= SW_EVENT_WRITE; } if (swReactor_event_error(flags)) { events |= SW_EVENT_ERROR; } if (flags & SW_EVENT_ONCE) { events |= SW_EVENT_ONCE; } return events; } int swReactor_create(swReactor *reactor, int max_event); void swReactor_destroy(swReactor *reactor); void swReactor_add_destroy_callback(swReactor *reactor, swCallback cb, void *data); static inline void swReactor_before_wait(swReactor *reactor) { reactor->running = 1; reactor->start = 1; } #define SW_REACTOR_CONTINUE if (reactor->once) {break;} else {continue;} int swReactor_empty(swReactor *reactor); swSocket* swReactor_get(swReactor *reactor, int fd); static sw_inline int swReactor_isset_handler(swReactor *reactor, int fdtype) { return reactor->read_handler[fdtype] != NULL; } static sw_inline void swReactor_add(swReactor *reactor, int fd, int fdtype) { swSocket *_socket = swReactor_get(reactor, fd); _socket->fd = fd; _socket->fdtype = swReactor_fdtype(fdtype); _socket->events = swReactor_events(fdtype); _socket->removed = 0; reactor->event_num++; } static sw_inline void swReactor_set(swReactor *reactor, int fd, int type) { swSocket *_socket = swReactor_get(reactor, fd); _socket->events = swReactor_events(type); } static sw_inline void swReactor_del(swReactor *reactor, int fd) { swSocket *_socket = swReactor_get(reactor, fd); _socket->events = 0; _socket->removed = 1; reactor->event_num--; } static sw_inline int swReactor_exists(swReactor *reactor, int fd) { swSocket *_socket = swReactor_get(reactor, fd); return !_socket->removed && _socket->events; } static sw_inline int swReactor_get_timeout_msec(swReactor *reactor) { return reactor->defer_tasks ? 0 : reactor->timeout_msec; } int swReactor_onWrite(swReactor *reactor, swEvent *ev); int swReactor_close(swReactor *reactor, int fd); int swReactor_write(swReactor *reactor, int fd, const void *buf, int n); int swReactor_wait_write_buffer(swReactor *reactor, int fd); void swReactor_activate_future_task(swReactor *reactor); static sw_inline int swReactor_add_event(swReactor *reactor, int fd, enum swEvent_type event_type) { swSocket *_socket = swReactor_get(reactor, fd); if (!(_socket->events & event_type)) { return reactor->set(reactor, fd, _socket->fdtype | _socket->events | event_type); } return SW_OK; } static sw_inline int swReactor_del_event(swReactor *reactor, int fd, enum swEvent_type event_type) { swSocket *_socket = swReactor_get(reactor, fd); if (_socket->events & event_type) { return reactor->set(reactor, fd, _socket->fdtype | (_socket->events & (~event_type))); } return SW_OK; } static sw_inline int swReactor_remove_read_event(swReactor *reactor, int fd) { swSocket *_socket = swReactor_get(reactor, fd); if (_socket->events & SW_EVENT_WRITE) { _socket->events &= (~SW_EVENT_READ); return reactor->set(reactor, fd, _socket->fdtype | _socket->events); } else { return reactor->del(reactor, fd); } } static sw_inline int swReactor_remove_write_event(swReactor *reactor, int fd) { swSocket *_socket = swReactor_get(reactor, fd); if (_socket->events & SW_EVENT_READ) { _socket->events &= (~SW_EVENT_WRITE); return reactor->set(reactor, fd, _socket->fdtype | _socket->events); } else { return reactor->del(reactor, fd); } } static sw_inline swReactor_handler swReactor_get_handler(swReactor *reactor, enum swEvent_type event_type, enum swFd_type fdtype) { switch(event_type) { case SW_EVENT_READ: return reactor->read_handler[fdtype]; case SW_EVENT_WRITE: return (reactor->write_handler[fdtype] != NULL) ? reactor->write_handler[fdtype] : reactor->default_write_handler; case SW_EVENT_ERROR: return (reactor->error_handler[fdtype] != NULL) ? reactor->error_handler[fdtype] : reactor->default_error_handler; default: abort(); break; } return NULL; } int swReactor_set_handler(swReactor *, int, swReactor_handler); static sw_inline int swReactor_trigger_close_event(swReactor *reactor, swEvent *event) { return reactor->default_error_handler(reactor, event); } int swReactorEpoll_create(swReactor *reactor, int max_event_num); int swReactorPoll_create(swReactor *reactor, int max_event_num); int swReactorKqueue_create(swReactor *reactor, int max_event_num); int swReactorSelect_create(swReactor *reactor); /*----------------------------Process Pool-------------------------------*/ int swProcessPool_create(swProcessPool *pool, uint32_t worker_num, key_t msgqueue_key, int ipc_mode); int swProcessPool_create_unix_socket(swProcessPool *pool, char *socket_file, int blacklog); int swProcessPool_create_tcp_socket(swProcessPool *pool, char *host, int port, int blacklog); int swProcessPool_set_protocol(swProcessPool *pool, int task_protocol, uint32_t max_packet_size); void swProcessPool_set_max_request(swProcessPool *pool, uint32_t max_request, uint32_t max_request_grace); int swProcessPool_wait(swProcessPool *pool); int swProcessPool_start(swProcessPool *pool); void swProcessPool_shutdown(swProcessPool *pool); pid_t swProcessPool_spawn(swProcessPool *pool, swWorker *worker); int swProcessPool_dispatch(swProcessPool *pool, swEventData *data, int *worker_id); int swProcessPool_response(swProcessPool *pool, char *data, int length); int swProcessPool_dispatch_blocking(swProcessPool *pool, swEventData *data, int *dst_worker_id); int swProcessPool_add_worker(swProcessPool *pool, swWorker *worker); int swProcessPool_del_worker(swProcessPool *pool, swWorker *worker); int swProcessPool_get_max_request(swProcessPool *pool); static sw_inline void swProcessPool_set_start_id(swProcessPool *pool, int start_id) { uint32_t i; pool->start_id = start_id; for (i = 0; i < pool->worker_num; i++) { pool->workers[i].id = pool->start_id + i; } } static sw_inline void swProcessPool_set_type(swProcessPool *pool, int type) { uint32_t i; pool->type = type; for (i = 0; i < pool->worker_num; i++) { pool->workers[i].type = type; } } static sw_inline swWorker* swProcessPool_get_worker(swProcessPool *pool, int worker_id) { return &(pool->workers[worker_id - pool->start_id]); } //-----------------------------Channel--------------------------- enum swChannel_flag { SW_CHAN_LOCK = 1u << 1, SW_CHAN_NOTIFY = 1u << 2, SW_CHAN_SHM = 1u << 3, }; typedef struct _swChannel { off_t head; off_t tail; size_t size; char head_tag; char tail_tag; int num; int max_num; /** * Data length, excluding structure */ size_t bytes; int flag; int maxlen; /** * memory point */ void *mem; swLock lock; swPipe notify_fd; } swChannel; swChannel* swChannel_new(size_t size, int maxlen, int flag); #define swChannel_empty(ch) (ch->num == 0) #define swChannel_full(ch) ((ch->head == ch->tail && ch->tail_tag != ch->head_tag) || (ch->bytes + sizeof(int) * ch->num == ch->size)) int swChannel_pop(swChannel *object, void *out, int buffer_length); int swChannel_push(swChannel *object, void *in, int data_length); int swChannel_out(swChannel *object, void *out, int buffer_length); int swChannel_in(swChannel *object, void *in, int data_length); int swChannel_peek(swChannel *object, void *out, int buffer_length); int swChannel_wait(swChannel *object); int swChannel_notify(swChannel *object); void swChannel_free(swChannel *object); void swChannel_print(swChannel *); /*----------------------------LinkedList-------------------------------*/ swLinkedList* swLinkedList_new(uint8_t type, swDestructor dtor); int swLinkedList_append(swLinkedList *ll, void *data); void swLinkedList_remove_node(swLinkedList *ll, swLinkedList_node *remove_node); int swLinkedList_prepend(swLinkedList *ll, void *data); void* swLinkedList_pop(swLinkedList *ll); void* swLinkedList_shift(swLinkedList *ll); swLinkedList_node* swLinkedList_find(swLinkedList *ll, void *data); void swLinkedList_free(swLinkedList *ll); #define swLinkedList_remove(ll, data) (swLinkedList_remove_node(ll, swLinkedList_find(ll, data))) /*----------------------------Thread Pool-------------------------------*/ enum swThread_type { SW_THREAD_MASTER = 1, SW_THREAD_REACTOR = 2, SW_THREAD_WORKER = 3, SW_THREAD_UDP = 4, SW_THREAD_UNIX_DGRAM = 5, SW_THREAD_HEARTBEAT = 6, }; typedef struct _swThreadPool { swCond cond; swThread *threads; swThreadParam *params; void *ptr1; void *ptr2; #ifdef SW_THREADPOOL_USE_CHANNEL swChannel *chan; #else swRingQueue queue; #endif int thread_num; int shutdown; sw_atomic_t task_num; void (*onStart)(struct _swThreadPool *pool, int id); void (*onStop)(struct _swThreadPool *pool, int id); int (*onTask)(struct _swThreadPool *pool, void *task, int task_len); } swThreadPool; struct _swThread { pthread_t tid; int id; swThreadPool *pool; }; int swThreadPool_dispatch(swThreadPool *pool, void *task, int task_len); int swThreadPool_create(swThreadPool *pool, int max_num); int swThreadPool_run(swThreadPool *pool); int swThreadPool_free(swThreadPool *pool); //--------------------------------protocol------------------------------ ssize_t swProtocol_get_package_length(swProtocol *protocol, swSocket *conn, char *data, uint32_t size); int swProtocol_recv_check_length(swProtocol *protocol, swSocket *conn, swString *buffer); int swProtocol_recv_check_eof(swProtocol *protocol, swSocket *conn, swString *buffer); //--------------------------------timer------------------------------ #define SW_TIMER_MIN_MS 1 #define SW_TIMER_MIN_SEC 0.001 #define SW_TIMER_MAX_MS LONG_MAX #define SW_TIMER_MAX_SEC ((double) (LONG_MAX / 1000)) typedef struct _swTimer swTimer; typedef struct _swTimer_node swTimer_node; typedef void (*swTimerCallback)(swTimer *, swTimer_node *); typedef void (*swTimerDtor)(swTimer_node *); enum swTimer_type { SW_TIMER_TYPE_KERNEL, SW_TIMER_TYPE_PHP, }; struct _swTimer_node { /*----------------properties--------------*/ long id; enum swTimer_type type; int64_t exec_msec; int64_t interval; uint64_t round; uint8_t removed; swHeap_node *heap_node; /*-----------------callback---------------*/ swTimerCallback callback; void *data; /*-----------------destructor-------------*/ swTimerDtor dtor; }; struct _swTimer { /*--------------signal timer--------------*/ swReactor *reactor; swHeap *heap; swHashMap *map; uint32_t num; uint64_t round; long _next_id; long _current_id; long _next_msec; /*---------------event timer--------------*/ struct timeval basetime; /*---------------system timer-------------*/ long lasttime; /*----------------------------------------*/ int (*set)(swTimer *timer, long exec_msec); void (*close)(swTimer *timer); }; int swTimer_init(swTimer *timer, long msec); void swTimer_reinit(swTimer *timer, swReactor *reactor); swTimer_node* swTimer_add(swTimer *timer, long _msec, int interval, void *data, swTimerCallback callback); enum swBool_type swTimer_del(swTimer *timer, swTimer_node *node); void swTimer_free(swTimer *timer); int swTimer_select(swTimer *timer); int swTimer_now(struct timeval *time); static sw_inline swTimer_node* swTimer_get(swTimer *timer, long id) { return (swTimer_node*) swHashMap_find_int(timer->map, id); } static sw_inline swTimer_node* swTimer_get_ex(swTimer *timer, long id, const enum swTimer_type type) { swTimer_node* tnode = swTimer_get(timer, id); return (tnode && tnode->type == type) ? tnode : NULL; } int swSystemTimer_init(swTimer *timer, long msec); void swSystemTimer_signal_handler(int sig); //-------------------------------------------------------------- //Worker process global Variable typedef struct { /** * Always run */ uint8_t run_always; /** * for timer with block io */ uint8_t signal_alarm; /** * Current Proccess Worker's id */ uint32_t id; /** * pipe_worker */ int pipe_used; uint32_t shutdown :1; uint32_t max_request; swString **buffer_input; swString **buffer_output; swWorker *worker; time_t exit_time; } swWorkerGlobal_t; typedef struct { uint16_t id; uint8_t type; uint8_t update_time; swString *buffer_stack; swReactor *reactor; swTimer *timer; uint8_t aio_init; uint8_t aio_schedule; uint32_t aio_task_num; swPipe aio_pipe; int aio_pipe_read; int aio_pipe_write; #ifdef SW_AIO_WRITE_LOCK swLock aio_lock; #endif } swThreadGlobal_t; typedef struct { union { char v4[INET_ADDRSTRLEN]; char v6[INET6_ADDRSTRLEN]; } address; } swDNS_server; typedef struct _swServer swServer; typedef struct _swFactory swFactory; typedef struct { uint8_t init :1; uint8_t running :1; uint8_t enable_coroutine :1; uint8_t use_signalfd :1; uint8_t enable_signalfd :1; uint8_t reuse_port :1; uint8_t socket_dontwait :1; uint8_t dns_lookup_random :1; uint8_t use_async_resolver :1; int error; int process_type; pid_t pid; int signal_fd; int log_fd; int null_fd; /** * worker(worker and task_worker) process chroot / user / group */ char *chroot; char *user; char *group; uint32_t log_level; char *log_file; uint32_t trace_flags; void (*write_log)(int level, char *content, size_t len); void (*fatal_error)(int code, const char *str, ...); //-----------------------[System]-------------------------- uint16_t cpu_num; uint32_t pagesize; struct utsname uname; //-----------------------[Socket]-------------------------- uint32_t max_sockets; /** * tcp socket default buffer size */ uint32_t socket_buffer_size; swArray *socket_array; double socket_send_timeout; swServer *serv; swMemoryPool *memory_pool; swLock lock; char *task_tmpdir; uint16_t task_tmpdir_len; char *dns_server_v4; char *dns_server_v6; double dns_cache_refresh_time; /** * aio-threads */ uint32_t aio_core_worker_num; uint32_t aio_worker_num; double aio_max_wait_time; double aio_max_idle_time; int aio_default_pipe_fd; swHashMap *functions; swLinkedList *hooks[SW_MAX_HOOK_TYPE]; } swGlobal_t; extern swGlobal_t SwooleG; //Local Global Variable extern swWorkerGlobal_t SwooleWG; //Worker Global Variable extern __thread swThreadGlobal_t SwooleTG; //Thread Global Variable #define SW_CPU_NUM (SwooleG.cpu_num) //----------------------------------------------- //OS Feature #if defined(HAVE_KQUEUE) || !defined(HAVE_SENDFILE) int swoole_sendfile(int out_fd, int in_fd, off_t *offset, size_t size); #else #include <sys/sendfile.h> #define swoole_sendfile(out_fd, in_fd, offset, limit) sendfile(out_fd, in_fd, offset, limit) #endif static sw_inline void sw_spinlock(sw_atomic_t *lock) { uint32_t i, n; while (1) { if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1)) { return; } if (SW_CPU_NUM > 1) { for (n = 1; n < SW_SPINLOCK_LOOP_N; n <<= 1) { for (i = 0; i < n; i++) { sw_atomic_cpu_pause(); } if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1)) { return; } } } swYield(); } } static sw_inline int64_t swTimer_get_relative_msec() { struct timeval now; if (!SwooleTG.timer) { return SW_ERR; } if (swTimer_now(&now) < 0) { return SW_ERR; } int64_t msec1 = (now.tv_sec - SwooleTG.timer->basetime.tv_sec) * 1000; int64_t msec2 = (now.tv_usec - SwooleTG.timer->basetime.tv_usec) / 1000; return msec1 + msec2; } static sw_inline int64_t swTimer_get_absolute_msec() { struct timeval now; if (swTimer_now(&now) < 0) { return SW_ERR; } int64_t msec1 = (now.tv_sec) * 1000; int64_t msec2 = (now.tv_usec) / 1000; return msec1 + msec2; } SW_EXTERN_C_END #endif /* SWOOLE_H_ */