aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryaworsky <yaworsky>2005-11-10 02:39:55 +0000
committeryaworsky <yaworsky>2005-11-10 02:39:55 +0000
commita7085384027284f3eb55e48b4c0d4a93bea131b4 (patch)
tree81f594b44223eac8dc27001c3291210ca9289a17
parentd53437f45b1e600ca010883b31bf4061601d2850 (diff)
downloadsyslog-win32-a7085384027284f3eb55e48b4c0d4a93bea131b4.tar.gz
syslog-win32-a7085384027284f3eb55e48b4c0d4a93bea131b4.tar.xz
syslog-win32-a7085384027284f3eb55e48b4c0d4a93bea131b4.zip
Improved performance.
-rw-r--r--daemon/dest_file.c78
-rw-r--r--daemon/syslogd.c89
-rw-r--r--daemon/syslogd.h17
-rw-r--r--daemon/udp_listener.c27
4 files changed, 135 insertions, 76 deletions
diff --git a/daemon/dest_file.c b/daemon/dest_file.c
index 3925499..ef28062 100644
--- a/daemon/dest_file.c
+++ b/daemon/dest_file.c
@@ -39,7 +39,8 @@ struct file_writer
struct destination *destination;
gchar *file_name;
struct fifo *message_queue;
- HANDLE fifo_semaphore;
+ CRITICAL_SECTION fifo_cs;
+ HANDLE fifo_event; /* manual-reset event */
HANDLE shutdown_event; /* manual-reset event */
HANDLE fd;
struct message *first_msg, *second_msg, *current_msg;
@@ -235,8 +236,9 @@ static void flush_coalescer( struct file_writer* writer )
static void destroy_file_writer( struct file_writer* writer )
{
TRACE_ENTER( "%p\n", writer );
- if( writer->fifo_semaphore ) CloseHandle( writer->fifo_semaphore );
+ if( writer->fifo_event ) CloseHandle( writer->fifo_event );
if( writer->shutdown_event ) CloseHandle( writer->shutdown_event );
+ DeleteCriticalSection( &writer->fifo_cs );
fifo_destroy( writer->message_queue );
g_free( writer->file_name );
g_free( writer );
@@ -258,10 +260,10 @@ static struct file_writer* create_file_writer( gchar* file_name,
ret = g_malloc0( sizeof(struct file_writer) );
ret->file_name = g_strdup( file_name );
ret->message_queue = fifo_create();
- ret->fifo_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL );
- if( !ret->fifo_semaphore )
+ ret->fifo_event = CreateEvent( NULL, TRUE, FALSE, NULL );
+ if( !ret->fifo_event )
{
- ERR( "Cannot create semaphore; error %lu\n", GetLastError() );
+ ERR( "Cannot create event; error %lu\n", GetLastError() );
goto error;
}
ret->shutdown_event = CreateEvent( NULL, TRUE, FALSE, NULL );
@@ -279,6 +281,8 @@ static struct file_writer* create_file_writer( gchar* file_name,
goto error;
}
+ InitializeCriticalSection( &ret->fifo_cs );
+
/* add writer to destination */
extra = destination->extra;
extra->file_writers = g_list_append( extra->file_writers, ret );
@@ -318,6 +322,37 @@ static void detach_writer_from_destination( struct file_writer* writer )
}
/******************************************************************************
+ * pop_messages_from_queue
+ *
+ * Helper function for writer_thread_proc.
+ * Extract messages from queue and write them to file.
+ */
+static void pop_messages_from_queue( struct file_writer* writer )
+{
+ EnterCriticalSection( &writer->fifo_cs );
+ for(;;)
+ {
+ writer->current_msg = fifo_pop( writer->message_queue );
+ if( !writer->current_msg )
+ break;
+ if( coalesce( writer ) )
+ {
+ if( writer->current_msg )
+ {
+ write_message_to_logfile( writer, &writer->current_msg );
+ TRACE_2( "%p written current message\n", writer );
+ }
+ }
+ else
+ {
+ flush_coalescer( writer );
+ }
+ }
+ ResetEvent( writer->fifo_event );
+ LeaveCriticalSection( &writer->fifo_cs );
+}
+
+/******************************************************************************
* writer_thread_proc
*
* Open file, extract messages from queue and write them to file.
@@ -326,7 +361,7 @@ static void detach_writer_from_destination( struct file_writer* writer )
static unsigned __stdcall writer_thread_proc( void* arg )
{
struct file_writer *writer = arg;
- HANDLE wait_objects[2] = { writer->fifo_semaphore, writer->shutdown_event };
+ HANDLE wait_objects[2] = { writer->fifo_event, writer->shutdown_event };
gchar *pathname;
TRACE_ENTER( "writer=%p\n", writer );
@@ -370,31 +405,20 @@ static unsigned __stdcall writer_thread_proc( void* arg )
detach writer from destination and continue to write any pending messages */
detach_writer_from_destination( writer );
/* from now no new messages will be put into queue */
- SetEvent( writer->shutdown_event );
+ break;
}
- writer->current_msg = fifo_pop( writer->message_queue );
- if( !writer->current_msg )
- {
+ if( WAIT_OBJECT_0 + 1 == w )
/* shutdown */
- goto done;
- }
+ break;
- if( coalesce( writer ) )
- {
- if( writer->current_msg )
- {
- write_message_to_logfile( writer, &writer->current_msg );
- TRACE_2( "%p written current message\n", writer );
- }
- }
- else
- {
- flush_coalescer( writer );
- }
+ if( WAIT_OBJECT_0 == w )
+ pop_messages_from_queue( writer );
}
+
done:
detach_writer_from_destination( writer );
+ pop_messages_from_queue( writer );
flush_coalescer( writer );
if( writer->fd != INVALID_HANDLE_VALUE ) CloseHandle( writer->fd );
destroy_file_writer( writer );
@@ -481,8 +505,10 @@ static void put_message_to_file_dest( struct destination* destination, struct me
}
/* put message into queue */
reference_message( msg );
- fifo_push( writer->message_queue, msg );
- ReleaseSemaphore( writer->fifo_semaphore, 1, NULL );
+ EnterCriticalSection( &writer->fifo_cs );
+ if( fifo_push( writer->message_queue, msg ) )
+ SetEvent( writer->fifo_event );
+ LeaveCriticalSection( &writer->fifo_cs );
done:
LeaveCriticalSection( &extra->cs_file_writers );
diff --git a/daemon/syslogd.c b/daemon/syslogd.c
index 439152b..dc947b3 100644
--- a/daemon/syslogd.c
+++ b/daemon/syslogd.c
@@ -29,7 +29,8 @@
/* internal source data */
static struct fifo *internal_message_queue = NULL;
-static HANDLE internal_queue_semaphore = NULL;
+static HANDLE internal_queue_event = NULL;
+static CRITICAL_SECTION internal_queue_cs;
static int internal_source_count = 0;
static struct source** internal_source_references = NULL;
@@ -576,15 +577,17 @@ void log_internal( int pri, char* fmt, ... )
LOG_FAC( LOG_SYSLOG ), LOG_PRI( pri ),
&stm,
hostname, program, msg );
+ EnterCriticalSection( &internal_queue_cs );
for( i = 1;; i++ )
{
- fifo_push( internal_message_queue, message );
- ReleaseSemaphore( internal_queue_semaphore, 1, NULL );
+ if( fifo_push( internal_message_queue, message ) )
+ SetEvent( internal_queue_event );
if( i == internal_source_count )
break;
message = duplicate_message( message );
message->source = internal_source_references[ i ];
}
+ LeaveCriticalSection( &internal_queue_cs );
done:
TRACE_LEAVE( "done\n" );
@@ -593,7 +596,7 @@ done:
/******************************************************************************
* shutdown_internal_sources
*
- * dispose all data except message queue and semaphore
+ * dispose all data except message queue and event
*/
static void shutdown_internal_sources()
{
@@ -625,10 +628,11 @@ static void fini_internal_sources()
internal_message_queue = NULL;
}
- if( internal_queue_semaphore )
+ if( internal_queue_event )
{
- CloseHandle( internal_queue_semaphore );
- internal_queue_semaphore = NULL;
+ DeleteCriticalSection( &internal_queue_cs );
+ CloseHandle( internal_queue_event );
+ internal_queue_event = NULL;
}
TRACE_LEAVE( "done\n" );
@@ -646,12 +650,13 @@ static gboolean init_internal_sources()
TRACE_ENTER( "\n" );
internal_message_queue = fifo_create();
- internal_queue_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL );
- if( !internal_queue_semaphore )
+ internal_queue_event = CreateEvent( NULL, TRUE, FALSE, NULL );
+ if( !internal_queue_event )
{
- ERR( "Cannot create semaphore; error %lu\n", GetLastError() );
+ ERR( "Cannot create event; error %lu\n", GetLastError() );
goto done;
}
+ InitializeCriticalSection( &internal_queue_cs );
internal_source_count = (int) number_of_sources( ST_INTERNAL );
if( 0 == internal_source_count )
@@ -734,8 +739,8 @@ void syslogd_main()
/* get messages from queues */
for(;;)
{
- HANDLE wait_handles[3] = { udp_queue_semaphore,
- internal_queue_semaphore,
+ HANDLE wait_handles[3] = { udp_queue_event,
+ internal_queue_event,
service_stop_event };
DWORD t;
struct raw_message *raw_msg;
@@ -749,15 +754,31 @@ void syslogd_main()
switch( WaitForMultipleObjects( 3, wait_handles, FALSE, t ) )
{
case WAIT_OBJECT_0:
- raw_msg = fifo_pop( udp_message_queue );
- TRACE_2( "got raw message %p from UDP listener\n", raw_msg );
- mux_message( parse_raw_message( raw_msg ) );
+ EnterCriticalSection( &udp_queue_cs );
+ for(;;)
+ {
+ raw_msg = fifo_pop( udp_message_queue );
+ if( !raw_msg )
+ break;
+ TRACE_2( "got raw message %p from UDP listener\n", raw_msg );
+ mux_message( parse_raw_message( raw_msg ) );
+ }
+ ResetEvent( udp_queue_event );
+ LeaveCriticalSection( &udp_queue_cs );
break;
case WAIT_OBJECT_0 + 1:
- msg = fifo_pop( internal_message_queue );
- TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name );
- mux_message( msg );
+ EnterCriticalSection( &internal_queue_cs );
+ for(;;)
+ {
+ msg = fifo_pop( internal_message_queue );
+ if( !msg )
+ break;
+ TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name );
+ mux_message( msg );
+ }
+ ResetEvent( internal_queue_event );
+ LeaveCriticalSection( &internal_queue_cs );
break;
case WAIT_OBJECT_0 + 2:
@@ -783,23 +804,39 @@ shutdown:
/* flush queues */
for(;;)
{
- HANDLE wait_handles[2] = { udp_queue_semaphore,
- internal_queue_semaphore };
+ HANDLE wait_handles[2] = { udp_queue_event,
+ internal_queue_event };
struct raw_message *raw_msg;
struct message *msg;
switch( WaitForMultipleObjects( 2, wait_handles, FALSE, 0 ) )
{
case WAIT_OBJECT_0:
- raw_msg = fifo_pop( udp_message_queue );
- TRACE_2( "got raw message %p from UDP listener\n", raw_msg );
- mux_message( parse_raw_message( raw_msg ) );
+ EnterCriticalSection( &udp_queue_cs );
+ for(;;)
+ {
+ raw_msg = fifo_pop( udp_message_queue );
+ if( !raw_msg )
+ break;
+ TRACE_2( "got raw message %p from UDP listener\n", raw_msg );
+ mux_message( parse_raw_message( raw_msg ) );
+ }
+ ResetEvent( udp_queue_event );
+ LeaveCriticalSection( &udp_queue_cs );
break;
case WAIT_OBJECT_0 + 1:
- msg = fifo_pop( internal_message_queue );
- TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name );
- mux_message( msg );
+ EnterCriticalSection( &internal_queue_cs );
+ for(;;)
+ {
+ msg = fifo_pop( internal_message_queue );
+ if( !msg )
+ break;
+ TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name );
+ mux_message( msg );
+ }
+ ResetEvent( internal_queue_event );
+ LeaveCriticalSection( &internal_queue_cs );
break;
case WAIT_TIMEOUT:
diff --git a/daemon/syslogd.h b/daemon/syslogd.h
index f061f7b..0bcd1fb 100644
--- a/daemon/syslogd.h
+++ b/daemon/syslogd.h
@@ -62,7 +62,8 @@ struct raw_message
};
extern struct fifo *udp_message_queue;
-extern HANDLE udp_queue_semaphore;
+extern HANDLE udp_queue_event;
+extern CRITICAL_SECTION udp_queue_cs;
extern gboolean init_udp_listener();
extern void shutdown_udp_listener();
@@ -189,21 +190,11 @@ extern gboolean init_destination_file( struct destination* destination );
extern gboolean init_destination_relay( struct destination* destination );
/* queue */
-struct fifo_item
-{
- struct fifo_item *next; /* queue is a single-linked list */
- void *payload;
-};
-
-struct fifo
-{
- struct fifo_item *first; /* first pushed item */
- struct fifo_item *last; /* last pushed item */
-};
+struct fifo;
extern struct fifo* fifo_create();
extern void fifo_destroy( struct fifo* queue );
-extern void fifo_push( struct fifo* queue, void* data );
+extern gboolean fifo_push( struct fifo* queue, void* data );
extern void* fifo_pop( struct fifo* queue );
/* logrotate */
diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c
index c039049..f9c7c1a 100644
--- a/daemon/udp_listener.c
+++ b/daemon/udp_listener.c
@@ -40,7 +40,8 @@ static gchar *datagram_buffer = NULL;
static HANDLE udp_listener_thread_handle = NULL;
struct fifo *udp_message_queue = NULL;
-HANDLE udp_queue_semaphore = NULL;
+HANDLE udp_queue_event = NULL;
+CRITICAL_SECTION udp_queue_cs;
/******************************************************************************
* udp_listener_thread
@@ -102,8 +103,10 @@ static unsigned __stdcall udp_listener_thread( void* arg )
/* allocate raw message and add it to the queue */
msg = g_malloc( sizeof(struct raw_message) );
memcpy( msg, &message, sizeof(struct raw_message) );
- fifo_push( udp_message_queue, msg );
- ReleaseSemaphore( udp_queue_semaphore, 1, NULL );
+ EnterCriticalSection( &udp_queue_cs );
+ if( fifo_push( udp_message_queue, msg ) )
+ SetEvent( udp_queue_event );
+ LeaveCriticalSection( &udp_queue_cs );
}
done:
@@ -115,7 +118,7 @@ done:
* shutdown_udp_listener
*
* stop listening thread and dispose all objects
- * except message queue and semaphore
+ * except message queue and event
*/
void shutdown_udp_listener()
{
@@ -184,10 +187,11 @@ void fini_udp_listener()
udp_message_queue = NULL;
}
- if( udp_queue_semaphore )
+ if( udp_queue_event )
{
- CloseHandle( udp_queue_semaphore );
- udp_queue_semaphore = NULL;
+ DeleteCriticalSection( &udp_queue_cs );
+ CloseHandle( udp_queue_event );
+ udp_queue_event = NULL;
}
TRACE_LEAVE( "done\n" );
@@ -207,17 +211,18 @@ gboolean init_udp_listener()
TRACE_ENTER( "\n" );
- /* create message queue and semaphore;
+ /* create message queue and event;
* we should do this first because of possible early returns
* from this function when no sources are defined
*/
udp_message_queue = fifo_create();
- udp_queue_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL );
- if( !udp_queue_semaphore )
+ udp_queue_event = CreateEvent( NULL, TRUE, FALSE, NULL );
+ if( !udp_queue_event )
{
- ERR( "Cannot create semaphore; error %lu\n", GetLastError() );
+ ERR( "Cannot create event; error %lu\n", GetLastError() );
goto done;
}
+ InitializeCriticalSection( &udp_queue_cs );
n = number_of_sources( ST_UDP );
if( 0 == n )