以下代码对一般使用websockets的人有帮助...并且可能是该领域入门的人的好模板。我想将其充实为更通用和可重用的内容,因为我的假设和问题将适用于许多不同的情况。

假设:


Timespan“延迟”用于告诉服务器我的客户端正常,并刷新订阅。经常将其发送到服务器会浪费资源,而发送太晚则可能导致服务器定义的超时。
接收块大小仅是我的客户端与网络缓冲区进行通信的方式。延迟不会影响服务器。 (不确定)
WebSocketStatus可以是其他值,例如closed,opening等,并且由于我不处理这些事件,因此该代码尚无法投入生产。

问题


必须做些什么才能准备好此代码生产?
如何确定发送和接收的最佳块大小?
我应该如何从代码错误,网络错误或服务器错误中恢复?
如何确保要获取的数据是完整的,而不是仅仅剪切一部分数据?仅仅因为其余数据卡在缓冲区中而在过渡期间关闭?

代码

  using System;
  using System.Collections.Generic;
  using System.Linq;
  using System.Net.WebSockets;
  using System.Text;
  using System.Threading;
  using System.Threading.Tasks;

  namespace BlockchainInfoWebSockets
  {
      class Client
      {
         private static object consoleLock = new object();
         private const int sendChunkSize = 256;
         private const int receiveChunkSize = 256;
         private const bool verbose = true;
         private static readonly TimeSpan delay = TimeSpan.FromMilliseconds(30000);

        static void Main(string[] args)
        {
            Thread.Sleep(1000);
            Connect("ws://ws.blockchain.info:8335/inv").Wait();
            Console.WriteLine("Press any key to exit...");
            Console.ReadKey();
        }

        public static async Task Connect(string uri)
        {
            ClientWebSocket webSocket = null;

            try
            {
                webSocket = new ClientWebSocket();
                await webSocket.ConnectAsync(new Uri(uri), CancellationToken.None);
                await Task.WhenAll(Receive(webSocket), Send(webSocket));
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception: {0}", ex);
            }
            finally
            {
                if (webSocket != null)
                    webSocket.Dispose();
                Console.WriteLine();

                lock (consoleLock)
                {
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("WebSocket closed.");
                    Console.ResetColor();
                }
            }
        }
       static UTF8Encoding encoder = new UTF8Encoding();

        private static async Task Send(ClientWebSocket webSocket)
        {

            //byte[] buffer = encoder.GetBytes("{\"op\":\"blocks_sub\"}"); //"{\"op\":\"unconfirmed_sub\"}");
            byte[] buffer = encoder.GetBytes("{\"op\":\"unconfirmed_sub\"}");
            await webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, CancellationToken.None);

          while (webSocket.State == WebSocketState.Open)
            {
                LogStatus(false, buffer, buffer.Length);
                await Task.Delay(delay);
            }
        }

        private static async Task Receive(ClientWebSocket webSocket)
        {
            byte[] buffer = new byte[receiveChunkSize];
            while (webSocket.State == WebSocketState.Open)
            {
                var result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
                if (result.MessageType == WebSocketMessageType.Close)
                {
                    await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
                }
                else
                {
                    LogStatus(true, buffer, result.Count);
                }
            }
        }

        private static void LogStatus(bool receiving, byte[] buffer, int length)
        {
            lock (consoleLock)
            {
                Console.ForegroundColor = receiving ? ConsoleColor.Green : ConsoleColor.Gray;
                //Console.WriteLine("{0} ", receiving ? "Received" : "Sent");

                if (verbose)
                    Console.WriteLine(encoder.GetString(buffer));

                Console.ResetColor();
            }
        }
    }
}


评论

在Receive()中,我们可以添加对碎片处理的支持-当发送的消息很大时会发生...

现在就自己动手做这件事....具体来说,我们应该检查ReceiveAsync()类类型的返回值:System.Net.WebSockets.WebSocketReceiveResult它具有一个标志来表明这是“结束”消息还是它是片段:EndOfMessage [bool]

@Sean我现在尚未接近此代码,但请进行更大的修改

在Receive方法中,不应使byte []缓冲区=新的byte [receiveChunkSize];在while循环中出现吗?

#1 楼


您可能希望通过重载对CancellationToken类的异步方法使用ClientWebSocket,以在退出时正确清理任务。
1024通常是我要用于Web连接的值,MTU(以太网的最大传输单元)1是1500字节,我想将其舍入到2的指数,在这种情况下为2 ^ 10。
我不确定ClientWebSocket类,但是您会要么得到一个WebException例如404,我认为是在StatusCode中,否则可能是您在说WebSocketStatus。记录这些异常可能是您唯一可以做的事情。
除非您有权访问另一端的服务器源代码并能够对其进行更改,除非服务器实现某种方式来告知多少数据它正在发送,无法确定。一个例子是传输的前4个字节是数据的长度。


#2 楼

包装器在关闭连接之前发送和接收一条消息。如果您只是这样做,则可以简单地使用HTTP。当存在一系列读和/或写序列且其间处于不活动状态时,WebSockets很有用。

#3 楼

最佳实践是使用System.Threading.Interlocked.CompareEx创建线程同步对象。就像这样:

private static object consoleLock
private static object GetLock() 
{ 
    System.Threading.Interlocked.CompareExchange(ref consoleLock, new object(), null);          
    return consoleLock; 
}


我会写一个LogStatus的重载,它接受一个字符串和一个Color,以便您可以在Connect方法中清理套接字关闭日志终于封锁了。

评论


\ $ \ begingroup \ $
您介意提供一个链接或描述使用CompareExchange而不是依赖于框架初始化顺序的值吗?
\ $ \ endgroup \ $
–pocheptsov
2015年9月4日在18:45