荔园在线

荔园之美,在春之萌芽,在夏之绽放,在秋之收获,在冬之沉淀

[回到开始] [上一篇][下一篇]


发信人: zhch@bbs.nju.edu.cn, 信区: InstallBBS
标  题: 自己动手, 实现多线程
发信站: 南京大学小百合站 (Tue Jan 14 00:10:01 2003)
转信站: SZU!news.tiaozhan.com!news.zixia.net!NJU
出  处: bbs.nju.edu.cn

linux的pthread多线程库有点像是用进程来模拟的,效率上比进程没有太大
的提高。而下面这个小程序给出了一个简单的第3方linux多线程的实现。
它实现了简单的线程调度,支持抢占式多任务,并提供了若干线程安全的库函数。
各线程间栈空间分离,堆空间共享。

目前程序还很简单,只是一个示例,但最基本的功能已经有了。效率方面待完
善后应该会有很大提高。

thread1_main();         用户程序入口
m_exec(int func());     建立一个线程
m_exit();               结束当前线程
m_getpid();             获得当前线程ID
m_sleep();              线程安全的sleep()
m_bind();               线程安全的bind()

用户程序可以使用一般的linux库函数,也可以使用m_开头的对应函数。后者比较安全。

下面打算把FB移植这个系统上面试试, 将来如有可能也可向bbs OS方面发展。

程序主要包括mthread.h和mthread.c两个文件, 用户需提供一个thread1_main()函数做为
用户入口函数。

mthread.h:
#include "stdarg.h"
#include "stdio.h"
#include "signal.h"
#include "setjmp.h"
#include "termios.h"
#include "unistd.h"
#include "sys/time.h"
#include "sys/poll.h"
#include "string.h"
#include "time.h"
#include "stdlib.h"
#include "unistd.h"
#include "fcntl.h"
#include "netdb.h"
#include "errno.h"
#include "sys/stat.h"
#include "sys/ipc.h"
#include "sys/shm.h"
#include "sys/types.h"
#include "sys/mman.h"
#include "sys/socket.h"
#include "netinet/in.h"
#include "arpa/inet.h"

#define MAX_MTHREAD 16
#define MAX_FD      16
#define MAX_STACK   8192

#define FORMAT(fmt, buf, len) \
        va_list ap; \
        va_start(ap, fmt); \
        vsnprintf(buf, len, fmt, ap); \
        va_end(ap); \
        if(len>=1) buf[len-1]=0

int printk(char *fmt, ...);
int panic(char *fmt, ...);
int thread1_main();

#define TASK_UNUSED 0
#define TASK_RUN    1
#define TASK_SLEEP  2
#define TASK_WAITIN 3

struct TASK {
        int waitfd;
        int stdio;
        int idle_time;
        int sig;
        int (*sig_handler)();
        int state;
        int t_wakeup;
        int pid;
        int ppid;
        int (*entry)();
        sigjmp_buf x;
        sigjmp_buf x0;
};

mthread.c:
#include "mthread.h"

struct TASK task[MAX_MTHREAD];
struct TASK *current;

int uptime;
int last_pid;
int num_switch;

int main() {
        int i;
        for(i=0; i<NSIG; i++) signal(i, SIG_IGN);
        printf("init...\n");
        init(0);
}

int idle() {
        while(1) sleep(1);
}

int init_timer() {
    struct itimerval new, old;
    int timer_handler();
    bzero(&new, sizeof(struct itimerval));
    bzero(&old, sizeof(struct itimerval));
    new.it_interval.tv_sec=0;
    new.it_interval.tv_usec=20000;
    new.it_value.tv_sec=0;
    new.it_value.tv_usec=20000;
    setitimer(ITIMER_REAL, &new, &old);
    signal(SIGALRM, (void*)timer_handler);
}

int init(int i) {
        char stack[MAX_STACK];
        int ii=i;
        int thread1_main(), idle();
        bzero(stack, MAX_STACK);
        if(ii>=MAX_MTHREAD) {
                printf("init ok\n");
                init_timer();
                task[0].entry=idle;
                m_exec(thread1_main);
                schedule();
        }
        if(sigsetjmp(task[ii].x0, 1)) {
                task[ii].entry();
                if(task[ii].stdio) close(task[ii].stdio);
                task[ii].state=TASK_UNUSED;
                current=0;
                schedule();
        }
        memcpy(&task[ii].x, &task[ii].x0, sizeof(sigjmp_buf));
        init(ii+1);
}

int timer_handler() {
        uptime+=20;
        check_events();
        wake_up_all();
        schedule();
}

int check_events() {
        struct pollfd fds[100];
        int i, j;
        for(i=0; i<100; i++) {
                fds[i].fd=i;
                fds[i].events=POLLIN;
                fds[i].revents=0;
        }
        poll(fds, 100, 0);
        for(i=0; i<100; i++) {
                if(fds[i].revents & POLLIN) {
                        for(j=0; j<MAX_MTHREAD; j++) {
                                if(task[j].state==TASK_WAITIN && task[j].wait
fd==i) {
                                        task[j].state=TASK_RUN;
                                }
                        }
                }
        }
}

int wake_up_all() {
        int i;
        for(i=1; i<MAX_MTHREAD; i++) {
                if(task[i].state==TASK_SLEEP && uptime>task[i].t_wakeup) {
                        task[i].state=TASK_RUN;
                }
        }
}

int schedule() {
        int max, maxi, i;
        max=0;
        maxi=0;
        for(i=1; i<MAX_MTHREAD; i++) {
                if(task[i].state!=TASK_RUN) continue;
                if(uptime-task[i].idle_time>max) {
                        max=uptime-task[i].idle_time;
                        maxi=i;
                }
        }
        if(current) {
                if(sigsetjmp(current->x, 1)) return;
        }
        switch_to(&task[maxi]);
}

int switch_to(struct TASK *new_task) {
        new_task->idle_time=uptime;
        if(new_task==current) return;
        current=new_task;
        num_switch++;
        siglongjmp(current->x, 1);
}

int m_exec2(int func(), int fd) {
        int i, ii;
        for(i=1; i<MAX_MTHREAD; i++) {
                ii=(last_pid+i)%MAX_MTHREAD;
                if(ii==0) continue;
                if(task[ii].state==TASK_UNUSED) {
                        last_pid=ii;
                        memcpy(&task[ii].x, &task[ii].x0, sizeof(sigjmp_buf));
                        task[ii].pid=ii;
                        task[ii].ppid=m_getpid();
                        task[ii].state=TASK_RUN;
                        task[ii].entry=func;
                        task[ii].stdio=fd;
                        schedule();
                        return ii;
                }
        }
        return -1;
}

int m_exec(int func()) {
        return m_exec2(func, 0);
}

int m_exit() {
        if(!current) return;
        if(current->stdio) close(current->stdio);
        current->state=TASK_UNUSED;
        schedule();
}

int m_getpid() {
        if(!current) return 0;
        if(current->pid==0) {
                printf("??==0??\n");
        }
        return current->pid;
}

int m_getppid() {
        if(!current) return 0;
        return current->ppid;
}

int m_sleep(int n) {
        m_usleep(n*1000000);
}

int m_usleep(int n) {
        if(!current) return;
        current->t_wakeup=uptime+n/1000;
        current->state=TASK_SLEEP;
        schedule();
}

int printk(char *fmt, ...) {
        char buf[256];
        FORMAT(fmt, buf, 256);
        printf("%s", buf);
}

int panic(char *fmt, ...) {
        char buf[256];
        FORMAT(fmt, buf, 256);
        printf("%s", buf);
        exit(0);
}

int prints(char *fmt, ...) {
        char buf[256];
        int len;
        FORMAT(fmt, buf, 256);
        if(!current) return;
        if(is_close(current->stdio)) {
                printf("closed\n");
                m_exit();
        }
        len=strlen(buf);
        if(len<=0) return len;
        if(write(current->stdio, buf, len)<=0) {
                printf("%d %d pipe broken\n", m_getpid(), current->stdio);
                m_exit();
        }
}

int is_close(int fd) {
        int f=dup(fd);
        if(f>=0) {
                close(f);
                return 0;
        }
        return 1;
}

int m_read(char *buf, int len) {
        if(!current) return 0;
        if(current->stdio<0) return 0;
        current->waitfd=current->stdio;
        current->state=TASK_WAITIN;
        schedule();
        return read(current->waitfd, buf, len);
}

int m_bind(int port, int func()) {
        struct sockaddr_in sock;
        int optval;
        int fd, fd2;
        bzero(&sock, sizeof(struct sockaddr_in));
        sock.sin_family=AF_INET;
        sock.sin_port=htons(port);
        fd=socket(PF_INET, SOCK_STREAM, 0);
        if(fd<0) return -1;
        optval=1;
        setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(int));
        if(bind(fd, (struct sockaddr*)&sock, sizeof(struct sockaddr_in))<0)
                return -1;
        if(listen(fd, 4)<0) return -1;
        while(1) {
                int size;
                size=sizeof(struct sockaddr_in);
                printf("wait for accept fd=%d\n", fd);
                fd2=m_accept(fd, (struct sockaddr*)&sock, &size);
                printf("accept ok %d\n", fd2);
                if(fd2<0) continue;
                m_exec2(func, fd2);
        }
}

int m_accept(int fd, struct sockaddr *sock, int *size) {
        if(!current) return 0;
        current->waitfd=fd;
        current->state=TASK_WAITIN;
        schedule();
        printf("OK!\n");
        return accept(fd, sock, size);
}


--
※ 来源:.南京大学小百合站 bbs.nju.edu.cn.[FROM: dsl.nju.edu.cn]


[回到开始] [上一篇][下一篇]

荔园在线首页 友情链接:深圳大学 深大招生 荔园晨风BBS S-Term软件 网络书店