package Net::PSYC::Datagram; our $VERSION = '1.0'; use strict; use IO::Socket::INET; import Net::PSYC qw( watch add W sendmsg same_host send_mmp parse_uniform BLOCKING makeMSG make_psyc parse_psyc parse_mmp PSYC_PORT PSYCS_PORT register_host register_route make_mmp UNL); sub TRUST { return 1; } sub new { my $class = shift; my $addr = shift || undef; # NOT 127.1 my $port = int(shift||0) || undef; # also, NOT 4404 my %a = (LocalPort => $port, Proto => 'udp'); $a{LocalAddr} = $addr if $addr; my $socket = IO::Socket::INET->new(%a) or return $!; my $self = { 'SOCKET' => $socket, 'IP' => $socket->sockhost, 'PORT' => $port || $socket->sockport, 'TYPE' => 'd', 'I_BUFFER' => '', 'O_BUFFER' => [], 'O_COUNT' => 0, 'LF' => '', }; W1('UDP bind to %s:%s successful', $self->{'IP'}, $self->{'PORT'}); bless $self, $class; watch($self) unless (BLOCKING() & 2); add($self->{'SOCKET'}, 'w', sub {$self->write()}, 0) unless (BLOCKING() & 1); return $self; } # send ( target, mc, data, vars ) sub send { my $self = shift; my ( $target, $data, $vars ) = @_; W2('send(%s, %s, %s)', $target, $data, $vars); push(@{$self->{'O_BUFFER'}}, [ [$vars, $data, $target, 0 ] ]); if (BLOCKING()) { # send the packet instantly return !$self->write(); } else { Net::PSYC::Event::revoke($self->{'SOCKET'}, 'w'); } return 0; } sub write () { my $self = shift; return 1 if (!${$self->{'O_BUFFER'}}[$self->{'O_COUNT'}]); # get a packet from the buffer my $packet = shift(@{${$self->{'O_BUFFER'}}[$self->{'O_COUNT'}]}); my $target = $packet->[2]; my ($user, $host, $port, $type, $object) = parse_uniform($target); $port ||= PSYC_PORT(); $packet->[0]->{'_target'} ||= $target; # funny, but not what we want.. returns 0.0.0.0 for INADDR_ANY and even # when the ip is useful, the port may not - the other side should better # use its own peer info. or the perl app provides _source. # # $vars->{'_source'} |= "psyc://$self->{'IP'}:$self->{'PORT'}/"; my $m = ".\n"; # empty packet! $m .= make_mmp($packet->[0], $packet->[1]); unless ($host) { W0('This PSYC target (%s) needs a host. Dropping message.', $target); return 1; } my $taddr = gethostbyname($host); # hm.. strange thing! my $tin = sockaddr_in($port, $taddr); if (!defined($self->{'SOCKET'}->send($m, 0, $tin))) { if (++$packet->[3] >= 3) { W0('Delivery of a PSYC UDP packet to %s failed for the third time. Dropping message.', $target); return 1; } unshift(@{${$self->{'O_BUFFER'}}[$self->{'O_COUNT'}]}, $packet); return 1; } W1('UDP[%s:%s] <= %s', $host, $port, $packet->[0]->{'_source'} || UNL()); if (!scalar(@{${$self->{'O_BUFFER'}}[$self->{'O_COUNT'}]})) { # all fragments of this packet sent splice(@{$self->{'O_BUFFER'}}, $self->{'O_COUNT'}, 1); $self->{'O_COUNT'} = 0 if (!${$self->{'O_BUFFER'}}[$self->{'O_COUNT'}]); } else { # fragments of this packet left $self->{'O_COUNT'} = 0 if (!${$self->{'O_BUFFER'}}[++$self->{'O_COUNT'}]); } if(scalar(@{$self->{'O_BUFFER'}})) { if (BLOCKING()) { $self->write(); } else { Net::PSYC::Event::revoke($self->{'SOCKET'}, 'w'); } } return 1; } sub read () { my $self = shift; my ($data, $last); $self->{'LAST_RECV'} = $self->{'SOCKET'}->recv($data, 8192); # READ socket return if (!$data); # connection lost !? # gibt es nen 'richtigen' weg herauszufinden, ob der socket noch lebt? $self->{'I_BUFFER'} .= $data; delete $self->{'LF'}; return 1; } sub negotiate { 1 } # returns _one_ mmp-packet .. or undef if the buffer is empty sub recv () { my $self = shift; if (length($self->{'I_BUFFER'}) > 2) { if ( $self->{'LF'} || $self->{'I_BUFFER'} =~ s/^\.(\r?\n)//g ) { $self->{'LF'} ||= $1; my ($vars, $data) = parse_mmp(\$$self{'I_BUFFER'}, $self->{'LF'}); return if (!defined $vars); unless (exists $vars->{'_source'}) { my ($port, $ip) = sockaddr_in($self->{'LAST_RECV'}); $vars->{'_source'} = "psyc://$ip:$port"; } return ($vars, $data); } # TODO : we need to provide a proper algorithm to clean up the # in-buffer if we got corrupted packets in it. and we need to # detect corrupted packets.. udp sucks noodles! ,-) } return; } 1;