From 74b7b6121179f8a82a2f96812c9a33e3f650eaed Mon Sep 17 00:00:00 2001 From: yaworsky Date: Mon, 31 Oct 2005 13:52:24 +0000 Subject: Refactored listener --- daemon/syslogd.c | 429 +++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 318 insertions(+), 111 deletions(-) (limited to 'daemon/syslogd.c') diff --git a/daemon/syslogd.c b/daemon/syslogd.c index 4c64a83..af50793 100644 --- a/daemon/syslogd.c +++ b/daemon/syslogd.c @@ -16,7 +16,6 @@ */ #include -#include #include #include #include @@ -28,8 +27,11 @@ #include #include -static struct fifo *raw_message_queue = NULL; -static HANDLE fifo_semaphore = NULL; +/* internal source data */ +static struct fifo *internal_message_queue = NULL; +static HANDLE internal_queue_semaphore = NULL; +static int internal_source_count = 0; +static struct source** internal_source_references = NULL; /* cache for source hostnames */ struct hostname @@ -48,6 +50,70 @@ static GIConv conversion_descriptor = (GIConv) -1; char *str_month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; +/****************************************************************************** + * create_message + * + * Create a new message with refcount=1. + * The caller should allocate sender, hostname, program and message strings. + * The function simply copies these pointers to the structure and the caller + * should not free allocated strings. + */ +struct message* create_message( struct source* source, + gchar* sender, + int facility, int priority, + LPSYSTEMTIME timestamp, + gchar* hostname, + gchar* program, + gchar* message ) +{ + struct message *msg; + + TRACE_ENTER( "\n" ); + + msg = g_malloc( sizeof(struct message) ); + msg->refcount = 1; + msg->source = source; + msg->sender = sender; + msg->facility = facility; + msg->priority = priority; + msg->timestamp = g_strdup_printf( "%s %2d %02d:%02d:%02d", + str_month[ timestamp->wMonth - 1 ], + timestamp->wDay, timestamp->wHour, + timestamp->wMinute, timestamp->wSecond ); + msg->hostname = hostname; + msg->program = program; + msg->message = message; + + TRACE_LEAVE( "message=%p\n", msg ); + return msg; +} + +/****************************************************************************** + * duplicate_message + * + * Make a copy of message. + */ +struct message* duplicate_message( struct message* msg ) +{ + struct message *new_msg; + + TRACE_ENTER( "message=%p\n", msg ); + + new_msg = g_malloc( sizeof(struct message) ); + new_msg->refcount = 1; + new_msg->source = msg->source; + new_msg->sender = g_strdup( msg->sender ); + new_msg->facility = msg->facility; + new_msg->priority = msg->priority; + new_msg->timestamp = g_strdup( msg->timestamp ); + new_msg->hostname = g_strdup( msg->hostname ); + new_msg->program = g_strdup( msg->program ); + new_msg->message = g_strdup( msg->message ); + + TRACE_LEAVE( "new message=%p\n", new_msg ); + return new_msg; +} + /****************************************************************************** * refrence_message * @@ -322,13 +388,11 @@ no_pri: * parse_timestamp * * parse TIMESTAMP part of message; - * allocate string; * return pointer to the next char after TIMESTAMP */ -static gchar* parse_timestamp( gchar* msg, gchar** timestamp ) +static gchar* parse_timestamp( gchar* msg, LPSYSTEMTIME timestamp ) { int i; - SYSTEMTIME stm; TRACE_ENTER( "\n" ); @@ -337,7 +401,7 @@ static gchar* parse_timestamp( gchar* msg, gchar** timestamp ) break; if( i == 12 ) goto no_timestamp; - stm.wMonth = i + 1; + timestamp->wMonth = i + 1; if( msg[3] != ' ' ) goto no_timestamp; @@ -346,13 +410,13 @@ static gchar* parse_timestamp( gchar* msg, gchar** timestamp ) { if( (!isdigit( msg[4] )) || (!isdigit( msg[5] )) ) goto no_timestamp; - stm.wDay = (msg[4] - '0') * 10 + (msg[5] - '0'); + timestamp->wDay = (msg[4] - '0') * 10 + (msg[5] - '0'); } else { if( !isdigit( msg[5] ) ) goto no_timestamp; - stm.wDay = msg[5] - '0'; + timestamp->wDay = msg[5] - '0'; } if( msg[6] != ' ' ) @@ -360,26 +424,23 @@ static gchar* parse_timestamp( gchar* msg, gchar** timestamp ) if( (!isdigit( msg[7] )) || (!isdigit( msg[8] )) || msg[9] != ':' ) goto no_timestamp; - stm.wHour = (msg[7] - '0') * 10 + (msg[8] - '0'); + timestamp->wHour = (msg[7] - '0') * 10 + (msg[8] - '0'); if( (!isdigit( msg[10] )) || (!isdigit( msg[11] )) || msg[12] != ':' ) goto no_timestamp; - stm.wMinute = (msg[10] - '0') * 10 + (msg[11] - '0'); + timestamp->wMinute = (msg[10] - '0') * 10 + (msg[11] - '0'); if( (!isdigit( msg[13] )) || (!isdigit( msg[14] )) || msg[15] != ' ' ) goto no_timestamp; - stm.wSecond = (msg[13] - '0') * 10 + (msg[14] - '0'); + timestamp->wSecond = (msg[13] - '0') * 10 + (msg[14] - '0'); msg += 16; goto done; no_timestamp: TRACE_2( "no timestamp\n" ); - GetLocalTime( &stm ); + GetLocalTime( timestamp ); done: - *timestamp = g_strdup_printf( "%s %2d %02d:%02d:%02d", - str_month[ stm.wMonth - 1 ], - stm.wDay, stm.wHour, stm.wMinute, stm.wSecond ); TRACE_LEAVE( "done\n" ); return msg; } @@ -392,30 +453,28 @@ done: */ static struct message* parse_raw_message( struct raw_message* raw_msg ) { - struct message* msg; gchar *current_part, *next_part; + gchar *sender, *hostname, *program, *message; + int facility, priority; + SYSTEMTIME timestamp; + struct message *msg; TRACE_ENTER( "raw message=%p\n", raw_msg ); - /* allocate and initialize message structure */ - msg = g_malloc( sizeof(struct message) ); - msg->refcount = 1; - msg->source = raw_msg->source; - /* get sender's hostname */ - msg->sender = get_hostname( &raw_msg->sender_addr ); + sender = get_hostname( &raw_msg->sender_addr ); current_part = raw_msg->msg; - next_part = parse_PRI( current_part, &msg->facility, &msg->priority ); + next_part = parse_PRI( current_part, &facility, &priority ); current_part = next_part; - next_part = parse_timestamp( current_part, &msg->timestamp ); + next_part = parse_timestamp( current_part, ×tamp ); if( next_part == current_part ) { /* no valid timestamp */ TRACE_2( "no valid timestamp: msg=%s\n", current_part ); - msg->hostname = g_strdup( msg->sender ); - msg->message = g_strdup( current_part ); + hostname = g_strdup( sender ); + message = g_strdup( current_part ); } else { @@ -426,29 +485,33 @@ static struct message* parse_raw_message( struct raw_message* raw_msg ) if( *next_part != ' ' ) { /* invalid hostname */ - msg->hostname = g_strdup( msg->sender ); - msg->message = g_strdup( current_part ); - TRACE_2( "invalid hostname; set sender (%s); msg=%s\n", msg->hostname, msg->message ); + hostname = g_strdup( sender ); + message = g_strdup( current_part ); + TRACE_2( "invalid hostname; set sender (%s); msg=%s\n", hostname, message ); } else { - msg->hostname = g_strndup( current_part, next_part - current_part ); + hostname = g_strndup( current_part, next_part - current_part ); while( *next_part == ' ' && *next_part != 0 ) next_part++; - msg->message = g_strdup( next_part ); - TRACE_2( "hostname=%s; msg=%s\n", msg->hostname, msg->message ); + message = g_strdup( next_part ); + TRACE_2( "hostname=%s; msg=%s\n", hostname, message ); } } /* try to find program name */ - current_part = msg->message; + current_part = message; next_part = current_part; while( *next_part != ' ' && *next_part != ':' && *next_part != '[' && *next_part != 0 ) next_part++; if( *next_part == ' ' || *next_part == 0 ) - msg->program = g_strdup(""); + program = g_strdup(""); else - msg->program = g_strndup( current_part, next_part - current_part ); + program = g_strndup( current_part, next_part - current_part ); + + /* create message */ + msg = create_message( raw_msg->source, sender, facility, priority, + ×tamp, hostname, program, message ); /* destroy raw message */ g_free( raw_msg->msg ); @@ -459,39 +522,151 @@ static struct message* parse_raw_message( struct raw_message* raw_msg ) } /****************************************************************************** - * message_processor + * number_of_sources * - * main function; extract raw messages from queue and parse them + * return the number of sources of the specified type */ -static unsigned __stdcall message_processor( void* arg ) +unsigned number_of_sources( enum source_type type ) { - HANDLE wait_handles[2] = { fifo_semaphore, service_stop_event }; + unsigned ret = 0; + GList *item; - TRACE_ENTER( "\n" ); - for(;;) + for( item = sources; item; item = item->next ) { - DWORD w; - struct raw_message *raw_msg; + struct source *src = item->data; - w = WaitForMultipleObjects( 2, wait_handles, FALSE, INFINITE ); - if( WAIT_OBJECT_0 + 1 == w ) - /* shutdown */ - break; - if( w != WAIT_OBJECT_0 ) - { - ERR( "WaitForMultipleObjects() error %lu\n", GetLastError() ); - SetEvent( service_stop_event ); + if( src->type == type ) + ret++; + } + return ret; +} + +/****************************************************************************** + * log_internal + * + * Generate internal log message. + * This function should be called only from the main thread because there's no + * access serialization to the writing end of the queue. + */ +void log_internal( int pri, char* fmt, ... ) +{ + gchar *sender, *hostname, *program, *msg; + va_list args; + SYSTEMTIME stm; + int i; + struct message *message; + + TRACE_ENTER( "\n" ); + + if( 0 == internal_source_count ) + goto done; + + sender = g_strdup( local_hostname ); + hostname = g_strdup( local_hostname ); + program = g_strdup( "syslog" ); + + va_start( args, fmt ); + msg = g_strdup_vprintf( fmt, args ); + va_end( args ); + + GetLocalTime( &stm ); + + message = create_message( internal_source_references[0], + sender, + LOG_FAC( LOG_SYSLOG ), LOG_PRI( pri ), + &stm, + hostname, program, msg ); + for( i = 1;; i++ ) + { + fifo_push( internal_message_queue, message ); + ReleaseSemaphore( internal_queue_semaphore, 1, NULL ); + if( i == internal_source_count ) break; - } - /* extract raw message from queue */ - raw_msg = fifo_pop( raw_message_queue ); + message = duplicate_message( message ); + message->source = internal_source_references[ i ]; + } - TRACE_2( "got message %p from queue\n", raw_msg ); +done: + TRACE_LEAVE( "done\n" ); +} + +/****************************************************************************** + * shutdown_internal_sources + * + * dispose all data except message queue and semaphore + */ +static void shutdown_internal_sources() +{ + TRACE_ENTER( "\n" ); + + internal_source_count = 0; + + if( internal_source_references ) + g_free( internal_source_references ); - mux_message( parse_raw_message( raw_msg ) ); - } TRACE_LEAVE( "done\n" ); - return 0; +} + +/****************************************************************************** + * fini_internal_sources + */ +static void fini_internal_sources() +{ + TRACE_ENTER( "\n" ); + + shutdown_internal_sources(); + + if( internal_message_queue ) + fifo_destroy( internal_message_queue ); + + if( internal_queue_semaphore ) + CloseHandle( internal_queue_semaphore ); + + TRACE_LEAVE( "done\n" ); +} + +/****************************************************************************** + * init_internal_sources + */ +static gboolean init_internal_sources() +{ + gboolean ret = FALSE; + GList *item; + int i; + + TRACE_ENTER( "\n" ); + + internal_message_queue = fifo_create(); + internal_queue_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL ); + if( !internal_queue_semaphore ) + { + ERR( "Cannot create semaphore; error %lu\n", GetLastError() ); + goto done; + } + + internal_source_count = (int) number_of_sources( ST_INTERNAL ); + if( 0 == internal_source_count ) + { + ret = TRUE; + goto done; + } + + internal_source_references = g_malloc( internal_source_count * sizeof(struct source*) ); + + for( i = 0, item = sources; item; item = item->next ) + { + struct source *src = item->data; + if( ST_INTERNAL == src->type ) + internal_source_references[ i++ ] = src; + } + ret = TRUE; + +done: + if( !ret ) + fini_internal_sources(); + + TRACE_LEAVE( "done; internal_sources=%d, ret=%d\n", internal_source_count, (int) ret ); + return ret; } /****************************************************************************** @@ -502,34 +677,38 @@ static unsigned __stdcall message_processor( void* arg ) static void fini_destinations() { GList *dest; + + TRACE_ENTER( "\n" ); + for( dest = destinations; dest; dest = dest->next ) - ((struct destination*) dest)->fini( (struct destination*) dest ); + { + struct destination *d = dest->data; + d->fini( d ); + } + + TRACE_LEAVE( "done\n" ); } /****************************************************************************** * main syslogd function - * - * global initialization; invoke listener */ void syslogd_main() { - HANDLE message_processor_thread = NULL; - unsigned tid; - TRACE_ENTER( "\n" ); - SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL ); - if( !read_configuration() ) goto done; if( !init_purger() ) goto done; - if( !init_listener() ) + purge_log_dirs(); + + if( !init_internal_sources() ) goto done; - purge_log_dirs(); + if( !init_udp_listener() ) + goto done; if( source_encoding && destination_encoding ) { @@ -541,47 +720,85 @@ void syslogd_main() } } - /* create message queue and semaphore; - * avoid using Glib's asynchronous queues and threading support - * because synchronization capabilities are very limited; - */ - raw_message_queue = fifo_create(); - fifo_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL ); - if( !fifo_semaphore ) - { - ERR( "Cannot create semaphore; error %lu\n", GetLastError() ); - goto done; - } + log_internal( LOG_NOTICE, "Syslog daemon started" ); - message_processor_thread = (HANDLE) _beginthreadex( NULL, 0, message_processor, NULL, 0, &tid ); - if( !message_processor_thread ) + /* get messages from queues */ + for(;;) { - ERR( "Cannot create thread; error %lu\n", GetLastError() ); - goto done; + HANDLE wait_handles[3] = { udp_queue_semaphore, + internal_queue_semaphore, + service_stop_event }; + DWORD t; + struct raw_message *raw_msg; + struct message *msg; + + if( !mark_interval ) + t = INFINITE; + else + t = mark_interval * 1000; + + switch( WaitForMultipleObjects( 3, wait_handles, FALSE, t ) ) + { + case WAIT_OBJECT_0: + raw_msg = fifo_pop( udp_message_queue ); + TRACE_2( "got raw message %p from UDP listener\n", raw_msg ); + mux_message( parse_raw_message( raw_msg ) ); + break; + + case WAIT_OBJECT_0 + 1: + msg = fifo_pop( internal_message_queue ); + TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name ); + mux_message( msg ); + break; + + case WAIT_OBJECT_0 + 2: + goto shutdown; + + case WAIT_TIMEOUT: + /* issue mark message */ + log_internal( LOG_NOTICE, "%s", mark_message ); + break; + + default: + ERR( "WaitForMultipleObjects() error %lu\n", GetLastError() ); + SetEvent( service_stop_event ); + goto shutdown; + } } - /* get messages from the listener */ +shutdown: + log_internal( LOG_NOTICE, "Syslog daemon is shutting down" ); + shutdown_udp_listener(); + shutdown_internal_sources(); + + /* flush queues */ for(;;) { + HANDLE wait_handles[2] = { udp_queue_semaphore, + internal_queue_semaphore }; struct raw_message *raw_msg; + struct message *msg; - switch( listener( &raw_msg ) ) + switch( WaitForMultipleObjects( 2, wait_handles, FALSE, 0 ) ) { - case LSNR_ERROR: - case LSNR_SHUTDOWN: - goto done; + case WAIT_OBJECT_0: + raw_msg = fifo_pop( udp_message_queue ); + TRACE_2( "got raw message %p from UDP listener\n", raw_msg ); + mux_message( parse_raw_message( raw_msg ) ); + break; - case LSNR_GOT_MESSAGE: - TRACE_2( "got message from %d.%d.%d.%d; source name %s; ptr=%p: %s\n", - raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b1, - raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b2, - raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b3, - raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b4, - raw_msg->source->name, raw_msg, raw_msg->msg ); - /* add raw message to queue */ - fifo_push( raw_message_queue, raw_msg ); - ReleaseSemaphore( fifo_semaphore, 1, NULL ); + case WAIT_OBJECT_0 + 1: + msg = fifo_pop( internal_message_queue ); + TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name ); + mux_message( msg ); break; + + case WAIT_TIMEOUT: + goto done; + + default: + ERR( "WaitForMultipleObjects() error %lu\n", GetLastError() ); + goto done; } } @@ -589,24 +806,14 @@ done: /* signal to all possibly running threads */ SetEvent( service_stop_event ); - if( message_processor_thread ) - { - /* wait for message processor shutdown */ - WaitForSingleObject( message_processor_thread, INFINITE ); - CloseHandle( message_processor_thread ); - } - + fini_udp_listener(); + fini_internal_sources(); fini_destinations(); fini_purger(); free_hostnames(); - if( raw_message_queue ) fifo_destroy( raw_message_queue ); if( conversion_descriptor != (GIConv) -1 ) g_iconv_close( conversion_descriptor ); - fini_listener(); - - if( fifo_semaphore ) CloseHandle( fifo_semaphore ); - TRACE_LEAVE( "done\n" ); } -- cgit v1.2.3