/*
* udp_listener.c - syslogd implementation for windows, UDP listener
*
* Created by Alexander Yaworsky
*
* THIS SOFTWARE IS NOT COPYRIGHTED
*
* This source code is offered for use in the public domain. You may
* use, modify or distribute it freely.
*
* This code is distributed in the hope that it will be useful but
* WITHOUT ANY WARRANTY. ALL WARRANTIES, EXPRESS OR IMPLIED ARE HEREBY
* DISCLAIMED. This includes but is not limited to warranties of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
*
*/
#include <process.h>
#include <stdio.h>
#include <winsock2.h>
#include <glib.h>
#include <syslog.h>
#include <syslogd.h>
/* udp listener data */
static SOCKET *udp_socket_array = NULL;
static int udp_socket_count = 0;
static struct source **udp_source_references = NULL;
static HANDLE *udp_event_array = NULL;
static int udp_event_count = 0;
static HANDLE udp_listener_stop_event = NULL;
static unsigned max_datagram_size = 1024;
static gchar *datagram_buffer = NULL;
static HANDLE udp_listener_thread_handle = NULL;
struct fifo *udp_message_queue = NULL;
HANDLE udp_queue_event = NULL;
CRITICAL_SECTION udp_queue_cs;
/******************************************************************************
* udp_listener_thread
*/
static unsigned __stdcall udp_listener_thread( void* arg )
{
TRACE_ENTER( "\n" );
SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL );
for(;;)
{
DWORD w;
int r;
int addrlen;
struct raw_message message;
struct raw_message *msg;
w = WaitForMultipleObjects( udp_event_count, udp_event_array, FALSE, INFINITE );
if( WAIT_FAILED == w )
{
ERR( "Wait error %lu\n", GetLastError() );
goto done;
}
if( w >= udp_event_count )
{
ERR( "Unknown wait error\n" );
goto done;
}
if( w == udp_event_count - 1 )
{
TRACE_2( "shutdown\n" );
goto done;
}
/* got UDP message, read it */
addrlen = sizeof(message.sender_addr);
r = recvfrom( udp_socket_array[ w ], datagram_buffer, max_datagram_size,
0, (struct sockaddr*) &message.sender_addr, &addrlen );
if( r < 0 )
{
ERR( "recvfrom() error %lu\n", WSAGetLastError() );
goto done;
}
if( !r )
{
TRACE_2( "empty datagram\n" );
continue;
}
message.msg = g_strndup( datagram_buffer, r );
message.source = udp_source_references[ w ];
TRACE_2( "got message from %d.%d.%d.%d; source name %s; %s\n",
message.sender_addr.sin_addr.S_un.S_un_b.s_b1,
message.sender_addr.sin_addr.S_un.S_un_b.s_b2,
message.sender_addr.sin_addr.S_un.S_un_b.s_b3,
message.sender_addr.sin_addr.S_un.S_un_b.s_b4,
message.source->name, message.msg );
/* allocate raw message and add it to the queue */
msg = g_malloc( sizeof(struct raw_message) );
memcpy( msg, &message, sizeof(struct raw_message) );
EnterCriticalSection( &udp_queue_cs );
if( fifo_push( udp_message_queue, msg ) )
SetEvent( udp_queue_event );
LeaveCriticalSection( &udp_queue_cs );
}
done:
TRACE_LEAVE( "done\n" );
return 0;
}
/******************************************************************************
* shutdown_udp_listener
*
* stop listening thread and dispose all objects
* except message queue and event
*/
void shutdown_udp_listener()
{
TRACE_ENTER( "\n" );
if( udp_listener_thread_handle )
{
SetEvent( udp_listener_stop_event );
TRACE_2( "wait for shutdown of udp listener thread\n" );
WaitForSingleObject( udp_listener_thread_handle, INFINITE );
CloseHandle( udp_listener_thread_handle );
udp_listener_thread_handle = NULL;
}
if( udp_socket_array )
{
int i;
for( i = 0; i < udp_socket_count; i++ )
closesocket( udp_socket_array[ i ] );
g_free( udp_socket_array );
udp_socket_array = NULL;
udp_socket_count = 0;
}
if( udp_source_references )
{
g_free( udp_source_references );
udp_source_references = NULL;
}
if( udp_event_array )
{
int i;
for( i = 0; i < udp_event_count; i++ )
CloseHandle( udp_event_array[ i ] );
g_free( udp_event_array );
udp_event_array = NULL;
udp_event_count = 0;
udp_listener_stop_event = NULL;
}
if( datagram_buffer )
{
g_free( datagram_buffer );
datagram_buffer = NULL;
}
max_datagram_size = 1024;
TRACE_LEAVE( "done\n" );
}
/******************************************************************************
* fini_udp_listener
*/
void fini_udp_listener()
{
TRACE_ENTER( "\n" );
shutdown_udp_listener();
if( udp_message_queue )
{
fifo_destroy( udp_message_queue );
udp_message_queue = NULL;
}
if( udp_queue_event )
{
DeleteCriticalSection( &udp_queue_cs );
CloseHandle( udp_queue_event );
udp_queue_event = NULL;
}
TRACE_LEAVE( "done\n" );
}
/******************************************************************************
* init_udp_listener
*
* create sockets, queue and thread
*/
gboolean init_udp_listener()
{
gboolean ret = FALSE;
unsigned n;
GList *item;
int i;
TRACE_ENTER( "\n" );
/* create message queue and event;
* we should do this first because of possible early returns
* from this function when no sources are defined
*/
udp_message_queue = fifo_create();
udp_queue_event = CreateEvent( NULL, TRUE, FALSE, NULL );
if( !udp_queue_event )
{
ERR( "Cannot create event; error %lu\n", GetLastError() );
goto done;
}
InitializeCriticalSection( &udp_queue_cs );
n = number_of_sources( ST_UDP );
if( 0 == n )
{
ret = TRUE;
goto done;
}
/* allocate memory for sockets, source references and events;
* the number of sockets is not greater than number of sources
*/
udp_socket_array = g_malloc( n * sizeof(SOCKET) );
udp_source_references = g_malloc( n * sizeof(struct source*) );
/* number of events is greater by one because of
* included udp_listener_stop_event
*/
udp_event_array = g_malloc( (n + 1) * sizeof(HANDLE) );
/* create sockets */
for( item = sources; item; item = item->next )
{
struct source *src = item->data;
SOCKET sock;
unsigned dgram_size;
int size;
if( src->type != ST_UDP )
continue;
sock = socket( AF_INET, SOCK_DGRAM, 0 );
if( INVALID_SOCKET == sock )
{
ERR( "Cannot create source %s: socket() error %lu\n", src->name, WSAGetLastError() );
continue;
}
if( bind( sock, (struct sockaddr*) &src->udp, sizeof(src->udp) ) )
{
ERR( "Cannot create source %s: bind() error %lu\n", src->name, WSAGetLastError() );
closesocket( sock );
continue;
}
size = sizeof(dgram_size);
if( getsockopt( sock, SOL_SOCKET, SO_MAX_MSG_SIZE, (char*) &dgram_size, &size ) )
{
ERR( "Cannot create source %s: getsockopt( SO_MAX_MSG_SIZE ) error %lu\n",
src->name, WSAGetLastError() );
closesocket( sock );
continue;
}
TRACE( "datagram size for %d.%d.%d.%d:%d is %u\n",
src->udp.sin_addr.S_un.S_un_b.s_b1, src->udp.sin_addr.S_un.S_un_b.s_b2,
src->udp.sin_addr.S_un.S_un_b.s_b3, src->udp.sin_addr.S_un.S_un_b.s_b4,
ntohs( src->udp.sin_port ), dgram_size );
if( dgram_size > max_datagram_size )
max_datagram_size = dgram_size;
udp_source_references[ udp_socket_count ] = src;
udp_socket_array[ udp_socket_count++ ] = sock;
}
if( 0 == udp_socket_count )
{
ret = TRUE;
goto done;
}
/* create events */
while( udp_event_count < udp_socket_count )
{
HANDLE evt = CreateEvent( NULL, FALSE, FALSE, NULL );
if( !evt )
{
ERR( "Cannot initialize source %s: CreateEvent error %lu\n",
udp_source_references[ udp_event_count ]->name, GetLastError() );
goto done;
}
udp_event_array[ udp_event_count++ ] = evt;
}
udp_listener_stop_event = CreateEvent( NULL, TRUE, FALSE, NULL );
if( !udp_listener_stop_event )
{
ERR( "Cannot create event; error %lu\n", GetLastError() );
goto done;
}
udp_event_array[ udp_event_count++ ] = udp_listener_stop_event;
/* bind events to sockets */
for( i = 0; i < udp_socket_count; i++ )
{
if( WSAEventSelect( udp_socket_array[ i ], udp_event_array[ i ], FD_READ ) )
{
ERR( "Cannot initialize source %s: WSAEventSelect() error %lu\n",
udp_source_references[ i ]->name, WSAGetLastError() );
goto done;
}
}
/* allocate datagram buffer */
datagram_buffer = g_malloc( max_datagram_size );
/* create thread */
udp_listener_thread_handle = (HANDLE) _beginthreadex( NULL, 0, udp_listener_thread, NULL, 0, &n );
if( !udp_listener_thread_handle )
{
ERR( "Cannot create thread; error %lu\n", GetLastError() );
goto done;
}
ret = TRUE;
done:
if( !ret )
fini_udp_listener();
TRACE_LEAVE( "done; udp_socket_count=%d, udp_event_count=%d, max_datagram_size=%d, ret=%d\n",
udp_socket_count, udp_event_count, max_datagram_size, (int) ret );
return ret;
}