/*
* syslogd.c - syslogd implementation for windows
*
* Created by Alexander Yaworsky
*
* THIS SOFTWARE IS NOT COPYRIGHTED
*
* This source code is offered for use in the public domain. You may
* use, modify or distribute it freely.
*
* This code is distributed in the hope that it will be useful but
* WITHOUT ANY WARRANTY. ALL WARRANTIES, EXPRESS OR IMPLIED ARE HEREBY
* DISCLAIMED. This includes but is not limited to warranties of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
*
*/
#include <ctype.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <winsock2.h>
#include <glib.h>
#include <syslog.h>
#include <syslogd.h>
/* internal source data */
static struct fifo *internal_message_queue = NULL;
static HANDLE internal_queue_event = NULL;
static CRITICAL_SECTION internal_queue_cs;
static int internal_source_count = 0;
static struct source** internal_source_references = NULL;
/* cache for source hostnames */
struct hostname
{
struct sockaddr_in addr;
struct string *host;
time_t top_age; /* zero prevents aging */
};
static GList *hostnames = NULL;
#define HOSTNAME_LIFETIME 60 /* seconds */
/* FIXME: is this value correct? maybe we should make it configurable? */
char *str_month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
/******************************************************************************
* create_message
*
* Create a new message with refcount=1.
*/
struct message* create_message( struct source* source,
struct string* sender,
int facility, int priority,
LPSYSTEMTIME timestamp,
struct string* hostname,
struct string* program,
struct string* message )
{
struct message *msg;
gchar *ts;
TRACE_ENTER( "\n" );
msg = g_malloc( sizeof(struct message) );
msg->refcount = 1;
msg->source = source;
msg->sender = string_addref( sender );
msg->facility = facility;
msg->priority = priority;
ts = g_strdup_printf( "%s %2d %02d:%02d:%02d",
str_month[ timestamp->wMonth - 1 ],
timestamp->wDay, timestamp->wHour,
timestamp->wMinute, timestamp->wSecond );
msg->timestamp = string_new( ts );
g_free( ts );
msg->hostname = string_addref( hostname );
msg->hostname_in_locale = string_addref( hostname );
msg->program = string_addref( program );
msg->message = string_addref( message );
msg->separator = string_addref( space );
msg->end_of_line = string_addref( line_feed );
TRACE_LEAVE( "message=%p\n", msg );
return msg;
}
/******************************************************************************
* duplicate_message
*
* Make a copy of message.
*/
struct message* duplicate_message( struct message* msg )
{
struct message *new_msg;
TRACE_ENTER( "message=%p\n", msg );
new_msg = g_malloc( sizeof(struct message) );
new_msg->refcount = 1;
new_msg->source = msg->source;
new_msg->sender = string_addref( msg->sender );
new_msg->facility = msg->facility;
new_msg->priority = msg->priority;
new_msg->timestamp = string_addref( msg->timestamp );
new_msg->hostname = string_addref( msg->hostname );
new_msg->hostname_in_locale = string_addref( msg->hostname_in_locale );
new_msg->program = string_addref( msg->program );
new_msg->message = string_addref( msg->message );
new_msg->separator = string_addref( msg->separator );
new_msg->end_of_line = string_addref( msg->end_of_line );
TRACE_LEAVE( "new message=%p\n", new_msg );
return new_msg;
}
/******************************************************************************
* refrence_message
*
* increment reference count
*/
void reference_message( struct message* msg )
{
TRACE_ENTER( "message=%p\n", msg );
InterlockedIncrement( &msg->refcount );
}
/******************************************************************************
* release_message
*
* decrement reference count and destroy message structure if it becomes zero
*/
void release_message( struct message* msg )
{
TRACE_ENTER( "message=%p\n", msg );
if( InterlockedDecrement( &msg->refcount ) )
{
TRACE_LEAVE( "done; still referenced\n" );
return;
}
string_release( msg->sender );
string_release( msg->timestamp );
string_release( msg->hostname );
string_release( msg->hostname_in_locale );
string_release( msg->program );
string_release( msg->message );
string_release( msg->separator );
string_release( msg->end_of_line );
g_free( msg );
TRACE_LEAVE( "done\n" );
}
/******************************************************************************
* filter_message
*
* return: TRUE - accepted message, FALSE - rejected message
*/
static gboolean filter_message( struct message* msg, struct filter* filter )
{
gboolean ret = FALSE;
TRACE_ENTER( "message=%p, filter=%s\n", msg, filter? filter->name : "NULL" );
if( !filter )
goto passed;
/* check facility */
if( !filter->facilities[ msg->facility ] )
goto done;
/* check priority */
if( !filter->priorities[ msg->priority ] )
goto done;
passed:
/* message is passed through filter */
ret = TRUE;
done:
TRACE_LEAVE( "done; ret=%d\n", ret );
return ret;
}
/******************************************************************************
* try_convert_string
*
* helper function for mux_message;
* try to convert encoding, return reference to the original string in case
* of failure
*/
static struct string* try_convert_string( struct string* s, GIConv cd )
{
struct string *ret;
gchar *converted_str;
gchar *inbuf, *outbuf;
gsize inbytes_left, outbytes, outbytes_left;
gsize converted_str_len, converted_str_allocated;
converted_str_allocated = s->gstr->len + /* at least */ 1;
converted_str = g_malloc( converted_str_allocated );
inbuf = s->gstr->str;
inbytes_left = s->gstr->len;
outbytes = converted_str_allocated;
outbuf = converted_str;
converted_str_len = 0;
if( 0 == inbytes_left )
goto done;
for(;;)
{
size_t r;
outbytes_left = outbytes;
r = g_iconv( cd, &inbuf, &inbytes_left, &outbuf, &outbytes_left );
converted_str_len += outbytes - outbytes_left;
outbytes = outbytes_left;
if( r != -1 )
goto done;
switch( errno )
{
case E2BIG:
/* guess the right output length */
if( s->gstr->len > inbytes_left )
outbytes = inbytes_left *
/* average size of destination char: */
converted_str_len / (s->gstr->len - inbytes_left)
+ 16; /* if the result of above expression too small */
else
/* cannot convert the first character; choose some initial value */
outbytes = 32;
converted_str_allocated += outbytes;
converted_str = g_realloc( converted_str, converted_str_allocated );
outbuf = converted_str + converted_str_len;
break;
case EILSEQ:
inbuf++;
inbytes_left--;
/* FIXME: skip invalid characters? or replace with some fallback? */
break;
case EINVAL:
goto done;
default:
TRACE( "unknown conversion error %d\n", errno );
g_free( converted_str );
return string_addref( s );
}
}
done:
ret = string_new_len( converted_str, converted_str_len );
g_free( converted_str );
return ret;
}
/******************************************************************************
* mux_message
*
* filter and multiplex message to destinations;
* release message
*/
static void mux_message( struct message* msg )
{
GList *item;
TRACE_ENTER( "message=%p\n", msg );
for( item = logpaths; item; item = item->next )
{
struct logpath *logpath = item->data;
gboolean need_dest_iconv, need_locale_iconv;
struct message *converted_msg;
if( logpath->source != msg->source )
continue;
if( !filter_message( msg, logpath->filter ) )
continue;
/* convert encoding if needed */
need_dest_iconv = (logpath->src2dest_cd != (GIConv) -1) && (logpath->ascii2dest_cd != (GIConv) -1);
need_locale_iconv = logpath->src2locale_cd != (GIConv) -1;
if( (!need_dest_iconv) && (!need_locale_iconv) )
{
converted_msg = msg;
reference_message( msg );
}
else
{
converted_msg = duplicate_message( msg );
if( need_dest_iconv )
{
string_release( converted_msg->timestamp );
converted_msg->timestamp = try_convert_string( msg->timestamp, logpath->ascii2dest_cd );
string_release( converted_msg->hostname );
converted_msg->hostname = try_convert_string( msg->hostname, logpath->src2dest_cd );
string_release( converted_msg->message );
converted_msg->message = try_convert_string( msg->message, logpath->src2dest_cd );
string_release( converted_msg->separator );
converted_msg->separator = try_convert_string( msg->separator, logpath->ascii2dest_cd );
string_release( converted_msg->end_of_line );
converted_msg->end_of_line = try_convert_string( msg->end_of_line, logpath->ascii2dest_cd );
}
if( need_locale_iconv )
{
string_release( converted_msg->hostname_in_locale );
converted_msg->hostname_in_locale = try_convert_string( msg->hostname_in_locale,
logpath->src2locale_cd );
string_release( converted_msg->program );
converted_msg->program = try_convert_string( msg->program, logpath->src2locale_cd );
}
}
/* put message to destination */
logpath->destination->put( logpath->destination, converted_msg );
release_message( converted_msg );
}
release_message( msg );
TRACE_LEAVE( "done\n" );
}
/******************************************************************************
* get_hostname
*
* convert addr to string and return it;
* string should be released after use
*/
static struct string* get_hostname( struct sockaddr_in* addr )
{
struct string *ret;
time_t current_time;
GList *item;
struct hostname *new_hostname;
char buf[16];
TRACE_ENTER( "%d.%d.%d.%d\n",
addr->sin_addr.S_un.S_un_b.s_b1, addr->sin_addr.S_un.S_un_b.s_b2,
addr->sin_addr.S_un.S_un_b.s_b3, addr->sin_addr.S_un.S_un_b.s_b4 );
current_time = time(NULL);
/* at first, try to find a cached entry */
item = hostnames;
while( item )
{
struct hostname *h = item->data;
if( h->top_age && h->top_age < current_time )
{
GList *next_item = item->next;
TRACE_2( "delete old entry %s\n", h->host->gstr->str );
string_release( h->host );
g_free( h );
hostnames = g_list_delete_link( hostnames, item );
item = next_item;
continue;
}
if( h->addr.sin_addr.S_un.S_addr == addr->sin_addr.S_un.S_addr )
{
/* found in cache */
ret = string_addref( h->host );
/* move entry to the beginning of the list */
item->data = hostnames->data;
hostnames->data = h;
TRACE_LEAVE( "done; found cached entry: %s\n", ret->gstr->str );
return ret;
}
item = item->next;
}
/* add new entry */
new_hostname = g_malloc( sizeof(struct hostname) );
memcpy( &new_hostname->addr, addr, sizeof(struct sockaddr_in) );
if( use_dns )
{
struct hostent *he = gethostbyaddr( (char*) &addr->sin_addr.S_un.S_addr,
sizeof(addr->sin_addr.S_un.S_addr), AF_INET );
new_hostname->top_age = time(NULL) + HOSTNAME_LIFETIME;
if( !he )
goto use_addr;
new_hostname->host = string_new( he->h_name );
}
else
{
new_hostname->top_age = 0;
use_addr:
sprintf( buf, "%d.%d.%d.%d",
addr->sin_addr.S_un.S_un_b.s_b1, addr->sin_addr.S_un.S_un_b.s_b2,
addr->sin_addr.S_un.S_un_b.s_b3, addr->sin_addr.S_un.S_un_b.s_b4 );
new_hostname->host = string_new( buf );
}
hostnames = g_list_prepend( hostnames, new_hostname );
ret = string_addref( new_hostname->host );
TRACE_LEAVE( "done; ret=%s\n", ret->gstr->str );
return ret;
}
/******************************************************************************
* free_hostnames
*/
static void free_hostnames()
{
GList *item;
for( item = hostnames; item; item = item->next )
{
struct hostname *h = item->data;
string_release( h->host );
g_free( h );
}
g_list_free( hostnames );
hostnames = NULL;
}
/******************************************************************************
* parse_PRI
*
* parse PRI part of message;
* set facility and priority;
* return pointer to the next char after PRI
*/
static gchar* parse_PRI( gchar* msg, int* facility, int* priority )
{
int i, pri;
TRACE_ENTER( "\n" );
if( *msg != '<' )
goto no_pri;
pri = 0;
for( i = 1; i < 5; i++ )
{
if( msg[ i ] == '>' )
break;
if( !isdigit( msg[ i ] ) )
goto no_pri;
pri = (pri * 10) + (msg[ i ] - '0');
}
if( i == 5 )
goto no_pri;
if( i == 1 )
/* FIXME: is this right? or "<>" is an unidentifiable PRI? */
goto no_pri;
msg += i + 1;
if( pri > LOG_NFACILITIES * 8 + 7 )
{
TRACE_2( "Unidentifiable PRI %d\n", pri );
*facility = LOG_FAC( LOG_USER );
*priority = LOG_PRI( LOG_NOTICE );
}
else
{
TRACE_2( "PRI=%d\n", pri );
*facility = LOG_FAC( pri );
*priority = LOG_PRI( pri );
}
TRACE_LEAVE( "done; facility=%d, priority=%d\n", *facility, *priority );
return msg;
no_pri:
*facility = LOG_FAC( LOG_USER );
*priority = LOG_PRI( LOG_NOTICE );
TRACE_LEAVE( "done; message contains no PRI\n" );
return msg;
}
/******************************************************************************
* str2month
*
* return month (1..12) or 0 if error
*/
static int str2month( char* s )
{
register char s1, s2, s3;
s1 = s[0];
if( 'A' <= s[0] && s[0] <= 'Z' )
s1 += 'a' - 'A';
s2 = s[1];
if( 'A' <= s[1] && s[1] <= 'Z' )
s2 += 'a' - 'A';
s3 = s[2];
if( 'A' <= s[2] && s[2] <= 'Z' )
s3 += 'a' - 'A';
if( s1 < 'm' )
{
if( s1 < 'f' )
{
if( s1 == 'a' )
{
if( s2 == 'p' && s3 == 'r' )
return 4; /* Apr */
if( s2 == 'u' && s3 == 'g' )
return 8; /* Aug */
return 0;
}
else if( s1 == 'd' )
{
if( s2 == 'e' && s3 == 'c' )
return 12; /* Dec */
return 0;
}
else
return 0;
}
else if( s1 == 'f' )
{
if( s2 == 'e' && s3 == 'b' )
return 2; /* Feb */
return 0;
}
else if( s1 == 'j' )
{
if( s2 == 'a' && s3 == 'n' )
return 1; /* Jan */
if( s2 != 'u' )
return 0;
if( s3 == 'l' )
return 7; /* Jul */
if( s3 == 'n' )
return 6; /* Jun */
return 0;
}
else
return 0;
}
else if( s1 > 'm' )
{
if( s1 < 'o' )
{
if( s1 == 'n' && s2 == 'o' && s3 == 'v' )
return 11; /* Nov */
return 0;
}
else if( s1 > 'o' )
{
if( s1 == 's' && s2 == 'e' && s3 == 'p' )
return 9; /* Sep */
return 0;
}
else /* s1 == 'o' */
{
if( s2 == 'c' && s3 == 't' )
return 10; /* Oct */
return 0;
}
}
else /* s1 == 'm' */
{
if( s2 != 'a' )
return 0;
if( s3 == 'r' )
return 3; /* Mar */
if( s3 == 'y' )
return 5; /* May */
return 0;
}
}
/******************************************************************************
* parse_timestamp
*
* parse TIMESTAMP part of message;
* return pointer to the next char after TIMESTAMP
*/
static gchar* parse_timestamp( gchar* msg, LPSYSTEMTIME timestamp )
{
TRACE_ENTER( "\n" );
timestamp->wMonth = str2month( msg );
if( 0 == timestamp->wMonth )
goto no_timestamp;
if( msg[3] != ' ' )
goto no_timestamp;
if( msg[4] != ' ' )
{
if( (!isdigit( msg[4] )) || (!isdigit( msg[5] )) )
goto no_timestamp;
timestamp->wDay = (msg[4] - '0') * 10 + (msg[5] - '0');
}
else
{
if( !isdigit( msg[5] ) )
goto no_timestamp;
timestamp->wDay = msg[5] - '0';
}
if( msg[6] != ' ' )
goto no_timestamp;
if( (!isdigit( msg[7] )) || (!isdigit( msg[8] )) || msg[9] != ':' )
goto no_timestamp;
timestamp->wHour = (msg[7] - '0') * 10 + (msg[8] - '0');
if( (!isdigit( msg[10] )) || (!isdigit( msg[11] )) || msg[12] != ':' )
goto no_timestamp;
timestamp->wMinute = (msg[10] - '0') * 10 + (msg[11] - '0');
if( (!isdigit( msg[13] )) || (!isdigit( msg[14] )) || msg[15] != ' ' )
goto no_timestamp;
timestamp->wSecond = (msg[13] - '0') * 10 + (msg[14] - '0');
msg += 16;
goto done;
no_timestamp:
TRACE_2( "no timestamp\n" );
GetLocalTime( timestamp );
done:
TRACE_LEAVE( "done\n" );
return msg;
}
/******************************************************************************
* parse_raw_message
*
* parse raw message and make a new message;
* destroy raw message
*/
static struct message* parse_raw_message( struct raw_message* raw_msg )
{
gchar *current_part, *next_part;
struct string *sender, *hostname, *program, *message;
int facility, priority;
SYSTEMTIME timestamp;
struct message *msg;
TRACE_ENTER( "raw message=%p\n", raw_msg );
/* get sender's hostname */
sender = get_hostname( &raw_msg->sender_addr );
current_part = raw_msg->msg;
next_part = parse_PRI( current_part, &facility, &priority );
current_part = next_part;
next_part = parse_timestamp( current_part, ×tamp );
if( next_part == current_part )
{
/* no valid timestamp */
TRACE_2( "no valid timestamp: msg=%s\n", current_part );
hostname = string_addref( sender );
message = string_new( current_part );
}
else
{
/* have valid timestamp, go ahead */
current_part = next_part;
while( isalnum( *next_part ) || *next_part == '-' || *next_part == '.' )
next_part++;
if( *next_part != ' ' )
{
/* invalid hostname */
hostname = string_addref( sender );
message = string_new( current_part );
TRACE_2( "invalid hostname; set sender (%s); msg=%s\n",
hostname->gstr->str, message->gstr->str );
}
else
{
hostname = string_new_len( current_part, next_part - current_part );
while( *next_part == ' ' && *next_part != 0 )
next_part++;
message = string_new( next_part );
TRACE_2( "hostname=%s; msg=%s\n", hostname->gstr->str, message->gstr->str );
}
}
/* try to find program name */
current_part = message->gstr->str;
next_part = current_part;
while( *next_part != ' ' && *next_part != ':' && *next_part != '[' && *next_part != 0 )
next_part++;
if( *next_part == ' ' || *next_part == 0 )
program = string_new("");
else
program = string_new_len( current_part, next_part - current_part );
/* create message */
msg = create_message( raw_msg->source, sender, facility, priority,
×tamp, hostname, program, message );
string_release( sender );
string_release( hostname );
string_release( program );
string_release( message );
/* destroy raw message */
g_free( raw_msg->msg );
g_free( raw_msg );
TRACE_LEAVE( "done; message=%p\n", msg );
return msg;
}
/******************************************************************************
* number_of_sources
*
* return the number of sources of the specified type
*/
unsigned number_of_sources( enum source_type type )
{
unsigned ret = 0;
GList *item;
for( item = sources; item; item = item->next )
{
struct source *src = item->data;
if( src->type == type )
ret++;
}
return ret;
}
/******************************************************************************
* log_internal
*
* Generate internal log message.
* This function should be called only from the main thread because there's no
* access serialization to the writing end of the queue.
*/
void log_internal( int pri, char* fmt, ... )
{
struct string *sender, *hostname, *program, *msg;
va_list args;
SYSTEMTIME stm;
int i;
struct message *message;
TRACE_ENTER( "\n" );
if( 0 == internal_source_count )
goto done;
sender = string_addref( local_hostname );
hostname = string_addref( local_hostname );
program = string_addref( self_program_name );
va_start( args, fmt );
msg = string_vprintf( fmt, args );
va_end( args );
GetLocalTime( &stm );
message = create_message( internal_source_references[0],
sender,
LOG_FAC( LOG_SYSLOG ), LOG_PRI( pri ),
&stm,
hostname, program, msg );
EnterCriticalSection( &internal_queue_cs );
for( i = 1;; i++ )
{
if( fifo_push( internal_message_queue, message ) )
SetEvent( internal_queue_event );
if( i == internal_source_count )
break;
message = duplicate_message( message );
message->source = internal_source_references[ i ];
}
LeaveCriticalSection( &internal_queue_cs );
done:
TRACE_LEAVE( "done\n" );
}
/******************************************************************************
* shutdown_internal_sources
*
* dispose all data except message queue and event
*/
static void shutdown_internal_sources()
{
TRACE_ENTER( "\n" );
internal_source_count = 0;
if( internal_source_references )
{
g_free( internal_source_references );
internal_source_references = NULL;
}
TRACE_LEAVE( "done\n" );
}
/******************************************************************************
* fini_internal_sources
*/
static void fini_internal_sources()
{
TRACE_ENTER( "\n" );
shutdown_internal_sources();
if( internal_message_queue )
{
fifo_destroy( internal_message_queue );
internal_message_queue = NULL;
}
if( internal_queue_event )
{
DeleteCriticalSection( &internal_queue_cs );
CloseHandle( internal_queue_event );
internal_queue_event = NULL;
}
TRACE_LEAVE( "done\n" );
}
/******************************************************************************
* init_internal_sources
*/
static gboolean init_internal_sources()
{
gboolean ret = FALSE;
GList *item;
int i;
TRACE_ENTER( "\n" );
internal_message_queue = fifo_create();
internal_queue_event = CreateEvent( NULL, TRUE, FALSE, NULL );
if( !internal_queue_event )
{
ERR( "Cannot create event; error %lu\n", GetLastError() );
goto done;
}
InitializeCriticalSection( &internal_queue_cs );
internal_source_count = (int) number_of_sources( ST_INTERNAL );
if( 0 == internal_source_count )
{
ret = TRUE;
goto done;
}
internal_source_references = g_malloc( internal_source_count * sizeof(struct source*) );
for( i = 0, item = sources; item; item = item->next )
{
struct source *src = item->data;
if( ST_INTERNAL == src->type )
internal_source_references[ i++ ] = src;
}
ret = TRUE;
done:
if( !ret )
fini_internal_sources();
TRACE_LEAVE( "done; internal_sources=%d, ret=%d\n", internal_source_count, (int) ret );
return ret;
}
/******************************************************************************
* fini_destinations
*
* for each destination call fini method
*/
static void fini_destinations()
{
GList *dest;
TRACE_ENTER( "\n" );
for( dest = destinations; dest; dest = dest->next )
{
struct destination *d = dest->data;
d->fini( d );
}
TRACE_LEAVE( "done\n" );
}
/******************************************************************************
* main syslogd function
*/
void syslogd_main()
{
TRACE_ENTER( "\n" );
if( !read_configuration() )
goto done;
if( !init_purger() )
goto done;
purge_log_dirs();
if( !init_internal_sources() )
goto done;
if( !init_udp_listener() )
goto done;
log_internal( LOG_NOTICE, "Syslog daemon started" );
/* get messages from queues */
for(;;)
{
HANDLE wait_handles[3] = { udp_queue_event,
internal_queue_event,
service_stop_event };
DWORD t;
struct raw_message *raw_msg;
struct message *msg;
if( !mark_interval )
t = INFINITE;
else
t = mark_interval * 1000;
switch( WaitForMultipleObjects( 3, wait_handles, FALSE, t ) )
{
case WAIT_OBJECT_0:
EnterCriticalSection( &udp_queue_cs );
for(;;)
{
raw_msg = fifo_pop( udp_message_queue );
if( !raw_msg )
break;
TRACE_2( "got raw message %p from UDP listener\n", raw_msg );
mux_message( parse_raw_message( raw_msg ) );
}
ResetEvent( udp_queue_event );
LeaveCriticalSection( &udp_queue_cs );
break;
case WAIT_OBJECT_0 + 1:
EnterCriticalSection( &internal_queue_cs );
for(;;)
{
msg = fifo_pop( internal_message_queue );
if( !msg )
break;
TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name );
mux_message( msg );
}
ResetEvent( internal_queue_event );
LeaveCriticalSection( &internal_queue_cs );
break;
case WAIT_OBJECT_0 + 2:
goto shutdown;
case WAIT_TIMEOUT:
/* issue mark message */
log_internal( LOG_NOTICE, "%s", mark_message );
break;
default:
ERR( "WaitForMultipleObjects() error %lu\n", GetLastError() );
SetEvent( service_stop_event );
goto shutdown;
}
}
shutdown:
log_internal( LOG_NOTICE, "Syslog daemon is shutting down" );
shutdown_udp_listener();
shutdown_internal_sources();
/* flush queues */
for(;;)
{
HANDLE wait_handles[2] = { udp_queue_event,
internal_queue_event };
struct raw_message *raw_msg;
struct message *msg;
switch( WaitForMultipleObjects( 2, wait_handles, FALSE, 0 ) )
{
case WAIT_OBJECT_0:
EnterCriticalSection( &udp_queue_cs );
for(;;)
{
raw_msg = fifo_pop( udp_message_queue );
if( !raw_msg )
break;
TRACE_2( "got raw message %p from UDP listener\n", raw_msg );
mux_message( parse_raw_message( raw_msg ) );
}
ResetEvent( udp_queue_event );
LeaveCriticalSection( &udp_queue_cs );
break;
case WAIT_OBJECT_0 + 1:
EnterCriticalSection( &internal_queue_cs );
for(;;)
{
msg = fifo_pop( internal_message_queue );
if( !msg )
break;
TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name );
mux_message( msg );
}
ResetEvent( internal_queue_event );
LeaveCriticalSection( &internal_queue_cs );
break;
case WAIT_TIMEOUT:
goto done;
default:
ERR( "WaitForMultipleObjects() error %lu\n", GetLastError() );
goto done;
}
}
done:
/* signal to all possibly running threads */
SetEvent( service_stop_event );
fini_udp_listener();
fini_internal_sources();
fini_destinations();
fini_purger();
free_hostnames();
TRACE_LEAVE( "done\n" );
}