aboutsummaryrefslogtreecommitdiffstats
path: root/daemon
diff options
context:
space:
mode:
authoryaworsky <yaworsky>2005-10-31 13:52:24 +0000
committeryaworsky <yaworsky>2005-10-31 13:52:24 +0000
commit74b7b6121179f8a82a2f96812c9a33e3f650eaed (patch)
treea051354cef72555130299fee3f17aa5948b6b3f4 /daemon
parent10379ee810238a765bf94a4ba48e4fff04fc7b6f (diff)
downloadsyslog-win32-74b7b6121179f8a82a2f96812c9a33e3f650eaed.tar.gz
syslog-win32-74b7b6121179f8a82a2f96812c9a33e3f650eaed.tar.xz
syslog-win32-74b7b6121179f8a82a2f96812c9a33e3f650eaed.zip
Refactored listener
Diffstat (limited to 'daemon')
-rw-r--r--daemon/Makefile.am5
-rw-r--r--daemon/listener.c335
-rw-r--r--daemon/syslogd.c429
-rw-r--r--daemon/syslogd.h29
-rw-r--r--daemon/udp_listener.c341
5 files changed, 680 insertions, 459 deletions
diff --git a/daemon/Makefile.am b/daemon/Makefile.am
index 06c1ccc..5e2a426 100644
--- a/daemon/Makefile.am
+++ b/daemon/Makefile.am
@@ -13,8 +13,9 @@ endif
AM_CPPFLAGS += -I. -I../include $(GLIB_CFLAGS)
bin_PROGRAMS = syslogd
-syslogd_SOURCES = conf.c dest_file.c dest_relay.c fifo.c listener.c logrotate.c \
- main.c names.c pathnames.c purger.c syslogd.c syslogd.h
+syslogd_SOURCES = conf.c dest_file.c dest_relay.c fifo.c logrotate.c \
+ main.c names.c pathnames.c purger.c syslogd.c syslogd.h \
+ udp_listener.c
syslogd_LDADD = -lws2_32 $(GLIB_LIBS)
endif
diff --git a/daemon/listener.c b/daemon/listener.c
deleted file mode 100644
index 32c6297..0000000
--- a/daemon/listener.c
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * listener.c - syslogd implementation for windows, listener for UDP
- * and "internal" sources
- *
- * Created by Alexander Yaworsky
- *
- * THIS SOFTWARE IS NOT COPYRIGHTED
- *
- * This source code is offered for use in the public domain. You may
- * use, modify or distribute it freely.
- *
- * This code is distributed in the hope that it will be useful but
- * WITHOUT ANY WARRANTY. ALL WARRANTIES, EXPRESS OR IMPLIED ARE HEREBY
- * DISCLAIMED. This includes but is not limited to warranties of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- */
-
-#include <stdio.h>
-#include <winsock2.h>
-
-#include <glib.h>
-
-#include <syslog.h>
-#include <syslogd.h>
-
-static SOCKET *socket_array = NULL;
-static int socket_count = 0;
-
-static struct source **source_references = NULL;
-
-static HANDLE *event_array = NULL;
-static int event_count = 0;
-
-/* message data */
-static unsigned max_datagram_size = 1024;
-static gchar *datagram_buffer = NULL;
-static struct raw_message message;
-
-static CRITICAL_SECTION cs_internal_message;
-static HANDLE internal_message_accepted = NULL;
-static gchar internal_message_buffer[ 1024 ];
-
-/******************************************************************************
- * init_listener
- *
- * create sockets and synchronization objects including ones for "internal"
- * source
- */
-gboolean init_listener()
-{
- gboolean ret = FALSE;
- unsigned n;
- GList *item;
- int i;
- struct source *internal_src;
-
- TRACE_ENTER( "\n" );
-
- /* create critical section and event for the access serialization to internal message buffer
- */
- InitializeCriticalSection( &cs_internal_message );
- internal_message_accepted = CreateEvent( NULL, FALSE, FALSE, NULL );
- if( !internal_message_accepted )
- {
- ERR( "Cannot create event; error %lu\n", GetLastError() );
- goto done;
- }
-
- /* allocate memory for sockets and events;
- * the number of sockets is not greater than number of sources
- */
- n = g_list_length( sources );
- socket_array = g_malloc( n * sizeof(SOCKET) );
-
- /* number of source references is greater by one because of inclusion the event
- * for "internal" source
- * FIXME: how about multiple internal sources?
- */
- source_references = g_malloc( (n + 1) * sizeof(struct source*) );
-
- /* number of events is greater by two because of inclusion the event
- * for "internal" source and the service_stop_event
- */
- event_array = g_malloc( (n + 2) * sizeof(HANDLE) );
-
- /* create sockets */
- for( item = sources; item; item = item->next )
- {
- struct source *src = item->data;
- SOCKET sock;
- unsigned dgram_size;
- int size;
-
- if( src->type == ST_INTERNAL )
- {
- internal_src = src;
- continue;
- }
-
- if( src->type != ST_UDP )
- continue;
-
- sock = socket( AF_INET, SOCK_DGRAM, 0 );
- if( INVALID_SOCKET == sock )
- {
- ERR( "socket() error %lu\n", WSAGetLastError() );
- goto done;
- }
-
- if( bind( sock, (struct sockaddr*) &src->udp, sizeof(src->udp) ) )
- {
- ERR( "bind() error %lu\n", WSAGetLastError() );
- closesocket( sock );
- goto done;
- }
-
- size = sizeof(dgram_size);
- if( getsockopt( sock, SOL_SOCKET, SO_MAX_MSG_SIZE, (char*) &dgram_size, &size ) )
- {
- ERR( "getsockopt( SO_MAX_MSG_SIZE ) error %lu\n", WSAGetLastError() );
- closesocket( sock );
- goto done;
- }
- TRACE( "datagram size for %d.%d.%d.%d:%d is %u\n",
- src->udp.sin_addr.S_un.S_un_b.s_b1, src->udp.sin_addr.S_un.S_un_b.s_b2,
- src->udp.sin_addr.S_un.S_un_b.s_b3, src->udp.sin_addr.S_un.S_un_b.s_b4,
- ntohs( src->udp.sin_port ), dgram_size );
- if( dgram_size > max_datagram_size )
- max_datagram_size = dgram_size;
-
- source_references[ socket_count ] = src;
- socket_array[ socket_count++ ] = sock;
- }
- source_references[ socket_count ] = internal_src;
-
- /* create events;
- * service_stop_event is added to the array
- */
- while( event_count <= socket_count )
- {
- HANDLE evt = CreateEvent( NULL, FALSE, FALSE, NULL );
- if( !evt )
- {
- ERR( "Cannot create event; error %lu\n", GetLastError() );
- goto done;
- }
- event_array[ event_count++ ] = evt;
- }
- event_array[ event_count++ ] = service_stop_event;
-
- /* bind events to sockets */
- for( i = 0; i < socket_count; i++ )
- {
- if( WSAEventSelect( socket_array[ i ], event_array[ i ], FD_READ ) )
- {
- ERR( "WSAEventSelect() error %lu\n", WSAGetLastError() );
- goto done;
- }
- }
-
- /* allocate datagram buffer */
- datagram_buffer = g_malloc( max_datagram_size );
-
- ret = TRUE;
-
-done:
- if( !ret )
- fini_listener();
-
- TRACE_LEAVE( "done; socket_count=%d, event_count=%d, max_datagram_size=%d, ret=%d\n",
- socket_count, event_count, max_datagram_size, (int) ret );
- return ret;
-}
-
-/******************************************************************************
- * fini_listener
- */
-void fini_listener()
-{
- int i;
-
- TRACE_ENTER( "\n" );
-
- for( i = 0; i < socket_count; i++ )
- closesocket( socket_array[ i ] );
- g_free( socket_array );
- socket_array = NULL;
- socket_count = 0;
-
- g_free( source_references );
- source_references = NULL;
-
- /* note that the last event is the service_stop_event
- * and should not be destroyed
- */
- for( i = 0; i < event_count - 1; i++ )
- CloseHandle( event_array[ i ] );
- g_free( event_array );
- event_array = NULL;
- event_count = 0;
-
- g_free( datagram_buffer );
- datagram_buffer = NULL;
-
- if( internal_message_accepted )
- {
- CloseHandle( internal_message_accepted );
- internal_message_accepted = NULL;
- }
- DeleteCriticalSection( &cs_internal_message );
- TRACE_LEAVE( "done\n" );
-}
-
-/******************************************************************************
- * listener
- *
- * wait for a message; generate mark message;
- * allocates a new raw_message structure and assigns its pointer to *msg
- */
-enum listener_status listener( struct raw_message** msg )
-{
- enum listener_status ret = LSNR_ERROR;
- DWORD t, w;
- int r;
- int addrlen;
-
- TRACE_ENTER( "\n" );
-
- for(;;)
- {
- if( !mark_interval )
- t = INFINITE;
- else
- t = mark_interval * 1000;
- w = WaitForMultipleObjects( event_count, event_array, FALSE, t );
- if( WAIT_TIMEOUT == w )
- {
- /* issue mark message */
- log_internal( LOG_NOTICE, "%s", mark_message );
- continue;
- }
- if( WAIT_FAILED == w )
- {
- ERR( "Wait error %lu\n", GetLastError() );
- goto done;
- }
- if( w >= event_count )
- {
- ERR( "Unknown wait error\n" );
- goto done;
- }
- if( w == event_count - 1 )
- {
- /* shut down */
- ret = LSNR_SHUTDOWN;
- goto done;
- }
- if( w == event_count - 2 )
- {
- /* got "internal" message */
- message.source = source_references[ socket_count ];
- if( !message.source )
- {
- /* internal source is not defined, cannot handle message */
- SetEvent( internal_message_accepted );
- continue;
- }
- message.msg = g_strdup( internal_message_buffer );
- SetEvent( internal_message_accepted );
- memset( &message.sender_addr, 0, sizeof(message.sender_addr) );
- goto alloc_msg;
- }
- /* got UDP message, read it */
- addrlen = sizeof(message.sender_addr);
- r = recvfrom( socket_array[ w ], datagram_buffer, max_datagram_size,
- 0, (struct sockaddr*) &message.sender_addr, &addrlen );
- if( r < 0 )
- {
- ERR( "recvfrom() error %lu\n", WSAGetLastError() );
- goto done;
- }
- if( !r )
- continue;
-
- message.msg = g_strndup( datagram_buffer, r );
- message.source = source_references[ w ];
-
- alloc_msg:
- *msg = g_malloc( sizeof(struct raw_message) );
- memcpy( *msg, &message, sizeof(struct raw_message) );
-
- ret = LSNR_GOT_MESSAGE;
- goto done;
- }
-
-done:
- TRACE_LEAVE( "done; ret=%d\n", (int) ret );
- return ret;
-}
-
-/******************************************************************************
- * log_internal
- *
- * generate internal log message
- */
-void log_internal( int pri, char* fmt, ... )
-{
- va_list args;
- SYSTEMTIME stm;
- int len;
- char *p;
-
- TRACE_ENTER( "\n" );
- EnterCriticalSection( &cs_internal_message );
-
- GetLocalTime( &stm );
- len = sprintf( internal_message_buffer, "<%d>%s %2d %02d:%02d:%02d %s syslog: ",
- LOG_SYSLOG | pri,
- str_month[ stm.wMonth - 1 ], stm.wDay, stm.wHour, stm.wMinute, stm.wSecond,
- local_hostname );
- va_start( args, fmt );
- vsnprintf( internal_message_buffer + len, sizeof(internal_message_buffer) - len, fmt, args );
- va_end( args );
- p = strchr( internal_message_buffer, '\n' );
- if( p )
- *p = 0;
- p = strchr( internal_message_buffer, '\r' );
- if( p )
- *p = 0;
-
- SetEvent( event_array[ event_count - 2 ] );
- LeaveCriticalSection( &cs_internal_message );
- TRACE_LEAVE( "done\n" );
-}
diff --git a/daemon/syslogd.c b/daemon/syslogd.c
index 4c64a83..af50793 100644
--- a/daemon/syslogd.c
+++ b/daemon/syslogd.c
@@ -16,7 +16,6 @@
*/
#include <ctype.h>
-#include <process.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -28,8 +27,11 @@
#include <syslog.h>
#include <syslogd.h>
-static struct fifo *raw_message_queue = NULL;
-static HANDLE fifo_semaphore = NULL;
+/* internal source data */
+static struct fifo *internal_message_queue = NULL;
+static HANDLE internal_queue_semaphore = NULL;
+static int internal_source_count = 0;
+static struct source** internal_source_references = NULL;
/* cache for source hostnames */
struct hostname
@@ -49,6 +51,70 @@ char *str_month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
/******************************************************************************
+ * create_message
+ *
+ * Create a new message with refcount=1.
+ * The caller should allocate sender, hostname, program and message strings.
+ * The function simply copies these pointers to the structure and the caller
+ * should not free allocated strings.
+ */
+struct message* create_message( struct source* source,
+ gchar* sender,
+ int facility, int priority,
+ LPSYSTEMTIME timestamp,
+ gchar* hostname,
+ gchar* program,
+ gchar* message )
+{
+ struct message *msg;
+
+ TRACE_ENTER( "\n" );
+
+ msg = g_malloc( sizeof(struct message) );
+ msg->refcount = 1;
+ msg->source = source;
+ msg->sender = sender;
+ msg->facility = facility;
+ msg->priority = priority;
+ msg->timestamp = g_strdup_printf( "%s %2d %02d:%02d:%02d",
+ str_month[ timestamp->wMonth - 1 ],
+ timestamp->wDay, timestamp->wHour,
+ timestamp->wMinute, timestamp->wSecond );
+ msg->hostname = hostname;
+ msg->program = program;
+ msg->message = message;
+
+ TRACE_LEAVE( "message=%p\n", msg );
+ return msg;
+}
+
+/******************************************************************************
+ * duplicate_message
+ *
+ * Make a copy of message.
+ */
+struct message* duplicate_message( struct message* msg )
+{
+ struct message *new_msg;
+
+ TRACE_ENTER( "message=%p\n", msg );
+
+ new_msg = g_malloc( sizeof(struct message) );
+ new_msg->refcount = 1;
+ new_msg->source = msg->source;
+ new_msg->sender = g_strdup( msg->sender );
+ new_msg->facility = msg->facility;
+ new_msg->priority = msg->priority;
+ new_msg->timestamp = g_strdup( msg->timestamp );
+ new_msg->hostname = g_strdup( msg->hostname );
+ new_msg->program = g_strdup( msg->program );
+ new_msg->message = g_strdup( msg->message );
+
+ TRACE_LEAVE( "new message=%p\n", new_msg );
+ return new_msg;
+}
+
+/******************************************************************************
* refrence_message
*
* increment reference count
@@ -322,13 +388,11 @@ no_pri:
* parse_timestamp
*
* parse TIMESTAMP part of message;
- * allocate string;
* return pointer to the next char after TIMESTAMP
*/
-static gchar* parse_timestamp( gchar* msg, gchar** timestamp )
+static gchar* parse_timestamp( gchar* msg, LPSYSTEMTIME timestamp )
{
int i;
- SYSTEMTIME stm;
TRACE_ENTER( "\n" );
@@ -337,7 +401,7 @@ static gchar* parse_timestamp( gchar* msg, gchar** timestamp )
break;
if( i == 12 )
goto no_timestamp;
- stm.wMonth = i + 1;
+ timestamp->wMonth = i + 1;
if( msg[3] != ' ' )
goto no_timestamp;
@@ -346,13 +410,13 @@ static gchar* parse_timestamp( gchar* msg, gchar** timestamp )
{
if( (!isdigit( msg[4] )) || (!isdigit( msg[5] )) )
goto no_timestamp;
- stm.wDay = (msg[4] - '0') * 10 + (msg[5] - '0');
+ timestamp->wDay = (msg[4] - '0') * 10 + (msg[5] - '0');
}
else
{
if( !isdigit( msg[5] ) )
goto no_timestamp;
- stm.wDay = msg[5] - '0';
+ timestamp->wDay = msg[5] - '0';
}
if( msg[6] != ' ' )
@@ -360,26 +424,23 @@ static gchar* parse_timestamp( gchar* msg, gchar** timestamp )
if( (!isdigit( msg[7] )) || (!isdigit( msg[8] )) || msg[9] != ':' )
goto no_timestamp;
- stm.wHour = (msg[7] - '0') * 10 + (msg[8] - '0');
+ timestamp->wHour = (msg[7] - '0') * 10 + (msg[8] - '0');
if( (!isdigit( msg[10] )) || (!isdigit( msg[11] )) || msg[12] != ':' )
goto no_timestamp;
- stm.wMinute = (msg[10] - '0') * 10 + (msg[11] - '0');
+ timestamp->wMinute = (msg[10] - '0') * 10 + (msg[11] - '0');
if( (!isdigit( msg[13] )) || (!isdigit( msg[14] )) || msg[15] != ' ' )
goto no_timestamp;
- stm.wSecond = (msg[13] - '0') * 10 + (msg[14] - '0');
+ timestamp->wSecond = (msg[13] - '0') * 10 + (msg[14] - '0');
msg += 16;
goto done;
no_timestamp:
TRACE_2( "no timestamp\n" );
- GetLocalTime( &stm );
+ GetLocalTime( timestamp );
done:
- *timestamp = g_strdup_printf( "%s %2d %02d:%02d:%02d",
- str_month[ stm.wMonth - 1 ],
- stm.wDay, stm.wHour, stm.wMinute, stm.wSecond );
TRACE_LEAVE( "done\n" );
return msg;
}
@@ -392,30 +453,28 @@ done:
*/
static struct message* parse_raw_message( struct raw_message* raw_msg )
{
- struct message* msg;
gchar *current_part, *next_part;
+ gchar *sender, *hostname, *program, *message;
+ int facility, priority;
+ SYSTEMTIME timestamp;
+ struct message *msg;
TRACE_ENTER( "raw message=%p\n", raw_msg );
- /* allocate and initialize message structure */
- msg = g_malloc( sizeof(struct message) );
- msg->refcount = 1;
- msg->source = raw_msg->source;
-
/* get sender's hostname */
- msg->sender = get_hostname( &raw_msg->sender_addr );
+ sender = get_hostname( &raw_msg->sender_addr );
current_part = raw_msg->msg;
- next_part = parse_PRI( current_part, &msg->facility, &msg->priority );
+ next_part = parse_PRI( current_part, &facility, &priority );
current_part = next_part;
- next_part = parse_timestamp( current_part, &msg->timestamp );
+ next_part = parse_timestamp( current_part, &timestamp );
if( next_part == current_part )
{
/* no valid timestamp */
TRACE_2( "no valid timestamp: msg=%s\n", current_part );
- msg->hostname = g_strdup( msg->sender );
- msg->message = g_strdup( current_part );
+ hostname = g_strdup( sender );
+ message = g_strdup( current_part );
}
else
{
@@ -426,29 +485,33 @@ static struct message* parse_raw_message( struct raw_message* raw_msg )
if( *next_part != ' ' )
{
/* invalid hostname */
- msg->hostname = g_strdup( msg->sender );
- msg->message = g_strdup( current_part );
- TRACE_2( "invalid hostname; set sender (%s); msg=%s\n", msg->hostname, msg->message );
+ hostname = g_strdup( sender );
+ message = g_strdup( current_part );
+ TRACE_2( "invalid hostname; set sender (%s); msg=%s\n", hostname, message );
}
else
{
- msg->hostname = g_strndup( current_part, next_part - current_part );
+ hostname = g_strndup( current_part, next_part - current_part );
while( *next_part == ' ' && *next_part != 0 )
next_part++;
- msg->message = g_strdup( next_part );
- TRACE_2( "hostname=%s; msg=%s\n", msg->hostname, msg->message );
+ message = g_strdup( next_part );
+ TRACE_2( "hostname=%s; msg=%s\n", hostname, message );
}
}
/* try to find program name */
- current_part = msg->message;
+ current_part = message;
next_part = current_part;
while( *next_part != ' ' && *next_part != ':' && *next_part != '[' && *next_part != 0 )
next_part++;
if( *next_part == ' ' || *next_part == 0 )
- msg->program = g_strdup("");
+ program = g_strdup("");
else
- msg->program = g_strndup( current_part, next_part - current_part );
+ program = g_strndup( current_part, next_part - current_part );
+
+ /* create message */
+ msg = create_message( raw_msg->source, sender, facility, priority,
+ &timestamp, hostname, program, message );
/* destroy raw message */
g_free( raw_msg->msg );
@@ -459,39 +522,151 @@ static struct message* parse_raw_message( struct raw_message* raw_msg )
}
/******************************************************************************
- * message_processor
+ * number_of_sources
*
- * main function; extract raw messages from queue and parse them
+ * return the number of sources of the specified type
*/
-static unsigned __stdcall message_processor( void* arg )
+unsigned number_of_sources( enum source_type type )
{
- HANDLE wait_handles[2] = { fifo_semaphore, service_stop_event };
+ unsigned ret = 0;
+ GList *item;
- TRACE_ENTER( "\n" );
- for(;;)
+ for( item = sources; item; item = item->next )
{
- DWORD w;
- struct raw_message *raw_msg;
+ struct source *src = item->data;
- w = WaitForMultipleObjects( 2, wait_handles, FALSE, INFINITE );
- if( WAIT_OBJECT_0 + 1 == w )
- /* shutdown */
- break;
- if( w != WAIT_OBJECT_0 )
- {
- ERR( "WaitForMultipleObjects() error %lu\n", GetLastError() );
- SetEvent( service_stop_event );
+ if( src->type == type )
+ ret++;
+ }
+ return ret;
+}
+
+/******************************************************************************
+ * log_internal
+ *
+ * Generate internal log message.
+ * This function should be called only from the main thread because there's no
+ * access serialization to the writing end of the queue.
+ */
+void log_internal( int pri, char* fmt, ... )
+{
+ gchar *sender, *hostname, *program, *msg;
+ va_list args;
+ SYSTEMTIME stm;
+ int i;
+ struct message *message;
+
+ TRACE_ENTER( "\n" );
+
+ if( 0 == internal_source_count )
+ goto done;
+
+ sender = g_strdup( local_hostname );
+ hostname = g_strdup( local_hostname );
+ program = g_strdup( "syslog" );
+
+ va_start( args, fmt );
+ msg = g_strdup_vprintf( fmt, args );
+ va_end( args );
+
+ GetLocalTime( &stm );
+
+ message = create_message( internal_source_references[0],
+ sender,
+ LOG_FAC( LOG_SYSLOG ), LOG_PRI( pri ),
+ &stm,
+ hostname, program, msg );
+ for( i = 1;; i++ )
+ {
+ fifo_push( internal_message_queue, message );
+ ReleaseSemaphore( internal_queue_semaphore, 1, NULL );
+ if( i == internal_source_count )
break;
- }
- /* extract raw message from queue */
- raw_msg = fifo_pop( raw_message_queue );
+ message = duplicate_message( message );
+ message->source = internal_source_references[ i ];
+ }
- TRACE_2( "got message %p from queue\n", raw_msg );
+done:
+ TRACE_LEAVE( "done\n" );
+}
+
+/******************************************************************************
+ * shutdown_internal_sources
+ *
+ * dispose all data except message queue and semaphore
+ */
+static void shutdown_internal_sources()
+{
+ TRACE_ENTER( "\n" );
+
+ internal_source_count = 0;
+
+ if( internal_source_references )
+ g_free( internal_source_references );
- mux_message( parse_raw_message( raw_msg ) );
- }
TRACE_LEAVE( "done\n" );
- return 0;
+}
+
+/******************************************************************************
+ * fini_internal_sources
+ */
+static void fini_internal_sources()
+{
+ TRACE_ENTER( "\n" );
+
+ shutdown_internal_sources();
+
+ if( internal_message_queue )
+ fifo_destroy( internal_message_queue );
+
+ if( internal_queue_semaphore )
+ CloseHandle( internal_queue_semaphore );
+
+ TRACE_LEAVE( "done\n" );
+}
+
+/******************************************************************************
+ * init_internal_sources
+ */
+static gboolean init_internal_sources()
+{
+ gboolean ret = FALSE;
+ GList *item;
+ int i;
+
+ TRACE_ENTER( "\n" );
+
+ internal_message_queue = fifo_create();
+ internal_queue_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL );
+ if( !internal_queue_semaphore )
+ {
+ ERR( "Cannot create semaphore; error %lu\n", GetLastError() );
+ goto done;
+ }
+
+ internal_source_count = (int) number_of_sources( ST_INTERNAL );
+ if( 0 == internal_source_count )
+ {
+ ret = TRUE;
+ goto done;
+ }
+
+ internal_source_references = g_malloc( internal_source_count * sizeof(struct source*) );
+
+ for( i = 0, item = sources; item; item = item->next )
+ {
+ struct source *src = item->data;
+ if( ST_INTERNAL == src->type )
+ internal_source_references[ i++ ] = src;
+ }
+ ret = TRUE;
+
+done:
+ if( !ret )
+ fini_internal_sources();
+
+ TRACE_LEAVE( "done; internal_sources=%d, ret=%d\n", internal_source_count, (int) ret );
+ return ret;
}
/******************************************************************************
@@ -502,34 +677,38 @@ static unsigned __stdcall message_processor( void* arg )
static void fini_destinations()
{
GList *dest;
+
+ TRACE_ENTER( "\n" );
+
for( dest = destinations; dest; dest = dest->next )
- ((struct destination*) dest)->fini( (struct destination*) dest );
+ {
+ struct destination *d = dest->data;
+ d->fini( d );
+ }
+
+ TRACE_LEAVE( "done\n" );
}
/******************************************************************************
* main syslogd function
- *
- * global initialization; invoke listener
*/
void syslogd_main()
{
- HANDLE message_processor_thread = NULL;
- unsigned tid;
-
TRACE_ENTER( "\n" );
- SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL );
-
if( !read_configuration() )
goto done;
if( !init_purger() )
goto done;
- if( !init_listener() )
+ purge_log_dirs();
+
+ if( !init_internal_sources() )
goto done;
- purge_log_dirs();
+ if( !init_udp_listener() )
+ goto done;
if( source_encoding && destination_encoding )
{
@@ -541,47 +720,85 @@ void syslogd_main()
}
}
- /* create message queue and semaphore;
- * avoid using Glib's asynchronous queues and threading support
- * because synchronization capabilities are very limited;
- */
- raw_message_queue = fifo_create();
- fifo_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL );
- if( !fifo_semaphore )
- {
- ERR( "Cannot create semaphore; error %lu\n", GetLastError() );
- goto done;
- }
+ log_internal( LOG_NOTICE, "Syslog daemon started" );
- message_processor_thread = (HANDLE) _beginthreadex( NULL, 0, message_processor, NULL, 0, &tid );
- if( !message_processor_thread )
+ /* get messages from queues */
+ for(;;)
{
- ERR( "Cannot create thread; error %lu\n", GetLastError() );
- goto done;
+ HANDLE wait_handles[3] = { udp_queue_semaphore,
+ internal_queue_semaphore,
+ service_stop_event };
+ DWORD t;
+ struct raw_message *raw_msg;
+ struct message *msg;
+
+ if( !mark_interval )
+ t = INFINITE;
+ else
+ t = mark_interval * 1000;
+
+ switch( WaitForMultipleObjects( 3, wait_handles, FALSE, t ) )
+ {
+ case WAIT_OBJECT_0:
+ raw_msg = fifo_pop( udp_message_queue );
+ TRACE_2( "got raw message %p from UDP listener\n", raw_msg );
+ mux_message( parse_raw_message( raw_msg ) );
+ break;
+
+ case WAIT_OBJECT_0 + 1:
+ msg = fifo_pop( internal_message_queue );
+ TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name );
+ mux_message( msg );
+ break;
+
+ case WAIT_OBJECT_0 + 2:
+ goto shutdown;
+
+ case WAIT_TIMEOUT:
+ /* issue mark message */
+ log_internal( LOG_NOTICE, "%s", mark_message );
+ break;
+
+ default:
+ ERR( "WaitForMultipleObjects() error %lu\n", GetLastError() );
+ SetEvent( service_stop_event );
+ goto shutdown;
+ }
}
- /* get messages from the listener */
+shutdown:
+ log_internal( LOG_NOTICE, "Syslog daemon is shutting down" );
+ shutdown_udp_listener();
+ shutdown_internal_sources();
+
+ /* flush queues */
for(;;)
{
+ HANDLE wait_handles[2] = { udp_queue_semaphore,
+ internal_queue_semaphore };
struct raw_message *raw_msg;
+ struct message *msg;
- switch( listener( &raw_msg ) )
+ switch( WaitForMultipleObjects( 2, wait_handles, FALSE, 0 ) )
{
- case LSNR_ERROR:
- case LSNR_SHUTDOWN:
- goto done;
+ case WAIT_OBJECT_0:
+ raw_msg = fifo_pop( udp_message_queue );
+ TRACE_2( "got raw message %p from UDP listener\n", raw_msg );
+ mux_message( parse_raw_message( raw_msg ) );
+ break;
- case LSNR_GOT_MESSAGE:
- TRACE_2( "got message from %d.%d.%d.%d; source name %s; ptr=%p: %s\n",
- raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b1,
- raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b2,
- raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b3,
- raw_msg->sender_addr.sin_addr.S_un.S_un_b.s_b4,
- raw_msg->source->name, raw_msg, raw_msg->msg );
- /* add raw message to queue */
- fifo_push( raw_message_queue, raw_msg );
- ReleaseSemaphore( fifo_semaphore, 1, NULL );
+ case WAIT_OBJECT_0 + 1:
+ msg = fifo_pop( internal_message_queue );
+ TRACE_2( "got message %p from internal source %s\n", msg, msg->source->name );
+ mux_message( msg );
break;
+
+ case WAIT_TIMEOUT:
+ goto done;
+
+ default:
+ ERR( "WaitForMultipleObjects() error %lu\n", GetLastError() );
+ goto done;
}
}
@@ -589,24 +806,14 @@ done:
/* signal to all possibly running threads */
SetEvent( service_stop_event );
- if( message_processor_thread )
- {
- /* wait for message processor shutdown */
- WaitForSingleObject( message_processor_thread, INFINITE );
- CloseHandle( message_processor_thread );
- }
-
+ fini_udp_listener();
+ fini_internal_sources();
fini_destinations();
fini_purger();
free_hostnames();
- if( raw_message_queue ) fifo_destroy( raw_message_queue );
if( conversion_descriptor != (GIConv) -1 )
g_iconv_close( conversion_descriptor );
- fini_listener();
-
- if( fifo_semaphore ) CloseHandle( fifo_semaphore );
-
TRACE_LEAVE( "done\n" );
}
diff --git a/daemon/syslogd.h b/daemon/syslogd.h
index 20dfa00..f061f7b 100644
--- a/daemon/syslogd.h
+++ b/daemon/syslogd.h
@@ -42,6 +42,8 @@ extern char *str_month[];
extern void syslogd_main();
+extern void log_internal( int pri, char* fmt, ... );
+
/* options and their default values */
extern gboolean use_dns;
extern gchar *source_encoding;
@@ -52,13 +54,6 @@ extern int hold;
extern gchar *logdir;
/* listener */
-enum listener_status
-{
- LSNR_ERROR,
- LSNR_SHUTDOWN,
- LSNR_GOT_MESSAGE
-};
-
struct raw_message
{
gchar *msg;
@@ -66,10 +61,12 @@ struct raw_message
struct source *source;
};
-extern gboolean init_listener();
-extern void fini_listener();
-extern enum listener_status listener( struct raw_message** msg );
-extern void log_internal( int pri, char* fmt, ... );
+extern struct fifo *udp_message_queue;
+extern HANDLE udp_queue_semaphore;
+
+extern gboolean init_udp_listener();
+extern void shutdown_udp_listener();
+extern void fini_udp_listener();
/* message */
struct message
@@ -85,6 +82,14 @@ struct message
gchar *message;
};
+extern struct message* create_message( struct source* source,
+ gchar* sender,
+ int facility, int priority,
+ LPSYSTEMTIME timestamp,
+ gchar* hostname,
+ gchar* program,
+ gchar* message );
+extern struct message* duplicate_message( struct message* msg );
extern void reference_message( struct message* msg );
extern void release_message( struct message* msg );
@@ -103,6 +108,8 @@ struct source
struct sockaddr_in udp;
};
+extern unsigned number_of_sources( enum source_type type );
+
enum destination_type
{
DT_UNDEFINED,
diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c
new file mode 100644
index 0000000..c039049
--- /dev/null
+++ b/daemon/udp_listener.c
@@ -0,0 +1,341 @@
+/*
+ * udp_listener.c - syslogd implementation for windows, UDP listener
+ *
+ * Created by Alexander Yaworsky
+ *
+ * THIS SOFTWARE IS NOT COPYRIGHTED
+ *
+ * This source code is offered for use in the public domain. You may
+ * use, modify or distribute it freely.
+ *
+ * This code is distributed in the hope that it will be useful but
+ * WITHOUT ANY WARRANTY. ALL WARRANTIES, EXPRESS OR IMPLIED ARE HEREBY
+ * DISCLAIMED. This includes but is not limited to warranties of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ */
+
+#include <process.h>
+#include <stdio.h>
+#include <winsock2.h>
+
+#include <glib.h>
+
+#include <syslog.h>
+#include <syslogd.h>
+
+/* udp listener data */
+static SOCKET *udp_socket_array = NULL;
+static int udp_socket_count = 0;
+
+static struct source **udp_source_references = NULL;
+
+static HANDLE *udp_event_array = NULL;
+static int udp_event_count = 0;
+static HANDLE udp_listener_stop_event = NULL;
+
+static unsigned max_datagram_size = 1024;
+static gchar *datagram_buffer = NULL;
+
+static HANDLE udp_listener_thread_handle = NULL;
+
+struct fifo *udp_message_queue = NULL;
+HANDLE udp_queue_semaphore = NULL;
+
+/******************************************************************************
+ * udp_listener_thread
+ */
+static unsigned __stdcall udp_listener_thread( void* arg )
+{
+ TRACE_ENTER( "\n" );
+
+ SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL );
+
+ for(;;)
+ {
+ DWORD w;
+ int r;
+ int addrlen;
+ struct raw_message message;
+ struct raw_message *msg;
+
+ w = WaitForMultipleObjects( udp_event_count, udp_event_array, FALSE, INFINITE );
+ if( WAIT_FAILED == w )
+ {
+ ERR( "Wait error %lu\n", GetLastError() );
+ goto done;
+ }
+ if( w >= udp_event_count )
+ {
+ ERR( "Unknown wait error\n" );
+ goto done;
+ }
+ if( w == udp_event_count - 1 )
+ {
+ TRACE_2( "shutdown\n" );
+ goto done;
+ }
+ /* got UDP message, read it */
+ addrlen = sizeof(message.sender_addr);
+ r = recvfrom( udp_socket_array[ w ], datagram_buffer, max_datagram_size,
+ 0, (struct sockaddr*) &message.sender_addr, &addrlen );
+ if( r < 0 )
+ {
+ ERR( "recvfrom() error %lu\n", WSAGetLastError() );
+ goto done;
+ }
+ if( !r )
+ {
+ TRACE_2( "empty datagram\n" );
+ continue;
+ }
+ message.msg = g_strndup( datagram_buffer, r );
+ message.source = udp_source_references[ w ];
+
+ TRACE_2( "got message from %d.%d.%d.%d; source name %s; %s\n",
+ message.sender_addr.sin_addr.S_un.S_un_b.s_b1,
+ message.sender_addr.sin_addr.S_un.S_un_b.s_b2,
+ message.sender_addr.sin_addr.S_un.S_un_b.s_b3,
+ message.sender_addr.sin_addr.S_un.S_un_b.s_b4,
+ message.source->name, message.msg );
+
+ /* allocate raw message and add it to the queue */
+ msg = g_malloc( sizeof(struct raw_message) );
+ memcpy( msg, &message, sizeof(struct raw_message) );
+ fifo_push( udp_message_queue, msg );
+ ReleaseSemaphore( udp_queue_semaphore, 1, NULL );
+ }
+
+done:
+ TRACE_LEAVE( "done\n" );
+ return 0;
+}
+
+/******************************************************************************
+ * shutdown_udp_listener
+ *
+ * stop listening thread and dispose all objects
+ * except message queue and semaphore
+ */
+void shutdown_udp_listener()
+{
+ TRACE_ENTER( "\n" );
+
+ if( udp_listener_thread_handle )
+ {
+ SetEvent( udp_listener_stop_event );
+ TRACE_2( "wait for shutdown of udp listener thread\n" );
+ WaitForSingleObject( udp_listener_thread_handle, INFINITE );
+ CloseHandle( udp_listener_thread_handle );
+ udp_listener_thread_handle = NULL;
+ }
+
+ if( udp_socket_array )
+ {
+ int i;
+
+ for( i = 0; i < udp_socket_count; i++ )
+ closesocket( udp_socket_array[ i ] );
+ g_free( udp_socket_array );
+ udp_socket_array = NULL;
+ udp_socket_count = 0;
+ }
+
+ if( udp_source_references )
+ {
+ g_free( udp_source_references );
+ udp_source_references = NULL;
+ }
+
+ if( udp_event_array )
+ {
+ int i;
+
+ for( i = 0; i < udp_event_count; i++ )
+ CloseHandle( udp_event_array[ i ] );
+ g_free( udp_event_array );
+ udp_event_array = NULL;
+ udp_event_count = 0;
+ udp_listener_stop_event = NULL;
+ }
+
+ if( datagram_buffer )
+ {
+ g_free( datagram_buffer );
+ datagram_buffer = NULL;
+ }
+ max_datagram_size = 1024;
+
+ TRACE_LEAVE( "done\n" );
+}
+
+/******************************************************************************
+ * fini_udp_listener
+ */
+void fini_udp_listener()
+{
+ TRACE_ENTER( "\n" );
+
+ shutdown_udp_listener();
+
+ if( udp_message_queue )
+ {
+ fifo_destroy( udp_message_queue );
+ udp_message_queue = NULL;
+ }
+
+ if( udp_queue_semaphore )
+ {
+ CloseHandle( udp_queue_semaphore );
+ udp_queue_semaphore = NULL;
+ }
+
+ TRACE_LEAVE( "done\n" );
+}
+
+/******************************************************************************
+ * init_udp_listener
+ *
+ * create sockets, queue and thread
+ */
+gboolean init_udp_listener()
+{
+ gboolean ret = FALSE;
+ unsigned n;
+ GList *item;
+ int i;
+
+ TRACE_ENTER( "\n" );
+
+ /* create message queue and semaphore;
+ * we should do this first because of possible early returns
+ * from this function when no sources are defined
+ */
+ udp_message_queue = fifo_create();
+ udp_queue_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL );
+ if( !udp_queue_semaphore )
+ {
+ ERR( "Cannot create semaphore; error %lu\n", GetLastError() );
+ goto done;
+ }
+
+ n = number_of_sources( ST_UDP );
+ if( 0 == n )
+ {
+ ret = TRUE;
+ goto done;
+ }
+
+ /* allocate memory for sockets, source references and events;
+ * the number of sockets is not greater than number of sources
+ */
+ udp_socket_array = g_malloc( n * sizeof(SOCKET) );
+ udp_source_references = g_malloc( n * sizeof(struct source*) );
+
+ /* number of events is greater by one because of
+ * included udp_listener_stop_event
+ */
+ udp_event_array = g_malloc( (n + 1) * sizeof(HANDLE) );
+
+ /* create sockets */
+ for( item = sources; item; item = item->next )
+ {
+ struct source *src = item->data;
+ SOCKET sock;
+ unsigned dgram_size;
+ int size;
+
+ if( src->type != ST_UDP )
+ continue;
+
+ sock = socket( AF_INET, SOCK_DGRAM, 0 );
+ if( INVALID_SOCKET == sock )
+ {
+ ERR( "Cannot create source %s: socket() error %lu\n", src->name, WSAGetLastError() );
+ continue;
+ }
+
+ if( bind( sock, (struct sockaddr*) &src->udp, sizeof(src->udp) ) )
+ {
+ ERR( "Cannot create source %s: bind() error %lu\n", src->name, WSAGetLastError() );
+ closesocket( sock );
+ continue;
+ }
+
+ size = sizeof(dgram_size);
+ if( getsockopt( sock, SOL_SOCKET, SO_MAX_MSG_SIZE, (char*) &dgram_size, &size ) )
+ {
+ ERR( "Cannot create source %s: getsockopt( SO_MAX_MSG_SIZE ) error %lu\n",
+ src->name, WSAGetLastError() );
+ closesocket( sock );
+ continue;
+ }
+ TRACE( "datagram size for %d.%d.%d.%d:%d is %u\n",
+ src->udp.sin_addr.S_un.S_un_b.s_b1, src->udp.sin_addr.S_un.S_un_b.s_b2,
+ src->udp.sin_addr.S_un.S_un_b.s_b3, src->udp.sin_addr.S_un.S_un_b.s_b4,
+ ntohs( src->udp.sin_port ), dgram_size );
+ if( dgram_size > max_datagram_size )
+ max_datagram_size = dgram_size;
+
+ udp_source_references[ udp_socket_count ] = src;
+ udp_socket_array[ udp_socket_count++ ] = sock;
+ }
+
+ if( 0 == udp_socket_count )
+ {
+ ret = TRUE;
+ goto done;
+ }
+
+ /* create events */
+ while( udp_event_count < udp_socket_count )
+ {
+ HANDLE evt = CreateEvent( NULL, FALSE, FALSE, NULL );
+ if( !evt )
+ {
+ ERR( "Cannot initialize source %s: CreateEvent error %lu\n",
+ udp_source_references[ udp_event_count ]->name, GetLastError() );
+ goto done;
+ }
+ udp_event_array[ udp_event_count++ ] = evt;
+ }
+ udp_listener_stop_event = CreateEvent( NULL, TRUE, FALSE, NULL );
+ if( !udp_listener_stop_event )
+ {
+ ERR( "Cannot create event; error %lu\n", GetLastError() );
+ goto done;
+ }
+ udp_event_array[ udp_event_count++ ] = udp_listener_stop_event;
+
+ /* bind events to sockets */
+ for( i = 0; i < udp_socket_count; i++ )
+ {
+ if( WSAEventSelect( udp_socket_array[ i ], udp_event_array[ i ], FD_READ ) )
+ {
+ ERR( "Cannot initialize source %s: WSAEventSelect() error %lu\n",
+ udp_source_references[ i ]->name, WSAGetLastError() );
+ goto done;
+ }
+ }
+
+ /* allocate datagram buffer */
+ datagram_buffer = g_malloc( max_datagram_size );
+
+ /* create thread */
+ udp_listener_thread_handle = (HANDLE) _beginthreadex( NULL, 0, udp_listener_thread, NULL, 0, &n );
+ if( !udp_listener_thread_handle )
+ {
+ ERR( "Cannot create thread; error %lu\n", GetLastError() );
+ goto done;
+ }
+
+ ret = TRUE;
+
+done:
+ if( !ret )
+ fini_udp_listener();
+
+ TRACE_LEAVE( "done; udp_socket_count=%d, udp_event_count=%d, max_datagram_size=%d, ret=%d\n",
+ udp_socket_count, udp_event_count, max_datagram_size, (int) ret );
+ return ret;
+}