]> git.karo-electronics.de Git - mv-sheeva.git/blobdiff - fs/dlm/lowcomms.c
Merge branch 'master' into tk71
[mv-sheeva.git] / fs / dlm / lowcomms.c
index 37a34c2c622a12525927956223dcb79c80d000a5..2d8c87b951c2539ee31114f54caa131abbd1d9b7 100644 (file)
@@ -63,6 +63,9 @@
 #define NEEDED_RMEM (4*1024*1024)
 #define CONN_HASH_SIZE 32
 
+/* Number of messages to send before rescheduling */
+#define MAX_SEND_MSG_COUNT 25
+
 struct cbuf {
        unsigned int base;
        unsigned int len;
@@ -108,6 +111,7 @@ struct connection {
 #define CF_INIT_PENDING 4
 #define CF_IS_OTHERCON 5
 #define CF_CLOSE 6
+#define CF_APP_LIMITED 7
        struct list_head writequeue;  /* List of outgoing writequeue_entries */
        spinlock_t writequeue_lock;
        int (*rx_action) (struct connection *); /* What to do when active */
@@ -295,7 +299,17 @@ static void lowcomms_write_space(struct sock *sk)
 {
        struct connection *con = sock2con(sk);
 
-       if (con && !test_and_set_bit(CF_WRITE_PENDING, &con->flags))
+       if (!con)
+               return;
+
+       clear_bit(SOCK_NOSPACE, &con->sock->flags);
+
+       if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
+               con->sock->sk->sk_write_pending--;
+               clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags);
+       }
+
+       if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
                queue_work(send_workqueue, &con->swork);
 }
 
@@ -915,6 +929,7 @@ static void tcp_connect_to_sock(struct connection *con)
        struct sockaddr_storage saddr, src_addr;
        int addr_len;
        struct socket *sock = NULL;
+       int one = 1;
 
        if (con->nodeid == 0) {
                log_print("attempt to connect sock 0 foiled");
@@ -960,6 +975,11 @@ static void tcp_connect_to_sock(struct connection *con)
        make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
 
        log_print("connecting to %d", con->nodeid);
+
+       /* Turn off Nagle's algorithm */
+       kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
+                         sizeof(one));
+
        result =
                sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
                                   O_NONBLOCK);
@@ -1011,6 +1031,10 @@ static struct socket *tcp_create_listen_sock(struct connection *con,
                goto create_out;
        }
 
+       /* Turn off Nagle's algorithm */
+       kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
+                         sizeof(one));
+
        result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
                                   (char *)&one, sizeof(one));
 
@@ -1297,6 +1321,7 @@ static void send_to_sock(struct connection *con)
        const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
        struct writequeue_entry *e;
        int len, offset;
+       int count = 0;
 
        mutex_lock(&con->sock_mutex);
        if (con->sock == NULL)
@@ -1319,14 +1344,27 @@ static void send_to_sock(struct connection *con)
                        ret = kernel_sendpage(con->sock, e->page, offset, len,
                                              msg_flags);
                        if (ret == -EAGAIN || ret == 0) {
+                               if (ret == -EAGAIN &&
+                                   test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) &&
+                                   !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
+                                       /* Notify TCP that we're limited by the
+                                        * application window size.
+                                        */
+                                       set_bit(SOCK_NOSPACE, &con->sock->flags);
+                                       con->sock->sk->sk_write_pending++;
+                               }
                                cond_resched();
                                goto out;
                        }
                        if (ret <= 0)
                                goto send_error;
                }
-                       /* Don't starve people filling buffers */
+
+               /* Don't starve people filling buffers */
+               if (++count >= MAX_SEND_MSG_COUNT) {
                        cond_resched();
+                       count = 0;
+               }
 
                spin_lock(&con->writequeue_lock);
                e->offset += ret;
@@ -1430,20 +1468,17 @@ static void work_stop(void)
 
 static int work_start(void)
 {
-       int error;
-       recv_workqueue = create_workqueue("dlm_recv");
-       error = IS_ERR(recv_workqueue);
-       if (error) {
-               log_print("can't start dlm_recv %d", error);
-               return error;
+       recv_workqueue = create_singlethread_workqueue("dlm_recv");
+       if (!recv_workqueue) {
+               log_print("can't start dlm_recv");
+               return -ENOMEM;
        }
 
        send_workqueue = create_singlethread_workqueue("dlm_send");
-       error = IS_ERR(send_workqueue);
-       if (error) {
-               log_print("can't start dlm_send %d", error);
+       if (!send_workqueue) {
+               log_print("can't start dlm_send");
                destroy_workqueue(recv_workqueue);
-               return error;
+               return -ENOMEM;
        }
 
        return 0;