aboutsummaryrefslogblamecommitdiffstats
path: root/daemon/udp_listener.c
blob: f9c7c1a9fa377f263713aa8d7e620e00c26e1d9b (plain) (tree)









































                                                                      

                              




























































                                                                                        



                                                 










                                                                               
                                 



































































                                                                               
                         
     


                                               


















                                                                               
                                      



                                                                

                                                             
     
                                                                  

                  
                                               
























































































































                                                                                                      
/*
 * udp_listener.c - syslogd implementation for windows, UDP listener
 *
 * Created by Alexander Yaworsky
 *
 * THIS SOFTWARE IS NOT COPYRIGHTED
 *
 * This source code is offered for use in the public domain. You may
 * use, modify or distribute it freely.
 *
 * This code is distributed in the hope that it will be useful but
 * WITHOUT ANY WARRANTY. ALL WARRANTIES, EXPRESS OR IMPLIED ARE HEREBY
 * DISCLAIMED. This includes but is not limited to warranties of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 *
 */

#include <process.h>
#include <stdio.h>
#include <winsock2.h>

#include <glib.h>

#include <syslog.h>
#include <syslogd.h>

/* udp listener data */
static SOCKET *udp_socket_array = NULL;
static int udp_socket_count = 0;

static struct source **udp_source_references = NULL;

static HANDLE *udp_event_array = NULL;
static int udp_event_count = 0;
static HANDLE udp_listener_stop_event = NULL;

static unsigned max_datagram_size = 1024;
static gchar *datagram_buffer = NULL;

static HANDLE udp_listener_thread_handle = NULL;

struct fifo *udp_message_queue = NULL;
HANDLE udp_queue_event = NULL;
CRITICAL_SECTION udp_queue_cs;

/******************************************************************************
 * udp_listener_thread
 */
static unsigned __stdcall udp_listener_thread( void* arg )
{
    TRACE_ENTER( "\n" );

    SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL );

    for(;;)
    {
        DWORD w;
        int r;
        int addrlen;
        struct raw_message message;
        struct raw_message *msg;

        w = WaitForMultipleObjects( udp_event_count, udp_event_array, FALSE, INFINITE );
        if( WAIT_FAILED == w )
        {
            ERR( "Wait error %lu\n", GetLastError() );
            goto done;
        }
        if( w >= udp_event_count )
        {
            ERR( "Unknown wait error\n" );
            goto done;
        }
        if( w == udp_event_count - 1 )
        {
            TRACE_2( "shutdown\n" );
            goto done;
        }
        /* got UDP message, read it */
        addrlen = sizeof(message.sender_addr);
        r = recvfrom( udp_socket_array[ w ], datagram_buffer, max_datagram_size,
                      0, (struct sockaddr*) &message.sender_addr, &addrlen );
        if( r < 0 )
        {
            ERR( "recvfrom() error %lu\n", WSAGetLastError() );
            goto done;
        }
        if( !r )
        {
            TRACE_2( "empty datagram\n" );
            continue;
        }
        message.msg = g_strndup( datagram_buffer, r );
        message.source = udp_source_references[ w ];

        TRACE_2( "got message from %d.%d.%d.%d; source name %s; %s\n",
                 message.sender_addr.sin_addr.S_un.S_un_b.s_b1,
                 message.sender_addr.sin_addr.S_un.S_un_b.s_b2,
                 message.sender_addr.sin_addr.S_un.S_un_b.s_b3,
                 message.sender_addr.sin_addr.S_un.S_un_b.s_b4,
                 message.source->name, message.msg );

        /* allocate raw message and add it to the queue */
        msg = g_malloc( sizeof(struct raw_message) );
        memcpy( msg, &message, sizeof(struct raw_message) );
        EnterCriticalSection( &udp_queue_cs );
        if( fifo_push( udp_message_queue, msg ) )
            SetEvent( udp_queue_event );
        LeaveCriticalSection( &udp_queue_cs );
    }

done:
    TRACE_LEAVE( "done\n" );
    return 0;
}

/******************************************************************************
 * shutdown_udp_listener
 *
 * stop listening thread and dispose all objects
 * except message queue and event
 */
void shutdown_udp_listener()
{
    TRACE_ENTER( "\n" );

    if( udp_listener_thread_handle )
    {
        SetEvent( udp_listener_stop_event );
        TRACE_2( "wait for shutdown of udp listener thread\n" );
        WaitForSingleObject( udp_listener_thread_handle, INFINITE );
        CloseHandle( udp_listener_thread_handle );
        udp_listener_thread_handle = NULL;
    }

    if( udp_socket_array )
    {
        int i;

        for( i = 0; i < udp_socket_count; i++ )
            closesocket( udp_socket_array[ i ] );
        g_free( udp_socket_array );
        udp_socket_array = NULL;
        udp_socket_count = 0;
    }

    if( udp_source_references )
    {
        g_free( udp_source_references );
        udp_source_references = NULL;
    }

    if( udp_event_array )
    {
        int i;

        for( i = 0; i < udp_event_count; i++ )
            CloseHandle( udp_event_array[ i ] );
        g_free( udp_event_array );
        udp_event_array = NULL;
        udp_event_count = 0;
        udp_listener_stop_event = NULL;
    }

    if( datagram_buffer )
    {
        g_free( datagram_buffer );
        datagram_buffer = NULL;
    }
    max_datagram_size = 1024;

    TRACE_LEAVE( "done\n" );
}

/******************************************************************************
 * fini_udp_listener
 */
void fini_udp_listener()
{
    TRACE_ENTER( "\n" );

    shutdown_udp_listener();

    if( udp_message_queue )
    {
        fifo_destroy( udp_message_queue );
        udp_message_queue = NULL;
    }

    if( udp_queue_event )
    {
        DeleteCriticalSection( &udp_queue_cs );
        CloseHandle( udp_queue_event );
        udp_queue_event = NULL;
    }

    TRACE_LEAVE( "done\n" );
}

/******************************************************************************
 * init_udp_listener
 *
 * create sockets, queue and thread
 */
gboolean init_udp_listener()
{
    gboolean ret = FALSE;
    unsigned n;
    GList *item;
    int i;

    TRACE_ENTER( "\n" );

    /* create message queue and event;
     * we should do this first because of possible early returns
     * from this function when no sources are defined
     */
    udp_message_queue = fifo_create();
    udp_queue_event = CreateEvent( NULL, TRUE, FALSE, NULL );
    if( !udp_queue_event )
    {
        ERR( "Cannot create event; error %lu\n", GetLastError() );
        goto done;
    }
    InitializeCriticalSection( &udp_queue_cs );

    n = number_of_sources( ST_UDP );
    if( 0 == n )
    {
        ret = TRUE;
        goto done;
    }

    /* allocate memory for sockets, source references and events;
     * the number of sockets is not greater than number of sources
     */
    udp_socket_array = g_malloc( n * sizeof(SOCKET) );
    udp_source_references = g_malloc( n * sizeof(struct source*) );

    /* number of events is greater by one because of
     * included udp_listener_stop_event
     */
    udp_event_array = g_malloc( (n + 1) * sizeof(HANDLE) );

    /* create sockets */
    for( item = sources; item; item = item->next )
    {
        struct source *src = item->data;
        SOCKET sock;
        unsigned dgram_size;
        int size;

        if( src->type != ST_UDP )
            continue;

        sock = socket( AF_INET, SOCK_DGRAM, 0 );
        if( INVALID_SOCKET == sock )
        {
            ERR( "Cannot create source %s: socket() error %lu\n", src->name, WSAGetLastError() );
            continue;
        }

        if( bind( sock, (struct sockaddr*) &src->udp, sizeof(src->udp) ) )
        {
            ERR( "Cannot create source %s: bind() error %lu\n", src->name, WSAGetLastError() );
            closesocket( sock );
            continue;
        }

        size = sizeof(dgram_size);
        if( getsockopt( sock, SOL_SOCKET, SO_MAX_MSG_SIZE, (char*) &dgram_size, &size ) )
        {
            ERR( "Cannot create source %s: getsockopt( SO_MAX_MSG_SIZE ) error %lu\n",
                 src->name, WSAGetLastError() );
            closesocket( sock );
            continue;
        }
        TRACE( "datagram size for %d.%d.%d.%d:%d is %u\n",
               src->udp.sin_addr.S_un.S_un_b.s_b1, src->udp.sin_addr.S_un.S_un_b.s_b2,
               src->udp.sin_addr.S_un.S_un_b.s_b3, src->udp.sin_addr.S_un.S_un_b.s_b4,
               ntohs( src->udp.sin_port ), dgram_size );
        if( dgram_size > max_datagram_size )
            max_datagram_size = dgram_size;

        udp_source_references[ udp_socket_count ] = src;
        udp_socket_array[ udp_socket_count++ ] = sock;
    }

    if( 0 == udp_socket_count )
    {
        ret = TRUE;
        goto done;
    }

    /* create events */
    while( udp_event_count < udp_socket_count )
    {
        HANDLE evt = CreateEvent( NULL, FALSE, FALSE, NULL );
        if( !evt )
        {
            ERR( "Cannot initialize source %s: CreateEvent error %lu\n",
                 udp_source_references[ udp_event_count ]->name, GetLastError() );
            goto done;
        }
        udp_event_array[ udp_event_count++ ] = evt;
    }
    udp_listener_stop_event = CreateEvent( NULL, TRUE, FALSE, NULL );
    if( !udp_listener_stop_event )
    {
        ERR( "Cannot create event; error %lu\n", GetLastError() );
        goto done;
    }
    udp_event_array[ udp_event_count++ ] = udp_listener_stop_event;

    /* bind events to sockets */
    for( i = 0; i < udp_socket_count; i++ )
    {
        if( WSAEventSelect( udp_socket_array[ i ], udp_event_array[ i ], FD_READ ) )
        {
            ERR( "Cannot initialize source %s: WSAEventSelect() error %lu\n",
                 udp_source_references[ i ]->name, WSAGetLastError() );
            goto done;
        }
    }

    /* allocate datagram buffer */
    datagram_buffer = g_malloc( max_datagram_size );

    /* create thread */
    udp_listener_thread_handle = (HANDLE) _beginthreadex( NULL, 0, udp_listener_thread, NULL, 0, &n );
    if( !udp_listener_thread_handle )
    {
        ERR( "Cannot create thread; error %lu\n", GetLastError() );
        goto done;
    }

    ret = TRUE;

done:
    if( !ret )
        fini_udp_listener();

    TRACE_LEAVE( "done; udp_socket_count=%d, udp_event_count=%d, max_datagram_size=%d, ret=%d\n",
                 udp_socket_count, udp_event_count, max_datagram_size, (int) ret );
    return ret;
}