diff options
-rw-r--r-- | daemon/dest_file.c | 78 | ||||
-rw-r--r-- | daemon/syslogd.c | 89 | ||||
-rw-r--r-- | daemon/syslogd.h | 17 | ||||
-rw-r--r-- | daemon/udp_listener.c | 27 |
4 files changed, 135 insertions, 76 deletions
diff --git a/daemon/dest_file.c b/daemon/dest_file.c index 3925499..ef28062 100644 --- a/daemon/dest_file.c +++ b/daemon/dest_file.c @@ -39,7 +39,8 @@ struct file_writer struct destination *destination; gchar *file_name; struct fifo *message_queue; - HANDLE fifo_semaphore; + CRITICAL_SECTION fifo_cs; + HANDLE fifo_event; /* manual-reset event */ HANDLE shutdown_event; /* manual-reset event */ HANDLE fd; struct message *first_msg, *second_msg, *current_msg; @@ -235,8 +236,9 @@ static void flush_coalescer( struct file_writer* writer ) static void destroy_file_writer( struct file_writer* writer ) { TRACE_ENTER( "%p\n", writer ); - if( writer->fifo_semaphore ) CloseHandle( writer->fifo_semaphore ); + if( writer->fifo_event ) CloseHandle( writer->fifo_event ); if( writer->shutdown_event ) CloseHandle( writer->shutdown_event ); + DeleteCriticalSection( &writer->fifo_cs ); fifo_destroy( writer->message_queue ); g_free( writer->file_name ); g_free( writer ); @@ -258,10 +260,10 @@ static struct file_writer* create_file_writer( gchar* file_name, ret = g_malloc0( sizeof(struct file_writer) ); ret->file_name = g_strdup( file_name ); ret->message_queue = fifo_create(); - ret->fifo_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL ); - if( !ret->fifo_semaphore ) + ret->fifo_event = CreateEvent( NULL, TRUE, FALSE, NULL ); + if( !ret->fifo_event ) { - ERR( "Cannot create semaphore; error %lu\n", GetLastError() ); + ERR( "Cannot create event; error %lu\n", GetLastError() ); goto error; } ret->shutdown_event = CreateEvent( NULL, TRUE, FALSE, NULL ); @@ -279,6 +281,8 @@ static struct file_writer* create_file_writer( gchar* file_name, goto error; } + InitializeCriticalSection( &ret->fifo_cs ); + /* add writer to destination */ extra = destination->extra; extra->file_writers = g_list_append( extra->file_writers, ret ); @@ -318,6 +322,37 @@ static void detach_writer_from_destination( struct file_writer* writer ) } /****************************************************************************** + * pop_messages_from_queue + * + * Helper function for writer_thread_proc. + * Extract messages from queue and write them to file. + */ +static void pop_messages_from_queue( struct file_writer* writer ) +{ + EnterCriticalSection( &writer->fifo_cs ); + for(;;) + { + writer->current_msg = fifo_pop( writer->message_queue ); + if( !writer->current_msg ) + break; + if( coalesce( writer ) ) + { + if( writer->current_msg ) + { + write_message_to_logfile( writer, &writer->current_msg ); + TRACE_2( "%p written current message\n", writer ); + } + } + else + { + flush_coalescer( writer ); + } + } + ResetEvent( writer->fifo_event ); + LeaveCriticalSection( &writer->fifo_cs ); +} + +/****************************************************************************** * writer_thread_proc * * Open file, extract messages from queue and write them to file. @@ -326,7 +361,7 @@ static void detach_writer_from_destination( struct file_writer* writer ) static unsigned __stdcall writer_thread_proc( void* arg ) { struct file_writer *writer = arg; - HANDLE wait_objects[2] = { writer->fifo_semaphore, writer->shutdown_event }; + HANDLE wait_objects[2] = { writer->fifo_event, writer->shutdown_event }; gchar *pathname; TRACE_ENTER( "writer=%p\n", writer ); @@ -370,31 +405,20 @@ static unsigned __stdcall writer_thread_proc( void* arg ) detach writer from destination and continue to write any pending messages */ detach_writer_from_destination( writer ); /* from now no new messages will be put into queue */ - SetEvent( writer->shutdown_event ); + break; } - writer->current_msg = fifo_pop( writer->message_queue ); - if( !writer->current_msg ) - { + if( WAIT_OBJECT_0 + 1 == w ) /* shutdown */ - goto done; - } + break; - if( coalesce( writer ) ) - { - if( writer->current_msg ) - { - write_message_to_logfile( writer, &writer->current_msg ); - TRACE_2( "%p written current message\n", writer ); - } - } - else - { - flush_coalescer( writer ); - } + if( WAIT_OBJECT_0 == w ) + pop_messages_from_queue( writer ); } + done: detach_writer_from_destination( writer ); + pop_messages_from_queue( writer ); flush_coalescer( writer ); if( writer->fd != INVALID_HANDLE_VALUE ) CloseHandle( writer->fd ); destroy_file_writer( writer ); @@ -481,8 +505,10 @@ static void put_message_to_file_dest( struct destination* destination, struct me } /* put message into queue */ reference_message( msg ); - fifo_push( writer->message_queue, msg ); - ReleaseSemaphore( writer->fifo_semaphore, 1, NULL ); + EnterCriticalSection( &writer->fifo_cs ); + if( fifo_push( writer->message_queue, msg ) ) + SetEvent( writer->fifo_event ); + LeaveCriticalSection( &writer->fifo_cs ); done: LeaveCriticalSection( &extra->cs_file_writers ); diff --git a/daemon/syslogd.c b/daemon/syslogd.c index 439152b..dc947b3 100644 --- a/daemon/syslogd.c +++ b/daemon/syslogd.c @@ -29,7 +29,8 @@ /* internal source data */ static struct fifo *internal_message_queue = NULL; -static HANDLE internal_queue_semaphore = NULL; +static HANDLE internal_queue_event = NULL; +static CRITICAL_SECTION internal_queue_cs; static int internal_source_count = 0; static struct source** internal_source_references = NULL; @@ -576,15 +577,17 @@ void log_internal( int pri, char* fmt, ... ) LOG_FAC( LOG_SYSLOG ), LOG_PRI( pri ), &stm, hostname, program, msg ); + EnterCriticalSection( &internal_queue_cs ); for( i = 1;; i++ ) { - fifo_push( internal_message_queue, message ); - ReleaseSemaphore( internal_queue_semaphore, 1, NULL ); + if( fifo_push( internal_message_queue, message ) ) + SetEvent( internal_queue_event ); if( i == internal_source_count ) break; message = duplicate_message( message ); message->source = internal_source_references[ i ]; } + LeaveCriticalSection( &internal_queue_cs ); done: TRACE_LEAVE( "done\n" ); @@ -593,7 +596,7 @@ done: /****************************************************************************** * shutdown_internal_sources * - * dispose all data except message queue and semaphore + * dispose all data except message queue and event */ static void shutdown_internal_sources() { @@ -625,10 +628,11 @@ static void fini_internal_sources() internal_message_queue = NULL; } - if( internal_queue_semaphore ) + if( internal_queue_event ) { - CloseHandle( internal_queue_semaphore ); - internal_queue_semaphore = NULL; + DeleteCriticalSection( &internal_queue_cs ); + CloseHandle( internal_queue_event ); + internal_queue_event = NULL; } TRACE_LEAVE( "done\n" ); @@ -646,12 +650,13 @@ static gboolean init_internal_sources() TRACE_ENTER( "\n" ); internal_message_queue = fifo_create(); - internal_queue_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL ); - if( !internal_queue_semaphore ) + internal_queue_event = CreateEvent( NULL, TRUE, FALSE, NULL ); + if( !internal_queue_event ) { - ERR( "Cannot create semaphore; error %lu\n", GetLastError() ); + ERR( "Cannot create event; error %lu\n", GetLastError() ); goto done; } + InitializeCriticalSection( &internal_queue_cs ); internal_source_count = (int) number_of_sources( ST_INTERNAL ); if( 0 == internal_source_count ) @@ -734,8 +739,8 @@ void syslogd_main() /* get messages from queues */ for(;;) { - HANDLE wait_handles[3] = { udp_queue_semaphore, - internal_queue_semaphore, + HANDLE wait_handles[3] = { udp_queue_event, + internal_queue_event, service_stop_event }; DWORD t; struct raw_message *raw_msg; @@ -749,15 +754,31 @@ void syslogd_main() switch( WaitForMultipleObjects( 3, wait_handles, FALSE, t ) ) { case WAIT_OBJECT_0: - raw_msg = fifo_pop( udp_message_queue ); - TRACE_2( "got raw message %p from UDP listener\n", raw_msg ); - mux_message( parse_raw_message( raw_msg ) ); + EnterCriticalSection( &udp_queue_cs ); + for(;;) + { + raw_msg = fifo_pop( udp_message_queue ); + if( !raw_msg ) + break; + TRACE_2( "got raw message %p from UDP listener\n", raw_msg ); + mux_message( parse_raw_message( raw_msg ) ); + } + ResetEvent( udp_queue_event ); + LeaveCriticalSection( &udp_queue_cs ); break; case WAIT_OBJECT_0 + 1: - msg = fifo_pop( internal_message_queue ); - TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name ); - mux_message( msg ); + EnterCriticalSection( &internal_queue_cs ); + for(;;) + { + msg = fifo_pop( internal_message_queue ); + if( !msg ) + break; + TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name ); + mux_message( msg ); + } + ResetEvent( internal_queue_event ); + LeaveCriticalSection( &internal_queue_cs ); break; case WAIT_OBJECT_0 + 2: @@ -783,23 +804,39 @@ shutdown: /* flush queues */ for(;;) { - HANDLE wait_handles[2] = { udp_queue_semaphore, - internal_queue_semaphore }; + HANDLE wait_handles[2] = { udp_queue_event, + internal_queue_event }; struct raw_message *raw_msg; struct message *msg; switch( WaitForMultipleObjects( 2, wait_handles, FALSE, 0 ) ) { case WAIT_OBJECT_0: - raw_msg = fifo_pop( udp_message_queue ); - TRACE_2( "got raw message %p from UDP listener\n", raw_msg ); - mux_message( parse_raw_message( raw_msg ) ); + EnterCriticalSection( &udp_queue_cs ); + for(;;) + { + raw_msg = fifo_pop( udp_message_queue ); + if( !raw_msg ) + break; + TRACE_2( "got raw message %p from UDP listener\n", raw_msg ); + mux_message( parse_raw_message( raw_msg ) ); + } + ResetEvent( udp_queue_event ); + LeaveCriticalSection( &udp_queue_cs ); break; case WAIT_OBJECT_0 + 1: - msg = fifo_pop( internal_message_queue ); - TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name ); - mux_message( msg ); + EnterCriticalSection( &internal_queue_cs ); + for(;;) + { + msg = fifo_pop( internal_message_queue ); + if( !msg ) + break; + TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name ); + mux_message( msg ); + } + ResetEvent( internal_queue_event ); + LeaveCriticalSection( &internal_queue_cs ); break; case WAIT_TIMEOUT: diff --git a/daemon/syslogd.h b/daemon/syslogd.h index f061f7b..0bcd1fb 100644 --- a/daemon/syslogd.h +++ b/daemon/syslogd.h @@ -62,7 +62,8 @@ struct raw_message }; extern struct fifo *udp_message_queue; -extern HANDLE udp_queue_semaphore; +extern HANDLE udp_queue_event; +extern CRITICAL_SECTION udp_queue_cs; extern gboolean init_udp_listener(); extern void shutdown_udp_listener(); @@ -189,21 +190,11 @@ extern gboolean init_destination_file( struct destination* destination ); extern gboolean init_destination_relay( struct destination* destination ); /* queue */ -struct fifo_item -{ - struct fifo_item *next; /* queue is a single-linked list */ - void *payload; -}; - -struct fifo -{ - struct fifo_item *first; /* first pushed item */ - struct fifo_item *last; /* last pushed item */ -}; +struct fifo; extern struct fifo* fifo_create(); extern void fifo_destroy( struct fifo* queue ); -extern void fifo_push( struct fifo* queue, void* data ); +extern gboolean fifo_push( struct fifo* queue, void* data ); extern void* fifo_pop( struct fifo* queue ); /* logrotate */ diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c index c039049..f9c7c1a 100644 --- a/daemon/udp_listener.c +++ b/daemon/udp_listener.c @@ -40,7 +40,8 @@ static gchar *datagram_buffer = NULL; static HANDLE udp_listener_thread_handle = NULL; struct fifo *udp_message_queue = NULL; -HANDLE udp_queue_semaphore = NULL; +HANDLE udp_queue_event = NULL; +CRITICAL_SECTION udp_queue_cs; /****************************************************************************** * udp_listener_thread @@ -102,8 +103,10 @@ static unsigned __stdcall udp_listener_thread( void* arg ) /* allocate raw message and add it to the queue */ msg = g_malloc( sizeof(struct raw_message) ); memcpy( msg, &message, sizeof(struct raw_message) ); - fifo_push( udp_message_queue, msg ); - ReleaseSemaphore( udp_queue_semaphore, 1, NULL ); + EnterCriticalSection( &udp_queue_cs ); + if( fifo_push( udp_message_queue, msg ) ) + SetEvent( udp_queue_event ); + LeaveCriticalSection( &udp_queue_cs ); } done: @@ -115,7 +118,7 @@ done: * shutdown_udp_listener * * stop listening thread and dispose all objects - * except message queue and semaphore + * except message queue and event */ void shutdown_udp_listener() { @@ -184,10 +187,11 @@ void fini_udp_listener() udp_message_queue = NULL; } - if( udp_queue_semaphore ) + if( udp_queue_event ) { - CloseHandle( udp_queue_semaphore ); - udp_queue_semaphore = NULL; + DeleteCriticalSection( &udp_queue_cs ); + CloseHandle( udp_queue_event ); + udp_queue_event = NULL; } TRACE_LEAVE( "done\n" ); @@ -207,17 +211,18 @@ gboolean init_udp_listener() TRACE_ENTER( "\n" ); - /* create message queue and semaphore; + /* create message queue and event; * we should do this first because of possible early returns * from this function when no sources are defined */ udp_message_queue = fifo_create(); - udp_queue_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL ); - if( !udp_queue_semaphore ) + udp_queue_event = CreateEvent( NULL, TRUE, FALSE, NULL ); + if( !udp_queue_event ) { - ERR( "Cannot create semaphore; error %lu\n", GetLastError() ); + ERR( "Cannot create event; error %lu\n", GetLastError() ); goto done; } + InitializeCriticalSection( &udp_queue_cs ); n = number_of_sources( ST_UDP ); if( 0 == n ) |