diff options
author | yaworsky <yaworsky> | 2005-10-31 13:52:24 +0000 |
---|---|---|
committer | yaworsky <yaworsky> | 2005-10-31 13:52:24 +0000 |
commit | 74b7b6121179f8a82a2f96812c9a33e3f650eaed (patch) | |
tree | a051354cef72555130299fee3f17aa5948b6b3f4 /daemon | |
parent | 10379ee810238a765bf94a4ba48e4fff04fc7b6f (diff) | |
download | syslog-win32-74b7b6121179f8a82a2f96812c9a33e3f650eaed.tar.gz syslog-win32-74b7b6121179f8a82a2f96812c9a33e3f650eaed.tar.xz syslog-win32-74b7b6121179f8a82a2f96812c9a33e3f650eaed.zip |
Refactored listener
Diffstat (limited to 'daemon')
-rw-r--r-- | daemon/Makefile.am | 5 | ||||
-rw-r--r-- | daemon/listener.c | 335 | ||||
-rw-r--r-- | daemon/syslogd.c | 429 | ||||
-rw-r--r-- | daemon/syslogd.h | 29 | ||||
-rw-r--r-- | daemon/udp_listener.c | 341 |
5 files changed, 680 insertions, 459 deletions
diff --git a/daemon/Makefile.am b/daemon/Makefile.am index 06c1ccc..5e2a426 100644 --- a/daemon/Makefile.am +++ b/daemon/Makefile.am @@ -13,8 +13,9 @@ endif AM_CPPFLAGS += -I. -I../include $(GLIB_CFLAGS) bin_PROGRAMS = syslogd -syslogd_SOURCES = conf.c dest_file.c dest_relay.c fifo.c listener.c logrotate.c \ - main.c names.c pathnames.c purger.c syslogd.c syslogd.h +syslogd_SOURCES = conf.c dest_file.c dest_relay.c fifo.c logrotate.c \ + main.c names.c pathnames.c purger.c syslogd.c syslogd.h \ + udp_listener.c syslogd_LDADD = -lws2_32 $(GLIB_LIBS) endif diff --git a/daemon/listener.c b/daemon/listener.c deleted file mode 100644 index 32c6297..0000000 --- a/daemon/listener.c +++ /dev/null @@ -1,335 +0,0 @@ -/* - * listener.c - syslogd implementation for windows, listener for UDP - * and "internal" sources - * - * Created by Alexander Yaworsky - * - * THIS SOFTWARE IS NOT COPYRIGHTED - * - * This source code is offered for use in the public domain. You may - * use, modify or distribute it freely. - * - * This code is distributed in the hope that it will be useful but - * WITHOUT ANY WARRANTY. ALL WARRANTIES, EXPRESS OR IMPLIED ARE HEREBY - * DISCLAIMED. This includes but is not limited to warranties of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * - */ - -#include <stdio.h> -#include <winsock2.h> - -#include <glib.h> - -#include <syslog.h> -#include <syslogd.h> - -static SOCKET *socket_array = NULL; -static int socket_count = 0; - -static struct source **source_references = NULL; - -static HANDLE *event_array = NULL; -static int event_count = 0; - -/* message data */ -static unsigned max_datagram_size = 1024; -static gchar *datagram_buffer = NULL; -static struct raw_message message; - -static CRITICAL_SECTION cs_internal_message; -static HANDLE internal_message_accepted = NULL; -static gchar internal_message_buffer[ 1024 ]; - -/****************************************************************************** - * init_listener - * - * create sockets and synchronization objects including ones for "internal" - * source - */ -gboolean init_listener() -{ - gboolean ret = FALSE; - unsigned n; - GList *item; - int i; - struct source *internal_src; - - TRACE_ENTER( "\n" ); - - /* create critical section and event for the access serialization to internal message buffer - */ - InitializeCriticalSection( &cs_internal_message ); - internal_message_accepted = CreateEvent( NULL, FALSE, FALSE, NULL ); - if( !internal_message_accepted ) - { - ERR( "Cannot create event; error %lu\n", GetLastError() ); - goto done; - } - - /* allocate memory for sockets and events; - * the number of sockets is not greater than number of sources - */ - n = g_list_length( sources ); - socket_array = g_malloc( n * sizeof(SOCKET) ); - - /* number of source references is greater by one because of inclusion the event - * for "internal" source - * FIXME: how about multiple internal sources? - */ - source_references = g_malloc( (n + 1) * sizeof(struct source*) ); - - /* number of events is greater by two because of inclusion the event - * for "internal" source and the service_stop_event - */ - event_array = g_malloc( (n + 2) * sizeof(HANDLE) ); - - /* create sockets */ - for( item = sources; item; item = item->next ) - { - struct source *src = item->data; - SOCKET sock; - unsigned dgram_size; - int size; - - if( src->type == ST_INTERNAL ) - { - internal_src = src; - continue; - } - - if( src->type != ST_UDP ) - continue; - - sock = socket( AF_INET, SOCK_DGRAM, 0 ); - if( INVALID_SOCKET == sock ) - { - ERR( "socket() error %lu\n", WSAGetLastError() ); - goto done; - } - - if( bind( sock, (struct sockaddr*) &src->udp, sizeof(src->udp) ) ) - { - ERR( "bind() error %lu\n", WSAGetLastError() ); - closesocket( sock ); - goto done; - } - - size = sizeof(dgram_size); - if( getsockopt( sock, SOL_SOCKET, SO_MAX_MSG_SIZE, (char*) &dgram_size, &size ) ) - { - ERR( "getsockopt( SO_MAX_MSG_SIZE ) error %lu\n", WSAGetLastError() ); - closesocket( sock ); - goto done; - } - TRACE( "datagram size for %d.%d.%d.%d:%d is %u\n", - src->udp.sin_addr.S_un.S_un_b.s_b1, src->udp.sin_addr.S_un.S_un_b.s_b2, - src->udp.sin_addr.S_un.S_un_b.s_b3, src->udp.sin_addr.S_un.S_un_b.s_b4, - ntohs( src->udp.sin_port ), dgram_size ); - if( dgram_size > max_datagram_size ) - max_datagram_size = dgram_size; - - source_references[ socket_count ] = src; - socket_array[ socket_count++ ] = sock; - } - source_references[ socket_count ] = internal_src; - - /* create events; - * service_stop_event is added to the array - */ - while( event_count <= socket_count ) - { - HANDLE evt = CreateEvent( NULL, FALSE, FALSE, NULL ); - if( !evt ) - { - ERR( "Cannot create event; error %lu\n", GetLastError() ); - goto done; - } - event_array[ event_count++ ] = evt; - } - event_array[ event_count++ ] = service_stop_event; - - /* bind events to sockets */ - for( i = 0; i < socket_count; i++ ) - { - if( WSAEventSelect( socket_array[ i ], event_array[ i ], FD_READ ) ) - { - ERR( "WSAEventSelect() error %lu\n", WSAGetLastError() ); - goto done; - } - } - - /* allocate datagram buffer */ - datagram_buffer = g_malloc( max_datagram_size ); - - ret = TRUE; - -done: - if( !ret ) - fini_listener(); - - TRACE_LEAVE( "done; socket_count=%d, event_count=%d, max_datagram_size=%d, ret=%d\n", - socket_count, event_count, max_datagram_size, (int) ret ); - return ret; -} - -/****************************************************************************** - * fini_listener - */ -void fini_listener() -{ - int i; - - TRACE_ENTER( "\n" ); - - for( i = 0; i < socket_count; i++ ) - closesocket( socket_array[ i ] ); - g_free( socket_array ); - socket_array = NULL; - socket_count = 0; - - g_free( source_references ); - source_references = NULL; - - /* note that the last event is the service_stop_event - * and should not be destroyed - */ - for( i = 0; i < event_count - 1; i++ ) - CloseHandle( event_array[ i ] ); - g_free( event_array ); - event_array = NULL; - event_count = 0; - - g_free( datagram_buffer ); - datagram_buffer = NULL; - - if( internal_message_accepted ) - { - CloseHandle( internal_message_accepted ); - internal_message_accepted = NULL; - } - DeleteCriticalSection( &cs_internal_message ); - TRACE_LEAVE( "done\n" ); -} - -/****************************************************************************** - * listener - * - * wait for a message; generate mark message; - * allocates a new raw_message structure and assigns its pointer to *msg - */ -enum listener_status listener( struct raw_message** msg ) -{ - enum listener_status ret = LSNR_ERROR; - DWORD t, w; - int r; - int addrlen; - - TRACE_ENTER( "\n" ); - - for(;;) - { - if( !mark_interval ) - t = INFINITE; - else - t = mark_interval * 1000; - w = WaitForMultipleObjects( event_count, event_array, FALSE, t ); - if( WAIT_TIMEOUT == w ) - { - /* issue mark message */ - log_internal( LOG_NOTICE, "%s", mark_message ); - continue; - } - if( WAIT_FAILED == w ) - { - ERR( "Wait error %lu\n", GetLastError() ); - goto done; - } - if( w >= event_count ) - { - ERR( "Unknown wait error\n" ); - goto done; - } - if( w == event_count - 1 ) - { - /* shut down */ - ret = LSNR_SHUTDOWN; - goto done; - } - if( w == event_count - 2 ) - { - /* got "internal" message */ - message.source = source_references[ socket_count ]; - if( !message.source ) - { - /* internal source is not defined, cannot handle message */ - SetEvent( internal_message_accepted ); - continue; - } - message.msg = g_strdup( internal_message_buffer ); - SetEvent( internal_message_accepted ); - memset( &message.sender_addr, 0, sizeof(message.sender_addr) ); - goto alloc_msg; - } - /* got UDP message, read it */ - addrlen = sizeof(message.sender_addr); - r = recvfrom( socket_array[ w ], datagram_buffer, max_datagram_size, - 0, (struct sockaddr*) &message.sender_addr, &addrlen ); - if( r < 0 ) - { - ERR( "recvfrom() error %lu\n", WSAGetLastError() ); - goto done; - } - if( !r ) - continue; - - message.msg = g_strndup( datagram_buffer, r ); - message.source = source_references[ w ]; - - alloc_msg: - *msg = g_malloc( sizeof(struct raw_message) ); - memcpy( *msg, &message, sizeof(struct raw_message) ); - - ret = LSNR_GOT_MESSAGE; - goto done; - } - -done: - TRACE_LEAVE( "done; ret=%d\n", (int) ret ); - return ret; -} - -/****************************************************************************** - * log_internal - * - * generate internal log message - */ -void log_internal( int pri, char* fmt, ... ) -{ - va_list args; - SYSTEMTIME stm; - int len; - char *p; - - TRACE_ENTER( "\n" ); - EnterCriticalSection( &cs_internal_message ); - - GetLocalTime( &stm ); - len = sprintf( internal_message_buffer, "<%d>%s %2d %02d:%02d:%02d %s syslog: ", - LOG_SYSLOG | pri, - str_month[ stm.wMonth - 1 ], stm.wDay, stm.wHour, stm.wMinute, stm.wSecond, - local_hostname ); - va_start( args, fmt ); - vsnprintf( internal_message_buffer + len, sizeof(internal_message_buffer) - len, fmt, args ); - va_end( args ); - p = strchr( internal_message_buffer, '\n' ); - if( p ) - *p = 0; - p = strchr( internal_message_buffer, '\r' ); - if( p ) - *p = 0; - - SetEvent( event_array[ event_count - 2 ] ); - LeaveCriticalSection( &cs_internal_message ); - TRACE_LEAVE( "done\n" ); -} diff --git a/daemon/syslogd.c b/daemon/syslogd.c index 4c64a83..af50793 100644 --- a/daemon/syslogd.c +++ b/daemon/syslogd.c @@ -16,7 +16,6 @@ */ #include <ctype.h> -#include <process.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -28,8 +27,11 @@ #include <syslog.h> #include <syslogd.h> -static struct fifo *raw_message_queue = NULL; -static HANDLE fifo_semaphore = NULL; +/* internal source data */ +static struct fifo *internal_message_queue = NULL; +static HANDLE internal_queue_semaphore = NULL; +static int internal_source_count = 0; +static struct source** internal_source_references = NULL; /* cache for source hostnames */ struct hostname @@ -49,6 +51,70 @@ char *str_month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; /****************************************************************************** + * create_message + * + * Create a new message with refcount=1. + * The caller should allocate sender, hostname, program and message strings. + * The function simply copies these pointers to the structure and the caller + * should not free allocated strings. + */ +struct message* create_message( struct source* source, + gchar* sender, + int facility, int priority, + LPSYSTEMTIME timestamp, + gchar* hostname, + gchar* program, + gchar* message ) +{ + struct message *msg; + + TRACE_ENTER( "\n" ); + + msg = g_malloc( sizeof(struct message) ); + msg->refcount = 1; + msg->source = source; + msg->sender = sender; + msg->facility = facility; + msg->priority = priority; + msg->timestamp = g_strdup_printf( "%s %2d %02d:%02d:%02d", + str_month[ timestamp->wMonth - 1 ], + timestamp->wDay, timestamp->wHour, + timestamp->wMinute, timestamp->wSecond ); + msg->hostname = hostname; + msg->program = program; + msg->message = message; + + TRACE_LEAVE( "message=%p\n", msg ); + return msg; +} + +/****************************************************************************** + * duplicate_message + * + * Make a copy of message. + */ +struct message* duplicate_message( struct message* msg ) +{ + struct message *new_msg; + + TRACE_ENTER( "message=%p\n", msg ); + + new_msg = g_malloc( sizeof(struct message) ); + new_msg->refcount = 1; + new_msg->source = msg->source; + new_msg->sender = g_strdup( msg->sender ); + new_msg->facility = msg->facility; + new_msg->priority = msg->priority; + new_msg->timestamp = g_strdup( msg->timestamp ); + new_msg->hostname = g_strdup( msg->hostname ); + new_msg->program = g_strdup( msg->program ); + new_msg->message = g_strdup( msg->message ); + + TRACE_LEAVE( "new message=%p\n", new_msg ); + return new_msg; +} + +/****************************************************************************** * refrence_message * * increment reference count @@ -322,13 +388,11 @@ no_pri: * parse_timestamp * * parse TIMESTAMP part of message; - * allocate string; * return pointer to the next char after TIMESTAMP */ -static gchar* parse_timestamp( gchar* msg, gchar** timestamp ) +static gchar* parse_timestamp( gchar* msg, LPSYSTEMTIME timestamp ) { int i; - SYSTEMTIME stm; TRACE_ENTER( "\n" ); @@ -337,7 +401,7 @@ static gchar* parse_timestamp( gchar* msg, gchar** timestamp ) break; if( i == 12 ) goto no_timestamp; - stm.wMonth = i + 1; + timestamp->wMonth = i + 1; if( msg[3] != ' ' ) goto no_timestamp; @@ -346,13 +410,13 @@ static gchar* parse_timestamp( gchar* msg, gchar** timestamp ) { if( (!isdigit( msg[4] )) || (!isdigit( msg[5] )) ) goto no_timestamp; - stm.wDay = (msg[4] - '0') * 10 + (msg[5] - '0'); + timestamp->wDay = (msg[4] - '0') * 10 + (msg[5] - '0'); } else { if( !isdigit( msg[5] ) ) goto no_timestamp; - stm.wDay = msg[5] - '0'; + timestamp->wDay = msg[5] - '0'; } if( msg[6] != ' ' ) @@ -360,26 +424,23 @@ static gchar* parse_timestamp( gchar* msg, gchar** timestamp ) if( (!isdigit( msg[7] )) || (!isdigit( msg[8] )) || msg[9] != ':' ) goto no_timestamp; - stm.wHour = (msg[7] - '0') * 10 + (msg[8] - '0'); + timestamp->wHour = (msg[7] - '0') * 10 + (msg[8] - '0'); if( (!isdigit( msg[10] )) || (!isdigit( msg[11] )) || msg[12] != ':' ) goto no_timestamp; - stm.wMinute = (msg[10] - '0') * 10 + (msg[11] - '0'); + timestamp->wMinute = (msg[10] - '0') * 10 + (msg[11] - '0'); if( (!isdigit( msg[13] )) || (!isdigit( msg[14] )) || msg[15] != ' ' ) goto no_timestamp; - stm.wSecond = (msg[13] - '0') * 10 + (msg[14] - '0'); + timestamp->wSecond = (msg[13] - '0') * 10 + (msg[14] - '0'); msg += 16; goto done; no_timestamp: TRACE_2( "no timestamp\n" ); - GetLocalTime( &stm ); + GetLocalTime( timestamp ); done: - *timestamp = g_strdup_printf( "%s %2d %02d:%02d:%02d", - str_month[ stm.wMonth - 1 ], - stm.wDay, stm.wHour, stm.wMinute, stm.wSecond ); TRACE_LEAVE( "done\n" ); return msg; } @@ -392,30 +453,28 @@ done: */ static struct message* parse_raw_message( struct raw_message* raw_msg ) { - struct message* msg; gchar *current_part, *next_part; + gchar *sender, *hostname, *program, *message; + int facility, priority; + SYSTEMTIME timestamp; + struct message *msg; TRACE_ENTER( "raw message=%p\n", raw_msg ); - /* allocate and initialize message structure */ - msg = g_malloc( sizeof(struct message) ); - msg->refcount = 1; - msg->source = raw_msg->source; - /* get sender's hostname */ - msg->sender = get_hostname( &raw_msg->sender_addr ); + sender = get_hostname( &raw_msg->sender_addr ); current_part = raw_msg->msg; - next_part = parse_PRI( current_part, &msg->facility, &msg->priority ); + next_part = parse_PRI( current_part, &facility, &priority ); current_part = next_part; - next_part = parse_timestamp( current_part, &msg->timestamp ); + next_part = parse_timestamp( current_part, ×tamp ); if( next_part == current_part ) { /* no valid timestamp */ TRACE_2( "no valid timestamp: msg=%s\n", current_part ); - msg->hostname = g_strdup( msg->sender ); - msg->message = g_strdup( current_part ); + hostname = g_strdup( sender ); + message = g_strdup( current_part ); } else { @@ -426,29 +485,33 @@ static struct message* parse_raw_message( struct raw_message* raw_msg ) if( *next_part != ' ' ) { /* invalid hostname */ - msg->hostname = g_strdup( msg->sender ); - msg->message = g_strdup( current_part ); - TRACE_2( "invalid hostname; set sender (%s); msg=%s\n", msg->hostname, msg->message ); + hostname = g_strdup( sender ); + message = g_strdup( current_part ); + TRACE_2( "invalid hostname; set sender (%s); msg=%s\n", hostname, message ); } else { - msg->hostname = g_strndup( current_part, next_part - current_part ); + hostname = g_strndup( current_part, next_part - current_part ); while( *next_part == ' ' && *next_part != 0 ) next_part++; - msg->message = g_strdup( next_part ); - TRACE_2( "hostname=%s; msg=%s\n", msg->hostname, msg->message ); + message = g_strdup( next_part ); + TRACE_2( "hostname=%s; msg=%s\n", hostname, message ); } } /* try to find program name */ - current_part = msg->message; + current_part = message; next_part = current_part; while( *next_part != ' ' && *next_part != ':' && *next_part != '[' && *next_part != 0 ) next_part++; if( *next_part == ' ' || *next_part == 0 ) - msg->program = g_strdup(""); + program = g_strdup(""); else - msg->program = g_strndup( current_part, next_part - current_part ); + program = g_strndup( current_part, next_part - current_part ); + + /* create message */ + msg = create_message( raw_msg->source, sender, facility, priority, + ×tamp, hostname, program, message ); /* destroy raw message */ g_free( raw_msg->msg ); @@ -459,39 +522,151 @@ static struct message* parse_raw_message( struct raw_message* raw_msg ) } /****************************************************************************** - * message_processor + * number_of_sources * - * main function; extract raw messages from queue and parse them + * return the number of sources of the specified type */ -static unsigned __stdcall message_processor( void* arg ) +unsigned number_of_sources( enum source_type type ) { - HANDLE wait_handles[2] = { fifo_semaphore, service_stop_event }; + unsigned ret = 0; + GList *item; - TRACE_ENTER( "\n" ); - for(;;) + for( item = sources; item; item = item->next ) { - DWORD w; - struct raw_message *raw_msg; + struct source *src = item->data; - w = WaitForMultipleObjects( 2, wait_handles, FALSE, INFINITE ); - if( WAIT_OBJECT_0 + 1 == w ) - /* shutdown */ - break; - if( w != WAIT_OBJECT_0 ) - { - ERR( "WaitForMultipleObjects() error %lu\n", GetLastError() ); - SetEvent( service_stop_event ); + if( src->type == type ) + ret++; + } + return ret; +} + +/****************************************************************************** + * log_internal + * + * Generate internal log message. + * This function should be called only from the main thread because there's no + * access serialization to the writing end of the queue. + */ +void log_internal( int pri, char* fmt, ... ) +{ + gchar *sender, *hostname, *program, *msg; + va_list args; + SYSTEMTIME stm; + int i; + struct message *message; + + TRACE_ENTER( "\n" ); + + if( 0 == internal_source_count ) + goto done; + + sender = g_strdup( local_hostname ); + hostname = g_strdup( local_hostname ); + program = g_strdup( "syslog" ); + + va_start( args, fmt ); + msg = g_strdup_vprintf( fmt, args ); + va_end( args ); + + GetLocalTime( &stm ); + + message = create_message( internal_source_references[0], + sender, + LOG_FAC( LOG_SYSLOG ), LOG_PRI( pri ), + &stm, + hostname, program, msg ); + for( i = 1;; i++ ) + { + fifo_push( internal_message_queue, message ); + ReleaseSemaphore( internal_queue_semaphore, 1, NULL ); + if( i == internal_source_count ) break; - } - /* extract raw message from queue */ - raw_msg = fifo_pop( raw_message_queue ); + message = duplicate_message( message ); + message->source = internal_source_references[ i ]; + } - TRACE_2( "got message %p from queue\n", raw_msg ); +done: + TRACE_LEAVE( "done\n" ); +} + +/****************************************************************************** + * shutdown_internal_sources + * + * dispose all data except message queue and semaphore + */ +static void shutdown_internal_sources() +{ + TRACE_ENTER( "\n" ); + + internal_source_count = 0; + + if( internal_source_references ) + g_free( internal_source_references ); - mux_message( parse_raw_message( raw_msg ) ); - } TRACE_LEAVE( "done\n" ); - return 0; +} + +/****************************************************************************** + * fini_internal_sources + */ +static void fini_internal_sources() +{ + TRACE_ENTER( "\n" ); + + shutdown_internal_sources(); + + if( internal_message_queue ) + fifo_destroy( internal_message_queue ); + + if( internal_queue_semaphore ) + CloseHandle( internal_queue_semaphore ); + + TRACE_LEAVE( "done\n" ); +} + +/****************************************************************************** + * init_internal_sources + */ +static gboolean init_internal_sources() +{ + gboolean ret = FALSE; + GList *item; + int i; + + TRACE_ENTER( "\n" ); + + internal_message_queue = fifo_create(); + internal_queue_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL ); + if( !internal_queue_semaphore ) + { + ERR( "Cannot create semaphore; error %lu\n", GetLastError() ); + goto done; + } + + internal_source_count = (int) number_of_sources( ST_INTERNAL ); + if( 0 == internal_source_count ) + { + ret = TRUE; + goto done; + } + + internal_source_references = g_malloc( internal_source_count * sizeof(struct source*) ); + + for( i = 0, item = sources; item; item = item->next ) + { + struct source *src = item->data; + if( ST_INTERNAL == src->type ) + internal_source_references[ i++ ] = src; + } + ret = TRUE; + +done: + if( !ret ) + fini_internal_sources(); + + TRACE_LEAVE( "done; internal_sources=%d, ret=%d\n", internal_source_count, (int) ret ); + return ret; } /****************************************************************************** @@ -502,34 +677,38 @@ static unsigned __stdcall message_processor( void* arg ) static void fini_destinations() { GList *dest; + + TRACE_ENTER( "\n" ); + for( dest = destinations; dest; dest = dest->next ) - ((struct destination*) dest)->fini( (struct destination*) dest ); + { + struct destination *d = dest->data; + d->fini( d ); + } + + TRACE_LEAVE( "done\n" ); } /****************************************************************************** * main syslogd function - * - * global initialization; invoke listener */ void syslogd_main() { - HANDLE message_processor_thread = NULL; - unsigned tid; - TRACE_ENTER( "\n" ); - SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL ); - if( !read_configuration() ) goto done; if( !init_purger() ) goto done; - if( !init_listener() ) + purge_log_dirs(); + + if( !init_internal_sources() ) goto done; - purge_log_dirs(); + if( !init_udp_listener() ) + goto done; if( source_encoding && destination_encoding ) { @@ -541,47 +720,85 @@ void syslogd_main() } } - /* create message queue and semaphore; - * avoid using Glib's asynchronous queues and threading support - * because synchronization capabilities are very limited; - */ - raw_message_queue = fifo_create(); - fifo_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL ); - if( !fifo_semaphore ) - { - ERR( "Cannot create semaphore; error %lu\n", GetLastError() ); - goto done; - } + log_internal( LOG_NOTICE, "Syslog daemon started" ); - message_processor_thread = (HANDLE) _beginthreadex( NULL, 0, message_processor, NULL, 0, &tid ); - if( !message_processor_thread ) + /* get messages from queues */ + for(;;) { - ERR( "Cannot create thread; error %lu\n", GetLastError() ); - goto done; + HANDLE wait_handles[3] = { udp_queue_semaphore, + internal_queue_semaphore, + service_stop_event }; + DWORD t; + struct raw_message *raw_msg; + struct message *msg; + + if( !mark_interval ) + t = INFINITE; + else + t = mark_interval * 1000; + + switch( WaitForMultipleObjects( 3, wait_handles, FALSE, t ) ) + { + case WAIT_OBJECT_0: + raw_msg = fifo_pop( udp_message_queue ); + TRACE_2( "got raw message %p from UDP listener\n", raw_msg ); + mux_message( parse_raw_message( raw_msg ) ); + break; + + case WAIT_OBJECT_0 + 1: + msg = fifo_pop( internal_message_queue ); + TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name ); + mux_message( msg ); + break; + + case WAIT_OBJECT_0 + 2: + goto shutdown; + + case WAIT_TIMEOUT: + /* issue mark message */ + log_internal( LOG_NOTICE, "%s", mark_message ); + break; + + default: + ERR( "WaitForMultipleObjects() error %lu\n", GetLastError() ); + SetEvent( service_stop_event ); + goto shutdown; + } } - /* get messages from the listener */ +shutdown: + log_internal( LOG_NOTICE, "Syslog daemon is shutting down" ); + shutdown_udp_listener(); + shutdown_internal_sources(); + + /* flush queues */ for(;;) { + HANDLE wait_handles[2] = { udp_queue_semaphore, + internal_queue_semaphore }; struct raw_message *raw_msg; + struct message *msg; - switch( listener( &raw_msg ) ) + switch( WaitForMultipleObjects( 2, wait_handles, FALSE, 0 ) ) { - case LSNR_ERROR: - case LSNR_SHUTDOWN: - goto done; + case WAIT_OBJECT_0: + raw_msg = fifo_pop( udp_message_queue ); + TRACE_2( "got raw message %p from UDP listener\n", raw_msg ); + mux_message( parse_raw_message( raw_msg ) ); + break; - case LSNR_GOT_MESSAGE: - TRACE_2( "got message from %d.%d.%d.%d; source name %s; ptr=%p: %s\n", - raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b1, - raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b2, - raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b3, - raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b4, - raw_msg->source->name, raw_msg, raw_msg->msg ); - /* add raw message to queue */ - fifo_push( raw_message_queue, raw_msg ); - ReleaseSemaphore( fifo_semaphore, 1, NULL ); + case WAIT_OBJECT_0 + 1: + msg = fifo_pop( internal_message_queue ); + TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name ); + mux_message( msg ); break; + + case WAIT_TIMEOUT: + goto done; + + default: + ERR( "WaitForMultipleObjects() error %lu\n", GetLastError() ); + goto done; } } @@ -589,24 +806,14 @@ done: /* signal to all possibly running threads */ SetEvent( service_stop_event ); - if( message_processor_thread ) - { - /* wait for message processor shutdown */ - WaitForSingleObject( message_processor_thread, INFINITE ); - CloseHandle( message_processor_thread ); - } - + fini_udp_listener(); + fini_internal_sources(); fini_destinations(); fini_purger(); free_hostnames(); - if( raw_message_queue ) fifo_destroy( raw_message_queue ); if( conversion_descriptor != (GIConv) -1 ) g_iconv_close( conversion_descriptor ); - fini_listener(); - - if( fifo_semaphore ) CloseHandle( fifo_semaphore ); - TRACE_LEAVE( "done\n" ); } diff --git a/daemon/syslogd.h b/daemon/syslogd.h index 20dfa00..f061f7b 100644 --- a/daemon/syslogd.h +++ b/daemon/syslogd.h @@ -42,6 +42,8 @@ extern char *str_month[]; extern void syslogd_main(); +extern void log_internal( int pri, char* fmt, ... ); + /* options and their default values */ extern gboolean use_dns; extern gchar *source_encoding; @@ -52,13 +54,6 @@ extern int hold; extern gchar *logdir; /* listener */ -enum listener_status -{ - LSNR_ERROR, - LSNR_SHUTDOWN, - LSNR_GOT_MESSAGE -}; - struct raw_message { gchar *msg; @@ -66,10 +61,12 @@ struct raw_message struct source *source; }; -extern gboolean init_listener(); -extern void fini_listener(); -extern enum listener_status listener( struct raw_message** msg ); -extern void log_internal( int pri, char* fmt, ... ); +extern struct fifo *udp_message_queue; +extern HANDLE udp_queue_semaphore; + +extern gboolean init_udp_listener(); +extern void shutdown_udp_listener(); +extern void fini_udp_listener(); /* message */ struct message @@ -85,6 +82,14 @@ struct message gchar *message; }; +extern struct message* create_message( struct source* source, + gchar* sender, + int facility, int priority, + LPSYSTEMTIME timestamp, + gchar* hostname, + gchar* program, + gchar* message ); +extern struct message* duplicate_message( struct message* msg ); extern void reference_message( struct message* msg ); extern void release_message( struct message* msg ); @@ -103,6 +108,8 @@ struct source struct sockaddr_in udp; }; +extern unsigned number_of_sources( enum source_type type ); + enum destination_type { DT_UNDEFINED, diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c new file mode 100644 index 0000000..c039049 --- /dev/null +++ b/daemon/udp_listener.c @@ -0,0 +1,341 @@ +/* + * udp_listener.c - syslogd implementation for windows, UDP listener + * + * Created by Alexander Yaworsky + * + * THIS SOFTWARE IS NOT COPYRIGHTED + * + * This source code is offered for use in the public domain. You may + * use, modify or distribute it freely. + * + * This code is distributed in the hope that it will be useful but + * WITHOUT ANY WARRANTY. ALL WARRANTIES, EXPRESS OR IMPLIED ARE HEREBY + * DISCLAIMED. This includes but is not limited to warranties of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * + */ + +#include <process.h> +#include <stdio.h> +#include <winsock2.h> + +#include <glib.h> + +#include <syslog.h> +#include <syslogd.h> + +/* udp listener data */ +static SOCKET *udp_socket_array = NULL; +static int udp_socket_count = 0; + +static struct source **udp_source_references = NULL; + +static HANDLE *udp_event_array = NULL; +static int udp_event_count = 0; +static HANDLE udp_listener_stop_event = NULL; + +static unsigned max_datagram_size = 1024; +static gchar *datagram_buffer = NULL; + +static HANDLE udp_listener_thread_handle = NULL; + +struct fifo *udp_message_queue = NULL; +HANDLE udp_queue_semaphore = NULL; + +/****************************************************************************** + * udp_listener_thread + */ +static unsigned __stdcall udp_listener_thread( void* arg ) +{ + TRACE_ENTER( "\n" ); + + SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL ); + + for(;;) + { + DWORD w; + int r; + int addrlen; + struct raw_message message; + struct raw_message *msg; + + w = WaitForMultipleObjects( udp_event_count, udp_event_array, FALSE, INFINITE ); + if( WAIT_FAILED == w ) + { + ERR( "Wait error %lu\n", GetLastError() ); + goto done; + } + if( w >= udp_event_count ) + { + ERR( "Unknown wait error\n" ); + goto done; + } + if( w == udp_event_count - 1 ) + { + TRACE_2( "shutdown\n" ); + goto done; + } + /* got UDP message, read it */ + addrlen = sizeof(message.sender_addr); + r = recvfrom( udp_socket_array[ w ], datagram_buffer, max_datagram_size, + 0, (struct sockaddr*) &message.sender_addr, &addrlen ); + if( r < 0 ) + { + ERR( "recvfrom() error %lu\n", WSAGetLastError() ); + goto done; + } + if( !r ) + { + TRACE_2( "empty datagram\n" ); + continue; + } + message.msg = g_strndup( datagram_buffer, r ); + message.source = udp_source_references[ w ]; + + TRACE_2( "got message from %d.%d.%d.%d; source name %s; %s\n", + message.sender_addr.sin_addr.S_un.S_un_b.s_b1, + message.sender_addr.sin_addr.S_un.S_un_b.s_b2, + message.sender_addr.sin_addr.S_un.S_un_b.s_b3, + message.sender_addr.sin_addr.S_un.S_un_b.s_b4, + message.source->name, message.msg ); + + /* allocate raw message and add it to the queue */ + msg = g_malloc( sizeof(struct raw_message) ); + memcpy( msg, &message, sizeof(struct raw_message) ); + fifo_push( udp_message_queue, msg ); + ReleaseSemaphore( udp_queue_semaphore, 1, NULL ); + } + +done: + TRACE_LEAVE( "done\n" ); + return 0; +} + +/****************************************************************************** + * shutdown_udp_listener + * + * stop listening thread and dispose all objects + * except message queue and semaphore + */ +void shutdown_udp_listener() +{ + TRACE_ENTER( "\n" ); + + if( udp_listener_thread_handle ) + { + SetEvent( udp_listener_stop_event ); + TRACE_2( "wait for shutdown of udp listener thread\n" ); + WaitForSingleObject( udp_listener_thread_handle, INFINITE ); + CloseHandle( udp_listener_thread_handle ); + udp_listener_thread_handle = NULL; + } + + if( udp_socket_array ) + { + int i; + + for( i = 0; i < udp_socket_count; i++ ) + closesocket( udp_socket_array[ i ] ); + g_free( udp_socket_array ); + udp_socket_array = NULL; + udp_socket_count = 0; + } + + if( udp_source_references ) + { + g_free( udp_source_references ); + udp_source_references = NULL; + } + + if( udp_event_array ) + { + int i; + + for( i = 0; i < udp_event_count; i++ ) + CloseHandle( udp_event_array[ i ] ); + g_free( udp_event_array ); + udp_event_array = NULL; + udp_event_count = 0; + udp_listener_stop_event = NULL; + } + + if( datagram_buffer ) + { + g_free( datagram_buffer ); + datagram_buffer = NULL; + } + max_datagram_size = 1024; + + TRACE_LEAVE( "done\n" ); +} + +/****************************************************************************** + * fini_udp_listener + */ +void fini_udp_listener() +{ + TRACE_ENTER( "\n" ); + + shutdown_udp_listener(); + + if( udp_message_queue ) + { + fifo_destroy( udp_message_queue ); + udp_message_queue = NULL; + } + + if( udp_queue_semaphore ) + { + CloseHandle( udp_queue_semaphore ); + udp_queue_semaphore = NULL; + } + + TRACE_LEAVE( "done\n" ); +} + +/****************************************************************************** + * init_udp_listener + * + * create sockets, queue and thread + */ +gboolean init_udp_listener() +{ + gboolean ret = FALSE; + unsigned n; + GList *item; + int i; + + TRACE_ENTER( "\n" ); + + /* create message queue and semaphore; + * we should do this first because of possible early returns + * from this function when no sources are defined + */ + udp_message_queue = fifo_create(); + udp_queue_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL ); + if( !udp_queue_semaphore ) + { + ERR( "Cannot create semaphore; error %lu\n", GetLastError() ); + goto done; + } + + n = number_of_sources( ST_UDP ); + if( 0 == n ) + { + ret = TRUE; + goto done; + } + + /* allocate memory for sockets, source references and events; + * the number of sockets is not greater than number of sources + */ + udp_socket_array = g_malloc( n * sizeof(SOCKET) ); + udp_source_references = g_malloc( n * sizeof(struct source*) ); + + /* number of events is greater by one because of + * included udp_listener_stop_event + */ + udp_event_array = g_malloc( (n + 1) * sizeof(HANDLE) ); + + /* create sockets */ + for( item = sources; item; item = item->next ) + { + struct source *src = item->data; + SOCKET sock; + unsigned dgram_size; + int size; + + if( src->type != ST_UDP ) + continue; + + sock = socket( AF_INET, SOCK_DGRAM, 0 ); + if( INVALID_SOCKET == sock ) + { + ERR( "Cannot create source %s: socket() error %lu\n", src->name, WSAGetLastError() ); + continue; + } + + if( bind( sock, (struct sockaddr*) &src->udp, sizeof(src->udp) ) ) + { + ERR( "Cannot create source %s: bind() error %lu\n", src->name, WSAGetLastError() ); + closesocket( sock ); + continue; + } + + size = sizeof(dgram_size); + if( getsockopt( sock, SOL_SOCKET, SO_MAX_MSG_SIZE, (char*) &dgram_size, &size ) ) + { + ERR( "Cannot create source %s: getsockopt( SO_MAX_MSG_SIZE ) error %lu\n", + src->name, WSAGetLastError() ); + closesocket( sock ); + continue; + } + TRACE( "datagram size for %d.%d.%d.%d:%d is %u\n", + src->udp.sin_addr.S_un.S_un_b.s_b1, src->udp.sin_addr.S_un.S_un_b.s_b2, + src->udp.sin_addr.S_un.S_un_b.s_b3, src->udp.sin_addr.S_un.S_un_b.s_b4, + ntohs( src->udp.sin_port ), dgram_size ); + if( dgram_size > max_datagram_size ) + max_datagram_size = dgram_size; + + udp_source_references[ udp_socket_count ] = src; + udp_socket_array[ udp_socket_count++ ] = sock; + } + + if( 0 == udp_socket_count ) + { + ret = TRUE; + goto done; + } + + /* create events */ + while( udp_event_count < udp_socket_count ) + { + HANDLE evt = CreateEvent( NULL, FALSE, FALSE, NULL ); + if( !evt ) + { + ERR( "Cannot initialize source %s: CreateEvent error %lu\n", + udp_source_references[ udp_event_count ]->name, GetLastError() ); + goto done; + } + udp_event_array[ udp_event_count++ ] = evt; + } + udp_listener_stop_event = CreateEvent( NULL, TRUE, FALSE, NULL ); + if( !udp_listener_stop_event ) + { + ERR( "Cannot create event; error %lu\n", GetLastError() ); + goto done; + } + udp_event_array[ udp_event_count++ ] = udp_listener_stop_event; + + /* bind events to sockets */ + for( i = 0; i < udp_socket_count; i++ ) + { + if( WSAEventSelect( udp_socket_array[ i ], udp_event_array[ i ], FD_READ ) ) + { + ERR( "Cannot initialize source %s: WSAEventSelect() error %lu\n", + udp_source_references[ i ]->name, WSAGetLastError() ); + goto done; + } + } + + /* allocate datagram buffer */ + datagram_buffer = g_malloc( max_datagram_size ); + + /* create thread */ + udp_listener_thread_handle = (HANDLE) _beginthreadex( NULL, 0, udp_listener_thread, NULL, 0, &n ); + if( !udp_listener_thread_handle ) + { + ERR( "Cannot create thread; error %lu\n", GetLastError() ); + goto done; + } + + ret = TRUE; + +done: + if( !ret ) + fini_udp_listener(); + + TRACE_LEAVE( "done; udp_socket_count=%d, udp_event_count=%d, max_datagram_size=%d, ret=%d\n", + udp_socket_count, udp_event_count, max_datagram_size, (int) ret ); + return ret; +} |