aboutsummaryrefslogtreecommitdiffstats
path: root/daemon/dest_file.c
diff options
context:
space:
mode:
Diffstat (limited to 'daemon/dest_file.c')
-rw-r--r--daemon/dest_file.c78
1 files changed, 52 insertions, 26 deletions
diff --git a/daemon/dest_file.c b/daemon/dest_file.c
index 3925499..ef28062 100644
--- a/daemon/dest_file.c
+++ b/daemon/dest_file.c
@@ -39,7 +39,8 @@ struct file_writer
struct destination *destination;
gchar *file_name;
struct fifo *message_queue;
- HANDLE fifo_semaphore;
+ CRITICAL_SECTION fifo_cs;
+ HANDLE fifo_event; /* manual-reset event */
HANDLE shutdown_event; /* manual-reset event */
HANDLE fd;
struct message *first_msg, *second_msg, *current_msg;
@@ -235,8 +236,9 @@ static void flush_coalescer( struct file_writer* writer )
static void destroy_file_writer( struct file_writer* writer )
{
TRACE_ENTER( "%p\n", writer );
- if( writer->fifo_semaphore ) CloseHandle( writer->fifo_semaphore );
+ if( writer->fifo_event ) CloseHandle( writer->fifo_event );
if( writer->shutdown_event ) CloseHandle( writer->shutdown_event );
+ DeleteCriticalSection( &writer->fifo_cs );
fifo_destroy( writer->message_queue );
g_free( writer->file_name );
g_free( writer );
@@ -258,10 +260,10 @@ static struct file_writer* create_file_writer( gchar* file_name,
ret = g_malloc0( sizeof(struct file_writer) );
ret->file_name = g_strdup( file_name );
ret->message_queue = fifo_create();
- ret->fifo_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL );
- if( !ret->fifo_semaphore )
+ ret->fifo_event = CreateEvent( NULL, TRUE, FALSE, NULL );
+ if( !ret->fifo_event )
{
- ERR( "Cannot create semaphore; error %lu\n", GetLastError() );
+ ERR( "Cannot create event; error %lu\n", GetLastError() );
goto error;
}
ret->shutdown_event = CreateEvent( NULL, TRUE, FALSE, NULL );
@@ -279,6 +281,8 @@ static struct file_writer* create_file_writer( gchar* file_name,
goto error;
}
+ InitializeCriticalSection( &ret->fifo_cs );
+
/* add writer to destination */
extra = destination->extra;
extra->file_writers = g_list_append( extra->file_writers, ret );
@@ -318,6 +322,37 @@ static void detach_writer_from_destination( struct file_writer* writer )
}
/******************************************************************************
+ * pop_messages_from_queue
+ *
+ * Helper function for writer_thread_proc.
+ * Extract messages from queue and write them to file.
+ */
+static void pop_messages_from_queue( struct file_writer* writer )
+{
+ EnterCriticalSection( &writer->fifo_cs );
+ for(;;)
+ {
+ writer->current_msg = fifo_pop( writer->message_queue );
+ if( !writer->current_msg )
+ break;
+ if( coalesce( writer ) )
+ {
+ if( writer->current_msg )
+ {
+ write_message_to_logfile( writer, &writer->current_msg );
+ TRACE_2( "%p written current message\n", writer );
+ }
+ }
+ else
+ {
+ flush_coalescer( writer );
+ }
+ }
+ ResetEvent( writer->fifo_event );
+ LeaveCriticalSection( &writer->fifo_cs );
+}
+
+/******************************************************************************
* writer_thread_proc
*
* Open file, extract messages from queue and write them to file.
@@ -326,7 +361,7 @@ static void detach_writer_from_destination( struct file_writer* writer )
static unsigned __stdcall writer_thread_proc( void* arg )
{
struct file_writer *writer = arg;
- HANDLE wait_objects[2] = { writer->fifo_semaphore, writer->shutdown_event };
+ HANDLE wait_objects[2] = { writer->fifo_event, writer->shutdown_event };
gchar *pathname;
TRACE_ENTER( "writer=%p\n", writer );
@@ -370,31 +405,20 @@ static unsigned __stdcall writer_thread_proc( void* arg )
detach writer from destination and continue to write any pending messages */
detach_writer_from_destination( writer );
/* from now no new messages will be put into queue */
- SetEvent( writer->shutdown_event );
+ break;
}
- writer->current_msg = fifo_pop( writer->message_queue );
- if( !writer->current_msg )
- {
+ if( WAIT_OBJECT_0 + 1 == w )
/* shutdown */
- goto done;
- }
+ break;
- if( coalesce( writer ) )
- {
- if( writer->current_msg )
- {
- write_message_to_logfile( writer, &writer->current_msg );
- TRACE_2( "%p written current message\n", writer );
- }
- }
- else
- {
- flush_coalescer( writer );
- }
+ if( WAIT_OBJECT_0 == w )
+ pop_messages_from_queue( writer );
}
+
done:
detach_writer_from_destination( writer );
+ pop_messages_from_queue( writer );
flush_coalescer( writer );
if( writer->fd != INVALID_HANDLE_VALUE ) CloseHandle( writer->fd );
destroy_file_writer( writer );
@@ -481,8 +505,10 @@ static void put_message_to_file_dest( struct destination* destination, struct me
}
/* put message into queue */
reference_message( msg );
- fifo_push( writer->message_queue, msg );
- ReleaseSemaphore( writer->fifo_semaphore, 1, NULL );
+ EnterCriticalSection( &writer->fifo_cs );
+ if( fifo_push( writer->message_queue, msg ) )
+ SetEvent( writer->fifo_event );
+ LeaveCriticalSection( &writer->fifo_cs );
done:
LeaveCriticalSection( &extra->cs_file_writers );