Göm meny

TSEA81 - Datorteknik och realtidssystem

Notera att dessa sidor enbart finns i engelsk version

TSEA81 - Computer Engineering and Real-time Systems

Lecture - 4 - Monitors, Message passing

The notation [RT] is used to refer to the book Realtidsprogrammering.

Condition variables

As described in Lecture 3 - Task synchronization, condition variables can be used, together with semaphores, for implementing conditional critical regions. A condition variable has three operations: one operation for initialisation, and two operations denoted Await and Cause.

As described in Lecture 3 - Task synchronization, a condition variable is associated with a semaphore, denoted the associated semaphore. The semaphore shall protect the shared data accessed in the the conditional critical region.

The condition variable operations are defined as follows:

  • Initialisation - during initialisation, the association between the condition variable and the associated semaphore is done. In addition, the condition variable is intialised so that Await-operations and Cause-operations can be performed.
  • Await - the calling task (i.e. the task which executes the the Await-operation) becomes waiting. At the same time, the resource protected by the associated semaphore is released. This means that the Await-operation includes an implicit Signal-operation on the associated semaphore. The calling task is placed in a wait state associated with the condition variable for which the Await-operation is done. When describing this wait state, it is said that the task is waiting on the condition variable.
  • Cause - this operation moves all tasks waiting for the condition variable. The move is done so that, after the move, all moved tasks are instead waiting on the associated semaphore. If no tasks are waiting for the condition variable when a Cause-operation occurs, nothing happens.

In Simple_OS, a condition variable is defined by the data type si_condvar. An Await-operation is done by calling the function si_cv_wait, and a Cause-operation is done by calling the function si_cv_broadcast.

In Linux, a condition variable is defined by the data type pthread_cond_t. An Await-operation is done by calling the function pthread_cond_wait, and a Cause-operation is done by calling the function pthread_cond_broadcast.

The example in Lecture 3 - Task synchronization can be implemented, using a condition variable, here called Change. The condition variable is declared together with the semaphore, as

/* mutex for protection of the buffer */
pthread_mutex_t Mutex; 

/* condition variable for indication of when the buffer changes 
   state, either from full to non-full or from empty 
   to non-empty */
pthread_cond_t Change; 

The condition variable is initalised in a function init_buffer, declared as

/* init_buffer: initialises the buffer */
void init_buffer(void)
{
    /* initialise mutex */ 
    pthread_mutex_init(&Mutex, NULL);
    /* initialise condition variable */
    pthread_cond_init(&Change, NULL);
    /* start writing from first position */ 
    In_Pos = 0; 
    /* start reading from first position */ 
    Out_Pos = 0; 
    /* no elements are stored in the buffer */ 
    Count = 0; 
}

The condition variable is used, in the function put_item, as

/* put_item: stores item in the buffer */
void put_item(char item) 
{
    /* reserve buffer */ 
  pthread_mutex_lock(&Mutex);
    /* check if the buffer is full, and if 
       this is so, wait */ 
    while (Count == BUFFER_SIZE)
    {
        pthread_cond_wait(&Change,&Mutex);
    }

    /* store item in buffer */ 

    Buffer_Data[In_Pos] = item; 
    In_Pos++; 
    if (In_Pos == BUFFER_SIZE)
    {
        In_Pos = 0; 
    }
    Count++; 

    /* notify other tasks that a change 
       has occurred */
    pthread_cond_broadcast(&Change);
    /* release buffer */ 
    pthread_mutex_unlock(&Mutex);
}

The condition variable is used, in the function get_item, as

/* get_item: read an item from the buffer */
char get_item(void) 
{
    /* item to read from buffer */ 
    char item; 
    /* reserve buffer */ 
    pthread_mutex_lock(&Mutex);
    /* check if buffer is empty, and if 
       this is so, wait */
    while (Count == 0)
    {
        pthread_cond_wait(&Change);
    }

    /* read item from buffer */

    item = Buffer_Data[Out_Pos]; 
    Out_Pos++; 
    if (Out_Pos == BUFFER_SIZE)
    {
       Out_Pos = 0; 
    }
    Count--; 

    /* notify other tasks that a change 
       has occurred */
    pthread_cond_broadcast(&Change);
    /* release buffer */ 
    pthread_mutex_unlock(&Mutex);

    /* return the value read */ 
    return item; 
}

Data abstraction

A buffer was declared in Lecture 3 - Task synchronization, as

/* buffer size */ 
#define BUFFER_SIZE 10

/* buffer data */  
char Buffer_Data[BUFFER_SIZE]; 

/* positions in buffer */ 
int In_Pos; 
int Out_Pos; 

/* number of elements in buffer */ 
int Count; 

The above buffer is declared using variables in a program, where the variables are declared as separate variables, on file scope.

A buffer can also be declared using the keyword struct, for the purpose of creating a data structure. In this data structure the variables constituting the buffer are kept together. Further, it is possible to use the keyword struct so that a specific data type, representing buffers, is created. This corresponds to creating a buffer class in an object oriented language.

If the access to variables of the created data type is done only through a selected set of functions, and not through direct access to variables inside the data structure, an abstract data type is obtained. This corresponds to creating a class, where data members are private and where access is only allowed through public member functions (a.k.a. methods).

A code example where the keyword struct is used in different ways is provided via the link struct_example.c.

Monitors

If an abstract data type is created, and if the data fields in the data type represents shared data in a program with tasks, and further if the semaphore protecting the data and also if eventual condition variables used in conditional critical regions accessing the data, are placed alongside the data in a structured data type, a monitor is obtained.

Monitors are described in Chapter 7 in [RT], where a monitor is defined as an abstract data type with functionality for mutual exclusion and conditional critical regions.

Monitors can be implemented using C and a real-time operating system, by defining an abstract data type which uses semaphores and condition variables.

A monitor is used in Assignment 3 - Lift with Monitors, for representation of the elevator which is used in the assignment.

Monitors can be implemented using C and a real-time operating system. For this purpose, a specific programming methodology can be used, where data structures containing the data belonging to a monitor are implemented using struct, and where operations on the data structures are defined as functions, with prototypes placed in header files. This kind of monitors are described in Section 7.1 in [RT].

When implementing abstract data types in C, different methods for memory allocation can be used. The methodology described in Section 7.1 in [RT] uses dynamic memory allocation, where the function malloc is used for memory allocation, and the function free is used for memory deallocation. A pointer type, denoted buffer_type, is defined as

typedef buffer_data_type* buffer_type;

where buffer_data_type is defined using struct according to

/* data structure for buffer */ 
typedef struct
{
    /* the buffer itself */
    char buffer_data[BUFFER_SIZE];
    /* position for next write */ 
    int in_pos;
    /* position for next read */ 
    int out_pos; 
    /* number of elements in the buffer */ 
    int count; 

    /* semaphore for protection of the buffer */
    si_semaphore mutex; 
    /* event to indicate that the buffer has changed 
       state, from full to non-full or from empty to 
       non-empty */ 
    si_condvar change; 
} buffer_data_type;

Memory allocation is done when a buffer is created, by calling malloc from the function that creates a buffer, according to

/* create_buffer: creates a buffer and initialises 
   the created buffer */
buffer_type create_buffer(void)
{
    /* reference to the created buffer */ 
    buffer_type buffer;
    /* allocate memory */ 
    buffer = (buffer_type) malloc(sizeof(buffer_data_type));

    /* start writing at first position */ 
    buffer->in_pos = 0;
    /* start reading at first position */ 
    buffer->out_pos = 0;
    /* no elements are stored in the buffer */ 
    buffer->count = 0; 

    /* initialise semaphore and event variable */ 
    si_sem_init(&buffer->mutex, 1);
    si_cv_init(&buffer->change, &buffer->mutex); 

    return buffer;
}

A program which shall use a buffer can declare a variable, e.g. denoted Buffer, of the type buffer_type, which is assiged a value when create_buffer is called, as

    Buffer = create_buffer(); 

This methodology thus means that a pointer is declared, which then is used to refer to the memory allocated during a call to create_buffer.

An alternative method is to use static memory allocation. This is e.g. necessary if malloc (or any corresponding function) is not available, which could be the case e.g. in an emedded system where the complete C Standard Library is not available. Another situation could be that it is not desirable to call malloc, due to uncertainty regarding the time required to allocate a certain amount of memory. Static memory allocation could then be used, making it possible to determine the amount of memory that is used by the program during compilation.

Static memory allocation can be achieved by declaring a variable, e.g. denoted Buffer, of the type buffer_data_type. The address of this variable is then used, when initialising the buffer, and when reading from or writing to the buffer. The function create_buffer, which allocates memory for a buffer by calling malloc, is replaced by a function, e.g. denoted init_buffer, implemented as

/* init_buffer: initialises buffer */
void init_buffer(buffer_type buffer)
{
    /* start writing at first position */ 
    buffer->in_pos = 0;
    /* start reading at first position */ 
    buffer->out_pos = 0;
    /* no elements are stored in the buffer */ 
    buffer->count = 0; 

    /* initialise semaphore and event variable */ 
    si_sem_init(&buffer->mutex, 1);
    si_cv_init(&buffer->change, &buffer->mutex); 
}

and called according to

    init_buffer(&Buffer); 

This methodology means that a variable which is a data structure, defined using struct, is declared, instead of declaring a pointer variable. The address of the variable refers to memory which is allocated statically.

Assignment 3 - Lift with Monitors

This assignment is more extensive than earlier assignments. The assignment, which covers simulation of an elevator system, shall provide experience in creating a real-time program with the following properties:

  • The use of a monitor, which is specified in a separate program module. The monitor represents the lift, with persons waiting for the lift and passengers travelling with the lift.
  • Conditional critical regions with different types of conditions. This is exemplified e.g. in the function which performs a lift journey, where different conditions must be evaluated, e.g. while a person is waiting for the lift, and during travel when the lift has reached a floor, so that a passenger with this floor as destination floor leaves the lift.
  • Tasks which are all created from one C function, but still have specific properties. This is exemplified by using a unique identity for each person.
  • Message passing, for transfer of the unique identity to each person task.
  • The use of graphics, for visualisation of events. In the assignment, the lift is visualised, with waiting persons and passengers in the lift.
It is recommended that the program is developed incrementally, which facilitates verification and debugging. The assignment therefore gives experience also in this type of development, applied to a real-time program.

Communication using messages

The execution of tasks is often synchronized, e.g. in connection with mutual exclusion, conditional critical regions, and asymmetric synchronization.

These types of synchronization often involves communication between tasks. As an example, a Cause-operation on a condition variable can be regarded as a form of communication, where a task indicates that an event has occurred. In a similar way, one could interpret the asymmetric synchronization in Assignment 2 - Alarm Clock as a form of communication, where a clock task informs an alarm task that an alarm shall be activated, using a Signal-operation on a semaphore.

Sometimes it is desirable that the task communication is done using messages. A task can then send one or more messages to another task. A message contains data which shall be transferred between the tasks. The software used for message handling also handles the necessary synchronization required during the communication, e.g. due to requirements on mutual exclusion and conditional critical regions.

Sending and receiving messages

It is not desirable to assume a certain message structure, e.g. that a message contains an array of characters or an integer number, when developing message handling software. The message handling software must be able to handle general messages, where the structure of the message contents can be decided by the tasks which send and receive messages. This can be achieved by e.g. representing a message as a sequence of bytes.

Tasks that communicate using messages must however agree on a common interpretation of the message contents, e.g. that a message contains an integer number (e.g. a time stamp), followed by a text string (which could represent the actual message contents). In this way, the sequences of bytes sent between the tasks can be interpreted in a correct way.

Message passing

Message based communication can be performed using a buffer, which stores messages, intended for one or more receiving tasks.

A message buffer can be built-in to the software that handles the parallel activities, e.g. a real-time operating system. A task can then send a message to another task without explictly indicating the buffer in which the message shall be stored. Instead, the sending task can indicate which task that shall receive the message, e.g. using an integer number which acts as a task identity. Message communication using a built-in buffer is implemented in Simple_OS, and is used in Assignment 3 - Lift with Monitors, for the purpose of transferring a unique identity to all person tasks in the program. It is also used, as the main method for communication and synchronization, in Assignment 4 - Lift with Message Passing. Message communication using a built-in buffer is treated in Section 8.1.2 in [RT].

Message based communication can also be performed without using a buffer. This can be achieved using symmetric synchronization. A sending task is then required to wait until a message has been acknowledged by a receiving task, and a receiving task is required to wait until a message has been received. This type of communication is e.g. built-in to the programming language Ada, and is there referred to as rendez-vous.

Message based communication using a buffer can be regarded as asynchronous communication, since the execution of the tasks which communicate via the buffer can be done asynchronously, e.g. when a sending task continues its execution directly after a message has been sent, and when a receiving task continues its execution when a message has been sent and receives the message a later time instant. In contrast to this, message based communication using symmetric synchronization, where the sending task and the receiving task wait for each other, can be regarded as synchronous communication. The execution of the sending task, as well as the execution of the receiving task, is then synchronized with the sending and receiving of a message.

Message based communication is often referred to as message passing. Functionality for message passing is available in different real-time operating systems. Information avbout message passing in the The QNX Microkernel in the QNX operating system, can e.g. be found among the QNX Community Resources.

The real-time operating system OSE from Enea uses message based communication, where the messages are referred to as signals. Information about the OSE real-time operating system can be found here.

As an example, consider implementing the clock in Assignment 1 - Introduction, Shared Resources using message passing.

We use a time data type, defined as

/* time data type */ 
typedef struct 
{
    int hours; 
    int minutes; 
    int seconds; 
} time_data_type; 

A message data type is defined, using message types

#define TIME_MESSAGE 0
#define SET_MESSAGE 1

and a message struct data type, defined as

/* data structure for messages */ 
typedef struct
{
    /* message type */
    int type; 
    /* the time */ 
    time_data_type time; 
} message_data_type; 

The clock task is modified. Instead of accessing a global data structure with the current time, the current time is stored inside the task, as a local variable in the C function implementing the task. The current time is then modified, based on messages sent to the clock task. The code for the clock task becomes

/* clock_task: clock task */ 
void clock_task(void) 
{
    /* the current time */ 
    time_data_type time; 

    /* message */
    message_data_type message;

    /* message length */
    int length;

    /* task_id of message sending process */
    int send_task_id;

    /* initialise the time */ 
    set_time(&time, 23, 59, 50); 

    /* infinite loop */
    while (1)
    {
        /* receive message */
        si_message_receive((char *) &message, &length, &send_task_id);

        /* check message type */
        switch(message.type)
        {
            case TIME_MESSAGE:
                /* increment with one second */
                increment_time(&time);
                /* display the time */
                display_time(time); 
                break;
            case SET_MESSAGE:
                /* assign new value to the current time */
                time = message.time;
                /* display the time */
                display_time(time); 
                break;
            default:
                /* an error condition */
                si_ui_show_error("Illegal message type to clock_task");
        }
    }
}

An additional task is introduced. This task shall keep the time by sending time messages to the clock. The code of this task becomes

/* time_task: time task */ 
void time_task(void) 
{
    /* message to be sent to clock task */ 
    message_data_type message; 

    /* infinite loop */ 
    while (1)
    {
        /* it is a move message */ 
        message.type = TIME_MESSAGE; 

        /* send message to clock task */ 
        si_message_send((char *) &message, sizeof(message), Clock_Task_Id); 

        /* wait one second */ 
        si_wait_n_ms(1000); 
    }
}

The task which performes user communication, and which sets the time, is modified. Instead of acting on a global data structure, protected by semaphore, a set message is sent do the clock task. The modified parts of the code are given by declarations, for the message received from the GUI, as

    /* message array */ 
    char ui_message[SI_UI_MAX_MESSAGE_SIZE]; 

and the message to be sent to the clock task, declared as

    /* message to be sent to clock task */ 
    message_data_type message; 

and the actual code sending the message for setting the time, given by

        /* check if it is a set message */ 
        if (strncmp(ui_message, "set", 3) == 0)
        {
            time_from_set_message(ui_message, &hours, &minutes, &seconds); 
            if (time_ok(hours, minutes, seconds))
            {
                /* it is a move message */ 
                message.type = SET_MESSAGE; 
                set_time(&time, hours, minutes, seconds); 
                message.time = time; 

                /* send message to clock task */ 
                si_message_send((char *) &message, sizeof(message), Clock_Task_Id); 
            }
            else
            {
                si_ui_show_error("Illegal value for hours, minutes or seconds"); 
            }
        }

The modified alarm clock program described above can be downloaded using the link set_clock_messages.c

Mailboxes

Message based communication can be performed using an explicit message buffer. The communication can be performed using requirements on sending and receiving tasks, so that a sending task is required to wait when the buffer is full, and a receiving task is required to wait when the buffer is empty. This strategy is often used in the producer-consumer problem, where a buffer also is used, but where it is not required that the buffer can handle items with a general content, as is the case when the buffer stores messages.

A message buffer which is explicitly used by sending and receiving tasks is often referred to as a mailbox. A mailbox can be implemented in a similar way as a buffer with specified contents, but instead using buffer items with a general structure. In the C programming language, this can e.g. be achieved using pointers. Mailboxes are treated in Section 8.1.1 in [RT].

As an example of using a mailbox, consider a program where it is desired to transfer data from one task, a producer, to another task, a consumer. There are requirements on the data transfer, also used in Lecture 3 - Task synchronization, stating that a producer shall wait if the buffer is full, and a consumer shall wait if the buffer is empty.

In this example, there are additional requirements, stating that different types of data shall be possible to send, using only one buffer. This means that the buffer must be able to contain data of different types, stored in the buffer at the same time.

A mailbox can be created for this purpose, by implementing a buffer which allows a mix of different data types for its elements.

In this example we consider requirements saying that is shall be possible to send and receive data of two different types. The types selected here are char and int.

A message data type can be defined, representing both of the desired data types, as

/* data structure for messages */ 
typedef struct
{
    /* message type */
    int type; 
    /* char value, if char message */ 
    char char_value; 
    /* int value, if int message */ 
    int int_value; 
} message_data_type; 

/* message_type defines a pointer to a message */
typedef message_data_type* message_type;

As an alternative, one could create several message types, for the different data types used. For simplicity, in this example, we use only one message data type.

A mailbox can be defined, in a similar way as a buffer was defined in Section Monitors, however with a different data type for the elements in the buffer. Here, the elements are defined using pointers, of type void *. This datatype allows pointers which are general, in the sense that they can point to data structures of arbitrary data types.

The mailbox is defined as a data structure using struct, with elements for the message buffer, but also with variables for protection and synchronization, such as a semaphore for protection and a condition variable for handling the conditional critical regions, as

/* data structure for mailbox */ 
typedef struct
{
    /* the mailbox itself, defined as an array 
       of pointers to messages */
    void *mailbox_data[MAILBOX_SIZE];
    /* position where next message shall be stored */ 
    int in_pos;
    /* position where next message shall be read */ 
    int out_pos; 
    /* number of messages in the mailbox */ 
    int count; 

    /* semaphore for protection of the mailbox */
    si_semaphore mutex; 
    /* condition variable to indicate that the mailbox has changed 
       state, from full to non-full or from empty to 
       non-empty */ 
    si_condvar change; 
} mailbox_data_type;

typedef mailbox_data_type* mailbox_type;

A function for creation and initialization of a mailbox can be implemented as

/* create_mailbox: creates a mailbox and initialises the 
   created mailbox */
mailbox_type create_mailbox(void)
{
    /* reference to the created mailbox */ 
    mailbox_type mailbox;
    /* allocate memory */ 
    mailbox = 
        (mailbox_type) malloc(sizeof(mailbox_data_type));

    /* start writing at first position */ 
    mailbox->in_pos = 0;
    /* start reading at first position */ 
    mailbox->out_pos = 0;
    /* no elements are stored in the mailbox */ 
    mailbox->count = 0; 

    /* initialise semaphore and event variable */ 
    si_sem_init(&mailbox->mutex, 1);
    si_cv_init(&mailbox->change, &mailbox->mutex); 

    return mailbox;
}

A function for sending a message to a mailbox can be created, using the semaphore for protection, and the condition variable for handling of the case when the mailbox is full, as

/* send_message: sends a message to mailbox. The 
   calling process will wait if the mailbox is full. 
   When the message is sent the parameter *message is 
   set to NULL, in order to prevent further references 
   to the message, when it is stored in the mailbox */
void send_message(
    mailbox_type mailbox, void **message)
{
    /* reserve mailbox */
    si_sem_wait(&mailbox->mutex);
    /* check if the mailbox is full, and if 
       this is so, wait */ 
    while (mailbox->count == MAILBOX_SIZE)
    {
        si_cv_wait(&mailbox->change); 
    }

    /* store message in mailbox, and set message 
       pointer to NULL */ 
    mailbox->mailbox_data[mailbox->in_pos] = *message; 
    *message = NULL; 
    mailbox->in_pos++;
    if (mailbox->in_pos == MAILBOX_SIZE)
    {
        mailbox->in_pos = 0;
    }
    mailbox->count++; 

    /* notify other processes that a change 
       has occurred */
    si_cv_broadcast(&mailbox->change); 

    /* release mailbox */
    si_sem_signal(&mailbox->mutex);
}

In the above listing of the function send_message, one can note that the parameter message is declared as double void pointer (i.e. a pointer to a pointer). The reason for this is that the pointer to the message, held initially by the function calling send_message, shall be set to NULL by send_message, in order to prevent the task calling send_message from access to the message contents after the message is sent.

A function for receiving a message from a mailbox can be created, using the semaphore for protection, and the condition variable for handling of the case when the mailbox is empty, as

/* receive_message: get a message from the mailbox. The 
   calling process will wait if the mailbox is empty */ 
void receive_message(
    mailbox_type mailbox, void **message)
{
    /* reserve mailbox */
    si_sem_wait(&mailbox->mutex);

    /* check if mailbox is empty, and if 
       this is so, wait */
    while (mailbox->count == 0)
    {
        si_cv_wait(&mailbox->change); 
    }

    /* get message from mailbox, and set the message 
       pointer where the message was stored to NULL */ 
    *message = mailbox->mailbox_data[mailbox->out_pos];
    mailbox->mailbox_data[mailbox->out_pos] = NULL; 
    mailbox->out_pos++;
    if (mailbox->out_pos == MAILBOX_SIZE) {
        mailbox->out_pos = 0;
    }
    mailbox->count--; 

    /* notify other processes that a change 
       has occurred */
    si_cv_broadcast(&mailbox->change); 

    /* release mailbox */
    si_sem_signal(&mailbox->mutex);
}

Again, it can be seen that the parameter message is declared as double void pointer. The reason for this is that the pointer to the message is an out-parameter from the function receive_message. An alternative solution would be to instead declare the return type of receive_message as void *.

As an example usage of the function send_message, the following code is shown, where two messages are sent, of different types, and where memory for both messages is allocated:

/* send_string: sends str_to_send character-wise to mailbox */ 
void send_string(mailbox_type mailbox, char str_to_send[])
{
    /* loop counter */ 
    size_t i; 
    /* message pointer */ 
    message_type message; 

    /* for each character in str_to_send */ 
    for (i = 0; i < strlen(str_to_send); i++) 
    {
        /* send character as char message */ 
        /* allocate memory */ 
        message = malloc(sizeof(message_data_type)); 
        /* fill in message type and contents */ 
        message->type = CHAR_MESSAGE; 
        message->char_value = str_to_send[i]; 
        message->int_value = 0; /* dummy value */ 
        /* send message */ 
        send_message(mailbox, (void *) &message); 

        /* send character code as int message */ 
        /* allocate memory */ 
        message = malloc(sizeof(message_data_type)); 
        /* fill in message type and contents */ 
        message->type = INT_MESSAGE; 
        message->char_value = '0'; /* dummy value */ 
        message->int_value = (int) (str_to_send[i]); 
        /* send message */ 
        send_message(mailbox, (void *) &message); 
    }
}

As an example usage of the function receive_message, the following code is shown, where a message is received, and different actions are taken, depending on the message type:

/* receive_and_print_message: receives a message from 
   mailbox, and prints message contents and message 
   time stamp */ 
void receive_and_print_message(mailbox_type mailbox)
{
    /* message pointer */ 
    message_type message; 

    /* receive_message */ 
    receive_message(mailbox, (void *) &message); 
    /* print message contents */ 
    switch (message->type)
    {
        case CHAR_MESSAGE: 
            printf("-%c-", message->char_value); 
            break; 
        case INT_MESSAGE: 
            printf("-%d-", message->int_value); 
            break; 
        default: 
            printf("\nERROR: illegal message type %d\n", message->type); 
    }

    /* ensure everything is printed immediately */ 
    fflush(stdout); 

    /* deallocate memory */ 
    free(message); 
}

It can be seen, in the above listing of code calling receive_message, that the memory allocated for the message is deallocated, after the message has been received and processed.


Informationsansvarig: Kent Palmkvist
Senast uppdaterad: 2020-11-15