/* * udp_listener.c - syslogd implementation for windows, UDP listener * * Created by Alexander Yaworsky * * THIS SOFTWARE IS NOT COPYRIGHTED * * This source code is offered for use in the public domain. You may * use, modify or distribute it freely. * * This code is distributed in the hope that it will be useful but * WITHOUT ANY WARRANTY. ALL WARRANTIES, EXPRESS OR IMPLIED ARE HEREBY * DISCLAIMED. This includes but is not limited to warranties of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * */ #include #include #include #include #include #include /* udp listener data */ static SOCKET *udp_socket_array = NULL; static int udp_socket_count = 0; static struct source **udp_source_references = NULL; static HANDLE *udp_event_array = NULL; static int udp_event_count = 0; static HANDLE udp_listener_stop_event = NULL; static unsigned max_datagram_size = 1024; static gchar *datagram_buffer = NULL; static HANDLE udp_listener_thread_handle = NULL; struct fifo *udp_message_queue = NULL; HANDLE udp_queue_event = NULL; CRITICAL_SECTION udp_queue_cs; /****************************************************************************** * udp_listener_thread */ static unsigned __stdcall udp_listener_thread( void* arg ) { TRACE_ENTER( "\n" ); SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL ); for(;;) { DWORD w; int r; int addrlen; struct raw_message message; struct raw_message *msg; w = WaitForMultipleObjects( udp_event_count, udp_event_array, FALSE, INFINITE ); if( WAIT_FAILED == w ) { ERR( "Wait error %lu\n", GetLastError() ); goto done; } if( w >= udp_event_count ) { ERR( "Unknown wait error\n" ); goto done; } if( w == udp_event_count - 1 ) { TRACE_2( "shutdown\n" ); goto done; } /* got UDP message, read it */ addrlen = sizeof(message.sender_addr); r = recvfrom( udp_socket_array[ w ], datagram_buffer, max_datagram_size, 0, (struct sockaddr*) &message.sender_addr, &addrlen ); if( r < 0 ) { ERR( "recvfrom() error %lu\n", WSAGetLastError() ); goto done; } if( !r ) { TRACE_2( "empty datagram\n" ); continue; } message.msg = g_strndup( datagram_buffer, r ); message.source = udp_source_references[ w ]; TRACE_2( "got message from %d.%d.%d.%d; source name %s; %s\n", message.sender_addr.sin_addr.S_un.S_un_b.s_b1, message.sender_addr.sin_addr.S_un.S_un_b.s_b2, message.sender_addr.sin_addr.S_un.S_un_b.s_b3, message.sender_addr.sin_addr.S_un.S_un_b.s_b4, message.source->name, message.msg ); /* allocate raw message and add it to the queue */ msg = g_malloc( sizeof(struct raw_message) ); memcpy( msg, &message, sizeof(struct raw_message) ); EnterCriticalSection( &udp_queue_cs ); if( fifo_push( udp_message_queue, msg ) ) SetEvent( udp_queue_event ); LeaveCriticalSection( &udp_queue_cs ); } done: TRACE_LEAVE( "done\n" ); return 0; } /****************************************************************************** * shutdown_udp_listener * * stop listening thread and dispose all objects * except message queue and event */ void shutdown_udp_listener() { TRACE_ENTER( "\n" ); if( udp_listener_thread_handle ) { SetEvent( udp_listener_stop_event ); TRACE_2( "wait for shutdown of udp listener thread\n" ); WaitForSingleObject( udp_listener_thread_handle, INFINITE ); CloseHandle( udp_listener_thread_handle ); udp_listener_thread_handle = NULL; } if( udp_socket_array ) { int i; for( i = 0; i < udp_socket_count; i++ ) closesocket( udp_socket_array[ i ] ); g_free( udp_socket_array ); udp_socket_array = NULL; udp_socket_count = 0; } if( udp_source_references ) { g_free( udp_source_references ); udp_source_references = NULL; } if( udp_event_array ) { int i; for( i = 0; i < udp_event_count; i++ ) CloseHandle( udp_event_array[ i ] ); g_free( udp_event_array ); udp_event_array = NULL; udp_event_count = 0; udp_listener_stop_event = NULL; } if( datagram_buffer ) { g_free( datagram_buffer ); datagram_buffer = NULL; } max_datagram_size = 1024; TRACE_LEAVE( "done\n" ); } /****************************************************************************** * fini_udp_listener */ void fini_udp_listener() { TRACE_ENTER( "\n" ); shutdown_udp_listener(); if( udp_message_queue ) { fifo_destroy( udp_message_queue ); udp_message_queue = NULL; } if( udp_queue_event ) { DeleteCriticalSection( &udp_queue_cs ); CloseHandle( udp_queue_event ); udp_queue_event = NULL; } TRACE_LEAVE( "done\n" ); } /****************************************************************************** * init_udp_listener * * create sockets, queue and thread */ gboolean init_udp_listener() { gboolean ret = FALSE; unsigned n; GList *item; int i; TRACE_ENTER( "\n" ); /* create message queue and event; * we should do this first because of possible early returns * from this function when no sources are defined */ udp_message_queue = fifo_create(); udp_queue_event = CreateEvent( NULL, TRUE, FALSE, NULL ); if( !udp_queue_event ) { ERR( "Cannot create event; error %lu\n", GetLastError() ); goto done; } InitializeCriticalSection( &udp_queue_cs ); n = number_of_sources( ST_UDP ); if( 0 == n ) { ret = TRUE; goto done; } /* allocate memory for sockets, source references and events; * the number of sockets is not greater than number of sources */ udp_socket_array = g_malloc( n * sizeof(SOCKET) ); udp_source_references = g_malloc( n * sizeof(struct source*) ); /* number of events is greater by one because of * included udp_listener_stop_event */ udp_event_array = g_malloc( (n + 1) * sizeof(HANDLE) ); /* create sockets */ for( item = sources; item; item = item->next ) { struct source *src = item->data; SOCKET sock; unsigned dgram_size; int size; if( src->type != ST_UDP ) continue; sock = socket( AF_INET, SOCK_DGRAM, 0 ); if( INVALID_SOCKET == sock ) { ERR( "Cannot create source %s: socket() error %lu\n", src->name, WSAGetLastError() ); continue; } if( bind( sock, (struct sockaddr*) &src->udp, sizeof(src->udp) ) ) { ERR( "Cannot create source %s: bind() error %lu\n", src->name, WSAGetLastError() ); closesocket( sock ); continue; } size = sizeof(dgram_size); if( getsockopt( sock, SOL_SOCKET, SO_MAX_MSG_SIZE, (char*) &dgram_size, &size ) ) { ERR( "Cannot create source %s: getsockopt( SO_MAX_MSG_SIZE ) error %lu\n", src->name, WSAGetLastError() ); closesocket( sock ); continue; } TRACE( "datagram size for %d.%d.%d.%d:%d is %u\n", src->udp.sin_addr.S_un.S_un_b.s_b1, src->udp.sin_addr.S_un.S_un_b.s_b2, src->udp.sin_addr.S_un.S_un_b.s_b3, src->udp.sin_addr.S_un.S_un_b.s_b4, ntohs( src->udp.sin_port ), dgram_size ); if( dgram_size > max_datagram_size ) max_datagram_size = dgram_size; udp_source_references[ udp_socket_count ] = src; udp_socket_array[ udp_socket_count++ ] = sock; } if( 0 == udp_socket_count ) { ret = TRUE; goto done; } /* create events */ while( udp_event_count < udp_socket_count ) { HANDLE evt = CreateEvent( NULL, FALSE, FALSE, NULL ); if( !evt ) { ERR( "Cannot initialize source %s: CreateEvent error %lu\n", udp_source_references[ udp_event_count ]->name, GetLastError() ); goto done; } udp_event_array[ udp_event_count++ ] = evt; } udp_listener_stop_event = CreateEvent( NULL, TRUE, FALSE, NULL ); if( !udp_listener_stop_event ) { ERR( "Cannot create event; error %lu\n", GetLastError() ); goto done; } udp_event_array[ udp_event_count++ ] = udp_listener_stop_event; /* bind events to sockets */ for( i = 0; i < udp_socket_count; i++ ) { if( WSAEventSelect( udp_socket_array[ i ], udp_event_array[ i ], FD_READ ) ) { ERR( "Cannot initialize source %s: WSAEventSelect() error %lu\n", udp_source_references[ i ]->name, WSAGetLastError() ); goto done; } } /* allocate datagram buffer */ datagram_buffer = g_malloc( max_datagram_size ); /* create thread */ udp_listener_thread_handle = (HANDLE) _beginthreadex( NULL, 0, udp_listener_thread, NULL, 0, &n ); if( !udp_listener_thread_handle ) { ERR( "Cannot create thread; error %lu\n", GetLastError() ); goto done; } ret = TRUE; done: if( !ret ) fini_udp_listener(); TRACE_LEAVE( "done; udp_socket_count=%d, udp_event_count=%d, max_datagram_size=%d, ret=%d\n", udp_socket_count, udp_event_count, max_datagram_size, (int) ret ); return ret; }