From 91b4a60c1f52ef7eac033379702a751baab6ad7d Mon Sep 17 00:00:00 2001 From: Mark Spencer Date: Mon, 29 Mar 2004 08:09:01 +0000 Subject: [PATCH] Add test program for windowing, implement proper windowing git-svn-id: https://origsvn.digium.com/svn/libpri/trunk@79 2fbb986a-6c06-0410-b554-c9c1f0a7f128 --- Makefile | 9 +- libpri.h | 3 + pri.c | 9 +- pri_internal.h | 1 + pri_q921.h | 1 + q921.c | 43 +++++++-- testprilib.c | 234 +++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 291 insertions(+), 9 deletions(-) create mode 100755 testprilib.c diff --git a/Makefile b/Makefile index 7ca3309..8ec0abe 100755 --- a/Makefile +++ b/Makefile @@ -28,6 +28,7 @@ #LIBPRI_COUNTERS=-DLIBPRI_COUNTERS TOBJS=testpri.o +T2OBJS=testprilib.o STATIC_LIBRARY=libpri.a DYNAMIC_LIBRARY=libpri.so.1.0 STATIC_OBJS=pri.o q921.o prisched.o q931.o @@ -60,6 +61,12 @@ uninstall: pritest: pritest.o $(CC) -o pritest pritest.o -L. -lpri -lzap +testprilib.o: testprilib.c + $(CC) $(CFLAGS) -D_REENTRANT -D_GNU_SOURCE -o $@ -c $< + +testprilib: testprilib.o + $(CC) -o testprilib testprilib.o -L. -lpri -lpthread + pridump: pridump.o $(CC) -o pridump pridump.o -L. -lpri -lzap @@ -77,5 +84,5 @@ $(DYNAMIC_LIBRARY): $(DYNAMIC_OBJS) clean: rm -f *.o *.so *.lo *.so.1 *.so.1.0 - rm -f testpri $(STATIC_LIBRARY) $(DYNAMIC_LIBRARY) + rm -f testpri testprilib $(STATIC_LIBRARY) $(DYNAMIC_LIBRARY) rm -f pritest pridump diff --git a/libpri.h b/libpri.h index 276cc47..62ad699 100755 --- a/libpri.h +++ b/libpri.h @@ -390,4 +390,7 @@ extern void pri_set_overlapdial(struct pri *pri,int state); #define PRI_DUMP_INFO extern void pri_dump_info(struct pri *pri); +/* Get file descriptor */ +extern int pri_fd(struct pri *pri); + #endif diff --git a/pri.c b/pri.c index 12812be..e6103ed 100755 --- a/pri.c +++ b/pri.c @@ -72,8 +72,8 @@ struct pri *pri_new(int fd, int node, int switchtype) p->q931_rxcount = 0; p->q931_txcount = 0; #endif - /* Start Q.921 layer */ - q921_start(p, 1); + /* Start Q.921 layer, Wait if we're the network */ + q921_start(p, p->localtype == PRI_CPE); } return p; } @@ -338,6 +338,11 @@ void pri_set_overlapdial(struct pri *pri,int state) pri->overlapdial = state; } +int pri_fd(struct pri *pri) +{ + return pri->fd; +} + void pri_dump_info(struct pri *pri) { #ifdef LIBPRI_COUNTERS diff --git a/pri_internal.h b/pri_internal.h index 3d34680..502055d 100755 --- a/pri_internal.h +++ b/pri_internal.h @@ -52,6 +52,7 @@ struct pri { /* Q.921 State */ int q921_state; int window; /* Max window size */ + int windowlen; /* Fullness of window */ int v_s; /* Next N(S) for transmission */ int v_a; /* Last acknowledged frame */ int v_r; /* Next frame expected to be received */ diff --git a/pri_q921.h b/pri_q921.h index 3d0f695..c4217f9 100755 --- a/pri_q921.h +++ b/pri_q921.h @@ -134,6 +134,7 @@ typedef union { typedef struct q921_frame { struct q921_frame *next; /* Next in list */ int len; /* Length of header + body */ + int transmitted; /* Have we been transmitted */ q921_i h; } q921_frame; diff --git a/q921.c b/q921.c index 456faf3..6a17deb 100755 --- a/q921.c +++ b/q921.c @@ -170,6 +170,22 @@ static int q921_ack_packet(struct pri *pri, int num) free(f); /* Reset retransmission counter if we actually acked something */ pri->retrans = 0; + /* Decrement window size */ + pri->windowlen--; + /* Search for something to send */ + f = pri->txqueue; + while(f) { + if (!f->transmitted) { + /* Send it now... */ + if (pri->debug & PRI_DEBUG_Q921_STATE) + pri_message("-- Finally transmitting %d, since window opened up\n", f->h.n_s); + f->transmitted++; + pri->windowlen++; + q921_transmit(pri, (q921_h *)(&f->h), f->len); + break; + } + f = f->next; + } return 1; } prev = f; @@ -319,8 +335,11 @@ static void t200_expire(void *vpri) pri_message("-- Retransmitting %d bytes\n", pri->txqueue->len); if (pri->busy) q921_rr(pri, 1, 0); - else + else { + if (!pri->txqueue->transmitted) + pri_error("!! Not good - head of queue has not been transmitted yet\n"); q921_transmit(pri, (q921_h *)&pri->txqueue->h, pri->txqueue->len); + } if (pri->debug & PRI_DEBUG_Q921_STATE) pri_message("-- Rescheduling retransmission (%d)\n", pri->retrans); pri->t200_timer = pri_schedule_event(pri, T_200, t200_expire, pri); @@ -362,6 +381,7 @@ int q921_transmit_iframe(struct pri *pri, void *buf, int len, int cr) break; } f->next = NULL; + f->transmitted = 0; f->len = len + 4; memcpy(f->h.data, buf, len); f->h.n_s = pri->v_s; @@ -374,9 +394,18 @@ int q921_transmit_iframe(struct pri *pri, void *buf, int len, int cr) prev->next = f; else pri->txqueue = f; - /* Immediately transmit unless we're in a recovery state */ + /* Immediately transmit unless we're in a recovery state, or the window + size is too big */ if (!pri->retrans && !pri->busy) { - q921_transmit(pri, (q921_h *)(&f->h), f->len); + if (pri->windowlen < pri->window) { + pri->windowlen++; + q921_transmit(pri, (q921_h *)(&f->h), f->len); + f->transmitted++; + } else { + if (pri->debug & PRI_DEBUG_Q921_STATE) + pri_message("Delaying transmission of %d, window is %d/%d long\n", + f->h.n_s, pri->windowlen, pri->window); + } } if (pri->t203_timer) { if (pri->debug & PRI_DEBUG_Q921_STATE) @@ -468,7 +497,7 @@ void q921_dump(q921_h *h, int len, int showraw, int txrx) pri_message("\n%c [", direction_tag); for (x=0;xraw[x]); - pri_message("]"); + pri_message("]\n"); } switch (h->h.data[0] & Q921_FRAMETYPE_MASK) { @@ -627,6 +656,7 @@ void q921_reset(struct pri *pri) pri->v_r = 0; pri->v_na = 0; pri->window = 7; + pri->windowlen = 0; pri_schedule_del(pri, pri->sabme_timer); pri_schedule_del(pri, pri->t203_timer); pri_schedule_del(pri, pri->t200_timer); @@ -731,8 +761,9 @@ static pri_event *__q921_receive(struct pri *pri, q921_h *h, int len) sendnow = 0; /* Resend the proper I-frame */ for(f=pri->txqueue;f;f=f->next) { - if (sendnow || (f->h.n_s == h->s.n_r)) { - /* Matches the request, or follows in our window */ + if ((sendnow || (f->h.n_s == h->s.n_r)) && f->transmitted) { + /* Matches the request, or follows in our window, and has + already been transmitted. */ sendnow = 1; pri_error("!! Got reject for frame %d, retransmitting frame %d now, updating n_r!\n", h->s.n_r, f->h.n_s); f->h.n_r = pri->v_r; diff --git a/testprilib.c b/testprilib.c new file mode 100755 index 0000000..e0fda9d --- /dev/null +++ b/testprilib.c @@ -0,0 +1,234 @@ +/* + * libpri: An implementation of Primary Rate ISDN + * + * Written by Mark Spencer + * + * Copyright (C) 2001, Linux Support Services, Inc. + * All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +/* + * This program tests libpri call reception using a zaptel interface. + * Its state machines are setup for RECEIVING CALLS ONLY, so if you + * are trying to both place and receive calls you have to a bit more. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "libpri.h" + +#define DEBUG_LEVEL PRI_DEBUG_ALL + +#define PRI_DEF_NODETYPE PRI_CPE +#define PRI_DEF_SWITCHTYPE PRI_SWITCH_NI2 + +static struct pri *first, *cur; + +static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; + +#define TEST_CALLS 32 + +static void event1(struct pri *pri, pri_event *e) +{ + /* Network */ + int x; + static q931_call *calls[TEST_CALLS]; + char name[256], num[256], dest[256]; + switch(e->gen.e) { + case PRI_EVENT_DCHAN_UP: + printf("Network is up. Sending blast of calls!\n"); + for (x=0;xgen.e), e->gen.e); + } +} + +static void event2(struct pri *pri, pri_event *e) +{ + /* CPE */ + switch(e->gen.e) { + case PRI_EVENT_DCHAN_UP: + default: + printf("PRI 2: %s (%d)\n", pri_event2str(e->gen.e), e->gen.e); + } +} + +static void testmsg(char *s) +{ + char *c; + static int keeplast = 0; + do { + c = strchr(s, '\n'); + if (c) { + *c = '\0'; + c++; + } + if (keeplast) + printf("%s", s); + else if (cur == first) + printf("-1 %s", s); + else + printf("-2 %s", s); + if (c) + printf("\n"); + s = c; + } while(c && *c); + if (!c) + keeplast = 1; + else + keeplast = 0; +} + +static void testerr(char *s) +{ + char *c; + static int keeplast = 0; + do { + c = strchr(s, '\n'); + if (c) { + *c = '\0'; + c++; + } + if (keeplast) + printf("%s", s); + else if (cur == first) + printf("=1 %s", s); + else + printf("=2 %s", s); + if (c) + printf("\n"); + s = c; + } while(c && *c); + if (!c) + keeplast = 1; + else + keeplast = 0; +} + + +static void *dchan(void *data) +{ + /* Joint D-channel */ + struct pri *pri = data; + struct timeval *next, tv; + pri_event *e; + fd_set fds; + int res; + for(;;) { + if (next == pri_schedule_next(pri)) { + gettimeofday(&tv, NULL); + tv.tv_sec = next->tv_sec - tv.tv_sec; + tv.tv_usec = next->tv_usec - tv.tv_usec; + if (tv.tv_usec < 0) { + tv.tv_usec += 1000000; + tv.tv_sec -= 1; + } + if (tv.tv_sec < 0) { + tv.tv_sec = 0; + tv.tv_usec = 0; + } + } + FD_ZERO(&fds); + FD_SET(pri_fd(pri), &fds); + res = select(pri_fd(pri) + 1, &fds, NULL, NULL, next ? &tv : NULL); + pthread_mutex_lock(&lock); + cur = pri; + if (res < 0) { + perror("select"); + } else if (!res) { + e = pri_schedule_run(pri); + } else { + e = pri_check_event(pri); + } + if (e) { + if (first == pri) { + event1(pri, e); + } else { + event2(pri, e); + } + } + pthread_mutex_unlock(&lock); + } + return NULL; +} + + +int main(int argc, char *argv[]) +{ + int pair[2]; + pthread_t tmp; + struct pri *pri; + pri_set_message(testmsg); + pri_set_error(testerr); + if (socketpair(AF_LOCAL, SOCK_DGRAM, 0, pair)) { + perror("socketpair"); + exit(1); + } + if (!(pri = pri_new(pair[0], PRI_NETWORK, PRI_DEF_SWITCHTYPE))) { + perror("pri(0)"); + exit(1); + } + first = pri; + pri_set_debug(pri, DEBUG_LEVEL); + if (pthread_create(&tmp, NULL, dchan, pri)) { + perror("thread(0)"); + exit(1); + } + if (!(pri = pri_new(pair[1], PRI_CPE, PRI_DEF_SWITCHTYPE))) { + perror("pri(1)"); + exit(1); + } + pri_set_debug(pri, DEBUG_LEVEL); + if (pthread_create(&tmp, NULL, dchan, pri)) { + perror("thread(1)"); + exit(1); + } + /* Wait for things to run */ + sleep(5); + exit(0); +} +