corosync  2.3.5
totempg.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2003-2005 MontaVista Software, Inc.
3  * Copyright (c) 2005 OSDL.
4  * Copyright (c) 2006-2012 Red Hat, Inc.
5  *
6  * All rights reserved.
7  *
8  * Author: Steven Dake (sdake@redhat.com)
9  * Author: Mark Haverkamp (markh@osdl.org)
10  *
11  * This software licensed under BSD license, the text of which follows:
12  *
13  * Redistribution and use in source and binary forms, with or without
14  * modification, are permitted provided that the following conditions are met:
15  *
16  * - Redistributions of source code must retain the above copyright notice,
17  * this list of conditions and the following disclaimer.
18  * - Redistributions in binary form must reproduce the above copyright notice,
19  * this list of conditions and the following disclaimer in the documentation
20  * and/or other materials provided with the distribution.
21  * - Neither the name of the MontaVista Software, Inc. nor the names of its
22  * contributors may be used to endorse or promote products derived from this
23  * software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35  * THE POSSIBILITY OF SUCH DAMAGE.
36  */
37 
38 /*
39  * FRAGMENTATION AND PACKING ALGORITHM:
40  *
41  * Assemble the entire message into one buffer
42  * if full fragment
43  * store fragment into lengths list
44  * for each full fragment
45  * multicast fragment
46  * set length and fragment fields of pg mesage
47  * store remaining multicast into head of fragmentation data and set lens field
48  *
49  * If a message exceeds the maximum packet size allowed by the totem
50  * single ring protocol, the protocol could lose forward progress.
51  * Statically calculating the allowed data amount doesn't work because
52  * the amount of data allowed depends on the number of fragments in
53  * each message. In this implementation, the maximum fragment size
54  * is dynamically calculated for each fragment added to the message.
55 
56  * It is possible for a message to be two bytes short of the maximum
57  * packet size. This occurs when a message or collection of
58  * messages + the mcast header + the lens are two bytes short of the
59  * end of the packet. Since another len field consumes two bytes, the
60  * len field would consume the rest of the packet without room for data.
61  *
62  * One optimization would be to forgo the final len field and determine
63  * it from the size of the udp datagram. Then this condition would no
64  * longer occur.
65  */
66 
67 /*
68  * ASSEMBLY AND UNPACKING ALGORITHM:
69  *
70  * copy incoming packet into assembly data buffer indexed by current
71  * location of end of fragment
72  *
73  * if not fragmented
74  * deliver all messages in assembly data buffer
75  * else
76  * if msg_count > 1 and fragmented
77  * deliver all messages except last message in assembly data buffer
78  * copy last fragmented section to start of assembly data buffer
79  * else
80  * if msg_count = 1 and fragmented
81  * do nothing
82  *
83  */
84 
85 #include <config.h>
86 
87 #ifdef HAVE_ALLOCA_H
88 #include <alloca.h>
89 #endif
90 #include <sys/types.h>
91 #include <sys/socket.h>
92 #include <netinet/in.h>
93 #include <arpa/inet.h>
94 #include <sys/uio.h>
95 #include <stdio.h>
96 #include <stdlib.h>
97 #include <string.h>
98 #include <assert.h>
99 #include <pthread.h>
100 #include <errno.h>
101 #include <limits.h>
102 
103 #include <corosync/swab.h>
104 #include <corosync/list.h>
105 #include <qb/qbloop.h>
106 #include <qb/qbipcs.h>
107 #include <corosync/totem/totempg.h>
108 #define LOGSYS_UTILS_ONLY 1
109 #include <corosync/logsys.h>
110 
111 #include "totemmrp.h"
112 #include "totemsrp.h"
113 
114 #define min(a,b) ((a) < (b)) ? a : b
115 
117  short version;
118  short type;
119 };
120 
121 #if !(defined(__i386__) || defined(__x86_64__))
122 /*
123  * Need align on architectures different then i386 or x86_64
124  */
125 #define TOTEMPG_NEED_ALIGN 1
126 #endif
127 
128 /*
129  * totempg_mcast structure
130  *
131  * header: Identify the mcast.
132  * fragmented: Set if this message continues into next message
133  * continuation: Set if this message is a continuation from last message
134  * msg_count Indicates how many packed messages are contained
135  * in the mcast.
136  * Also, the size of each packed message and the messages themselves are
137  * appended to the end of this structure when sent.
138  */
141  unsigned char fragmented;
142  unsigned char continuation;
143  unsigned short msg_count;
144  /*
145  * short msg_len[msg_count];
146  */
147  /*
148  * data for messages
149  */
150 };
151 
152 /*
153  * Maximum packet size for totem pg messages
154  */
155 #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
156  sizeof (struct totempg_mcast))
157 
158 /*
159  * Local variables used for packing small messages
160  */
161 static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
162 
163 static int mcast_packed_msg_count = 0;
164 
165 static int totempg_reserved = 1;
166 
167 static unsigned int totempg_size_limit;
168 
169 static totem_queue_level_changed_fn totem_queue_level_changed = NULL;
170 
171 static uint32_t totempg_threaded_mode = 0;
172 
173 /*
174  * Function and data used to log messages
175  */
176 static int totempg_log_level_security;
177 static int totempg_log_level_error;
178 static int totempg_log_level_warning;
179 static int totempg_log_level_notice;
180 static int totempg_log_level_debug;
181 static int totempg_subsys_id;
182 static void (*totempg_log_printf) (
183  int level,
184  int subsys,
185  const char *function,
186  const char *file,
187  int line,
188  const char *format, ...) __attribute__((format(printf, 6, 7)));
189 
191 
192 static totempg_stats_t totempg_stats;
193 
197 };
198 
199 struct assembly {
200  unsigned int nodeid;
201  unsigned char data[MESSAGE_SIZE_MAX];
202  int index;
203  unsigned char last_frag_num;
205  struct list_head list;
206 };
207 
208 static void assembly_deref (struct assembly *assembly);
209 
210 static int callback_token_received_fn (enum totem_callback_token_type type,
211  const void *data);
212 
213 DECLARE_LIST_INIT(assembly_list_inuse);
214 
215 DECLARE_LIST_INIT(assembly_list_free);
216 
217 DECLARE_LIST_INIT(assembly_list_inuse_trans);
218 
219 DECLARE_LIST_INIT(assembly_list_free_trans);
220 
221 DECLARE_LIST_INIT(totempg_groups_list);
222 
223 /*
224  * Staging buffer for packed messages. Messages are staged in this buffer
225  * before sending. Multiple messages may fit which cuts down on the
226  * number of mcasts sent. If a message doesn't completely fit, then
227  * the mcast header has a fragment bit set that says that there are more
228  * data to follow. fragment_size is an index into the buffer. It indicates
229  * the size of message data and where to place new message data.
230  * fragment_contuation indicates whether the first packed message in
231  * the buffer is a continuation of a previously packed fragment.
232  */
233 static unsigned char *fragmentation_data;
234 
235 static int fragment_size = 0;
236 
237 static int fragment_continuation = 0;
238 
239 static int totempg_waiting_transack = 0;
240 
242  void (*deliver_fn) (
243  unsigned int nodeid,
244  const void *msg,
245  unsigned int msg_len,
246  int endian_conversion_required);
247 
248  void (*confchg_fn) (
249  enum totem_configuration_type configuration_type,
250  const unsigned int *member_list, size_t member_list_entries,
251  const unsigned int *left_list, size_t left_list_entries,
252  const unsigned int *joined_list, size_t joined_list_entries,
253  const struct memb_ring_id *ring_id);
254 
256 
258  int32_t q_level;
259 
260  struct list_head list;
261 };
262 
263 static unsigned char next_fragment = 1;
264 
265 static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
266 
267 static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER;
268 
269 static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
270 
271 #define log_printf(level, format, args...) \
272 do { \
273  totempg_log_printf(level, \
274  totempg_subsys_id, \
275  __FUNCTION__, __FILE__, __LINE__, \
276  format, ##args); \
277 } while (0);
278 
279 static int msg_count_send_ok (int msg_count);
280 
281 static int byte_count_send_ok (int byte_count);
282 
283 static void totempg_waiting_trans_ack_cb (int waiting_trans_ack)
284 {
285  log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack);
286  totempg_waiting_transack = waiting_trans_ack;
287 }
288 
289 static struct assembly *assembly_ref (unsigned int nodeid)
290 {
291  struct assembly *assembly;
292  struct list_head *list;
293  struct list_head *active_assembly_list_inuse;
294  struct list_head *active_assembly_list_free;
295 
296  if (totempg_waiting_transack) {
297  active_assembly_list_inuse = &assembly_list_inuse_trans;
298  active_assembly_list_free = &assembly_list_free_trans;
299  } else {
300  active_assembly_list_inuse = &assembly_list_inuse;
301  active_assembly_list_free = &assembly_list_free;
302  }
303 
304  /*
305  * Search inuse list for node id and return assembly buffer if found
306  */
307  for (list = active_assembly_list_inuse->next;
308  list != active_assembly_list_inuse;
309  list = list->next) {
310 
311  assembly = list_entry (list, struct assembly, list);
312 
313  if (nodeid == assembly->nodeid) {
314  return (assembly);
315  }
316  }
317 
318  /*
319  * Nothing found in inuse list get one from free list if available
320  */
321  if (list_empty (active_assembly_list_free) == 0) {
322  assembly = list_entry (active_assembly_list_free->next, struct assembly, list);
323  list_del (&assembly->list);
324  list_add (&assembly->list, active_assembly_list_inuse);
325  assembly->nodeid = nodeid;
326  assembly->index = 0;
327  assembly->last_frag_num = 0;
329  return (assembly);
330  }
331 
332  /*
333  * Nothing available in inuse or free list, so allocate a new one
334  */
335  assembly = malloc (sizeof (struct assembly));
336  /*
337  * TODO handle memory allocation failure here
338  */
339  assert (assembly);
340  assembly->nodeid = nodeid;
341  assembly->data[0] = 0;
342  assembly->index = 0;
343  assembly->last_frag_num = 0;
345  list_init (&assembly->list);
346  list_add (&assembly->list, active_assembly_list_inuse);
347 
348  return (assembly);
349 }
350 
351 static void assembly_deref (struct assembly *assembly)
352 {
353  struct list_head *active_assembly_list_free;
354 
355  if (totempg_waiting_transack) {
356  active_assembly_list_free = &assembly_list_free_trans;
357  } else {
358  active_assembly_list_free = &assembly_list_free;
359  }
360 
361  list_del (&assembly->list);
362  list_add (&assembly->list, active_assembly_list_free);
363 }
364 
365 static void assembly_deref_from_normal_and_trans (int nodeid)
366 {
367  int j;
368  struct list_head *list, *list_next;
369  struct list_head *active_assembly_list_inuse;
370  struct list_head *active_assembly_list_free;
371  struct assembly *assembly;
372 
373  for (j = 0; j < 2; j++) {
374  if (j == 0) {
375  active_assembly_list_inuse = &assembly_list_inuse;
376  active_assembly_list_free = &assembly_list_free;
377  } else {
378  active_assembly_list_inuse = &assembly_list_inuse_trans;
379  active_assembly_list_free = &assembly_list_free_trans;
380  }
381 
382  for (list = active_assembly_list_inuse->next;
383  list != active_assembly_list_inuse;
384  list = list_next) {
385 
386  list_next = list->next;
387  assembly = list_entry (list, struct assembly, list);
388 
389  if (nodeid == assembly->nodeid) {
390  list_del (&assembly->list);
391  list_add (&assembly->list, active_assembly_list_free);
392  }
393  }
394  }
395 
396 }
397 
398 static inline void app_confchg_fn (
399  enum totem_configuration_type configuration_type,
400  const unsigned int *member_list, size_t member_list_entries,
401  const unsigned int *left_list, size_t left_list_entries,
402  const unsigned int *joined_list, size_t joined_list_entries,
403  const struct memb_ring_id *ring_id)
404 {
405  int i;
406  struct totempg_group_instance *instance;
407  struct list_head *list;
408 
409  /*
410  * For every leaving processor, add to free list
411  * This also has the side effect of clearing out the dataset
412  * In the leaving processor's assembly buffer.
413  */
414  for (i = 0; i < left_list_entries; i++) {
415  assembly_deref_from_normal_and_trans (left_list[i]);
416  }
417 
418  for (list = totempg_groups_list.next;
419  list != &totempg_groups_list;
420  list = list->next) {
421 
422  instance = list_entry (list, struct totempg_group_instance, list);
423 
424  if (instance->confchg_fn) {
425  instance->confchg_fn (
426  configuration_type,
427  member_list,
428  member_list_entries,
429  left_list,
430  left_list_entries,
431  joined_list,
432  joined_list_entries,
433  ring_id);
434  }
435  }
436 }
437 
438 static inline void group_endian_convert (
439  void *msg,
440  int msg_len)
441 {
442  unsigned short *group_len;
443  int i;
444  char *aligned_msg;
445 
446 #ifdef TOTEMPG_NEED_ALIGN
447  /*
448  * Align data structure for not i386 or x86_64
449  */
450  if ((size_t)msg % 4 != 0) {
451  aligned_msg = alloca(msg_len);
452  memcpy(aligned_msg, msg, msg_len);
453  } else {
454  aligned_msg = msg;
455  }
456 #else
457  aligned_msg = msg;
458 #endif
459 
460  group_len = (unsigned short *)aligned_msg;
461  group_len[0] = swab16(group_len[0]);
462  for (i = 1; i < group_len[0] + 1; i++) {
463  group_len[i] = swab16(group_len[i]);
464  }
465 
466  if (aligned_msg != msg) {
467  memcpy(msg, aligned_msg, msg_len);
468  }
469 }
470 
471 static inline int group_matches (
472  struct iovec *iovec,
473  unsigned int iov_len,
474  struct totempg_group *groups_b,
475  unsigned int group_b_cnt,
476  unsigned int *adjust_iovec)
477 {
478  unsigned short *group_len;
479  char *group_name;
480  int i;
481  int j;
482 #ifdef TOTEMPG_NEED_ALIGN
483  struct iovec iovec_aligned = { NULL, 0 };
484 #endif
485 
486  assert (iov_len == 1);
487 
488 #ifdef TOTEMPG_NEED_ALIGN
489  /*
490  * Align data structure for not i386 or x86_64
491  */
492  if ((size_t)iovec->iov_base % 4 != 0) {
493  iovec_aligned.iov_base = alloca(iovec->iov_len);
494  memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
495  iovec_aligned.iov_len = iovec->iov_len;
496  iovec = &iovec_aligned;
497  }
498 #endif
499 
500  group_len = (unsigned short *)iovec->iov_base;
501  group_name = ((char *)iovec->iov_base) +
502  sizeof (unsigned short) * (group_len[0] + 1);
503 
504 
505  /*
506  * Calculate amount to adjust the iovec by before delivering to app
507  */
508  *adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1);
509  for (i = 1; i < group_len[0] + 1; i++) {
510  *adjust_iovec += group_len[i];
511  }
512 
513  /*
514  * Determine if this message should be delivered to this instance
515  */
516  for (i = 1; i < group_len[0] + 1; i++) {
517  for (j = 0; j < group_b_cnt; j++) {
518  if ((group_len[i] == groups_b[j].group_len) &&
519  (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
520  return (1);
521  }
522  }
523  group_name += group_len[i];
524  }
525  return (0);
526 }
527 
528 
529 static inline void app_deliver_fn (
530  unsigned int nodeid,
531  void *msg,
532  unsigned int msg_len,
533  int endian_conversion_required)
534 {
535  struct totempg_group_instance *instance;
536  struct iovec stripped_iovec;
537  unsigned int adjust_iovec;
538  struct iovec *iovec;
539  struct list_head *list;
540 
541  struct iovec aligned_iovec = { NULL, 0 };
542 
543  if (endian_conversion_required) {
544  group_endian_convert (msg, msg_len);
545  }
546 
547  /*
548  * TODO: segmentation/assembly need to be redesigned to provide aligned access
549  * in all cases to avoid memory copies on non386 archs. Probably broke backwars
550  * compatibility
551  */
552 
553 #ifdef TOTEMPG_NEED_ALIGN
554  /*
555  * Align data structure for not i386 or x86_64
556  */
557  aligned_iovec.iov_base = alloca(msg_len);
558  aligned_iovec.iov_len = msg_len;
559  memcpy(aligned_iovec.iov_base, msg, msg_len);
560 #else
561  aligned_iovec.iov_base = msg;
562  aligned_iovec.iov_len = msg_len;
563 #endif
564 
565  iovec = &aligned_iovec;
566 
567  for (list = totempg_groups_list.next;
568  list != &totempg_groups_list;
569  list = list->next) {
570 
571  instance = list_entry (list, struct totempg_group_instance, list);
572  if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) {
573  stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
574  stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec;
575 
576 #ifdef TOTEMPG_NEED_ALIGN
577  /*
578  * Align data structure for not i386 or x86_64
579  */
580  if ((char *)iovec->iov_base + adjust_iovec % 4 != 0) {
581  /*
582  * Deal with misalignment
583  */
584  stripped_iovec.iov_base =
585  alloca (stripped_iovec.iov_len);
586  memcpy (stripped_iovec.iov_base,
587  (char *)iovec->iov_base + adjust_iovec,
588  stripped_iovec.iov_len);
589  }
590 #endif
591  instance->deliver_fn (
592  nodeid,
593  stripped_iovec.iov_base,
594  stripped_iovec.iov_len,
595  endian_conversion_required);
596  }
597  }
598 }
599 
600 static void totempg_confchg_fn (
601  enum totem_configuration_type configuration_type,
602  const unsigned int *member_list, size_t member_list_entries,
603  const unsigned int *left_list, size_t left_list_entries,
604  const unsigned int *joined_list, size_t joined_list_entries,
605  const struct memb_ring_id *ring_id)
606 {
607 // TODO optimize this
608  app_confchg_fn (configuration_type,
609  member_list, member_list_entries,
610  left_list, left_list_entries,
611  joined_list, joined_list_entries,
612  ring_id);
613 }
614 
615 static void totempg_deliver_fn (
616  unsigned int nodeid,
617  const void *msg,
618  unsigned int msg_len,
619  int endian_conversion_required)
620 {
621  struct totempg_mcast *mcast;
622  unsigned short *msg_lens;
623  int i;
624  struct assembly *assembly;
625  char header[FRAME_SIZE_MAX];
626  int msg_count;
627  int continuation;
628  int start;
629  const char *data;
630  int datasize;
631  struct iovec iov_delv;
632 
633  assembly = assembly_ref (nodeid);
634  assert (assembly);
635 
636  /*
637  * Assemble the header into one block of data and
638  * assemble the packet contents into one block of data to simplify delivery
639  */
640 
641  mcast = (struct totempg_mcast *)msg;
642  if (endian_conversion_required) {
643  mcast->msg_count = swab16 (mcast->msg_count);
644  }
645 
646  msg_count = mcast->msg_count;
647  datasize = sizeof (struct totempg_mcast) +
648  msg_count * sizeof (unsigned short);
649 
650  memcpy (header, msg, datasize);
651  data = msg;
652 
653  msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
654  if (endian_conversion_required) {
655  for (i = 0; i < mcast->msg_count; i++) {
656  msg_lens[i] = swab16 (msg_lens[i]);
657  }
658  }
659 
660  memcpy (&assembly->data[assembly->index], &data[datasize],
661  msg_len - datasize);
662 
663  /*
664  * If the last message in the buffer is a fragment, then we
665  * can't deliver it. We'll first deliver the full messages
666  * then adjust the assembly buffer so we can add the rest of the
667  * fragment when it arrives.
668  */
669  msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count;
670  continuation = mcast->continuation;
671  iov_delv.iov_base = (void *)&assembly->data[0];
672  iov_delv.iov_len = assembly->index + msg_lens[0];
673 
674  /*
675  * Make sure that if this message is a continuation, that it
676  * matches the sequence number of the previous fragment.
677  * Also, if the first packed message is a continuation
678  * of a previous message, but the assembly buffer
679  * is empty, then we need to discard it since we can't
680  * assemble a complete message. Likewise, if this message isn't a
681  * continuation and the assembly buffer is empty, we have to discard
682  * the continued message.
683  */
684  start = 0;
685 
686  if (assembly->throw_away_mode == THROW_AWAY_ACTIVE) {
687  /* Throw away the first msg block */
688  if (mcast->fragmented == 0 || mcast->fragmented == 1) {
689  assembly->throw_away_mode = THROW_AWAY_INACTIVE;
690 
691  assembly->index += msg_lens[0];
692  iov_delv.iov_base = (void *)&assembly->data[assembly->index];
693  iov_delv.iov_len = msg_lens[1];
694  start = 1;
695  }
696  } else
697  if (assembly->throw_away_mode == THROW_AWAY_INACTIVE) {
698  if (continuation == assembly->last_frag_num) {
699  assembly->last_frag_num = mcast->fragmented;
700  for (i = start; i < msg_count; i++) {
701  app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len,
702  endian_conversion_required);
703  assembly->index += msg_lens[i];
704  iov_delv.iov_base = (void *)&assembly->data[assembly->index];
705  if (i < (msg_count - 1)) {
706  iov_delv.iov_len = msg_lens[i + 1];
707  }
708  }
709  } else {
710  log_printf (LOG_DEBUG, "fragmented continuation %u is not equal to assembly last_frag_num %u",
711  continuation, assembly->last_frag_num);
712  assembly->throw_away_mode = THROW_AWAY_ACTIVE;
713  }
714  }
715 
716  if (mcast->fragmented == 0) {
717  /*
718  * End of messages, dereference assembly struct
719  */
720  assembly->last_frag_num = 0;
721  assembly->index = 0;
722  assembly_deref (assembly);
723  } else {
724  /*
725  * Message is fragmented, keep around assembly list
726  */
727  if (mcast->msg_count > 1) {
728  memmove (&assembly->data[0],
729  &assembly->data[assembly->index],
730  msg_lens[msg_count]);
731 
732  assembly->index = 0;
733  }
734  assembly->index += msg_lens[msg_count];
735  }
736 }
737 
738 /*
739  * Totem Process Group Abstraction
740  * depends on poll abstraction, POSIX, IPV4
741  */
742 
744 
745 int callback_token_received_fn (enum totem_callback_token_type type,
746  const void *data)
747 {
748  struct totempg_mcast mcast;
749  struct iovec iovecs[3];
750 
751  if (totempg_threaded_mode == 1) {
752  pthread_mutex_lock (&mcast_msg_mutex);
753  }
754  if (mcast_packed_msg_count == 0) {
755  if (totempg_threaded_mode == 1) {
756  pthread_mutex_unlock (&mcast_msg_mutex);
757  }
758  return (0);
759  }
760  if (totemmrp_avail() == 0) {
761  if (totempg_threaded_mode == 1) {
762  pthread_mutex_unlock (&mcast_msg_mutex);
763  }
764  return (0);
765  }
766  mcast.header.version = 0;
767  mcast.header.type = 0;
768  mcast.fragmented = 0;
769 
770  /*
771  * Was the first message in this buffer a continuation of a
772  * fragmented message?
773  */
774  mcast.continuation = fragment_continuation;
775  fragment_continuation = 0;
776 
777  mcast.msg_count = mcast_packed_msg_count;
778 
779  iovecs[0].iov_base = (void *)&mcast;
780  iovecs[0].iov_len = sizeof (struct totempg_mcast);
781  iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
782  iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short);
783  iovecs[2].iov_base = (void *)&fragmentation_data[0];
784  iovecs[2].iov_len = fragment_size;
785  (void)totemmrp_mcast (iovecs, 3, 0);
786 
787  mcast_packed_msg_count = 0;
788  fragment_size = 0;
789 
790  if (totempg_threaded_mode == 1) {
791  pthread_mutex_unlock (&mcast_msg_mutex);
792  }
793  return (0);
794 }
795 
796 /*
797  * Initialize the totem process group abstraction
798  */
800  qb_loop_t *poll_handle,
801  struct totem_config *totem_config)
802 {
803  int res;
804 
805  totempg_totem_config = totem_config;
806  totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security;
807  totempg_log_level_error = totem_config->totem_logging_configuration.log_level_error;
808  totempg_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
809  totempg_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
810  totempg_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
811  totempg_log_printf = totem_config->totem_logging_configuration.log_printf;
812  totempg_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
813 
814  fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
815  if (fragmentation_data == 0) {
816  return (-1);
817  }
818 
819  totemsrp_net_mtu_adjust (totem_config);
820 
821  res = totemmrp_initialize (
822  poll_handle,
823  totem_config,
824  &totempg_stats,
825  totempg_deliver_fn,
826  totempg_confchg_fn,
827  totempg_waiting_trans_ack_cb);
828 
830  &callback_token_received_handle,
832  0,
833  callback_token_received_fn,
834  0);
835 
836  totempg_size_limit = (totemmrp_avail() - 1) *
837  (totempg_totem_config->net_mtu -
838  sizeof (struct totempg_mcast) - 16);
839 
840  list_init (&totempg_groups_list);
841 
842  return (res);
843 }
844 
845 void totempg_finalize (void)
846 {
847  if (totempg_threaded_mode == 1) {
848  pthread_mutex_lock (&totempg_mutex);
849  }
851  if (totempg_threaded_mode == 1) {
852  pthread_mutex_unlock (&totempg_mutex);
853  }
854 }
855 
856 /*
857  * Multicast a message
858  */
859 static int mcast_msg (
860  struct iovec *iovec_in,
861  unsigned int iov_len,
862  int guarantee)
863 {
864  int res = 0;
865  struct totempg_mcast mcast;
866  struct iovec iovecs[3];
867  struct iovec iovec[64];
868  int i;
869  int dest, src;
870  int max_packet_size = 0;
871  int copy_len = 0;
872  int copy_base = 0;
873  int total_size = 0;
874 
875  if (totempg_threaded_mode == 1) {
876  pthread_mutex_lock (&mcast_msg_mutex);
877  }
879 
880  /*
881  * Remove zero length iovectors from the list
882  */
883  assert (iov_len < 64);
884  for (dest = 0, src = 0; src < iov_len; src++) {
885  if (iovec_in[src].iov_len) {
886  memcpy (&iovec[dest++], &iovec_in[src],
887  sizeof (struct iovec));
888  }
889  }
890  iov_len = dest;
891 
892  max_packet_size = TOTEMPG_PACKET_SIZE -
893  (sizeof (unsigned short) * (mcast_packed_msg_count + 1));
894 
895  mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
896 
897  /*
898  * Check if we would overwrite new message queue
899  */
900  for (i = 0; i < iov_len; i++) {
901  total_size += iovec[i].iov_len;
902  }
903 
904  if (byte_count_send_ok (total_size + sizeof(unsigned short) *
905  (mcast_packed_msg_count)) == 0) {
906 
907  if (totempg_threaded_mode == 1) {
908  pthread_mutex_unlock (&mcast_msg_mutex);
909  }
910  return(-1);
911  }
912 
913  mcast.header.version = 0;
914  for (i = 0; i < iov_len; ) {
915  mcast.fragmented = 0;
916  mcast.continuation = fragment_continuation;
917  copy_len = iovec[i].iov_len - copy_base;
918 
919  /*
920  * If it all fits with room left over, copy it in.
921  * We need to leave at least sizeof(short) + 1 bytes in the
922  * fragment_buffer on exit so that max_packet_size + fragment_size
923  * doesn't exceed the size of the fragment_buffer on the next call.
924  */
925  if ((copy_len + fragment_size) <
926  (max_packet_size - sizeof (unsigned short))) {
927 
928  memcpy (&fragmentation_data[fragment_size],
929  (char *)iovec[i].iov_base + copy_base, copy_len);
930  fragment_size += copy_len;
931  mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
932  next_fragment = 1;
933  copy_len = 0;
934  copy_base = 0;
935  i++;
936  continue;
937 
938  /*
939  * If it just fits or is too big, then send out what fits.
940  */
941  } else {
942  unsigned char *data_ptr;
943 
944  copy_len = min(copy_len, max_packet_size - fragment_size);
945  if( copy_len == max_packet_size )
946  data_ptr = (unsigned char *)iovec[i].iov_base + copy_base;
947  else {
948  data_ptr = fragmentation_data;
949  memcpy (&fragmentation_data[fragment_size],
950  (unsigned char *)iovec[i].iov_base + copy_base, copy_len);
951  }
952 
953  memcpy (&fragmentation_data[fragment_size],
954  (unsigned char *)iovec[i].iov_base + copy_base, copy_len);
955  mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
956 
957  /*
958  * if we're not on the last iovec or the iovec is too large to
959  * fit, then indicate a fragment. This also means that the next
960  * message will have the continuation of this one.
961  */
962  if ((i < (iov_len - 1)) ||
963  ((copy_base + copy_len) < iovec[i].iov_len)) {
964  if (!next_fragment) {
965  next_fragment++;
966  }
967  fragment_continuation = next_fragment;
968  mcast.fragmented = next_fragment++;
969  assert(fragment_continuation != 0);
970  assert(mcast.fragmented != 0);
971  } else {
972  fragment_continuation = 0;
973  }
974 
975  /*
976  * assemble the message and send it
977  */
978  mcast.msg_count = ++mcast_packed_msg_count;
979  iovecs[0].iov_base = (void *)&mcast;
980  iovecs[0].iov_len = sizeof(struct totempg_mcast);
981  iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
982  iovecs[1].iov_len = mcast_packed_msg_count *
983  sizeof(unsigned short);
984  iovecs[2].iov_base = (void *)data_ptr;
985  iovecs[2].iov_len = max_packet_size;
986  assert (totemmrp_avail() > 0);
987  res = totemmrp_mcast (iovecs, 3, guarantee);
988  if (res == -1) {
989  goto error_exit;
990  }
991 
992  /*
993  * Recalculate counts and indexes for the next.
994  */
995  mcast_packed_msg_lens[0] = 0;
996  mcast_packed_msg_count = 0;
997  fragment_size = 0;
998  max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short));
999 
1000  /*
1001  * If the iovec all fit, go to the next iovec
1002  */
1003  if ((copy_base + copy_len) == iovec[i].iov_len) {
1004  copy_len = 0;
1005  copy_base = 0;
1006  i++;
1007 
1008  /*
1009  * Continue with the rest of the current iovec.
1010  */
1011  } else {
1012  copy_base += copy_len;
1013  }
1014  }
1015  }
1016 
1017  /*
1018  * Bump only if we added message data. This may be zero if
1019  * the last buffer just fit into the fragmentation_data buffer
1020  * and we were at the last iovec.
1021  */
1022  if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
1023  mcast_packed_msg_count++;
1024  }
1025 
1026 error_exit:
1027  if (totempg_threaded_mode == 1) {
1028  pthread_mutex_unlock (&mcast_msg_mutex);
1029  }
1030  return (res);
1031 }
1032 
1033 /*
1034  * Determine if a message of msg_size could be queued
1035  */
1036 static int msg_count_send_ok (
1037  int msg_count)
1038 {
1039  int avail = 0;
1040 
1041  avail = totemmrp_avail ();
1042  totempg_stats.msg_queue_avail = avail;
1043 
1044  return ((avail - totempg_reserved) > msg_count);
1045 }
1046 
1047 static int byte_count_send_ok (
1048  int byte_count)
1049 {
1050  unsigned int msg_count = 0;
1051  int avail = 0;
1052 
1053  avail = totemmrp_avail ();
1054 
1055  msg_count = (byte_count / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1056 
1057  return (avail >= msg_count);
1058 }
1059 
1060 static int send_reserve (
1061  int msg_size)
1062 {
1063  unsigned int msg_count = 0;
1064 
1065  msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1066  totempg_reserved += msg_count;
1067  totempg_stats.msg_reserved = totempg_reserved;
1068 
1069  return (msg_count);
1070 }
1071 
1072 static void send_release (
1073  int msg_count)
1074 {
1075  totempg_reserved -= msg_count;
1076  totempg_stats.msg_reserved = totempg_reserved;
1077 }
1078 
1079 #ifndef HAVE_SMALL_MEMORY_FOOTPRINT
1080 #undef MESSAGE_QUEUE_MAX
1081 #define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totempg_totem_config->net_mtu)
1082 #endif /* HAVE_SMALL_MEMORY_FOOTPRINT */
1083 
1084 static uint32_t q_level_precent_used(void)
1085 {
1086  return (100 - (((totemmrp_avail() - totempg_reserved) * 100) / MESSAGE_QUEUE_MAX));
1087 }
1088 
1090  void **handle_out,
1092  int delete,
1093  int (*callback_fn) (enum totem_callback_token_type type, const void *),
1094  const void *data)
1095 {
1096  unsigned int res;
1097  if (totempg_threaded_mode == 1) {
1098  pthread_mutex_lock (&callback_token_mutex);
1099  }
1100  res = totemmrp_callback_token_create (handle_out, type, delete,
1101  callback_fn, data);
1102  if (totempg_threaded_mode == 1) {
1103  pthread_mutex_unlock (&callback_token_mutex);
1104  }
1105  return (res);
1106 }
1107 
1109  void *handle_out)
1110 {
1111  if (totempg_threaded_mode == 1) {
1112  pthread_mutex_lock (&callback_token_mutex);
1113  }
1114  totemmrp_callback_token_destroy (handle_out);
1115  if (totempg_threaded_mode == 1) {
1116  pthread_mutex_unlock (&callback_token_mutex);
1117  }
1118 }
1119 
1120 /*
1121  * vi: set autoindent tabstop=4 shiftwidth=4 :
1122  */
1123 
1125  void **totempg_groups_instance,
1126 
1127  void (*deliver_fn) (
1128  unsigned int nodeid,
1129  const void *msg,
1130  unsigned int msg_len,
1131  int endian_conversion_required),
1132 
1133  void (*confchg_fn) (
1134  enum totem_configuration_type configuration_type,
1135  const unsigned int *member_list, size_t member_list_entries,
1136  const unsigned int *left_list, size_t left_list_entries,
1137  const unsigned int *joined_list, size_t joined_list_entries,
1138  const struct memb_ring_id *ring_id))
1139 {
1140  struct totempg_group_instance *instance;
1141 
1142  if (totempg_threaded_mode == 1) {
1143  pthread_mutex_lock (&totempg_mutex);
1144  }
1145 
1146  instance = malloc (sizeof (struct totempg_group_instance));
1147  if (instance == NULL) {
1148  goto error_exit;
1149  }
1150 
1151  instance->deliver_fn = deliver_fn;
1152  instance->confchg_fn = confchg_fn;
1153  instance->groups = 0;
1154  instance->groups_cnt = 0;
1155  instance->q_level = QB_LOOP_MED;
1156  list_init (&instance->list);
1157  list_add (&instance->list, &totempg_groups_list);
1158 
1159  if (totempg_threaded_mode == 1) {
1160  pthread_mutex_unlock (&totempg_mutex);
1161  }
1162  *totempg_groups_instance = instance;
1163  return (0);
1164 
1165 error_exit:
1166  if (totempg_threaded_mode == 1) {
1167  pthread_mutex_unlock (&totempg_mutex);
1168  }
1169  return (-1);
1170 }
1171 
1173  void *totempg_groups_instance,
1174  const struct totempg_group *groups,
1175  size_t group_cnt)
1176 {
1177  struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1178  struct totempg_group *new_groups;
1179  unsigned int res = 0;
1180 
1181  if (totempg_threaded_mode == 1) {
1182  pthread_mutex_lock (&totempg_mutex);
1183  }
1184 
1185  new_groups = realloc (instance->groups,
1186  sizeof (struct totempg_group) *
1187  (instance->groups_cnt + group_cnt));
1188  if (new_groups == 0) {
1189  res = ENOMEM;
1190  goto error_exit;
1191  }
1192  memcpy (&new_groups[instance->groups_cnt],
1193  groups, group_cnt * sizeof (struct totempg_group));
1194  instance->groups = new_groups;
1195  instance->groups_cnt += group_cnt;
1196 
1197 error_exit:
1198  if (totempg_threaded_mode == 1) {
1199  pthread_mutex_unlock (&totempg_mutex);
1200  }
1201  return (res);
1202 }
1203 
1205  void *totempg_groups_instance,
1206  const struct totempg_group *groups,
1207  size_t group_cnt)
1208 {
1209  if (totempg_threaded_mode == 1) {
1210  pthread_mutex_lock (&totempg_mutex);
1211  }
1212 
1213  if (totempg_threaded_mode == 1) {
1214  pthread_mutex_unlock (&totempg_mutex);
1215  }
1216  return (0);
1217 }
1218 
1219 #define MAX_IOVECS_FROM_APP 32
1220 #define MAX_GROUPS_PER_MSG 32
1221 
1223  void *totempg_groups_instance,
1224  const struct iovec *iovec,
1225  unsigned int iov_len,
1226  int guarantee)
1227 {
1228  struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1229  unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1230  struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1231  int i;
1232  unsigned int res;
1233 
1234  if (totempg_threaded_mode == 1) {
1235  pthread_mutex_lock (&totempg_mutex);
1236  }
1237 
1238  /*
1239  * Build group_len structure and the iovec_mcast structure
1240  */
1241  group_len[0] = instance->groups_cnt;
1242  for (i = 0; i < instance->groups_cnt; i++) {
1243  group_len[i + 1] = instance->groups[i].group_len;
1244  iovec_mcast[i + 1].iov_len = instance->groups[i].group_len;
1245  iovec_mcast[i + 1].iov_base = (void *) instance->groups[i].group;
1246  }
1247  iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short);
1248  iovec_mcast[0].iov_base = group_len;
1249  for (i = 0; i < iov_len; i++) {
1250  iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len;
1251  iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base;
1252  }
1253 
1254  res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee);
1255 
1256  if (totempg_threaded_mode == 1) {
1257  pthread_mutex_unlock (&totempg_mutex);
1258  }
1259 
1260  return (res);
1261 }
1262 
1263 static void check_q_level(
1264  void *totempg_groups_instance)
1265 {
1266  struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1267  int32_t old_level = instance->q_level;
1268  int32_t percent_used = q_level_precent_used();
1269 
1270  if (percent_used >= 75 && instance->q_level != TOTEM_Q_LEVEL_CRITICAL) {
1271  instance->q_level = TOTEM_Q_LEVEL_CRITICAL;
1272  } else if (percent_used < 30 && instance->q_level != TOTEM_Q_LEVEL_LOW) {
1273  instance->q_level = TOTEM_Q_LEVEL_LOW;
1274  } else if (percent_used > 40 && percent_used < 50 && instance->q_level != TOTEM_Q_LEVEL_GOOD) {
1275  instance->q_level = TOTEM_Q_LEVEL_GOOD;
1276  } else if (percent_used > 60 && percent_used < 70 && instance->q_level != TOTEM_Q_LEVEL_HIGH) {
1277  instance->q_level = TOTEM_Q_LEVEL_HIGH;
1278  }
1279  if (totem_queue_level_changed && old_level != instance->q_level) {
1280  totem_queue_level_changed(instance->q_level);
1281  }
1282 }
1283 
1285  void *totempg_groups_instance)
1286 {
1287  struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1288 
1289  check_q_level(instance);
1290 }
1291 
1293  void *totempg_groups_instance,
1294  const struct iovec *iovec,
1295  unsigned int iov_len)
1296 {
1297  struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1298  unsigned int size = 0;
1299  unsigned int i;
1300  unsigned int reserved = 0;
1301 
1302  if (totempg_threaded_mode == 1) {
1303  pthread_mutex_lock (&totempg_mutex);
1304  pthread_mutex_lock (&mcast_msg_mutex);
1305  }
1306 
1307  for (i = 0; i < instance->groups_cnt; i++) {
1308  size += instance->groups[i].group_len;
1309  }
1310  for (i = 0; i < iov_len; i++) {
1311  size += iovec[i].iov_len;
1312  }
1313 
1314  if (size >= totempg_size_limit) {
1315  reserved = -1;
1316  goto error_exit;
1317  }
1318 
1319  if (byte_count_send_ok (size)) {
1320  reserved = send_reserve (size);
1321  } else {
1322  reserved = 0;
1323  }
1324 
1325 error_exit:
1326  check_q_level(instance);
1327 
1328  if (totempg_threaded_mode == 1) {
1329  pthread_mutex_unlock (&mcast_msg_mutex);
1330  pthread_mutex_unlock (&totempg_mutex);
1331  }
1332  return (reserved);
1333 }
1334 
1335 
1337 {
1338  if (totempg_threaded_mode == 1) {
1339  pthread_mutex_lock (&totempg_mutex);
1340  pthread_mutex_lock (&mcast_msg_mutex);
1341  }
1342  send_release (msg_count);
1343  if (totempg_threaded_mode == 1) {
1344  pthread_mutex_unlock (&mcast_msg_mutex);
1345  pthread_mutex_unlock (&totempg_mutex);
1346  }
1347  return 0;
1348 }
1349 
1351  void *totempg_groups_instance,
1352  int guarantee,
1353  const struct totempg_group *groups,
1354  size_t groups_cnt,
1355  const struct iovec *iovec,
1356  unsigned int iov_len)
1357 {
1358  unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1359  struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1360  int i;
1361  unsigned int res;
1362 
1363  if (totempg_threaded_mode == 1) {
1364  pthread_mutex_lock (&totempg_mutex);
1365  }
1366 
1367  /*
1368  * Build group_len structure and the iovec_mcast structure
1369  */
1370  group_len[0] = groups_cnt;
1371  for (i = 0; i < groups_cnt; i++) {
1372  group_len[i + 1] = groups[i].group_len;
1373  iovec_mcast[i + 1].iov_len = groups[i].group_len;
1374  iovec_mcast[i + 1].iov_base = (void *) groups[i].group;
1375  }
1376  iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short);
1377  iovec_mcast[0].iov_base = group_len;
1378  for (i = 0; i < iov_len; i++) {
1379  iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
1380  iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
1381  }
1382 
1383  res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee);
1384 
1385  if (totempg_threaded_mode == 1) {
1386  pthread_mutex_unlock (&totempg_mutex);
1387  }
1388  return (res);
1389 }
1390 
1391 /*
1392  * Returns -1 if error, 0 if can't send, 1 if can send the message
1393  */
1395  void *totempg_groups_instance,
1396  const struct totempg_group *groups,
1397  size_t groups_cnt,
1398  const struct iovec *iovec,
1399  unsigned int iov_len)
1400 {
1401  unsigned int size = 0;
1402  unsigned int i;
1403  unsigned int res;
1404 
1405  if (totempg_threaded_mode == 1) {
1406  pthread_mutex_lock (&totempg_mutex);
1407  }
1408 
1409  for (i = 0; i < groups_cnt; i++) {
1410  size += groups[i].group_len;
1411  }
1412  for (i = 0; i < iov_len; i++) {
1413  size += iovec[i].iov_len;
1414  }
1415 
1416  res = msg_count_send_ok (size);
1417 
1418  if (totempg_threaded_mode == 1) {
1419  pthread_mutex_unlock (&totempg_mutex);
1420  }
1421  return (res);
1422 }
1423 
1425  unsigned int nodeid,
1426  struct totem_ip_address *interfaces,
1427  unsigned int interfaces_size,
1428  char ***status,
1429  unsigned int *iface_count)
1430 {
1431  int res;
1432 
1433  res = totemmrp_ifaces_get (
1434  nodeid,
1435  interfaces,
1436  interfaces_size,
1437  status,
1438  iface_count);
1439 
1440  return (res);
1441 }
1442 
1444 {
1445  totemmrp_event_signal (type, value);
1446 }
1447 
1448 void* totempg_get_stats (void)
1449 {
1450  return &totempg_stats;
1451 }
1452 
1454  const char *cipher_type,
1455  const char *hash_type)
1456 {
1457  int res;
1458 
1459  res = totemmrp_crypto_set (cipher_type, hash_type);
1460 
1461  return (res);
1462 }
1463 
1465 {
1466  int res;
1467 
1468  res = totemmrp_ring_reenable ();
1469 
1470  return (res);
1471 }
1472 
1473 #define ONE_IFACE_LEN 63
1474 const char *totempg_ifaces_print (unsigned int nodeid)
1475 {
1476  static char iface_string[256 * INTERFACE_MAX];
1477  char one_iface[ONE_IFACE_LEN+1];
1478  struct totem_ip_address interfaces[INTERFACE_MAX];
1479  char **status;
1480  unsigned int iface_count;
1481  unsigned int i;
1482  int res;
1483 
1484  iface_string[0] = '\0';
1485 
1486  res = totempg_ifaces_get (nodeid, interfaces, INTERFACE_MAX, &status, &iface_count);
1487  if (res == -1) {
1488  return ("no interface found for nodeid");
1489  }
1490 
1491  res = totempg_ifaces_get (nodeid, interfaces, INTERFACE_MAX, &status, &iface_count);
1492 
1493  for (i = 0; i < iface_count; i++) {
1494  snprintf (one_iface, ONE_IFACE_LEN,
1495  "r(%d) ip(%s) ",
1496  i, totemip_print (&interfaces[i]));
1497  strcat (iface_string, one_iface);
1498  }
1499  return (iface_string);
1500 }
1501 
1502 unsigned int totempg_my_nodeid_get (void)
1503 {
1504  return (totemmrp_my_nodeid_get());
1505 }
1506 
1508 {
1509  return (totemmrp_my_family_get());
1510 }
1512  void (*totem_service_ready) (void))
1513 {
1514  totemmrp_service_ready_register (totem_service_ready);
1515 }
1516 
1518 {
1519  totem_queue_level_changed = fn;
1520 }
1521 
1523  const struct totem_ip_address *member,
1524  int ring_no)
1525 {
1526  return totemmrp_member_add (member, ring_no);
1527 }
1528 
1530  const struct totem_ip_address *member,
1531  int ring_no)
1532 {
1533  return totemmrp_member_remove (member, ring_no);
1534 }
1535 
1537 {
1538  totempg_threaded_mode = 1;
1540 }
1541 
1543 {
1544  totemmrp_trans_ack ();
1545 }
1546 
unsigned char last_frag_num
Definition: totempg.c:203
int totempg_initialize(qb_loop_t *poll_handle, struct totem_config *totem_config)
Initialize the totem process groups abstraction.
Definition: totempg.c:799
void(*) in log_level_security)
Definition: totem.h:82
int totemmrp_ifaces_get(unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totemmrp.c:199
void totemmrp_finalize(void)
Definition: totemmrp.c:154
Totem Single Ring Protocol.
#define TOTEMPG_NEED_ALIGN
Definition: totempg.c:125
uint32_t value
int totempg_groups_initialize(void **totempg_groups_instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Initialize a groups instance.
Definition: totempg.c:1124
size_t group_len
Definition: totempg.h:57
struct list_head * next
Definition: list.h:47
void totemmrp_callback_token_destroy(void *handle_out)
Definition: totemmrp.c:188
void * totempg_get_stats(void)
Definition: totempg.c:1448
const char * totemip_print(const struct totem_ip_address *addr)
Definition: totemip.c:214
Totem Single Ring Protocol.
int totempg_groups_join(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
Definition: totempg.c:1172
void totempg_queue_level_register_callback(totem_queue_level_changed_fn fn)
Definition: totempg.c:1517
#define TOTEMPG_PACKET_SIZE
Definition: totempg.c:155
unsigned char fragmented
Definition: totempg.c:141
totem_configuration_type
Definition: coroapi.h:110
struct totempg_group * groups
Definition: totempg.c:255
int totemmrp_initialize(qb_loop_t *poll_handle, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Initialize the group messaging interface.
Definition: totemmrp.c:118
void totempg_trans_ack(void)
Definition: totempg.c:1542
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition: totemsrp.c:4754
int totemmrp_member_add(const struct totem_ip_address *member, int ring_no)
Definition: totemmrp.c:256
int totemmrp_avail(void)
Return number of available messages that can be queued.
Definition: totemmrp.c:173
struct message_header header
Definition: totemsrp.c:60
int guarantee
Definition: totemsrp.c:66
struct list_head list
Definition: totempg.c:260
#define log_printf(level, format, args...)
Definition: totempg.c:271
int totemmrp_my_family_get(void)
Definition: totemmrp.c:233
int totemmrp_crypto_set(const char *cipher_type, const char *hash_type)
Definition: totemmrp.c:219
void totemmrp_trans_ack(void)
Definition: totemmrp.c:283
void(* totem_queue_level_changed_fn)(enum totem_q_level level)
Definition: totempg.h:182
Definition: list.h:46
int totempg_groups_send_ok_groups(void *totempg_groups_instance, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
Definition: totempg.c:1394
unsigned char data[MESSAGE_SIZE_MAX]
Definition: totempg.c:201
int totempg_groups_mcast_groups(void *totempg_groups_instance, int guarantee, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
Definition: totempg.c:1350
int totempg_ifaces_get(unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totempg.c:1424
#define INTERFACE_MAX
Definition: coroapi.h:75
#define MAX_GROUPS_PER_MSG
Definition: totempg.c:1220
const char * totempg_ifaces_print(unsigned int nodeid)
Definition: totempg.c:1474
void(* deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition: totempg.c:242
void totempg_threaded_mode_enable(void)
Definition: totempg.c:1536
DECLARE_LIST_INIT(assembly_list_inuse)
int totemmrp_member_remove(const struct totem_ip_address *member, int ring_no)
Definition: totemmrp.c:267
void(* confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totempg.c:248
int totempg_groups_leave(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
Definition: totempg.c:1204
struct list_head list
Definition: totempg.c:205
int totemmrp_mcast(struct iovec *iovec, unsigned int iov_len, int priority)
Multicast a message.
Definition: totemmrp.c:162
int totempg_my_family_get(void)
Definition: totempg.c:1507
unsigned char continuation
Definition: totempg.c:142
const void * group
Definition: totempg.h:56
void * callback_token_received_handle
Definition: totempg.c:743
int totemmrp_callback_token_create(void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totemmrp.c:178
int totemmrp_ring_reenable(void)
Definition: totemmrp.c:238
Linked list API.
int totempg_groups_mcast_joined(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
Definition: totempg.c:1222
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:75
void totempg_check_q_level(void *totempg_groups_instance)
Definition: totempg.c:1284
unsigned int totempg_my_nodeid_get(void)
Definition: totempg.c:1502
typedef __attribute__
uint32_t msg_queue_avail
Definition: totem.h:290
totem_event_type
Definition: totem.h:212
int totempg_groups_joined_release(int msg_count)
Definition: totempg.c:1336
void totempg_event_signal(enum totem_event_type type, int value)
Definition: totempg.c:1443
unsigned short msg_count
Definition: totempg.c:143
int totempg_member_remove(const struct totem_ip_address *member, int ring_no)
Definition: totempg.c:1529
unsigned int totemmrp_my_nodeid_get(void)
Definition: totemmrp.c:228
void totempg_callback_token_destroy(void *handle_out)
Definition: totempg.c:1108
unsigned int net_mtu
Definition: totem.h:165
void totemmrp_service_ready_register(void(*totem_service_ready)(void))
Definition: totemmrp.c:248
throw_away_mode
Definition: totempg.c:194
int totempg_groups_joined_reserve(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len)
Definition: totempg.c:1292
int totempg_ring_reenable(void)
Definition: totempg.c:1464
#define MESSAGE_QUEUE_MAX
Definition: totempg.c:1081
#define swab16(x)
Definition: swab.h:35
void totemmrp_threaded_mode_enable(void)
Definition: totemmrp.c:278
uint32_t msg_reserved
Definition: totem.h:289
void totempg_service_ready_register(void(*totem_service_ready)(void))
Definition: totempg.c:1511
unsigned int nodeid
Definition: totempg.c:200
#define min(a, b)
Definition: totempg.c:114
#define FRAME_SIZE_MAX
Definition: totem.h:50
#define list_entry(ptr, type, member)
Definition: list.h:84
#define ONE_IFACE_LEN
Definition: totempg.c:1473
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:163
enum throw_away_mode throw_away_mode
Definition: totempg.c:204
int totempg_crypto_set(const char *cipher_type, const char *hash_type)
Definition: totempg.c:1453
void totemmrp_event_signal(enum totem_event_type type, int value)
Definition: totemmrp.c:194
char type
Definition: totemrrp.c:518
#define MESSAGE_SIZE_MAX
Definition: coroapi.h:84
unsigned int nodeid
Definition: coroapi.h:65
void totempg_finalize(void)
Definition: totempg.c:845
struct memb_ring_id ring_id
Definition: totemsrp.c:64
int totempg_member_add(const struct totem_ip_address *member, int ring_no)
Definition: totempg.c:1522
int index
Definition: totempg.c:202
Totem Single Ring Protocol.
int totempg_callback_token_create(void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totempg.c:1089
static void(*) struct totem_config totempg_totem_config)
Definition: totempg.c:190
struct totempg_mcast_header header
Definition: totempg.c:140
totem_callback_token_type
Definition: coroapi.h:117