summaryrefslogtreecommitdiff
path: root/src/sender_client.cc
diff options
context:
space:
mode:
authorJoel Klinghed <the_jk@yahoo.com>2015-06-09 00:21:45 +0200
committerJoel Klinghed <the_jk@yahoo.com>2015-06-09 00:21:45 +0200
commit8f01c9a51e32891b862489e4082d2c20aa1fc883 (patch)
treec8f21cb9768c0853da6f68decae316c1f9c7a2f5 /src/sender_client.cc
parent913cfd1c7ef7a145036a8416d4ea815cb5cdb601 (diff)
Improve sender and sender_client
1) Add sockguard in sockutils.hh to help with closing sockets 2) Make sender fork in background 3) Make sender_client start a sender if needed (and sender_bin specified in config)
Diffstat (limited to 'src/sender_client.cc')
-rw-r--r--src/sender_client.cc200
1 files changed, 135 insertions, 65 deletions
diff --git a/src/sender_client.cc b/src/sender_client.cc
index 69899ec..df83a74 100644
--- a/src/sender_client.cc
+++ b/src/sender_client.cc
@@ -4,6 +4,7 @@
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/un.h>
+#include <sys/wait.h>
#include <cerrno>
#include <cstring>
#include <fcntl.h>
@@ -18,28 +19,30 @@ namespace stuff {
namespace {
+long WRITE_TIMEOUT = 5;
+long CONNECT_TIMEOUT = 5;
+
class SenderClientImpl : public SenderClient {
public:
- SenderClientImpl()
- : sock_(-1) {
- }
- ~SenderClientImpl() override {
- if (sock_ != -1) {
- close(sock_);
- }
+ SenderClientImpl(std::shared_ptr<Error> error)
+ : error_(error) {
}
bool open(const Config* config) {
if (!config) return false;
sender_ = config->get("sender", "");
- if (sender_.empty()) return false;
+ if (sender_.empty()) {
+ if (error_) error_->error("Config missing sender");
+ return false;
+ }
+ sender_bin_ = config->get("sender_bin", "");
return true;
}
void send(const std::string& channel, const std::string& message) override {
struct timeval target;
gettimeofday(&target, NULL);
- target.tv_sec += 5;
+ target.tv_sec += WRITE_TIMEOUT;
send(channel, message, &target);
}
@@ -47,7 +50,7 @@ public:
private:
void send(const std::string& channel, const std::string& message,
const struct timeval* target) {
- if (sock_ == -1) {
+ if (!sock_) {
if (!setup()) return;
}
@@ -60,7 +63,7 @@ private:
ssize_t ret;
if (pos < 4) {
size_t const avail = 4 - pos;
- ret = write(sock_,
+ ret = write(sock_.get(),
reinterpret_cast<char*>(&size1) + pos, avail);
if (ret > 0) {
pos += ret;
@@ -68,14 +71,14 @@ private:
}
} else if (pos < 4 + channel.size()) {
size_t const avail = 4 + channel.size() - pos;
- ret = write(sock_, channel.data() + pos - 4, avail);
+ ret = write(sock_.get(), channel.data() + pos - 4, avail);
if (ret > 0) {
pos += ret;
if (static_cast<size_t>(ret) == avail) continue;
}
} else if (pos < 8 + channel.size()) {
size_t const avail = 8 + channel.size() - pos;
- ret = write(sock_, reinterpret_cast<char*>(&size2)
+ ret = write(sock_.get(), reinterpret_cast<char*>(&size2)
+ pos - 4 - channel.size(), avail);
if (ret > 0) {
pos += ret;
@@ -83,7 +86,8 @@ private:
}
} else {
size_t const avail = len - pos;
- ret = write(sock_, message.data() + pos - 8 - channel.size(),
+ ret = write(sock_.get(),
+ message.data() + pos - 8 - channel.size(),
avail);
if (ret > 0) {
pos += ret;
@@ -94,45 +98,26 @@ private:
if (ret < 0) {
if (errno == EINTR) continue;
if (errno != EAGAIN && errno != EWOULDBLOCK) {
- close(sock_);
- sock_ = -1;
+ sock_.reset();
return send(channel, message);
}
}
fd_set write_set;
FD_ZERO(&write_set);
- FD_SET(sock_, &write_set);
+ FD_SET(sock_.get(), &write_set);
while (true) {
struct timeval timeout;
- gettimeofday(&timeout, NULL);
- if (target->tv_sec == timeout.tv_sec) {
- timeout.tv_sec = 0;
- if (target->tv_usec > timeout.tv_usec) {
- timeout.tv_usec = target->tv_usec - timeout.tv_usec;
- } else {
- timeout.tv_usec = 0;
- }
- } else if (target->tv_sec > timeout.tv_sec) {
- timeout.tv_sec = target->tv_sec - timeout.tv_sec;
- if (target->tv_usec >= timeout.tv_usec) {
- timeout.tv_usec = target->tv_usec - timeout.tv_usec;
- } else {
- timeout.tv_sec--;
- timeout.tv_usec =
- 1000000l + target->tv_usec - timeout.tv_usec;
- }
- } else {
+ if (!calc_timeout(target, &timeout)) {
timeout.tv_sec = 0;
timeout.tv_usec = 0;
}
- auto ret = select(sock_ + 1, nullptr, &write_set, nullptr,
+ auto ret = select(sock_.get() + 1, nullptr, &write_set, nullptr,
&timeout);
if (ret < 0 && errno == EINTR) continue;
if (ret <= 0) {
// Timeout or error
- close(sock_);
- sock_ = -1;
+ sock_.reset();
return send(channel, message);
}
break;
@@ -140,8 +125,81 @@ private:
}
}
+ bool connect_timeout(int sock, struct sockaddr* addr, socklen_t addrlen,
+ const struct timeval* target) {
+ if (!make_nonblocking(sock)) {
+ error_->error("Unable to make non-blocking socket", errno);
+ return false;
+ }
+ while (true) {
+ if (connect(sock, addr, addrlen) == 0) {
+ return true;
+ }
+ if (errno == EINTR) continue;
+ if (errno != EINPROGRESS) return false;
+ fd_set write_set;
+ FD_ZERO(&write_set);
+ FD_SET(sock, &write_set);
+ while (true) {
+ struct timeval timeout;
+ if (!calc_timeout(target, &timeout)) {
+ timeout.tv_sec = 0;
+ timeout.tv_usec = 0;
+ }
+ auto ret = select(sock + 1, nullptr, &write_set, nullptr,
+ &timeout);
+ if (ret < 0) {
+ if (errno == EINTR) continue;
+ return false;
+ }
+ if (ret == 0) {
+ errno = ETIMEDOUT;
+ return false;
+ }
+ int err;
+ socklen_t len = sizeof(int);
+ if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &len)) {
+ return false;
+ }
+ if (err != 0) {
+ errno = err;
+ return false;
+ }
+ return true;
+ }
+ }
+ }
+
+ bool setup_start() {
+ if (sender_bin_.empty()) return false;
+ auto pid = fork();
+ if (pid < 0) {
+ if (error_) error_->error("Error forking", errno);
+ return false;
+ }
+ if (pid == 0) {
+ char* argv[2];
+ argv[0] = const_cast<char*>(sender_bin_.c_str());
+ argv[1] = nullptr;
+ _exit(execv(argv[0], argv));
+ } else {
+ int status;
+ auto ret = waitpid(pid, &status, 0);
+ if (ret == -1) {
+ if (error_) {
+ error_->error("Error waiting for sender bin", errno);
+ }
+ }
+ return true;
+ }
+ }
+
bool setup() {
- size_t pos = sender_.find(':');
+ struct timeval target;
+ gettimeofday(&target, NULL);
+ target.tv_sec += CONNECT_TIMEOUT;
+
+ auto pos = sender_.find(':');
if (pos != std::string::npos) {
// host:port
struct addrinfo hints, *res;
@@ -151,57 +209,69 @@ private:
hints.ai_protocol = IPPROTO_TCP;
if (getaddrinfo(sender_.substr(0, pos).c_str(),
sender_.substr(pos + 1).c_str(), &hints, &res)) {
+ if (error_) error_->error("Error resolving: " + sender_);
return false;
}
for (auto ptr = res; ptr; ptr = ptr->ai_next) {
- sock_ = socket(ptr->ai_family, ptr->ai_socktype,
- ptr->ai_protocol);
- if (sock_ == -1) continue;
- // TODO: Make async
- if (connect(sock_, res->ai_addr, res->ai_addrlen)) {
- close(sock_);
- sock_ = -1;
+ sock_.reset(socket(ptr->ai_family, ptr->ai_socktype,
+ ptr->ai_protocol));
+ if (!sock_) continue;
+ if (!connect_timeout(sock_.get(), res->ai_addr, res->ai_addrlen,
+ &target)) {
+ sock_.reset();
continue;
}
break;
- }
- freeaddrinfo(res);
- if (sock_ == -1) return false;
+ }
+ freeaddrinfo(res);
+ if (!sock_) {
+ if (errno == ECONNREFUSED && setup_start()) {
+ return setup();
+ }
+ if (error_) error_->error("Socket/Connect failed", errno);
+ return false;
+ }
} else {
// socket
- sock_ = socket(PF_LOCAL, SOCK_STREAM, 0);
- if (sock_ == -1) return false;
+ sock_.reset(socket(PF_LOCAL, SOCK_STREAM, 0));
+ if (!sock_) {
+ if (error_) {
+ error_->error("Unable to create unix socket", errno);
+ }
+ return false;
+ }
struct sockaddr_un name;
name.sun_family = AF_LOCAL;
strncpy(name.sun_path, sender_.c_str(), sizeof(name.sun_path));
name.sun_path[sizeof(name.sun_path) - 1] = '\0';
- // TODO: Make async
- if (connect(sock_, reinterpret_cast<struct sockaddr*>(&name),
- SUN_LEN(&name))) {
- close(sock_);
- sock_ = 1;
+ if (!connect_timeout(sock_.get(),
+ reinterpret_cast<struct sockaddr*>(&name),
+ SUN_LEN(&name), &target)) {
+ if ((errno == ECONNREFUSED ||
+ errno == ENOENT) && setup_start()) {
+ return setup();
+ }
+ if (error_) error_->error("Connect failed", errno);
+ sock_.reset();
return false;
}
}
- if (!make_nonblocking(sock_)) {
- close(sock_);
- sock_ = -1;
- return false;
- }
-
return true;
}
std::string sender_;
- int sock_;
+ std::string sender_bin_;
+ std::shared_ptr<Error> error_;
+ sockguard sock_;
};
} // namespace
// static
-std::unique_ptr<SenderClient> SenderClient::create(const Config* config) {
- std::unique_ptr<SenderClientImpl> ret(new SenderClientImpl());
+std::unique_ptr<SenderClient> SenderClient::create(
+ const Config* config, std::shared_ptr<Error> error) {
+ std::unique_ptr<SenderClientImpl> ret(new SenderClientImpl(error));
if (!ret->open(config)) return nullptr;
return std::move(ret);
}