aboutsummaryrefslogtreecommitdiffstats
path: root/daemon/udp_listener.c
diff options
context:
space:
mode:
Diffstat (limited to 'daemon/udp_listener.c')
-rw-r--r--daemon/udp_listener.c341
1 files changed, 341 insertions, 0 deletions
diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c
new file mode 100644
index 0000000..c039049
--- /dev/null
+++ b/daemon/udp_listener.c
@@ -0,0 +1,341 @@
+/*
+ * udp_listener.c - syslogd implementation for windows, UDP listener
+ *
+ * Created by Alexander Yaworsky
+ *
+ * THIS SOFTWARE IS NOT COPYRIGHTED
+ *
+ * This source code is offered for use in the public domain. You may
+ * use, modify or distribute it freely.
+ *
+ * This code is distributed in the hope that it will be useful but
+ * WITHOUT ANY WARRANTY. ALL WARRANTIES, EXPRESS OR IMPLIED ARE HEREBY
+ * DISCLAIMED. This includes but is not limited to warranties of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ */
+
+#include <process.h>
+#include <stdio.h>
+#include <winsock2.h>
+
+#include <glib.h>
+
+#include <syslog.h>
+#include <syslogd.h>
+
+/* udp listener data */
+static SOCKET *udp_socket_array = NULL;
+static int udp_socket_count = 0;
+
+static struct source **udp_source_references = NULL;
+
+static HANDLE *udp_event_array = NULL;
+static int udp_event_count = 0;
+static HANDLE udp_listener_stop_event = NULL;
+
+static unsigned max_datagram_size = 1024;
+static gchar *datagram_buffer = NULL;
+
+static HANDLE udp_listener_thread_handle = NULL;
+
+struct fifo *udp_message_queue = NULL;
+HANDLE udp_queue_semaphore = NULL;
+
+/******************************************************************************
+ * udp_listener_thread
+ */
+static unsigned __stdcall udp_listener_thread( void* arg )
+{
+ TRACE_ENTER( "\n" );
+
+ SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL );
+
+ for(;;)
+ {
+ DWORD w;
+ int r;
+ int addrlen;
+ struct raw_message message;
+ struct raw_message *msg;
+
+ w = WaitForMultipleObjects( udp_event_count, udp_event_array, FALSE, INFINITE );
+ if( WAIT_FAILED == w )
+ {
+ ERR( "Wait error %lu\n", GetLastError() );
+ goto done;
+ }
+ if( w >= udp_event_count )
+ {
+ ERR( "Unknown wait error\n" );
+ goto done;
+ }
+ if( w == udp_event_count - 1 )
+ {
+ TRACE_2( "shutdown\n" );
+ goto done;
+ }
+ /* got UDP message, read it */
+ addrlen = sizeof(message.sender_addr);
+ r = recvfrom( udp_socket_array[ w ], datagram_buffer, max_datagram_size,
+ 0, (struct sockaddr*) &message.sender_addr, &addrlen );
+ if( r < 0 )
+ {
+ ERR( "recvfrom() error %lu\n", WSAGetLastError() );
+ goto done;
+ }
+ if( !r )
+ {
+ TRACE_2( "empty datagram\n" );
+ continue;
+ }
+ message.msg = g_strndup( datagram_buffer, r );
+ message.source = udp_source_references[ w ];
+
+ TRACE_2( "got message from %d.%d.%d.%d; source name %s; %s\n",
+ message.sender_addr.sin_addr.S_un.S_un_b.s_b1,
+ message.sender_addr.sin_addr.S_un.S_un_b.s_b2,
+ message.sender_addr.sin_addr.S_un.S_un_b.s_b3,
+ message.sender_addr.sin_addr.S_un.S_un_b.s_b4,
+ message.source->name, message.msg );
+
+ /* allocate raw message and add it to the queue */
+ msg = g_malloc( sizeof(struct raw_message) );
+ memcpy( msg, &message, sizeof(struct raw_message) );
+ fifo_push( udp_message_queue, msg );
+ ReleaseSemaphore( udp_queue_semaphore, 1, NULL );
+ }
+
+done:
+ TRACE_LEAVE( "done\n" );
+ return 0;
+}
+
+/******************************************************************************
+ * shutdown_udp_listener
+ *
+ * stop listening thread and dispose all objects
+ * except message queue and semaphore
+ */
+void shutdown_udp_listener()
+{
+ TRACE_ENTER( "\n" );
+
+ if( udp_listener_thread_handle )
+ {
+ SetEvent( udp_listener_stop_event );
+ TRACE_2( "wait for shutdown of udp listener thread\n" );
+ WaitForSingleObject( udp_listener_thread_handle, INFINITE );
+ CloseHandle( udp_listener_thread_handle );
+ udp_listener_thread_handle = NULL;
+ }
+
+ if( udp_socket_array )
+ {
+ int i;
+
+ for( i = 0; i < udp_socket_count; i++ )
+ closesocket( udp_socket_array[ i ] );
+ g_free( udp_socket_array );
+ udp_socket_array = NULL;
+ udp_socket_count = 0;
+ }
+
+ if( udp_source_references )
+ {
+ g_free( udp_source_references );
+ udp_source_references = NULL;
+ }
+
+ if( udp_event_array )
+ {
+ int i;
+
+ for( i = 0; i < udp_event_count; i++ )
+ CloseHandle( udp_event_array[ i ] );
+ g_free( udp_event_array );
+ udp_event_array = NULL;
+ udp_event_count = 0;
+ udp_listener_stop_event = NULL;
+ }
+
+ if( datagram_buffer )
+ {
+ g_free( datagram_buffer );
+ datagram_buffer = NULL;
+ }
+ max_datagram_size = 1024;
+
+ TRACE_LEAVE( "done\n" );
+}
+
+/******************************************************************************
+ * fini_udp_listener
+ */
+void fini_udp_listener()
+{
+ TRACE_ENTER( "\n" );
+
+ shutdown_udp_listener();
+
+ if( udp_message_queue )
+ {
+ fifo_destroy( udp_message_queue );
+ udp_message_queue = NULL;
+ }
+
+ if( udp_queue_semaphore )
+ {
+ CloseHandle( udp_queue_semaphore );
+ udp_queue_semaphore = NULL;
+ }
+
+ TRACE_LEAVE( "done\n" );
+}
+
+/******************************************************************************
+ * init_udp_listener
+ *
+ * create sockets, queue and thread
+ */
+gboolean init_udp_listener()
+{
+ gboolean ret = FALSE;
+ unsigned n;
+ GList *item;
+ int i;
+
+ TRACE_ENTER( "\n" );
+
+ /* create message queue and semaphore;
+ * we should do this first because of possible early returns
+ * from this function when no sources are defined
+ */
+ udp_message_queue = fifo_create();
+ udp_queue_semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL );
+ if( !udp_queue_semaphore )
+ {
+ ERR( "Cannot create semaphore; error %lu\n", GetLastError() );
+ goto done;
+ }
+
+ n = number_of_sources( ST_UDP );
+ if( 0 == n )
+ {
+ ret = TRUE;
+ goto done;
+ }
+
+ /* allocate memory for sockets, source references and events;
+ * the number of sockets is not greater than number of sources
+ */
+ udp_socket_array = g_malloc( n * sizeof(SOCKET) );
+ udp_source_references = g_malloc( n * sizeof(struct source*) );
+
+ /* number of events is greater by one because of
+ * included udp_listener_stop_event
+ */
+ udp_event_array = g_malloc( (n + 1) * sizeof(HANDLE) );
+
+ /* create sockets */
+ for( item = sources; item; item = item->next )
+ {
+ struct source *src = item->data;
+ SOCKET sock;
+ unsigned dgram_size;
+ int size;
+
+ if( src->type != ST_UDP )
+ continue;
+
+ sock = socket( AF_INET, SOCK_DGRAM, 0 );
+ if( INVALID_SOCKET == sock )
+ {
+ ERR( "Cannot create source %s: socket() error %lu\n", src->name, WSAGetLastError() );
+ continue;
+ }
+
+ if( bind( sock, (struct sockaddr*) &src->udp, sizeof(src->udp) ) )
+ {
+ ERR( "Cannot create source %s: bind() error %lu\n", src->name, WSAGetLastError() );
+ closesocket( sock );
+ continue;
+ }
+
+ size = sizeof(dgram_size);
+ if( getsockopt( sock, SOL_SOCKET, SO_MAX_MSG_SIZE, (char*) &dgram_size, &size ) )
+ {
+ ERR( "Cannot create source %s: getsockopt( SO_MAX_MSG_SIZE ) error %lu\n",
+ src->name, WSAGetLastError() );
+ closesocket( sock );
+ continue;
+ }
+ TRACE( "datagram size for %d.%d.%d.%d:%d is %u\n",
+ src->udp.sin_addr.S_un.S_un_b.s_b1, src->udp.sin_addr.S_un.S_un_b.s_b2,
+ src->udp.sin_addr.S_un.S_un_b.s_b3, src->udp.sin_addr.S_un.S_un_b.s_b4,
+ ntohs( src->udp.sin_port ), dgram_size );
+ if( dgram_size > max_datagram_size )
+ max_datagram_size = dgram_size;
+
+ udp_source_references[ udp_socket_count ] = src;
+ udp_socket_array[ udp_socket_count++ ] = sock;
+ }
+
+ if( 0 == udp_socket_count )
+ {
+ ret = TRUE;
+ goto done;
+ }
+
+ /* create events */
+ while( udp_event_count < udp_socket_count )
+ {
+ HANDLE evt = CreateEvent( NULL, FALSE, FALSE, NULL );
+ if( !evt )
+ {
+ ERR( "Cannot initialize source %s: CreateEvent error %lu\n",
+ udp_source_references[ udp_event_count ]->name, GetLastError() );
+ goto done;
+ }
+ udp_event_array[ udp_event_count++ ] = evt;
+ }
+ udp_listener_stop_event = CreateEvent( NULL, TRUE, FALSE, NULL );
+ if( !udp_listener_stop_event )
+ {
+ ERR( "Cannot create event; error %lu\n", GetLastError() );
+ goto done;
+ }
+ udp_event_array[ udp_event_count++ ] = udp_listener_stop_event;
+
+ /* bind events to sockets */
+ for( i = 0; i < udp_socket_count; i++ )
+ {
+ if( WSAEventSelect( udp_socket_array[ i ], udp_event_array[ i ], FD_READ ) )
+ {
+ ERR( "Cannot initialize source %s: WSAEventSelect() error %lu\n",
+ udp_source_references[ i ]->name, WSAGetLastError() );
+ goto done;
+ }
+ }
+
+ /* allocate datagram buffer */
+ datagram_buffer = g_malloc( max_datagram_size );
+
+ /* create thread */
+ udp_listener_thread_handle = (HANDLE) _beginthreadex( NULL, 0, udp_listener_thread, NULL, 0, &n );
+ if( !udp_listener_thread_handle )
+ {
+ ERR( "Cannot create thread; error %lu\n", GetLastError() );
+ goto done;
+ }
+
+ ret = TRUE;
+
+done:
+ if( !ret )
+ fini_udp_listener();
+
+ TRACE_LEAVE( "done; udp_socket_count=%d, udp_event_count=%d, max_datagram_size=%d, ret=%d\n",
+ udp_socket_count, udp_event_count, max_datagram_size, (int) ret );
+ return ret;
+}