aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryaworsky <yaworsky>2005-10-31 13:52:24 +0000
committeryaworsky <yaworsky>2005-10-31 13:52:24 +0000
commit74b7b6121179f8a82a2f96812c9a33e3f650eaed (patch)
treea051354cef72555130299fee3f17aa5948b6b3f4
parent10379ee810238a765bf94a4ba48e4fff04fc7b6f (diff)
downloadsyslog-win32-74b7b6121179f8a82a2f96812c9a33e3f650eaed.tar.gz
syslog-win32-74b7b6121179f8a82a2f96812c9a33e3f650eaed.tar.xz
syslog-win32-74b7b6121179f8a82a2f96812c9a33e3f650eaed.zip
Refactored listener
-rw-r--r--daemon/Makefile.am5
-rw-r--r--daemon/listener.c335
-rw-r--r--daemon/syslogd.c429
-rw-r--r--daemon/syslogd.h29
-rw-r--r--daemon/udp_listener.c341
-rw-r--r--doc/src/internals.xml41
6 files changed, 705 insertions, 475 deletions
diff --git a/daemon/Makefile.am b/daemon/Makefile.am
index 06c1ccc..5e2a426 100644
--- a/daemon/Makefile.am
+++ b/daemon/Makefile.am
@@ -13,8 +13,9 @@ endif
AM_CPPFLAGS += -I. -I../include $(GLIB_CFLAGS)
bin_PROGRAMS = syslogd
-syslogd_SOURCES = conf.c dest_file.c dest_relay.c fifo.c listener.c logrotate.c \
- main.c names.c pathnames.c purger.c syslogd.c syslogd.h
+syslogd_SOURCES = conf.c dest_file.c dest_relay.c fifo.c logrotate.c \
+ main.c names.c pathnames.c purger.c syslogd.c syslogd.h \
+ udp_listener.c
syslogd_LDADD = -lws2_32 $(GLIB_LIBS)
endif
diff --git a/daemon/listener.c b/daemon/listener.c
deleted file mode 100644
index 32c6297..0000000
--- a/daemon/listener.c
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * listener.c - syslogd implementation for windows, listener for UDP
- * and "internal" sources
- *
- * Created by Alexander Yaworsky
- *
- * THIS SOFTWARE IS NOT COPYRIGHTED
- *
- * This source code is offered for use in the public domain. You may
- * use, modify or distribute it freely.
- *
- * This code is distributed in the hope that it will be useful but
- * WITHOUT ANY WARRANTY. ALL WARRANTIES, EXPRESS OR IMPLIED ARE HEREBY
- * DISCLAIMED. This includes but is not limited to warranties of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- */
-
-#include <stdio.h>
-#include <winsock2.h>
-
-#include <glib.h>
-
-#include <syslog.h>
-#include <syslogd.h>
-
-static SOCKET *socket_array = NULL;
-static int socket_count = 0;
-
-static struct source **source_references = NULL;
-
-static HANDLE *event_array = NULL;
-static int event_count = 0;
-
-/* message data */
-static unsigned max_datagram_size = 1024;
-static gchar *datagram_buffer = NULL;
-static struct raw_message message;
-
-static CRITICAL_SECTION cs_internal_message;
-static HANDLE internal_message_accepted = NULL;
-static gchar internal_message_buffer[ 1024 ];
-
-/******************************************************************************
- * init_listener
- *
- * create sockets and synchronization objects including ones for "internal"
- * source
- */
-gboolean init_listener()
-{
- gboolean ret = FALSE;
- unsigned n;
- GList *item;
- int i;
- struct source *internal_src;
-
- TRACE_ENTER( "\n" );
-
- /* create critical section and event for the access serialization to internal message buffer
- */
- InitializeCriticalSection( &cs_internal_message );
- internal_message_accepted = CreateEvent( NULL, FALSE, FALSE, NULL );
- if( !internal_message_accepted )
- {
- ERR( "Cannot create event; error %lu\n", GetLastError() );
- goto done;
- }
-
- /* allocate memory for sockets and events;
- * the number of sockets is not greater than number of sources
- */
- n = g_list_length( sources );
- socket_array = g_malloc( n * sizeof(SOCKET) );
-
- /* number of source references is greater by one because of inclusion the event
- * for "internal" source
- * FIXME: how about multiple internal sources?
- */
- source_references = g_malloc( (n + 1) * sizeof(struct source*) );
-
- /* number of events is greater by two because of inclusion the event
- * for "internal" source and the service_stop_event
- */
- event_array = g_malloc( (n + 2) * sizeof(HANDLE) );
-
- /* create sockets */
- for( item = sources; item; item = item->next )
- {
- struct source *src = item->data;
- SOCKET sock;
- unsigned dgram_size;
- int size;
-
- if( src->type == ST_INTERNAL )
- {
- internal_src = src;
- continue;
- }
-
- if( src->type != ST_UDP )
- continue;
-
- sock = socket( AF_INET, SOCK_DGRAM, 0 );
- if( INVALID_SOCKET == sock )
- {
- ERR( "socket() error %lu\n", WSAGetLastError() );
- goto done;
- }
-
- if( bind( sock, (struct sockaddr*) &src->udp, sizeof(src->udp) ) )
- {
- ERR( "bind() error %lu\n", WSAGetLastError() );
- closesocket( sock );
- goto done;
- }
-
- size = sizeof(dgram_size);
- if( getsockopt( sock, SOL_SOCKET, SO_MAX_MSG_SIZE, (char*) &dgram_size, &size ) )
- {
- ERR( "getsockopt( SO_MAX_MSG_SIZE ) error %lu\n", WSAGetLastError() );
- closesocket( sock );
- goto done;
- }
- TRACE( "datagram size for %d.%d.%d.%d:%d is %u\n",
- src->udp.sin_addr.S_un.S_un_b.s_b1, src->udp.sin_addr.S_un.S_un_b.s_b2,
- src->udp.sin_addr.S_un.S_un_b.s_b3, src->udp.sin_addr.S_un.S_un_b.s_b4,
- ntohs( src->udp.sin_port ), dgram_size );
- if( dgram_size > max_datagram_size )
- max_datagram_size = dgram_size;
-
- source_references[ socket_count ] = src;
- socket_array[ socket_count++ ] = sock;
- }
- source_references[ socket_count ] = internal_src;
-
- /* create events;
- * service_stop_event is added to the array
- */
- while( event_count <= socket_count )
- {
- HANDLE evt = CreateEvent( NULL, FALSE, FALSE, NULL );
- if( !evt )
- {
- ERR( "Cannot create event; error %lu\n", GetLastError() );
- goto done;
- }
- event_array[ event_count++ ] = evt;
- }
- event_array[ event_count++ ] = service_stop_event;
-
- /* bind events to sockets */
- for( i = 0; i < socket_count; i++ )
- {
- if( WSAEventSelect( socket_array[ i ], event_array[ i ], FD_READ ) )
- {
- ERR( "WSAEventSelect() error %lu\n", WSAGetLastError() );
- goto done;
- }
- }
-
- /* allocate datagram buffer */
- datagram_buffer = g_malloc( max_datagram_size );
-
- ret = TRUE;
-
-done:
- if( !ret )
- fini_listener();
-
- TRACE_LEAVE( "done; socket_count=%d, event_count=%d, max_datagram_size=%d, ret=%d\n",
- socket_count, event_count, max_datagram_size, (int) ret );
- return ret;
-}
-
-/******************************************************************************
- * fini_listener
- */
-void fini_listener()
-{
- int i;
-
- TRACE_ENTER( "\n" );
-
- for( i = 0; i < socket_count; i++ )
- closesocket( socket_array[ i ] );
- g_free( socket_array );
- socket_array = NULL;
- socket_count = 0;
-
- g_free( source_references );
- source_references = NULL;
-
- /* note that the last event is the service_stop_event
- * and should not be destroyed
- */
- for( i = 0; i < event_count - 1; i++ )
- CloseHandle( event_array[ i ] );
- g_free( event_array );
- event_array = NULL;
- event_count = 0;
-
- g_free( datagram_buffer );
- datagram_buffer = NULL;
-
- if( internal_message_accepted )
- {
- CloseHandle( internal_message_accepted );
- internal_message_accepted = NULL;
- }
- DeleteCriticalSection( &cs_internal_message );
- TRACE_LEAVE( "done\n" );
-}
-
-/******************************************************************************
- * listener
- *
- * wait for a message; generate mark message;
- * allocates a new raw_message structure and assigns its pointer to *msg
- */
-enum listener_status listener( struct raw_message** msg )
-{
- enum listener_status ret = LSNR_ERROR;
- DWORD t, w;
- int r;
- int addrlen;
-
- TRACE_ENTER( "\n" );
-
- for(;;)
- {
- if( !mark_interval )
- t = INFINITE;
- else
- t = mark_interval * 1000;
- w = WaitForMultipleObjects( event_count, event_array, FALSE, t );
- if( WAIT_TIMEOUT == w )
- {
- /* issue mark message */
- log_internal( LOG_NOTICE, "%s", mark_message );
- continue;
- }
- if( WAIT_FAILED == w )
- {
- ERR( "Wait error %lu\n", GetLastError() );
- goto done;
- }
- if( w >= event_count )
- {
- ERR( "Unknown wait error\n" );
- goto done;
- }
- if( w == event_count - 1 )
- {
- /* shut down */
- ret = LSNR_SHUTDOWN;
- goto done;
- }
- if( w == event_count - 2 )
- {
- /* got "internal" message */
- message.source = source_references[ socket_count ];
- if( !message.source )
- {
- /* internal source is not defined, cannot handle message */
- SetEvent( internal_message_accepted );
- continue;
- }
- message.msg = g_strdup( internal_message_buffer );
- SetEvent( internal_message_accepted );
- memset( &message.sender_addr, 0, sizeof(message.sender_addr) );
- goto alloc_msg;
- }
- /* got UDP message, read it */
- addrlen = sizeof(message.sender_addr);
- r = recvfrom( socket_array[ w ], datagram_buffer, max_datagram_size,
- 0, (struct sockaddr*) &message.sender_addr, &addrlen );
- if( r < 0 )
- {
- ERR( "recvfrom() error %lu\n", WSAGetLastError() );
- goto done;
- }
- if( !r )
- continue;
-
- message.msg = g_strndup( datagram_buffer, r );
- message.source = source_references[ w ];
-
- alloc_msg:
- *msg = g_malloc( sizeof(struct raw_message) );
- memcpy( *msg, &message, sizeof(struct raw_message) );
-
- ret = LSNR_GOT_MESSAGE;
- goto done;
- }
-
-done:
- TRACE_LEAVE( "done; ret=%d\n", (int) ret );
- return ret;
-}
-
-/******************************************************************************
- * log_internal
- *
- * generate internal log message
- */
-void log_internal( int pri, char* fmt, ... )
-{
- va_list args;
- SYSTEMTIME stm;
- int len;
- char *p;
-
- TRACE_ENTER( "\n" );
- EnterCriticalSection( &cs_internal_message );
-
- GetLocalTime( &stm );
- len = sprintf( internal_message_buffer, "<%d>%s %2d %02d:%02d:%02d %s syslog: ",
- LOG_SYSLOG | pri,
- str_month[ stm.wMonth - 1 ], stm.wDay, stm.wHour, stm.wMinute, stm.wSecond,
- local_hostname );
- va_start( args, fmt );
- vsnprintf( internal_message_buffer + len, sizeof(internal_message_buffer) - len, fmt, args );
- va_end( args );
- p = strchr( internal_message_buffer, '\n' );
- if( p )
- *p = 0;
- p = strchr( internal_message_buffer, '\r' );
- if( p )
- *p = 0;
-
- SetEvent( event_array[ event_count - 2 ] );
- LeaveCriticalSection( &cs_internal_message );
- TRACE_LEAVE( "done\n" );
-}
diff --git a/daemon/syslogd.c b/daemon/syslogd.c
index 4c64a83..af50793 100644
--- a/daemon/syslogd.c
+++ b/daemon/syslogd.c
@@ -16,7 +16,6 @@
*/
#include <ctype.h>
-#include <process.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -28,8 +27,11 @@
#include <syslog.h>
#include <syslogd.h>
-static struct fifo *raw_message_queue = NULL;
-static HANDLE fifo_semaphore = NULL;
+/* internal source data */
+static struct fifo *internal_message_queue = NULL;
+static HANDLE internal_queue_semaphore = NULL;
+static int internal_source_count = 0;
+static struct source** internal_source_references = NULL;
/* cache for source hostnames */
struct hostname
@@ -49,6 +51,70 @@ char *str_month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
/******************************************************************************
+ * create_message
+ *
+ * Create a new message with refcount=1.
+ * The caller should allocate sender, hostname, program and message strings.
+ * The function simply copies these pointers to the structure and the caller
+ * should not free allocated strings.
+ */
+struct message* create_message( struct source* source,
+ gchar* sender,
+ int facility, int priority,
+ LPSYSTEMTIME timestamp,
+ gchar* hostname,
+ gchar* program,
+ gchar* message )
+{
+ struct message *msg;
+
+ TRACE_ENTER( "\n" );
+
+ msg = g_malloc( sizeof(struct message) );
+ msg->refcount = 1;
+ msg->source = source;
+ msg->sender = sender;
+ msg->facility = facility;
+ msg->priority = priority;
+ msg->timestamp = g_strdup_printf( "%s %2d %02d:%02d:%02d",
+ str_month[ timestamp->wMonth - 1 ],
+ timestamp->wDay, timestamp->wHour,
+ timestamp->wMinute, timestamp->wSecond );
+ msg->hostname = hostname;
+ msg->program = program;
+ msg->message = message;
+
+ TRACE_LEAVE( "message=%p\n", msg );
+ return msg;
+}
+
+/******************************************************************************
+ * duplicate_message
+ *
+ * Make a copy of message.
+ */
+struct message* duplicate_message( struct message* msg )
+{
+ struct message *new_msg;
+
+ TRACE_ENTER( "message=%p\n", msg );
+
+ new_msg = g_malloc( sizeof(struct message) );
+ new_msg->refcount = 1;
+ new_msg->source = msg->source;
+ new_msg->sender = g_strdup( msg->sender );
+ new_msg->facility = msg->facility;
+ new_msg->priority = msg->priority;
+ new_msg->timestamp = g_strdup( msg->timestamp );
+ new_msg->hostname = g_strdup( msg->hostname );
+ new_msg->program = g_strdup( msg->program );
+ new_msg->message = g_strdup( msg->message );
+
+ TRACE_LEAVE( "new message=%p\n", new_msg );
+ return new_msg;
+}
+
+/******************************************************************************
* refrence_message
*
* increment reference count
@@ -322,13 +388,11 @@ no_pri:
* parse_timestamp
*
* parse TIMESTAMP part of message;
- * allocate string;
* return pointer to the next char after TIMESTAMP
*/
-static gchar* parse_timestamp( gchar* msg, gchar** timestamp )
+static gchar* parse_timestamp( gchar* msg, LPSYSTEMTIME timestamp )
{
int i;
- SYSTEMTIME stm;
TRACE_ENTER( "\n" );
@@ -337,7 +401,7 @@ static gchar* parse_timestamp( gchar* msg, gchar** timestamp )
break;
if( i == 12 )
goto no_timestamp;
- stm.wMonth = i + 1;
+ timestamp->wMonth = i + 1;
if( msg[3] != ' ' )
goto no_timestamp;
@@ -346,13 +410,13 @@ static gchar* parse_timestamp( gchar* msg, gchar** timestamp )
{
if( (!isdigit( msg[4] )) || (!isdigit( msg[5] )) )
goto no_timestamp;
- stm.wDay = (msg[4] - '0') * 10 + (msg[5] - '0');
+ timestamp->wDay = (msg[4] - '0') * 10 + (msg[5] - '0');
}
else
{
if( !isdigit( msg[5] ) )
goto no_timestamp;
- stm.wDay = msg[5] - '0';
+ timestamp->wDay = msg[5] - '0';
}
if( msg[6] != ' ' )
@@ -360,26 +424,23 @@ static gchar* parse_timestamp( gchar* msg, gchar** timestamp )
if( (!isdigit( msg[7] )) || (!isdigit( msg[8] )) || msg[9] != ':' )
goto no_timestamp;
- stm.wHour = (msg[7] - '0') * 10 + (msg[8] - '0');
+ timestamp->wHour = (msg[7] - '0') * 10 + (msg[8] - '0');
if( (!isdigit( msg[10] )) || (!isdigit( msg[11] )) || msg[12] != ':' )
goto no_timestamp;
- stm.wMinute = (msg[10] - '0') * 10 + (msg[11] - '0');
+ timestamp->wMinute = (msg[10] - '0') * 10 + (msg[11] - '0');
if( (!isdigit( msg[13] )) || (!isdigit( msg[14] )) || msg[15] != ' ' )
goto no_timestamp;
- stm.wSecond = (msg[13] - '0') * 10 + (msg[14] - '0');
+ timestamp->wSecond = (msg[13] - '0') * 10 + (msg[14] - '0');
msg += 16;
goto done;
no_timestamp:
TRACE_2( "no timestamp\n" );
- GetLocalTime( &stm );
+ GetLocalTime( timestamp );
done:
- *timestamp = g_strdup_printf( "%s %2d %02d:%02d:%02d",
- str_month[ stm.wMonth - 1 ],
- stm.wDay, stm.wHour, stm.wMinute, stm.wSecond );
TRACE_LEAVE( "done\n" );
return msg;
}
@@ -392,30 +453,28 @@ done:
*/
static struct message* parse_raw_message( struct raw_message* raw_msg )
{
- struct message* msg;
gchar *current_part, *next_part;
+ gchar *sender, *hostname, *program, *message;
+ int facility, priority;
+ SYSTEMTIME timestamp;
+ struct message *msg;
TRACE_ENTER( "raw message=%p\n", raw_msg );
- /* allocate and initialize message structure */
- msg = g_malloc( sizeof(struct message) );
- msg->refcount = 1;
- msg->source = raw_msg->source;
-
/* get sender's hostname */
- msg->sender = get_hostname( &raw_msg->sender_addr );
+ sender = get_hostname( &raw_msg->sender_addr );
current_part = raw_msg->msg;
- next_part = parse_PRI( current_part, &msg->facility, &msg->priority );
+ next_part = parse_PRI( current_part, &facility, &priority );
current_part = next_part;
- next_part = parse_timestamp( current_part, &msg->timestamp );
+ next_part = parse_timestamp( current_part, &timestamp );
if( next_part == current_part )
{
/* no valid timestamp */
TRACE_2( "no valid timestamp: msg=%s\n", current_part );
- msg->hostname = g_strdup( msg->sender );
- msg->message = g_strdup( current_part );
+ hostname = g_strdup( sender );
+ message = g_strdup( current_part );
}
else
{
@@ -426,29 +485,33 @@ static struct message* parse_raw_message( struct raw_message* raw_msg )
if( *next_part != ' ' )
{
/* invalid hostname */
- msg->hostname = g_strdup( msg->sender );
- msg->message = g_strdup( current_part );
- TRACE_2( "invalid hostname; set sender (%s); msg=%s\n", msg->hostname, msg->message );
+ hostname = g_strdup( sender );
+ message = g_strdup( current_part );
+ TRACE_2( "invalid hostname; set sender (%s); msg=%s\n", hostname, message );
}
else
{
- msg->hostname = g_strndup( current_part, next_part - current_part );
+ hostname = g_strndup( current_part, next_part - current_part );
while( *next_part == ' ' && *next_part != 0 )
next_part++;
- msg->message = g_strdup( next_part );
- TRACE_2( "hostname=%s; msg=%s\n", msg->hostname, msg->message );
+ message = g_strdup( next_part );
+ TRACE_2( "hostname=%s; msg=%s\n", hostname, message );
}
}
/* try to find program name */
- current_part = msg->message;
+ current_part = message;
next_part = current_part;
while( *next_part != ' ' && *next_part != ':' && *next_part != '[' && *next_part != 0 )
next_part++;
if( *next_part == ' ' || *next_part == 0 )
- msg->program = g_strdup("");
+ program = g_strdup("");
else
- msg->program = g_strndup( current_part, next_part - current_part );
+ program = g_strndup( current_part, next_part - current_part );
+
+ /* create message */
+ msg = create_message( raw_msg->source, sender, facility, priority,
+ &timestamp, hostname, program, message );
/* destroy raw message */
g_free( raw_msg->msg );
@@ -459,39 +522,151 @@ static struct message* parse_raw_message( struct raw_message* raw_msg )
}
/******************************************************************************
- * message_processor
+ * number_of_sources
*
- * main function; extract raw messages from queue and parse them
+ * return the number of sources of the specified type
*/
-static unsigned __stdcall message_processor( void* arg )
+unsigned number_of_sources( enum source_type type )
{
- HANDLE wait_handles[2] = { fifo_semaphore, service_stop_event };
+ unsigned ret = 0;
+ GList *item;
- TRACE_ENTER( "\n" );
- for(;;)
+ for( item = sources; item; item = item->next )
{
- DWORD w;
- struct raw_message *raw_msg;
+ struct source *src = item->data;
- w = WaitForMultipleObjects( 2, wait_handles, FALSE, INFINITE );
- if( WAIT_OBJECT_0 + 1 == w )
- /* shutdown */
- break;
- if( w != WAIT_OBJECT_0 )
- {
- ERR( "WaitForMultipleObjects() error %lu\n", GetLastError() );
- SetEvent( service_stop_event );
+ if( src->type == type )
+ ret++;
+ }
+ return ret;
+}
+
+/******************************************************************************
+ * log_internal
+ *
+ * Generate internal log message.
+ * This function should be called only from the main thread because there's no
+ * access serialization to the writing end of the queue.
+ */
+void log_internal( int pri, char* fmt, ... )
+{
+ gchar *sender, *hostname, *program, *msg;
+ va_list args;
+ SYSTEMTIME stm;
+ int i;
+ struct message *message;
+
+ TRACE_ENTER( "\n" );
+
+ if( 0 == internal_source_count )
+ goto done;
+
+ sender = g_strdup( local_hostname );
+ hostname = g_strdup( local_hostname );
+ program = g_strdup( "syslog" );
+
+ va_start( args, fmt );
+ msg = g_strdup_vprintf( fmt, args );
+ va_end( args );
+
+ GetLocalTime( &stm );
+
+ message = create_message( internal_source_references[0],
+ sender,
+ LOG_FAC( LOG_SYSLOG ), LOG_PRI( pri ),
+ &stm,
+ hostname, program, msg );
+ for( i = 1;; i++ )
+ {
+ fifo_push( internal_message_queue, message );
+ ReleaseSemaphore( internal_queue_semaphore, 1, NULL );
+ if( i == internal_source_count )
break;
- }
- /* extract raw message from queue */
- raw_msg = fifo_pop( raw_message_queue );
+ message = duplicate_message( message );
+ message->source = internal_source_references[ i ];
+ }
- TRACE_2( "got message %p from queue\n", raw_msg );
+done:
+ TRACE_LEAVE( "done\n" );
+}
+
+/******************************************************************************
+ * shutdown_internal_sources
+ *
+ * dispose all data except message queue and semaphore
+ */
+static void shutdown_internal_sources()
+{
+ TRACE_ENTER( "\n" );
+
+ internal_source_count = 0;
+
+ if( internal_source_references )
+ g_free( internal_source_references );
- mux_message( parse_raw_message( raw_msg ) );
- }
TRACE_LEAVE( "done\n" );
- return 0;
+}
+
+/******************************************************************************
+ * fini_internal_sources
+ */
+static void fini_internal_sources()
+{
+ TRACE_ENTER( "\n" );
+
+ shutdown_internal_sources();
+
+ if( internal_message_queue )
+ fifo_destroy( internal_message_queue );
+
+ if( internal_queue_semaphore )
+ CloseHandle( internal_queue_semaphore );
+
+ TRACE_LEAVE( "done\n" );
+}
+
+/******************************************************************************
+ * init_internal_sources
+ */
+static gboolean init_internal_sources()
+{
+ gboolean ret = FALSE;
+ GList *item;
+ int i;
+
+ TRACE_ENTER( "\n" );
+
+ internal_message_queue = fifo_create();
+ internal_queue_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL );
+ if( !internal_queue_semaphore )
+ {
+ ERR( "Cannot create semaphore; error %lu\n", GetLastError() );
+ goto done;
+ }
+
+ internal_source_count = (int) number_of_sources( ST_INTERNAL );
+ if( 0 == internal_source_count )
+ {
+ ret = TRUE;
+ goto done;
+ }
+
+ internal_source_references = g_malloc( internal_source_count * sizeof(struct source*) );
+
+ for( i = 0, item = sources; item; item = item->next )
+ {
+ struct source *src = item->data;
+ if( ST_INTERNAL == src->type )
+ internal_source_references[ i++ ] = src;
+ }
+ ret = TRUE;
+
+done:
+ if( !ret )
+ fini_internal_sources();
+
+ TRACE_LEAVE( "done; internal_sources=%d, ret=%d\n", internal_source_count, (int) ret );
+ return ret;
}
/******************************************************************************
@@ -502,34 +677,38 @@ static unsigned __stdcall message_processor( void* arg )
static void fini_destinations()
{
GList *dest;
+
+ TRACE_ENTER( "\n" );
+
for( dest = destinations; dest; dest = dest->next )
- ((struct destination*) dest)->fini( (struct destination*) dest );
+ {
+ struct destination *d = dest->data;
+ d->fini( d );
+ }
+
+ TRACE_LEAVE( "done\n" );
}
/******************************************************************************
* main syslogd function
- *
- * global initialization; invoke listener
*/
void syslogd_main()
{
- HANDLE message_processor_thread = NULL;
- unsigned tid;
-
TRACE_ENTER( "\n" );
- SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL );
-
if( !read_configuration() )
goto done;
if( !init_purger() )
goto done;
- if( !init_listener() )
+ purge_log_dirs();
+
+ if( !init_internal_sources() )
goto done;
- purge_log_dirs();
+ if( !init_udp_listener() )
+ goto done;
if( source_encoding && destination_encoding )
{
@@ -541,47 +720,85 @@ void syslogd_main()
}
}
- /* create message queue and semaphore;
- * avoid using Glib's asynchronous queues and threading support
- * because synchronization capabilities are very limited;
- */
- raw_message_queue = fifo_create();
- fifo_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL );
- if( !fifo_semaphore )
- {
- ERR( "Cannot create semaphore; error %lu\n", GetLastError() );
- goto done;
- }
+ log_internal( LOG_NOTICE, "Syslog daemon started" );
- message_processor_thread = (HANDLE) _beginthreadex( NULL, 0, message_processor, NULL, 0, &tid );
- if( !message_processor_thread )
+ /* get messages from queues */
+ for(;;)
{
- ERR( "Cannot create thread; error %lu\n", GetLastError() );
- goto done;
+ HANDLE wait_handles[3] = { udp_queue_semaphore,
+ internal_queue_semaphore,
+ service_stop_event };
+ DWORD t;
+ struct raw_message *raw_msg;
+ struct message *msg;
+
+ if( !mark_interval )
+ t = INFINITE;
+ else
+ t = mark_interval * 1000;
+
+ switch( WaitForMultipleObjects( 3, wait_handles, FALSE, t ) )
+ {
+ case WAIT_OBJECT_0:
+ raw_msg = fifo_pop( udp_message_queue );
+ TRACE_2( "got raw message %p from UDP listener\n", raw_msg );
+ mux_message( parse_raw_message( raw_msg ) );
+ break;
+
+ case WAIT_OBJECT_0 + 1:
+ msg = fifo_pop( internal_message_queue );
+ TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name );
+ mux_message( msg );
+ break;
+
+ case WAIT_OBJECT_0 + 2:
+ goto shutdown;
+
+ case WAIT_TIMEOUT:
+ /* issue mark message */
+ log_internal( LOG_NOTICE, "%s", mark_message );
+ break;
+
+ default:
+ ERR( "WaitForMultipleObjects() error %lu\n", GetLastError() );
+ SetEvent( service_stop_event );
+ goto shutdown;
+ }
}
- /* get messages from the listener */
+shutdown:
+ log_internal( LOG_NOTICE, "Syslog daemon is shutting down" );
+ shutdown_udp_listener();
+ shutdown_internal_sources();
+
+ /* flush queues */
for(;;)
{
+ HANDLE wait_handles[2] = { udp_queue_semaphore,
+ internal_queue_semaphore };
struct raw_message *raw_msg;
+ struct message *msg;
- switch( listener( &raw_msg ) )
+ switch( WaitForMultipleObjects( 2, wait_handles, FALSE, 0 ) )
{
- case LSNR_ERROR:
- case LSNR_SHUTDOWN:
- goto done;
+ case WAIT_OBJECT_0:
+ raw_msg = fifo_pop( udp_message_queue );
+ TRACE_2( "got raw message %p from UDP listener\n", raw_msg );
+ mux_message( parse_raw_message( raw_msg ) );
+ break;
- case LSNR_GOT_MESSAGE:
- TRACE_2( "got message from %d.%d.%d.%d; source name %s; ptr=%p: %s\n",
- raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b1,
- raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b2,
- raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b3,
- raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b4,
- raw_msg->source->name, raw_msg, raw_msg->msg );
- /* add raw message to queue */
- fifo_push( raw_message_queue, raw_msg );
- ReleaseSemaphore( fifo_semaphore, 1, NULL );
+ case WAIT_OBJECT_0 + 1:
+ msg = fifo_pop( internal_message_queue );
+ TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name );
+ mux_message( msg );
break;
+
+ case WAIT_TIMEOUT:
+ goto done;
+
+ default:
+ ERR( "WaitForMultipleObjects() error %lu\n", GetLastError() );
+ goto done;
}
}
@@ -589,24 +806,14 @@ done:
/* signal to all possibly running threads */
SetEvent( service_stop_event );
- if( message_processor_thread )
- {
- /* wait for message processor shutdown */
- WaitForSingleObject( message_processor_thread, INFINITE );
- CloseHandle( message_processor_thread );
- }
-
+ fini_udp_listener();
+ fini_internal_sources();
fini_destinations();
fini_purger();
free_hostnames();
- if( raw_message_queue ) fifo_destroy( raw_message_queue );
if( conversion_descriptor != (GIConv) -1 )
g_iconv_close( conversion_descriptor );
- fini_listener();
-
- if( fifo_semaphore ) CloseHandle( fifo_semaphore );
-
TRACE_LEAVE( "done\n" );
}
diff --git a/daemon/syslogd.h b/daemon/syslogd.h
index 20dfa00..f061f7b 100644
--- a/daemon/syslogd.h
+++ b/daemon/syslogd.h
@@ -42,6 +42,8 @@ extern char *str_month[];
extern void syslogd_main();
+extern void log_internal( int pri, char* fmt, ... );
+
/* options and their default values */
extern gboolean use_dns;
extern gchar *source_encoding;
@@ -52,13 +54,6 @@ extern int hold;
extern gchar *logdir;
/* listener */
-enum listener_status
-{
- LSNR_ERROR,
- LSNR_SHUTDOWN,
- LSNR_GOT_MESSAGE
-};
-
struct raw_message
{
gchar *msg;
@@ -66,10 +61,12 @@ struct raw_message
struct source *source;
};
-extern gboolean init_listener();
-extern void fini_listener();
-extern enum listener_status listener( struct raw_message** msg );
-extern void log_internal( int pri, char* fmt, ... );
+extern struct fifo *udp_message_queue;
+extern HANDLE udp_queue_semaphore;
+
+extern gboolean init_udp_listener();
+extern void shutdown_udp_listener();
+extern void fini_udp_listener();
/* message */
struct message
@@ -85,6 +82,14 @@ struct message
gchar *message;
};
+extern struct message* create_message( struct source* source,
+ gchar* sender,
+ int facility, int priority,
+ LPSYSTEMTIME timestamp,
+ gchar* hostname,
+ gchar* program,
+ gchar* message );
+extern struct message* duplicate_message( struct message* msg );
extern void reference_message( struct message* msg );
extern void release_message( struct message* msg );
@@ -103,6 +108,8 @@ struct source
struct sockaddr_in udp;
};
+extern unsigned number_of_sources( enum source_type type );
+
enum destination_type
{
DT_UNDEFINED,
diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c
new file mode 100644
index 0000000..c039049
--- /dev/null
+++ b/daemon/udp_listener.c
@@ -0,0 +1,341 @@
+/*
+ * udp_listener.c - syslogd implementation for windows, UDP listener
+ *
+ * Created by Alexander Yaworsky
+ *
+ * THIS SOFTWARE IS NOT COPYRIGHTED
+ *
+ * This source code is offered for use in the public domain. You may
+ * use, modify or distribute it freely.
+ *
+ * This code is distributed in the hope that it will be useful but
+ * WITHOUT ANY WARRANTY. ALL WARRANTIES, EXPRESS OR IMPLIED ARE HEREBY
+ * DISCLAIMED. This includes but is not limited to warranties of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ */
+
+#include <process.h>
+#include <stdio.h>
+#include <winsock2.h>
+
+#include <glib.h>
+
+#include <syslog.h>
+#include <syslogd.h>
+
+/* udp listener data */
+static SOCKET *udp_socket_array = NULL;
+static int udp_socket_count = 0;
+
+static struct source **udp_source_references = NULL;
+
+static HANDLE *udp_event_array = NULL;
+static int udp_event_count = 0;
+static HANDLE udp_listener_stop_event = NULL;
+
+static unsigned max_datagram_size = 1024;
+static gchar *datagram_buffer = NULL;
+
+static HANDLE udp_listener_thread_handle = NULL;
+
+struct fifo *udp_message_queue = NULL;
+HANDLE udp_queue_semaphore = NULL;
+
+/******************************************************************************
+ * udp_listener_thread
+ */
+static unsigned __stdcall udp_listener_thread( void* arg )
+{
+ TRACE_ENTER( "\n" );
+
+ SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL );
+
+ for(;;)
+ {
+ DWORD w;
+ int r;
+ int addrlen;
+ struct raw_message message;
+ struct raw_message *msg;
+
+ w = WaitForMultipleObjects( udp_event_count, udp_event_array, FALSE, INFINITE );
+ if( WAIT_FAILED == w )
+ {
+ ERR( "Wait error %lu\n", GetLastError() );
+ goto done;
+ }
+ if( w >= udp_event_count )
+ {
+ ERR( "Unknown wait error\n" );
+ goto done;
+ }
+ if( w == udp_event_count - 1 )
+ {
+ TRACE_2( "shutdown\n" );
+ goto done;
+ }
+ /* got UDP message, read it */
+ addrlen = sizeof(message.sender_addr);
+ r = recvfrom( udp_socket_array[ w ], datagram_buffer, max_datagram_size,
+ 0, (struct sockaddr*) &message.sender_addr, &addrlen );
+ if( r < 0 )
+ {
+ ERR( "recvfrom() error %lu\n", WSAGetLastError() );
+ goto done;
+ }
+ if( !r )
+ {
+ TRACE_2( "empty datagram\n" );
+ continue;
+ }
+ message.msg = g_strndup( datagram_buffer, r );
+ message.source = udp_source_references[ w ];
+
+ TRACE_2( "got message from %d.%d.%d.%d; source name %s; %s\n",
+ message.sender_addr.sin_addr.S_un.S_un_b.s_b1,
+ message.sender_addr.sin_addr.S_un.S_un_b.s_b2,
+ message.sender_addr.sin_addr.S_un.S_un_b.s_b3,
+ message.sender_addr.sin_addr.S_un.S_un_b.s_b4,
+ message.source->name, message.msg );
+
+ /* allocate raw message and add it to the queue */
+ msg = g_malloc( sizeof(struct raw_message) );
+ memcpy( msg, &message, sizeof(struct raw_message) );
+ fifo_push( udp_message_queue, msg );
+ ReleaseSemaphore( udp_queue_semaphore, 1, NULL );
+ }
+
+done:
+ TRACE_LEAVE( "done\n" );
+ return 0;
+}
+
+/******************************************************************************
+ * shutdown_udp_listener
+ *
+ * stop listening thread and dispose all objects
+ * except message queue and semaphore
+ */
+void shutdown_udp_listener()
+{
+ TRACE_ENTER( "\n" );
+
+ if( udp_listener_thread_handle )
+ {
+ SetEvent( udp_listener_stop_event );
+ TRACE_2( "wait for shutdown of udp listener thread\n" );
+ WaitForSingleObject( udp_listener_thread_handle, INFINITE );
+ CloseHandle( udp_listener_thread_handle );
+ udp_listener_thread_handle = NULL;
+ }
+
+ if( udp_socket_array )
+ {
+ int i;
+
+ for( i = 0; i < udp_socket_count; i++ )
+ closesocket( udp_socket_array[ i ] );
+ g_free( udp_socket_array );
+ udp_socket_array = NULL;
+ udp_socket_count = 0;
+ }
+
+ if( udp_source_references )
+ {
+ g_free( udp_source_references );
+ udp_source_references = NULL;
+ }
+
+ if( udp_event_array )
+ {
+ int i;
+
+ for( i = 0; i < udp_event_count; i++ )
+ CloseHandle( udp_event_array[ i ] );
+ g_free( udp_event_array );
+ udp_event_array = NULL;
+ udp_event_count = 0;
+ udp_listener_stop_event = NULL;
+ }
+
+ if( datagram_buffer )
+ {
+ g_free( datagram_buffer );
+ datagram_buffer = NULL;
+ }
+ max_datagram_size = 1024;
+
+ TRACE_LEAVE( "done\n" );
+}
+
+/******************************************************************************
+ * fini_udp_listener
+ */
+void fini_udp_listener()
+{
+ TRACE_ENTER( "\n" );
+
+ shutdown_udp_listener();
+
+ if( udp_message_queue )
+ {
+ fifo_destroy( udp_message_queue );
+ udp_message_queue = NULL;
+ }
+
+ if( udp_queue_semaphore )
+ {
+ CloseHandle( udp_queue_semaphore );
+ udp_queue_semaphore = NULL;
+ }
+
+ TRACE_LEAVE( "done\n" );
+}
+
+/******************************************************************************
+ * init_udp_listener
+ *
+ * create sockets, queue and thread
+ */
+gboolean init_udp_listener()
+{
+ gboolean ret = FALSE;
+ unsigned n;
+ GList *item;
+ int i;
+
+ TRACE_ENTER( "\n" );
+
+ /* create message queue and semaphore;
+ * we should do this first because of possible early returns
+ * from this function when no sources are defined
+ */
+ udp_message_queue = fifo_create();
+ udp_queue_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL );
+ if( !udp_queue_semaphore )
+ {
+ ERR( "Cannot create semaphore; error %lu\n", GetLastError() );
+ goto done;
+ }
+
+ n = number_of_sources( ST_UDP );
+ if( 0 == n )
+ {
+ ret = TRUE;
+ goto done;
+ }
+
+ /* allocate memory for sockets, source references and events;
+ * the number of sockets is not greater than number of sources
+ */
+ udp_socket_array = g_malloc( n * sizeof(SOCKET) );
+ udp_source_references = g_malloc( n * sizeof(struct source*) );
+
+ /* number of events is greater by one because of
+ * included udp_listener_stop_event
+ */
+ udp_event_array = g_malloc( (n + 1) * sizeof(HANDLE) );
+
+ /* create sockets */
+ for( item = sources; item; item = item->next )
+ {
+ struct source *src = item->data;
+ SOCKET sock;
+ unsigned dgram_size;
+ int size;
+
+ if( src->type != ST_UDP )
+ continue;
+
+ sock = socket( AF_INET, SOCK_DGRAM, 0 );
+ if( INVALID_SOCKET == sock )
+ {
+ ERR( "Cannot create source %s: socket() error %lu\n", src->name, WSAGetLastError() );
+ continue;
+ }
+
+ if( bind( sock, (struct sockaddr*) &src->udp, sizeof(src->udp) ) )
+ {
+ ERR( "Cannot create source %s: bind() error %lu\n", src->name, WSAGetLastError() );
+ closesocket( sock );
+ continue;
+ }
+
+ size = sizeof(dgram_size);
+ if( getsockopt( sock, SOL_SOCKET, SO_MAX_MSG_SIZE, (char*) &dgram_size, &size ) )
+ {
+ ERR( "Cannot create source %s: getsockopt( SO_MAX_MSG_SIZE ) error %lu\n",
+ src->name, WSAGetLastError() );
+ closesocket( sock );
+ continue;
+ }
+ TRACE( "datagram size for %d.%d.%d.%d:%d is %u\n",
+ src->udp.sin_addr.S_un.S_un_b.s_b1, src->udp.sin_addr.S_un.S_un_b.s_b2,
+ src->udp.sin_addr.S_un.S_un_b.s_b3, src->udp.sin_addr.S_un.S_un_b.s_b4,
+ ntohs( src->udp.sin_port ), dgram_size );
+ if( dgram_size > max_datagram_size )
+ max_datagram_size = dgram_size;
+
+ udp_source_references[ udp_socket_count ] = src;
+ udp_socket_array[ udp_socket_count++ ] = sock;
+ }
+
+ if( 0 == udp_socket_count )
+ {
+ ret = TRUE;
+ goto done;
+ }
+
+ /* create events */
+ while( udp_event_count < udp_socket_count )
+ {
+ HANDLE evt = CreateEvent( NULL, FALSE, FALSE, NULL );
+ if( !evt )
+ {
+ ERR( "Cannot initialize source %s: CreateEvent error %lu\n",
+ udp_source_references[ udp_event_count ]->name, GetLastError() );
+ goto done;
+ }
+ udp_event_array[ udp_event_count++ ] = evt;
+ }
+ udp_listener_stop_event = CreateEvent( NULL, TRUE, FALSE, NULL );
+ if( !udp_listener_stop_event )
+ {
+ ERR( "Cannot create event; error %lu\n", GetLastError() );
+ goto done;
+ }
+ udp_event_array[ udp_event_count++ ] = udp_listener_stop_event;
+
+ /* bind events to sockets */
+ for( i = 0; i < udp_socket_count; i++ )
+ {
+ if( WSAEventSelect( udp_socket_array[ i ], udp_event_array[ i ], FD_READ ) )
+ {
+ ERR( "Cannot initialize source %s: WSAEventSelect() error %lu\n",
+ udp_source_references[ i ]->name, WSAGetLastError() );
+ goto done;
+ }
+ }
+
+ /* allocate datagram buffer */
+ datagram_buffer = g_malloc( max_datagram_size );
+
+ /* create thread */
+ udp_listener_thread_handle = (HANDLE) _beginthreadex( NULL, 0, udp_listener_thread, NULL, 0, &n );
+ if( !udp_listener_thread_handle )
+ {
+ ERR( "Cannot create thread; error %lu\n", GetLastError() );
+ goto done;
+ }
+
+ ret = TRUE;
+
+done:
+ if( !ret )
+ fini_udp_listener();
+
+ TRACE_LEAVE( "done; udp_socket_count=%d, udp_event_count=%d, max_datagram_size=%d, ret=%d\n",
+ udp_socket_count, udp_event_count, max_datagram_size, (int) ret );
+ return ret;
+}
diff --git a/doc/src/internals.xml b/doc/src/internals.xml
index d64bf87..dafa160 100644
--- a/doc/src/internals.xml
+++ b/doc/src/internals.xml
@@ -1,5 +1,5 @@
<para>
-There are three basic parts of daemon: listener, message processor and
+There are three basic parts of daemon: UDP listener, message processor and
message writer.
All these parts run in separate threads:
the listener receives messages as fast as possible and passes them to the
@@ -8,13 +8,13 @@ the message processor performs time-consuming tasks and
message writer performs asynchronous output to files.
</para>
<para>
-Datagrams are received by the listener.
+Datagrams are received by the UDP listener.
The listener emits raw messages (struct raw_message) which contain content
of datagram, sender address and reference to a source described
in configuration file.
</para>
<para>
-Raw messages are passed to the processing thread via queue.
+Raw messages are passed to the main processing thread via queue.
Message processing involves the following tasks:
<itemizedlist>
<listitem>
@@ -55,20 +55,29 @@ Log rotation is initiated at process startup or at writing thread startup.
Old log files are deleted by the purger which is launched at process startup
or by the writing thread after file is closed.
<screen>
-+--------+ raw message +-----+ +------+ message +-----------------+
-|listener|------------>|queue|--->|parser|-------->|charset converter|--->
-+--------+ +-----+ +------+ +-----------------+
+ mark message
++-------------------------------------------------+
+| |
+| +----------------+ +-----+ message +--------------+
++--->|internal sources|--->|queue|-------->| - - - - - |
+ | (log_internal) | +-----+ | \ |
+ +----------------+ | main loop \ |
+ | \| message
++------------+ raw message +-----+ | +------+ /|--------->
+|UDP listener|------------>|queue|-------->|-|parser|- - |
++------------+ +-----+ | +------+ |
+ +--------------+
- +------+ +-----------+ +-----------+
---->|filter|--->|multiplexer|-+->|destination|+
- +------+ +-----------+ +->+-----------+|+
- ^ +-> +-----------+|
- | +-----------+
- +-------+
- |logpath|+
- +-------+|+
- +-------+|
- +-------+
+ +-----------------+ +------+ +-----------+ +-----------+
+--->|charset converter|--->|filter|--->|multiplexer|-+->|destination|+
+ +-----------------+ +------+ +-----------+ +->+-----------+|+
+ ^ +-> +-----------+|
+ | +-----------+
+ +-------+
+ |logpath|+
+ +-------+|+
+ +-------+|
+ +-------+
file +-----------+ +-----+ +--------------+