diff options
author | yaworsky <yaworsky> | 2005-10-31 13:52:24 +0000 |
---|---|---|
committer | yaworsky <yaworsky> | 2005-10-31 13:52:24 +0000 |
commit | 74b7b6121179f8a82a2f96812c9a33e3f650eaed (patch) | |
tree | a051354cef72555130299fee3f17aa5948b6b3f4 /daemon/udp_listener.c | |
parent | 10379ee810238a765bf94a4ba48e4fff04fc7b6f (diff) | |
download | syslog-win32-74b7b6121179f8a82a2f96812c9a33e3f650eaed.tar.gz syslog-win32-74b7b6121179f8a82a2f96812c9a33e3f650eaed.tar.xz syslog-win32-74b7b6121179f8a82a2f96812c9a33e3f650eaed.zip |
Refactored listener
Diffstat (limited to 'daemon/udp_listener.c')
-rw-r--r-- | daemon/udp_listener.c | 341 |
1 files changed, 341 insertions, 0 deletions
diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c new file mode 100644 index 0000000..c039049 --- /dev/null +++ b/daemon/udp_listener.c @@ -0,0 +1,341 @@ +/* + * udp_listener.c - syslogd implementation for windows, UDP listener + * + * Created by Alexander Yaworsky + * + * THIS SOFTWARE IS NOT COPYRIGHTED + * + * This source code is offered for use in the public domain. You may + * use, modify or distribute it freely. + * + * This code is distributed in the hope that it will be useful but + * WITHOUT ANY WARRANTY. ALL WARRANTIES, EXPRESS OR IMPLIED ARE HEREBY + * DISCLAIMED. This includes but is not limited to warranties of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * + */ + +#include <process.h> +#include <stdio.h> +#include <winsock2.h> + +#include <glib.h> + +#include <syslog.h> +#include <syslogd.h> + +/* udp listener data */ +static SOCKET *udp_socket_array = NULL; +static int udp_socket_count = 0; + +static struct source **udp_source_references = NULL; + +static HANDLE *udp_event_array = NULL; +static int udp_event_count = 0; +static HANDLE udp_listener_stop_event = NULL; + +static unsigned max_datagram_size = 1024; +static gchar *datagram_buffer = NULL; + +static HANDLE udp_listener_thread_handle = NULL; + +struct fifo *udp_message_queue = NULL; +HANDLE udp_queue_semaphore = NULL; + +/****************************************************************************** + * udp_listener_thread + */ +static unsigned __stdcall udp_listener_thread( void* arg ) +{ + TRACE_ENTER( "\n" ); + + SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL ); + + for(;;) + { + DWORD w; + int r; + int addrlen; + struct raw_message message; + struct raw_message *msg; + + w = WaitForMultipleObjects( udp_event_count, udp_event_array, FALSE, INFINITE ); + if( WAIT_FAILED == w ) + { + ERR( "Wait error %lu\n", GetLastError() ); + goto done; + } + if( w >= udp_event_count ) + { + ERR( "Unknown wait error\n" ); + goto done; + } + if( w == udp_event_count - 1 ) + { + TRACE_2( "shutdown\n" ); + goto done; + } + /* got UDP message, read it */ + addrlen = sizeof(message.sender_addr); + r = recvfrom( udp_socket_array[ w ], datagram_buffer, max_datagram_size, + 0, (struct sockaddr*) &message.sender_addr, &addrlen ); + if( r < 0 ) + { + ERR( "recvfrom() error %lu\n", WSAGetLastError() ); + goto done; + } + if( !r ) + { + TRACE_2( "empty datagram\n" ); + continue; + } + message.msg = g_strndup( datagram_buffer, r ); + message.source = udp_source_references[ w ]; + + TRACE_2( "got message from %d.%d.%d.%d; source name %s; %s\n", + message.sender_addr.sin_addr.S_un.S_un_b.s_b1, + message.sender_addr.sin_addr.S_un.S_un_b.s_b2, + message.sender_addr.sin_addr.S_un.S_un_b.s_b3, + message.sender_addr.sin_addr.S_un.S_un_b.s_b4, + message.source->name, message.msg ); + + /* allocate raw message and add it to the queue */ + msg = g_malloc( sizeof(struct raw_message) ); + memcpy( msg, &message, sizeof(struct raw_message) ); + fifo_push( udp_message_queue, msg ); + ReleaseSemaphore( udp_queue_semaphore, 1, NULL ); + } + +done: + TRACE_LEAVE( "done\n" ); + return 0; +} + +/****************************************************************************** + * shutdown_udp_listener + * + * stop listening thread and dispose all objects + * except message queue and semaphore + */ +void shutdown_udp_listener() +{ + TRACE_ENTER( "\n" ); + + if( udp_listener_thread_handle ) + { + SetEvent( udp_listener_stop_event ); + TRACE_2( "wait for shutdown of udp listener thread\n" ); + WaitForSingleObject( udp_listener_thread_handle, INFINITE ); + CloseHandle( udp_listener_thread_handle ); + udp_listener_thread_handle = NULL; + } + + if( udp_socket_array ) + { + int i; + + for( i = 0; i < udp_socket_count; i++ ) + closesocket( udp_socket_array[ i ] ); + g_free( udp_socket_array ); + udp_socket_array = NULL; + udp_socket_count = 0; + } + + if( udp_source_references ) + { + g_free( udp_source_references ); + udp_source_references = NULL; + } + + if( udp_event_array ) + { + int i; + + for( i = 0; i < udp_event_count; i++ ) + CloseHandle( udp_event_array[ i ] ); + g_free( udp_event_array ); + udp_event_array = NULL; + udp_event_count = 0; + udp_listener_stop_event = NULL; + } + + if( datagram_buffer ) + { + g_free( datagram_buffer ); + datagram_buffer = NULL; + } + max_datagram_size = 1024; + + TRACE_LEAVE( "done\n" ); +} + +/****************************************************************************** + * fini_udp_listener + */ +void fini_udp_listener() +{ + TRACE_ENTER( "\n" ); + + shutdown_udp_listener(); + + if( udp_message_queue ) + { + fifo_destroy( udp_message_queue ); + udp_message_queue = NULL; + } + + if( udp_queue_semaphore ) + { + CloseHandle( udp_queue_semaphore ); + udp_queue_semaphore = NULL; + } + + TRACE_LEAVE( "done\n" ); +} + +/****************************************************************************** + * init_udp_listener + * + * create sockets, queue and thread + */ +gboolean init_udp_listener() +{ + gboolean ret = FALSE; + unsigned n; + GList *item; + int i; + + TRACE_ENTER( "\n" ); + + /* create message queue and semaphore; + * we should do this first because of possible early returns + * from this function when no sources are defined + */ + udp_message_queue = fifo_create(); + udp_queue_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL ); + if( !udp_queue_semaphore ) + { + ERR( "Cannot create semaphore; error %lu\n", GetLastError() ); + goto done; + } + + n = number_of_sources( ST_UDP ); + if( 0 == n ) + { + ret = TRUE; + goto done; + } + + /* allocate memory for sockets, source references and events; + * the number of sockets is not greater than number of sources + */ + udp_socket_array = g_malloc( n * sizeof(SOCKET) ); + udp_source_references = g_malloc( n * sizeof(struct source*) ); + + /* number of events is greater by one because of + * included udp_listener_stop_event + */ + udp_event_array = g_malloc( (n + 1) * sizeof(HANDLE) ); + + /* create sockets */ + for( item = sources; item; item = item->next ) + { + struct source *src = item->data; + SOCKET sock; + unsigned dgram_size; + int size; + + if( src->type != ST_UDP ) + continue; + + sock = socket( AF_INET, SOCK_DGRAM, 0 ); + if( INVALID_SOCKET == sock ) + { + ERR( "Cannot create source %s: socket() error %lu\n", src->name, WSAGetLastError() ); + continue; + } + + if( bind( sock, (struct sockaddr*) &src->udp, sizeof(src->udp) ) ) + { + ERR( "Cannot create source %s: bind() error %lu\n", src->name, WSAGetLastError() ); + closesocket( sock ); + continue; + } + + size = sizeof(dgram_size); + if( getsockopt( sock, SOL_SOCKET, SO_MAX_MSG_SIZE, (char*) &dgram_size, &size ) ) + { + ERR( "Cannot create source %s: getsockopt( SO_MAX_MSG_SIZE ) error %lu\n", + src->name, WSAGetLastError() ); + closesocket( sock ); + continue; + } + TRACE( "datagram size for %d.%d.%d.%d:%d is %u\n", + src->udp.sin_addr.S_un.S_un_b.s_b1, src->udp.sin_addr.S_un.S_un_b.s_b2, + src->udp.sin_addr.S_un.S_un_b.s_b3, src->udp.sin_addr.S_un.S_un_b.s_b4, + ntohs( src->udp.sin_port ), dgram_size ); + if( dgram_size > max_datagram_size ) + max_datagram_size = dgram_size; + + udp_source_references[ udp_socket_count ] = src; + udp_socket_array[ udp_socket_count++ ] = sock; + } + + if( 0 == udp_socket_count ) + { + ret = TRUE; + goto done; + } + + /* create events */ + while( udp_event_count < udp_socket_count ) + { + HANDLE evt = CreateEvent( NULL, FALSE, FALSE, NULL ); + if( !evt ) + { + ERR( "Cannot initialize source %s: CreateEvent error %lu\n", + udp_source_references[ udp_event_count ]->name, GetLastError() ); + goto done; + } + udp_event_array[ udp_event_count++ ] = evt; + } + udp_listener_stop_event = CreateEvent( NULL, TRUE, FALSE, NULL ); + if( !udp_listener_stop_event ) + { + ERR( "Cannot create event; error %lu\n", GetLastError() ); + goto done; + } + udp_event_array[ udp_event_count++ ] = udp_listener_stop_event; + + /* bind events to sockets */ + for( i = 0; i < udp_socket_count; i++ ) + { + if( WSAEventSelect( udp_socket_array[ i ], udp_event_array[ i ], FD_READ ) ) + { + ERR( "Cannot initialize source %s: WSAEventSelect() error %lu\n", + udp_source_references[ i ]->name, WSAGetLastError() ); + goto done; + } + } + + /* allocate datagram buffer */ + datagram_buffer = g_malloc( max_datagram_size ); + + /* create thread */ + udp_listener_thread_handle = (HANDLE) _beginthreadex( NULL, 0, udp_listener_thread, NULL, 0, &n ); + if( !udp_listener_thread_handle ) + { + ERR( "Cannot create thread; error %lu\n", GetLastError() ); + goto done; + } + + ret = TRUE; + +done: + if( !ret ) + fini_udp_listener(); + + TRACE_LEAVE( "done; udp_socket_count=%d, udp_event_count=%d, max_datagram_size=%d, ret=%d\n", + udp_socket_count, udp_event_count, max_datagram_size, (int) ret ); + return ret; +} |