/*
Asynchronous request-reply single-threaded server in Python
that spawns a request handler each time a request is received
This is different from other examples because the number of request handler threads is not defined ahead of time.
Request:
Client DEALER --> Server ROUTER --> Request handler (spawned)
1. Clients send requests via a DEALER socket on port 5570
2. Server receives requests via a ROUTER socket on port 5570
3. Server passes both the request and the client identity directly to request handlers when they are spawned
Reply:
Client DEALER <-- Server ROUTER <-- Server DEALER <-- Request handler DEALER
1. Request handler returns the reply to the Server via a DEALER socket on inproc
2. Server receives the reply from the request handler via a DEALER socket on inproc
3. Server sends the reply to the client via a ROUTER socket on port 5570
4. Client receives the reply via a DEALER socket on port 5570
*/
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;
namespace NetmqSample
{
public class ZmqClient
{
public void Request(
string input)
{
var socket =
new DealerSocket();
socket.Options.Identity =
Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
socket.Connect("tcp://127.0.0.1:5570");
socket.SendFrame(input);
Console.WriteLine($"client send: {input} : {DateTime.Now:T}");
var answer =
socket.ReceiveFrameString();
Console.WriteLine($"client received: {answer} : {DateTime.Now:T}");
socket.Dispose();
}
}
public class ZmqServer
{
private DealerSocket _backend;
private RouterSocket _frontend;
public void Run()
{
_frontend =
new RouterSocket();
_frontend.Bind("tcp://*:5570");
_frontend.ReceiveReady +=
Frontend_ReceiveReady;
_backend =
new DealerSocket();
_backend.Bind("inproc://backend");
_backend.ReceiveReady +=
Backend_ReceiveReady;
var poller =
new NetMQPoller { _frontend, _backend };
poller.RunAsync();
Console.WriteLine("server started");
}
private void Backend_ReceiveReady(
object sender, NetMQSocketEventArgs e)
{
var id =
e.Socket.ReceiveFrameString();
var msg =
e.Socket.ReceiveFrameString();
Console.WriteLine($"server backend response: {id} : {msg}");
_frontend.SendFrame(id, true);
_frontend.SendFrame(msg);
}
private void Frontend_ReceiveReady(
object sender, NetMQSocketEventArgs e)
{
var id =
e.Socket.ReceiveFrameString();
var msg =
e.Socket.ReceiveFrameString();
//Console.WriteLine($"server frontend received: {id} : {msg} : {DateTime.Now:T}");
var task =
new Task(() =>
new RequestHandler().Run(id, msg), TaskCreationOptions.LongRunning);
task.Start();
}
}
public class RequestHandler
{
public void Run(
string id,
string msg)
{
var worker =
new DealerSocket(
"inproc://backend");
// Simulate a long-running operation
Thread.Sleep(
2000);
worker.SendFrame(id, true);
worker.SendFrame(msg +
" : " +
DateTime.Now.ToLongTimeString());
worker.Dispose();
}
}
}
class Program
{
static void Main(
string[] args)
{
var server =
new ZmqServer();
server.Run();
Enumerable.Range(0,
2000).ToList().ForEach(x =>
{
Task.Factory.StartNew(() =>
new ZmqClient().Request(x.ToString(
"0000")), TaskCreationOptions.LongRunning);
});
Console.ReadLine();
}
}
转载于:https://www.cnblogs.com/zhahost/p/6013550.html
转载请注明原文地址: https://win8.8miu.com/read-1552312.html