plebble
send_queue.h
Go to the documentation of this file.
1 /*
2 -------------------------------------------------------------------------------
3  PLEBBLE
4 -------------------------------------------------------------------------------
5  Copyright (C) 2019-2020 KATLAS Technology. (http://katlastechnology.com)
6  Copyright (C) 2017-2020 Marcos Mayorga. (mm@mm-studios.com)
7 
8  This file is part of our Plebble(R) Platform.
9 
10  The code below cannot be copied, used for any purpose other than the one
11  agreed and/or distributed without the express permission of
12  KATLAS Technology.
13 -------------------------------------------------------------------------------
14 
15 
16 */
17 #ifndef USGOV_3cf53a0bc929f5209a42abf8163c17b908f6d7dfda0b34da09ce1d75d036a947
18 #define USGOV_3cf53a0bc929f5209a42abf8163c17b908f6d7dfda0b34da09ce1d75d036a947
19 
20 #include <queue>
21 #include <vector>
22 #include <unordered_map>
23 #include <us/gov/bgtask.h>
24 #include <us/gov/config.h>
25 #include "busyled.h"
26 
27 namespace us{ namespace gov {
28 namespace socket {
29 using namespace std;
30 
31  struct datagram;
32  struct client;
33 
34  struct qi:pair<datagram*,client*> {
36  qi(datagram* d, client*c);
37  qi(const qi&)=delete;
38  ~qi();
39  short skip{0};
40  };
41 
43  bool operator () (qi*& lhs, qi*& rhs) const;
44  };
45 
46  typedef priority_queue<qi*,vector<qi*>,priority_service> pri_q_t;
47 
48  struct queue_t: pri_q_t, bgtask { //datagrams with lower service code have more priority
49  typedef pri_q_t b;
50  typedef bgtask t;
51 
52  struct ongoing_t: unordered_map<client*,qi*> {
53  qi* reset(qi*);
54  void set(qi*);
55  };
56 
57  static constexpr size_t wmh{CFG_SENDQ_WMH}; //watermark high
58  static constexpr size_t schunk{CFG_SENDQ_SCHUNK};
59 
61  ~queue_t() override;
62 
63  ko send(datagram* d, client* cli, uint16_t pri);
64 
65  qi* next();
66  qi* next_();
68  size_t get_size() const;
69  void wait();
70  void dump(const string& prefix, ostream&) const;
71  void clear();
72 // virtual void flood(const datagram&, const client&) const;
73  void purge(client*);
74  void run();
75  void onwakeup();
77 
78  condition_variable cv;
79  mutable mutex mx;
80  #if CFG_COUNTERS==1
81  uint64_t sent{0};
82  uint64_t dropped{0};
83  uint64_t bytes_sent{0};
84  #endif
86  //pri_q_t skipped;
88  #if CFG_LOGS==1
89  string logdir;
90  #endif
91  };
92 
94 
95 }
96 }}
97 
98 #endif
99 
100 
101 
us::gov::socket::queue_t::~queue_t
~queue_t() override
bgtask.h
us::gov::socket::pri_q_t
priority_queue< qi *, vector< qi * >, priority_service > pri_q_t
Definition: send_queue.h:46
c
us::gov::socket::queue_t c
Definition: send_queue.cpp:25
us::gov::socket::queue_t::replace
qi * replace(qi *)
us::gov::socket::client
Definition: client.h:44
us.ko
Definition: ko.java:20
us::gov::socket::queue_t::t
bgtask t
Definition: send_queue.h:50
us::gov::socket::datagram::completed
bool completed() const
log_start
#define log_start
Definition: gov.h:85
us
Definition: daemon.h:22
us::gov::socket::queue_t::get_size
size_t get_size() const
CFG_SENDQ_SCHUNK
#define CFG_SENDQ_SCHUNK
Definition: config.h:40
unlikely
#define unlikely(x)
Definition: likely.h:30
us::gov::socket::queue_t::purge
void purge(client *)
us::gov::socket::busyled_t
Definition: busyled.h:27
us.pair.first
f first
Definition: pair.java:20
us::gov::socket::qi
Definition: send_queue.h:34
us::gov::socket::queue_t
Definition: send_queue.h:48
likely.h
us::gov::socket::queue_t::ongoing_t
Definition: send_queue.h:52
us::gov::socket::queue_t::run
void run()
us::gov::socket::queue_t::set_busy_handler
void set_busy_handler(busyled_t::handler_t *hsend)
us::gov::socket::queue_t::b
pri_q_t b
Definition: send_queue.h:49
us::gov::socket::send_queue_t
queue_t send_queue_t
Definition: send_queue.h:93
us::gov::socket::datagram::dend
uint32_t dend
Definition: datagram.h:201
us::gov::socket::datagram
Definition: datagram.h:44
us::gov::socket
Definition: busyled.h:23
us::gov::socket::datagram::service
uint16_t service
Definition: datagram.h:200
us::gov::socket::qi::qi
qi(const qi &)=delete
us::gov::socket::queue_t::onwakeup
void onwakeup()
us.pair.second
s second
Definition: pair.java:21
us::gov::socket::busyled_t::handler_t
Definition: busyled.h:28
us::gov::socket::qi::~qi
~qi()
Definition: send_queue.cpp:310
us::gov::socket::qi::b
pair< datagram *, client * > b
Definition: send_queue.h:35
us::gov::socket::queue_t::dump
void dump(const string &prefix, ostream &) const
us::gov::socket::queue_t::next_
qi * next_()
us::ko
const ko_t * ko
Definition: ko.h:27
datagram.h
CFG_SENDQ_WMH
#define CFG_SENDQ_WMH
Definition: config.h:39
us::gov::socket::priority_service
Definition: send_queue.h:42
us::gov::socket::queue_t::ongoing
ongoing_t ongoing
Definition: send_queue.h:85
c
Definition: client.cpp:417
us::ok
static constexpr ko ok
Definition: ko.h:28
us::gov::socket::queue_t::send
ko send(datagram *d, client *cli, uint16_t pri)
likely
#define likely(x)
Definition: likely.h:29
us::gov::socket::queue_t::mx
mutex mx
Definition: send_queue.h:79
us::gov::socket::datagram.size
long size()
Definition: datagram.cs:178
us::gov::socket::queue_t::ongoing_t::set
void set(qi *)
us::gov::socket::queue_t::wait
void wait()
us::gov::socket::queue_t::ongoing_t::reset
qi * reset(qi *)
us::gov::socket::queue_t::busyled
busyled_t busyled
Definition: send_queue.h:87
std
Definition: app.h:380
send_queue.h
us::gov::socket::queue_t::queue_t
queue_t()
us::gov::socket::queue_t::cv
condition_variable cv
Definition: send_queue.h:78
config.h
busyled.h
us::gov::socket::priority_service::operator()
bool operator()(qi *&lhs, qi *&rhs) const
Definition: send_queue.cpp:315
us::gov::socket::queue_t::clear
void clear()
us::gov::socket::qi::skip
short skip
Definition: send_queue.h:39
us.pair
Definition: pair.java:19
gov.h
us::gov::socket::client::endpoint
string endpoint() const
client.h
log
#define log
Definition: gov.h:83
us::gov::socket::queue_t::next
qi * next()
us::gov::bgtask
Definition: bgtask.h:31